Source code for satpy.readers.olci_nc

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2016 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""Sentinel-3 OLCI reader
"""

import logging
from datetime import datetime

import dask.array as da
import numpy as np
import xarray as xr

from satpy.readers.file_handlers import BaseFileHandler
from satpy.utils import angle2xyz, xyz2angle
from satpy import CHUNK_SIZE
from functools import reduce

logger = logging.getLogger(__name__)

PLATFORM_NAMES = {'S3A': 'Sentinel-3A',
                  'S3B': 'Sentinel-3B'}


[docs]class BitFlags(object): """Manipulate flags stored bitwise. """ flag_list = ['INVALID', 'WATER', 'LAND', 'CLOUD', 'SNOW_ICE', 'INLAND_WATER', 'TIDAL', 'COSMETIC', 'SUSPECT', 'HISOLZEN', 'SATURATED', 'MEGLINT', 'HIGHGLINT', 'WHITECAPS', 'ADJAC', 'WV_FAIL', 'PAR_FAIL', 'AC_FAIL', 'OC4ME_FAIL', 'OCNN_FAIL', 'Extra_1', 'KDM_FAIL', 'Extra_2', 'CLOUD_AMBIGUOUS', 'CLOUD_MARGIN', 'BPAC_ON', 'WHITE_SCATT', 'LOWRW', 'HIGHRW'] meaning = {f: i for i, f in enumerate(flag_list)} def __init__(self, value): self._value = value def __getitem__(self, item): pos = self.meaning[item] return ((self._value >> pos) % 2).astype(np.bool)
[docs]class NCOLCIBase(BaseFileHandler): def __init__(self, filename, filename_info, filetype_info): super(NCOLCIBase, self).__init__(filename, filename_info, filetype_info) self.nc = xr.open_dataset(self.filename, decode_cf=True, mask_and_scale=True, engine='h5netcdf', chunks={'columns': CHUNK_SIZE, 'rows': CHUNK_SIZE}) self.nc = self.nc.rename({'columns': 'x', 'rows': 'y'}) # TODO: get metadata from the manifest file (xfdumanifest.xml) self.platform_name = PLATFORM_NAMES[filename_info['mission_id']] self.sensor = 'olci' @property def start_time(self): return datetime.strptime(self.nc.attrs['start_time'], '%Y-%m-%dT%H:%M:%S.%fZ') @property def end_time(self): return datetime.strptime(self.nc.attrs['stop_time'], '%Y-%m-%dT%H:%M:%S.%fZ')
[docs] def get_dataset(self, key, info): """Load a dataset.""" logger.debug('Reading %s.', key.name) variable = self.nc[key.name] return variable
[docs]class NCOLCICal(NCOLCIBase): pass
[docs]class NCOLCIGeo(NCOLCIBase): pass
[docs]class NCOLCIChannelBase(NCOLCIBase): def __init__(self, filename, filename_info, filetype_info): super(NCOLCIChannelBase, self).__init__(filename, filename_info, filetype_info) self.channel = filename_info.get('dataset_name')
[docs]class NCOLCI1B(NCOLCIChannelBase): def __init__(self, filename, filename_info, filetype_info, cal): super(NCOLCI1B, self).__init__(filename, filename_info, filetype_info) self.cal = cal.nc def _get_solar_flux_old(self, band): # TODO: this could be replaced with vectorized indexing in the future. from dask.base import tokenize blocksize = CHUNK_SIZE solar_flux = self.cal['solar_flux'].isel(bands=band).values d_index = self.cal['detector_index'].fillna(0).astype(int) shape = d_index.shape vchunks = range(0, shape[0], blocksize) hchunks = range(0, shape[1], blocksize) token = tokenize(band, d_index, solar_flux) name = 'solar_flux_' + token def get_items(array, slices): return solar_flux[d_index[slices].values] dsk = {(name, i, j): (get_items, d_index, (slice(vcs, min(vcs + blocksize, shape[0])), slice(hcs, min(hcs + blocksize, shape[1])))) for i, vcs in enumerate(vchunks) for j, hcs in enumerate(hchunks) } res = da.Array(dsk, name, shape=shape, chunks=(blocksize, blocksize), dtype=solar_flux.dtype) return res @staticmethod def _get_items(idx, solar_flux): return solar_flux[idx] def _get_solar_flux(self, band): """Get the solar flux for the band.""" solar_flux = self.cal['solar_flux'].isel(bands=band).values d_index = self.cal['detector_index'].fillna(0).astype(int) return da.map_blocks(self._get_items, d_index.data, solar_flux=solar_flux, dtype=solar_flux.dtype)
[docs] def get_dataset(self, key, info): """Load a dataset.""" if self.channel != key.name: return logger.debug('Reading %s.', key.name) radiances = self.nc[self.channel + '_radiance'] if key.calibration == 'reflectance': idx = int(key.name[2:]) - 1 sflux = self._get_solar_flux(idx) radiances = radiances / sflux * np.pi * 100 radiances.attrs['units'] = '%' radiances.attrs['platform_name'] = self.platform_name radiances.attrs['sensor'] = self.sensor radiances.attrs.update(key.to_dict()) return radiances
[docs]class NCOLCI2(NCOLCIChannelBase):
[docs] def get_dataset(self, key, info): """Load a dataset """ if self.channel is not None and self.channel != key.name: return logger.debug('Reading %s.', key.name) if self.channel is not None and self.channel.startswith('Oa'): dataset = self.nc[self.channel + '_reflectance'] else: dataset = self.nc[info['nc_key']] if key.name == 'wqsf': dataset.attrs['_FillValue'] = 1 elif key.name == 'mask': mask = self.getbitmask(dataset.to_masked_array().data) dataset = dataset * np.nan dataset = dataset.where(~ mask, True) dataset.attrs['platform_name'] = self.platform_name dataset.attrs['sensor'] = self.sensor dataset.attrs.update(key.to_dict()) return dataset
[docs] def getbitmask(self, wqsf, items=[]): """ """ items = ["INVALID", "SNOW_ICE", "INLAND_WATER", "SUSPECT", "AC_FAIL", "CLOUD", "HISOLZEN", "OCNN_FAIL", "CLOUD_MARGIN", "CLOUD_AMBIGUOUS", "LOWRW", "LAND"] bflags = BitFlags(wqsf) return reduce(np.logical_or, [bflags[item] for item in items])
[docs]class NCOLCIAngles(BaseFileHandler): datasets = {'satellite_azimuth_angle': 'OAA', 'satellite_zenith_angle': 'OZA', 'solar_azimuth_angle': 'SAA', 'solar_zenith_angle': 'SZA'} def __init__(self, filename, filename_info, filetype_info): super(NCOLCIAngles, self).__init__(filename, filename_info, filetype_info) self.nc = None # TODO: get metadata from the manifest file (xfdumanifest.xml) self.platform_name = PLATFORM_NAMES[filename_info['mission_id']] self.sensor = 'olci' self.cache = {} self._start_time = filename_info['start_time'] self._end_time = filename_info['end_time']
[docs] def get_dataset(self, key, info): """Load a dataset.""" if key.name not in self.datasets: return if self.nc is None: self.nc = xr.open_dataset(self.filename, decode_cf=True, mask_and_scale=True, engine='h5netcdf', chunks={'tie_columns': CHUNK_SIZE, 'tie_rows': CHUNK_SIZE}) self.nc = self.nc.rename({'tie_columns': 'x', 'tie_rows': 'y'}) logger.debug('Reading %s.', key.name) l_step = self.nc.attrs['al_subsampling_factor'] c_step = self.nc.attrs['ac_subsampling_factor'] if (c_step != 1 or l_step != 1) and self.cache.get(key.name) is None: if key.name.startswith('satellite'): zen = self.nc[self.datasets['satellite_zenith_angle']] zattrs = zen.attrs azi = self.nc[self.datasets['satellite_azimuth_angle']] aattrs = azi.attrs elif key.name.startswith('solar'): zen = self.nc[self.datasets['solar_zenith_angle']] zattrs = zen.attrs azi = self.nc[self.datasets['solar_azimuth_angle']] aattrs = azi.attrs else: raise NotImplementedError("Don't know how to read " + key.name) x, y, z = angle2xyz(azi, zen) shape = x.shape from geotiepoints.interpolator import Interpolator tie_lines = np.arange( 0, (shape[0] - 1) * l_step + 1, l_step) tie_cols = np.arange(0, (shape[1] - 1) * c_step + 1, c_step) lines = np.arange((shape[0] - 1) * l_step + 1) cols = np.arange((shape[1] - 1) * c_step + 1) along_track_order = 1 cross_track_order = 3 satint = Interpolator([x.values, y.values, z.values], (tie_lines, tie_cols), (lines, cols), along_track_order, cross_track_order) (x, y, z, ) = satint.interpolate() del satint x = xr.DataArray(da.from_array(x, chunks=(CHUNK_SIZE, CHUNK_SIZE)), dims=['y', 'x']) y = xr.DataArray(da.from_array(y, chunks=(CHUNK_SIZE, CHUNK_SIZE)), dims=['y', 'x']) z = xr.DataArray(da.from_array(z, chunks=(CHUNK_SIZE, CHUNK_SIZE)), dims=['y', 'x']) azi, zen = xyz2angle(x, y, z) azi.attrs = aattrs zen.attrs = zattrs if 'zenith' in key.name: values = zen elif 'azimuth' in key.name: values = azi else: raise NotImplementedError("Don't know how to read " + key.name) if key.name.startswith('satellite'): self.cache['satellite_zenith_angle'] = zen self.cache['satellite_azimuth_angle'] = azi elif key.name.startswith('solar'): self.cache['solar_zenith_angle'] = zen self.cache['solar_azimuth_angle'] = azi elif key.name in self.cache: values = self.cache[key.name] else: values = self.nc[self.datasets[key.name]] values.attrs['platform_name'] = self.platform_name values.attrs['sensor'] = self.sensor values.attrs.update(key.to_dict()) return values
@property def start_time(self): return self._start_time @property def end_time(self): return self._end_time