Source code for satpy.readers.hrpt

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2009-2016 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""Reading and calibrating hrpt avhrr data.

Todo:
- AMSU
- Compare output with AAPP

Reading:
http://www.ncdc.noaa.gov/oa/pod-guide/ncdc/docs/klm/html/c4/sec4-1.htm#t413-1

Calibration:
http://www.ncdc.noaa.gov/oa/pod-guide/ncdc/docs/klm/html/c7/sec7-1.htm

"""

import logging
from datetime import datetime

import numpy as np
try:
    from pygac.calibration import calibrate_solar, calibrate_thermal
except ImportError:
    from pygac.gac_calibration import calibrate_solar, calibrate_thermal

from satpy.dataset import Dataset
from satpy.readers.file_handlers import BaseFileHandler

logger = logging.getLogger(__name__)

AVHRR_CHANNEL_NAMES = ("1", "2", "3a", "3b", "4", "5")

dtype = np.dtype([('frame_sync', '>u2', (6, )),
                  ('id', [('id', '>u2'),
                          ('spare', '>u2')]),
                  ('timecode', '>u2', (4, )),
                  ('telemetry', [("ramp_calibration", '>u2', (5, )),
                                 ("PRT", '>u2', (3, )),
                                 ("ch3_patch_temp", '>u2'),
                                 ("spare", '>u2'), ]),
                  ('back_scan', '>u2', (10, 3)),
                  ('space_data', '>u2', (10, 5)),
                  ('sync', '>u2'),
                  ('TIP_data', '>u2', (520, )),
                  ('spare', '>u2', (127, )),
                  ('image_data', '>u2', (2048, 5)),
                  ('aux_sync', '>u2', (100, ))])


[docs]def time_seconds(tc_array, year): """Return the time object from the timecodes """ tc_array = np.array(tc_array, copy=True) word = tc_array[:, 0] day = word >> 1 word = tc_array[:, 1].astype(np.uint64) msecs = ((127) & word) * 1024 word = tc_array[:, 2] msecs += word & 1023 msecs *= 1024 word = tc_array[:, 3] msecs += word & 1023 return (np.datetime64( str(year) + '-01-01T00:00:00Z', 's') + msecs[:].astype('timedelta64[ms]') + (day - 1)[:].astype('timedelta64[D]'))
[docs]def bfield(array, bit): """return the bit array. """ return (array & 2**(9 - bit + 1)).astype(np.bool)
spacecrafts = {7: "NOAA 15", 3: "NOAA 16", 13: "NOAA 18", 15: "NOAA 19"}
[docs]def geo_interpolate(lons32km, lats32km): from geotiepoints import SatelliteInterpolator cols32km = np.arange(0, 2048, 32) cols1km = np.arange(2048) lines = lons32km.shape[0] rows32km = np.arange(lines) rows1km = np.arange(lines) along_track_order = 1 cross_track_order = 3 satint = SatelliteInterpolator( (lons32km, lats32km), (rows32km, cols32km), (rows1km, cols1km), along_track_order, cross_track_order) lons, lats = satint.interpolate() return lons, lats
[docs]class HRPTFile(BaseFileHandler): """Reader for HRPT Minor Frame, 10 bits data expanded to 16 bits. """ def __init__(self, filename, filename_info, filetype_info): super(HRPTFile, self).__init__(filename, filename_info, filetype_info) self.channels = {i: None for i in AVHRR_CHANNEL_NAMES} self.units = {i: 'counts' for i in AVHRR_CHANNEL_NAMES} self._data = None self._is3b = None self.lons = None self.lats = None self.area = None self.platform_name = None self.year = filename_info.get('start_time', datetime.utcnow()).year self.times = None self.prt = None self.ict = None self.space = None self.read()
[docs] def read(self): with open(self.filename, "rb") as fp_: self._data = np.memmap(fp_, dtype=dtype, mode="r") if np.all(self._data['frame_sync'][0] > 1024): self._data = self._data.newbyteorder() self.platform_name = spacecrafts[ (self._data["id"]["id"][0] >> 3) & 15]
[docs] def get_dataset(self, key, info): if self._data is None: self.read() if key.name in ['latitude', 'longitude']: lons, lats = self.get_lonlats() if key.name == 'latitude': return Dataset(lats, id=key) else: return Dataset(lons, id=key) avhrr_channel_index = {'1': 0, '2': 1, '3a': 2, '3b': 2, '4': 3, '5': 4} index = avhrr_channel_index[key.name] mask = False if key.name in ['3a', '3b'] and self._is3b is None: ch3a = bfield(self._data["id"]["id"], 10) self._is3b = np.logical_not(ch3a) if key.name == '3a': mask = np.tile(self._is3b, (1, 2048)) elif key.name == '3b': mask = np.tile(np.logical_not(self._is3b), (1, 2048)) data = self._data["image_data"][:, :, index] if key.calibration == 'counts': return Dataset(data, mask=mask, area=self.get_lonlats(), units='1') pg_spacecraft = ''.join(self.platform_name.split()).lower() jdays = (np.datetime64(self.start_time) - np.datetime64(str( self.year) + '-01-01T00:00:00Z')) / np.timedelta64(1, 'D') if index < 2 or key.name == '3a': data = calibrate_solar(data, index, self.year, jdays, pg_spacecraft) units = '%' if index > 2 or key.name == '3b': if self.times is None: self.times = time_seconds(self._data["timecode"], self.year) line_numbers = ( np.round((self.times - self.times[-1]) / np.timedelta64(166666667, 'ns'))).astype(np.int) line_numbers -= line_numbers[0] if self.prt is None: self.prt, self.ict, self.space = self.get_telemetry() chan = index + 1 data = calibrate_thermal(data, self.prt, self.ict[:, chan - 3], self.space[:, chan - 3], line_numbers, chan, pg_spacecraft) units = 'K' # TODO: check if entirely masked before returning return Dataset(data, mask=mask, units=units)
[docs] def get_telemetry(self): prt = np.mean(self._data["telemetry"]['PRT'], axis=1) ict = np.empty((len(self._data), 3)) for i in range(3): ict[:, i] = np.mean(self._data['back_scan'][:, :, i], axis=1) space = np.empty((len(self._data), 3)) for i in range(3): space[:, i] = np.mean(self._data['space_data'][ :, :, i + 2], axis=1) return prt, ict, space
[docs] def get_lonlats(self): if self.lons is not None and self.lats is not None: return self.lons, self.lats from pyorbital.orbital import Orbital from pyorbital.geoloc import compute_pixels, get_lonlatalt from pyorbital.geoloc_instrument_definitions import avhrr if self.times is None: self.times = time_seconds(self._data["timecode"], self.year) scanline_nb = len(self.times) scan_points = np.arange(0, 2048, 32) # scan_points = np.arange(2048) sgeom = avhrr(scanline_nb, scan_points, apply_offset=False) # no attitude error rpy = [0, 0, 0] s_times = sgeom.times( self.times[:, np.newaxis]).ravel() # s_times = (np.tile(sgeom._times[0, :], (scanline_nb, 1)).astype( # 'timedelta64[s]') + self.times[:, np.newaxis]).ravel() orb = Orbital(self.platform_name) pixels_pos = compute_pixels(orb, sgeom, s_times, rpy) lons, lats, alts = get_lonlatalt(pixels_pos, s_times) self.lons, self.lats = geo_interpolate( lons.reshape((scanline_nb, -1)), lats.reshape((scanline_nb, -1))) return self.lons, self.lats
@property def start_time(self): return time_seconds(self._data["timecode"][0, np.newaxis, :], self.year).astype(datetime)[0] @property def end_time(self): return time_seconds(self._data["timecode"][-1, np.newaxis, :], self.year).astype(datetime)[0]