#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2017 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
# Copyright (c) 2016--2019.
# Author(s):
#
# Thomas Leppelt <thomas.leppelt@gmail.com>
# Sauli Joro <sauli.joro@icloud.com>
# Gerrit Holl <gerrit.holl@dwd.de>
# This file is part of satpy.
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""Interface to MTG-FCI-FDHSI L1C NetCDF files
This module defines the :class:`FCIFDHSIFileHandler` file handler, to
be used for reading Meteosat Third Generation (MTG) Flexible Combined
Imager (FCI) Full Disk High Spectral Imagery (FDHSI) data. FCI will fly
on the MTG Imager (MTG-I) series of satellites, scheduled to be launched
in 2021 by the earliest. For more information about FCI, see `EUMETSAT`_.
.. _EUMETSAT: https://www.eumetsat.int/website/home/Satellites/FutureSatellites/MeteosatThirdGeneration/MTGDesign/index.html#fci # noqa: E501
"""
from __future__ import (division, absolute_import, print_function,
unicode_literals)
import logging
import numpy as np
import dask.array as da
import xarray as xr
from pyresample import geometry
from netCDF4 import default_fillvals
from .netcdf_utils import NetCDF4FileHandler
logger = logging.getLogger(__name__)
[docs]class FCIFDHSIFileHandler(NetCDF4FileHandler):
"""Class implementing the MTG FCI FDHSI File Reader
This class implements the Meteosat Third Generation (MTG) Flexible
Combined Imager (FCI) Full Disk High Spectral Imagery (FDHSI) reader.
It is designed to be used through the :class:`~satpy.Scene`
class using the :mod:`~satpy.Scene.load` method with the reader
``"fci_l1c_fdhsi"``.
"""
def __init__(self, filename, filename_info, filetype_info):
super(FCIFDHSIFileHandler, self).__init__(filename, filename_info,
filetype_info)
logger.debug('Reading: {}'.format(self.filename))
logger.debug('Start: {}'.format(self.start_time))
logger.debug('End: {}'.format(self.end_time))
self.cache = {}
@property
def start_time(self):
return self.filename_info['start_time']
@property
def end_time(self):
return self.filename_info['end_time']
[docs] def get_dataset(self, key, info=None):
"""Load a dataset."""
logger.debug('Reading {} from {}'.format(key.name, self.filename))
# Get the dataset
# Get metadata for given dataset
measured, root = self.get_channel_dataset(key.name)
radlab = measured + "/effective_radiance"
data = self[radlab]
attrs = data.attrs.copy()
info = info.copy()
fv = attrs.pop(
"FillValue",
default_fillvals.get(data.dtype.str[1:], np.nan))
vr = attrs.pop("valid_range", [-np.inf, np.inf])
if key.calibration == "counts":
attrs["_FillValue"] = fv
nfv = fv
else:
nfv = np.nan
data = data.where(data >= vr[0], nfv)
data = data.where(data <= vr[1], nfv)
if key.calibration == "counts":
# from package description, this just means not applying add_offset
# and scale_factor
attrs.pop("scale_factor")
attrs.pop("add_offset")
data.attrs["units"] = "1"
res = data
else:
data = (data * attrs.pop("scale_factor", 1) +
attrs.pop("add_offset", 0))
if key.calibration in ("brightness_temperature", "reflectance"):
res = self.calibrate(data, key, measured, root)
else:
res = data
data.attrs["units"] = attrs["units"]
# pre-calibration units no longer apply
info.pop("units")
attrs.pop("units")
self.nlines, self.ncols = res.shape
res.attrs.update(key.to_dict())
res.attrs.update(info)
res.attrs.update(attrs)
return res
[docs] def get_channel_dataset(self, channel):
root_group = 'data/{}'.format(channel)
group = 'data/{}/measured'.format(channel)
return group, root_group
[docs] def calc_area_extent(self, key):
"""Calculate area extent for a dataset."""
# Calculate the area extent of the swath based on start line and column
# information, total number of segments and channel resolution
# numbers from Package Description, Table 8
xyres = {500: 22272, 1000: 11136, 2000: 5568}
chkres = xyres[key.resolution]
# Get metadata for given dataset
measured, root = self.get_channel_dataset(key.name)
# Get start/end line and column of loaded swath.
self.startline = int(self[measured + "/start_position_row"])
self.endline = int(self[measured + "/end_position_row"])
self.startcol = int(self[measured + "/start_position_column"])
self.endcol = int(self[measured + "/end_position_column"])
self.nlines, self.ncols = self[measured + "/effective_radiance/shape"]
logger.debug('Channel {} resolution: {}'.format(key.name, chkres))
logger.debug('Row/Cols: {} / {}'.format(self.nlines, self.ncols))
logger.debug('Start/End row: {} / {}'.format(self.startline, self.endline))
logger.debug('Start/End col: {} / {}'.format(self.startcol, self.endcol))
# total_segments = 70
# Calculate full globe line extent
max_y = 5432229.9317116784
min_y = -5429229.5285458621
full_y = max_y + abs(min_y)
# Single swath line extent
res_y = full_y / chkres # Extent per pixel resolution
startl = min_y + res_y * self.startline - 0.5 * (res_y)
endl = min_y + res_y * self.endline + 0.5 * (res_y)
logger.debug('Start / end extent: {} / {}'.format(startl, endl))
chk_extent = (-5432229.9317116784, endl,
5429229.5285458621, startl)
return(chk_extent)
_fallback_area_def = {
"reference_altitude": 35786400, # metre
}
[docs] def get_area_def(self, key, info=None):
"""Calculate on-fly area definition for 0 degree geos-projection for a dataset."""
# TODO Projection information are hard coded for 0 degree geos projection
# Test dataset doen't provide the values in the file container.
# Only fill values are inserted
a = float(self["state/processor/earth_equatorial_radius"])
b = float(self["state/processor/earth_polar_radius"])
h = float(self["state/processor/reference_altitude"])
lon_0 = float(self["state/processor/projection_origin_longitude"])
if h == default_fillvals[
self["state/processor/reference_altitude"].dtype.str[1:]]:
logger.warning(
"Reference altitude in {:s} set to "
"fill value, using {:d}".format(
self.filename,
self._fallback_area_def["reference_altitude"]))
h = self._fallback_area_def["reference_altitude"]
# Channel dependent swath resoultion
area_extent = self.calc_area_extent(key)
logger.debug('Calculated area extent: {}'
.format(''.join(str(area_extent))))
proj_dict = {'a': float(a),
'b': float(b),
'lon_0': float(lon_0),
'h': float(h),
'proj': 'geos',
'units': 'm'}
area = geometry.AreaDefinition(
'some_area_name',
"On-the-fly area",
'geosfci',
proj_dict,
self.ncols,
self.nlines,
area_extent)
self.area = area
return area
[docs] def calibrate(self, data, key, measured, root):
"""Data calibration."""
if key.calibration == 'brightness_temperature':
data = self._ir_calibrate(data, measured, root)
elif key.calibration == 'reflectance':
data = self._vis_calibrate(data, measured)
else:
raise RuntimeError(
"Received unknown calibration key. Expected "
"'brightness_temperature' or 'reflectance', got "
+ key.calibration)
return data
def _ir_calibrate(self, radiance, measured, root):
"""IR channel calibration."""
coef = self[measured + "/radiance_unit_conversion_coefficient"]
wl_c = self[root + "/central_wavelength_actual"]
a = self[measured + "/radiance_to_bt_conversion_coefficient_a"]
b = self[measured + "/radiance_to_bt_conversion_coefficient_b"]
c1 = self[measured + "/radiance_to_bt_conversion_constant_c1"]
c2 = self[measured + "/radiance_to_bt_conversion_constant_c2"]
for v in (coef, wl_c, a, b, c1, c2):
if v == v.attrs.get("FillValue",
default_fillvals.get(v.dtype.str[1:])):
logger.error(
"{:s} set to fill value, cannot produce "
"brightness temperatures for {:s}.".format(
v.attrs.get("long_name",
"at least one necessary coefficient"),
root))
return xr.DataArray(
da.full(shape=radiance.shape,
chunks=radiance.chunks,
fill_value=np.nan),
dims=radiance.dims,
coords=radiance.coords,
attrs=radiance.attrs)
Lv = radiance * coef
vc = 1e6/wl_c # from wl in um to wn in m^-1
nom = c2 * vc
denom = a * np.log(1 + (c1 * vc**3) / Lv)
res = nom / denom - b / a
res.attrs["units"] = "K"
return res
def _vis_calibrate(self, radiance, measured):
"""VIS channel calibration."""
# radiance to reflectance taken as in mipp/xrit/MSG.py
# again FCI User Guide is not clear on how to do this
cesilab = measured + "/channel_effective_solar_irradiance"
cesi = self[cesilab]
if cesi == cesi.attrs.get(
"FillValue", default_fillvals.get(cesi.dtype.str[1:])):
logger.error(
"channel effective solar irradiance set to fill value, "
"cannot produce reflectance for {:s}.".format(measured))
return xr.DataArray(
da.full(shape=radiance.shape,
chunks=radiance.chunks,
fill_value=np.nan),
dims=radiance.dims,
coords=radiance.coords,
attrs=radiance.attrs)
sirr = float(cesi)
res = radiance / sirr * 100
res.attrs["units"] = "%"
return res