Source code for gms_preprocessing.model.metadata

# -*- coding: utf-8 -*-

# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
#
# Copyright (C) 2020  Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
#
# This software was developed within the context of the GeoMultiSens project funded
# by the German Federal Ministry of Education and Research
# (project grant code: 01 IS 14 010 A-C).
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later version.
# Please note the following exception: `gms_preprocessing` depends on tqdm, which
# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.

"""Module 'metadata' for handling any type of metadata of GeoMultiSens compatible sensor systems."""

from __future__ import (division, print_function, unicode_literals, absolute_import)

import collections
import datetime
import glob
import math
import os
import re
import sys
import warnings
import iso8601
import xml.etree.ElementTree as ET
from typing import List, TYPE_CHECKING  # noqa F401  # flake8 issue

import numpy as np
import pyproj
from matplotlib import dates as mdates
from pyorbital import astronomy
from natsort import natsorted

from py_tools_ds.geo.map_info import geotransform2mapinfo
from py_tools_ds.geo.projection import WKT2EPSG
from pyrsr import RSR
from sicor.options import get_options as get_ac_options

from ..options.config import GMS_config as CFG
from ..io.input_reader import open_specific_file_within_archive, Solar_Irradiance_reader, SRF_Reader
from ..io.output_writer import enviHdr_keyOrder
from ..algorithms import geoprocessing as GEOP
from ..misc import helper_functions as HLP_F
from ..misc import database_tools as DB_T
from ..misc.path_generator import path_generator, get_path_ac_options
from ..misc.definition_dicts import get_GMS_sensorcode, is_dataset_provided_as_fullScene, datasetid_to_sat_sen
from ..misc.exceptions import ACNotSupportedError

if TYPE_CHECKING:
    from ..model.gms_object import GMS_identifier  # noqa F401  # flake8 issue

__author__ = 'Daniel Scheffler', 'Robert Behling'


[docs]class METADATA(object): def __init__(self, GMS_id): # private attributes self._AcqDateTime = None # unpack GMS_identifier self.GMS_identifier = GMS_id self.image_type = GMS_id.image_type self.Satellite = GMS_id.satellite self.Sensor = GMS_id.sensor self.Subsystem = GMS_id.subsystem self.proc_level = GMS_id.proc_level self.logger = GMS_id.logger self.Dataname = '' self.FolderOrArchive = '' self.Metafile = "" # File containing image metadata (automatically found) self.EntityID = "" # ID to identify the original scene self.SceneID = '' # postgreSQL-database identifier self.Sensormode = "" self.gResolution = -99. # resolution [m] self.AcqDate = "" # YYYY-MM-DD self.AcqTime = "" # HH:MM:SS self.Offsets = {} # Dict with offset for each band (radiance) self.Gains = {} # Dict with gain for each band (radiance) self.OffsetsRef = {} # Dict with offset for each band for conversion to Reflectance (Landsat8) self.GainsRef = {} # Dict with gain for each band for conversion to Reflectance (Landsat8) self.CWL = {} self.FWHM = {} self.ProcLCode = "" # processing Level: Sensor specific Code self.ProcLStr = "" # processing Level: Sensor independent String (raw,rad cal, rad+geom cal, ortho) self.SunElevation = -99.0 # Sun elevation angle at center of product [Deg] # Sun azimuth angle at center of product, in degrees from North (clockwise) at the time of the first image line self.SunAzimuth = -99.0 self.SolIrradiance = [] self.ThermalConstK1 = {} self.ThermalConstK2 = {} self.EarthSunDist = 1.0 # viewing zenith angle of the Sensor (offNadir) [Deg] (usually) in Case of RapidEye "+" # being East and “-” being West self.ViewingAngle = -99.0 self.ViewingAngle_arrProv = {} # viewing azimuth angle. The angle between the view direction of the satellite and a line perpendicular to # the image or tile center.[Deg] self.IncidenceAngle = -9999.99 self.IncidenceAngle_arrProv = {} self.FOV = 9999.99 # field of view of the sensor [Deg] # Sensor specific quality code: See ro2/behling/Satelliten/doc_allg/ReadOutMetadata/SatMetadata.xls self.Quality = [] self.PhysUnit = "DN" # Factor to get reflectance values in [0-1]. Sentinel2A provides scaling factors for the Level1C # TOA-reflectance products self.ScaleFactor = -99 self.CS_EPSG = -99. # EPSG-Code of coordinate system self.CS_TYPE = "" self.CS_DATUM = "" self.CS_UTM_ZONE = -99. self.WRS_path = -99. self.WRS_row = -99. # List of Corner Coordinates in order of Lon/Lat/DATA_X/Data_Y for all given image corners self.CornerTieP_LonLat = [] self.CornerTieP_UTM = [] self.LayerBandsAssignment = [] # List of spectral bands in the same order as they are stored in the rasterfile. self.additional = [] self.image_type = 'RSD' self.orbitParams = {} self.overpassDurationSec = -99. self.scene_length = -99. self.rows = -99. self.cols = -99. self.bands = -99. self.nBands = -99. self.map_info = [] self.projection = "" self.wvlUnit = "" self.spec_vals = {'fill': None, 'zero': None, 'saturated': None} self.version_gms_preprocessing = CFG.version self.versionalias_gms_preprocessing = CFG.versionalias
[docs] def read_meta(self, scene_ID, stacked_image, data_folderOrArchive, LayerBandsAssignment=None): """ Read metadata. """ self.SceneID = scene_ID self.Dataname = stacked_image self.FolderOrArchive = data_folderOrArchive self.LayerBandsAssignment = LayerBandsAssignment if LayerBandsAssignment else [] if re.search(r"SPOT", self.Satellite, re.I): self.Read_SPOT_dimap2() elif re.search(r"Terra", self.Satellite, re.I): self.Read_ASTER_hdffile(self.Subsystem) elif re.search(r"Sentinel-2A", self.Satellite, re.I) or re.search(r"Sentinel-2B", self.Satellite, re.I): self.Read_Sentinel2_xmls() elif re.search(r"LANDSAT", self.Satellite, re.I): self.Read_LANDSAT_mtltxt(self.LayerBandsAssignment) elif re.search(r"RapidEye", self.Satellite, re.I): self.Read_RE_metaxml() elif re.search(r"ALOS", self.Satellite, re.I): self.Read_ALOS_summary() self.Read_ALOS_LEADER() # for gains and offsets else: raise NotImplementedError("%s is not a supported sensor or sensorname is misspelled." % self.Satellite) return self
def __getstate__(self): """Defines how the attributes of MetaObj instances are pickled.""" if self.logger: self.logger.close() return self.__dict__ @property def AcqDateTime(self): """Returns a datetime.datetime object containing date, time and timezone (UTC time).""" if not self._AcqDateTime and self.AcqDate and self.AcqTime: self._AcqDateTime = datetime.datetime.strptime('%s %s%s' % (self.AcqDate, self.AcqTime, '.000000+0000'), '%Y-%m-%d %H:%M:%S.%f%z') return self._AcqDateTime @AcqDateTime.setter def AcqDateTime(self, DateTime): # type: (datetime.datetime) -> None if isinstance(DateTime, str): self._AcqDateTime = datetime.datetime.strptime(DateTime, '%Y-%m-%d %H:%M:%S.%f%z') elif isinstance(DateTime, datetime.datetime): self._AcqDateTime = DateTime self.AcqDate = DateTime.strftime('%Y-%m-%d') self.AcqTime = DateTime.strftime('%H:%M:%S') @property def overview(self): attr2include = \ ['Dataname', 'Metafile', 'EntityID', 'Satellite', 'Sensor', 'Subsystem', 'gResolution', 'AcqDate', 'AcqTime', 'CWL', 'FWHM', 'Offsets', 'Gains', 'ProcLCode', 'ProcLStr', 'SunElevation', 'SunAzimuth', 'ViewingAngle', 'IncidenceAngle', 'FOV', 'SolIrradiance', 'ThermalConstK1', 'ThermalConstK2', 'EarthSunDist', 'Quality', 'PhysUnit', 'additional', 'GainsRef', 'OffsetsRef', 'CornerTieP_LonLat', 'CS_EPSG', 'CS_TYPE', 'CS_DATUM', 'CS_UTM_ZONE', 'LayerBandsAssignment'] return collections.OrderedDict((attr, getattr(self, attr)) for attr in attr2include) @property def LayerBandsAssignment_full(self): # type: () -> list """Return complete LayerBandsAssignment without excluding thermal or panchromatic bands. NOTE: CFG.sort_bands_by_cwl is respected, so returned list may be sorted by central wavelength """ return get_LayerBandsAssignment(self.GMS_identifier, no_thermal=False, no_pan=False, return_fullLBA=True) @property def bandnames(self): return [('Band %s' % i) for i in self.LayerBandsAssignment]
[docs] def Read_SPOT_dimap2(self): """----METHOD_2------------------------------------------------------------ # read metadata from spot dimap file """ # self.default_attr() if os.path.isdir(self.FolderOrArchive): glob_res = glob.glob(os.path.join(self.FolderOrArchive, '*/scene01/metadata.dim')) assert len(glob_res) > 0, 'No metadata.dim file can be found in %s!' % self.FolderOrArchive self.Metafile = glob_res[0] dim_ = open(self.Metafile, "r").read() else: # archive dim_, self.Metafile = open_specific_file_within_archive(self.FolderOrArchive, '*/scene01/metadata.dim') # special values h1 = re.findall(r"<SPECIAL_VALUE_INDEX>([a-zA-Z0-9]*)</SPECIAL_VALUE_INDEX>", dim_, re.I) h2 = re.findall(r"<SPECIAL_VALUE_TEXT>([a-zA-Z0-9]*)</SPECIAL_VALUE_TEXT>", dim_, re.I) self.additional.append(["SpecialValues:"]) for ii, ind in enumerate(h1): self.additional[0].append(["%s:%s" % (ind, h2[ii])]) # EntityID h3 = re.search(r"<SOURCE_ID>([a-zA-Z0-9]*)</SOURCE_ID>", dim_, re.I) self.EntityID = h3.group(1) # AcqDate h4 = re.search(r"<IMAGING_DATE>([0-9-]*)</IMAGING_DATE>", dim_, re.I) AcqDate = h4.group(1) # Earth sun distance self.EarthSunDist = self.get_EarthSunDistance(AcqDate) # AcqTime h5 = re.search(r"<IMAGING_TIME>([0-9:]*)</IMAGING_TIME>", dim_, re.I) AcqTime = h5.group(1) # set self.AcqDateTime as well as self.AcqDate and self.AcqTime self.AcqDateTime = datetime.datetime.strptime( '%s %s%s' % (AcqDate, AcqTime, '.000000+0000'), '%Y-%m-%d %H:%M:%S.%f%z') # Satellite + Sensor h6 = re.search(r"<MISSION>([a-zA-Z]*)</MISSION>[a-zA-Z0-9\s]*" r"<MISSION_INDEX>([0-9]*)</MISSION_INDEX>[a-zA-Z0-9\s]*" r"<INSTRUMENT>([a-zA-Z]*)</INSTRUMENT>[a-zA-Z0-9\s]*" r"<INSTRUMENT_INDEX>([0-9]*)</INSTRUMENT_INDEX>[a-zA-Z0-9\s]*" r"<SENSOR_CODE>([a-zA-Z0-9]*)</SENSOR_CODE>", dim_, re.I) self.Satellite = "-".join([h6.group(1), h6.group(2)]) self.Sensor = "".join([h6.group(3), h6.group(4), h6.group(5)]) # Angles: incidence angle, sunAzimuth, sunElevation h7 = re.search(r"<INCIDENCE_ANGLE>(.*)</INCIDENCE_ANGLE>[\s\S]*" r"<SUN_AZIMUTH>(.*)</SUN_AZIMUTH>[\s\S]" r"*<SUN_ELEVATION>(.*)</SUN_ELEVATION>", dim_, re.I) self.IncidenceAngle = float(h7.group(1)) self.SunAzimuth = float(h7.group(2)) self.SunElevation = float(h7.group(3)) # Viewing Angle: not always included in the metadata.dim file h8 = re.search(r"<VIEWING_ANGLE>(.*)</VIEWING_ANGLE>", dim_, re.I) if type(h8).__name__ == 'NoneType': self.ViewingAngle = 0 else: self.ViewingAngle = float(h8.group(1)) # Field of View self.FOV = get_FieldOfView(self.GMS_identifier) # Units self.ScaleFactor = 1 self.PhysUnit = "DN" # ProcLevel h11 = re.search(r"<PROCESSING_LEVEL>([a-zA-Z0-9]*)</PROCESSING_LEVEL>", dim_, re.I) self.ProcLCode = h11.group(1) # Quality h12 = re.findall(r"<Bad_Pixel>[\s]*" r"<PIXEL_INDEX>([0-9]*)</PIXEL_INDEX>[\s]*" r"<BAD_PIXEL_STATUS>([^<]*)</BAD_PIXEL_STATUS>" r"</Bad_Pixel>", dim_, re.I) self.Quality = h12 # Coordinate Reference System re_CS_EPSG = re.search(r'<HORIZONTAL_CS_CODE>epsg:([0-9]*)</HORIZONTAL_CS_CODE>', dim_, re.I) re_CS_TYPE = re.search(r'<HORIZONTAL_CS_TYPE>([a-zA-Z0-9]*)</HORIZONTAL_CS_TYPE>', dim_, re.I) re_CS_DATUM = re.search(r'<HORIZONTAL_CS_NAME>([\w+\s]*)</HORIZONTAL_CS_NAME>', dim_, re.I) self.CS_EPSG = int(re_CS_EPSG.group(1)) if re_CS_EPSG is not None else self.CS_EPSG self.CS_TYPE = 'LonLat' if re_CS_TYPE is not None and re_CS_TYPE.group(1) == 'GEOGRAPHIC' else 'UTM' \ if re_CS_TYPE is not None and re_CS_TYPE.group(1) == 'UTM' else self.CS_TYPE self.CS_DATUM = 'WGS84' if re_CS_DATUM is not None and re_CS_DATUM.group(1) == 'WGS 84' else self.CS_DATUM # Corner Coordinates h121 = re.findall(r"<TIE_POINT_CRS_X>(.*)</TIE_POINT_CRS_X>", dim_, re.I) h122 = re.findall(r"<TIE_POINT_CRS_Y>(.*)</TIE_POINT_CRS_Y>", dim_, re.I) if len(h121) == 4 and self.CS_TYPE == 'LonLat': # Set Corner Tie Point Coordinates (order = UL,UR,LL,LR) self.CornerTieP_LonLat.append(tuple([float(h121[0]), float(h122[0])])) # UL self.CornerTieP_LonLat.append(tuple([float(h121[1]), float(h122[1])])) # UR self.CornerTieP_LonLat.append(tuple([float(h121[3]), float(h122[3])])) # LL self.CornerTieP_LonLat.append(tuple([float(h121[2]), float(h122[2])])) # LR # ul_lon,ul_lat = self.inDs.GetGCPs()[0].GCPX,self.inDs.GetGCPs()[0].GCPY # funktioniert nur bei SPOT # lr_lon,lr_lat = self.inDs.GetGCPs()[2].GCPX,self.inDs.GetGCPs()[2].GCPY ########################## # band specific metadata # ########################## LBA_full_sorted = natsorted(self.LayerBandsAssignment_full) # Gains and Offsets h9 = re.search(r"<Image_Interpretation>[\s\S]*</Image_Interpretation>", dim_, re.I) h9_ = h9.group(0) h91 = re.findall(r"<PHYSICAL_UNIT>([^<]*)</PHYSICAL_UNIT>", h9_, re.I) h92 = re.findall(r"<PHYSICAL_BIAS>([^<]*)</PHYSICAL_BIAS>", h9_, re.I) h93 = re.findall(r"<PHYSICAL_GAIN>([^<]*)</PHYSICAL_GAIN>", h9_, re.I) self.additional.append(["Physical Units per band:"]) for ii, ind in enumerate(h91): # FIXME does not respect sort_bands_by_cwl self.additional[-1].append(ind) # Offsets for ii, (ind, band) in enumerate(zip(h92, LBA_full_sorted)): self.Offsets[band] = float(ind) # Gains for ii, (ind, band) in enumerate(zip(h93, LBA_full_sorted)): # gains in dimap file represent reciprocal 1/gain self.Gains[band] = 1 / float(ind) # Solar irradiance, central wavelengths , full width half maximum per band self.wvlUnit = 'Nanometers' # derive number of bands from number of given gains/offsets in header file or from raster data # noinspection PyBroadException try: self.nBands = (np.mean([len(self.Gains), len(self.Offsets)]) if np.std([len(self.Gains), len(self.Offsets)]) == 0 and len(self.Gains) != 0 else GEOP.GEOPROCESSING(self.Dataname, self.logger).bands) except Exception: self.logger.warning('Unable to get number of bands for the dataset %s Provider values are used for ' 'solar irradiation, CWL and FWHM!.' % self.Dataname) self.LayerBandsAssignment = get_LayerBandsAssignment(self.GMS_identifier, self.nBands) # Exact values calculated based in SRFs self.SolIrradiance, self.CWL, self.FWHM = self.calc_solar_irradiance_CWL_FWHM_per_band() # Provider values if not self.SolIrradiance: h10 = re.search(r"<Solar_Irradiance>[\s\S]*" r"</Solar_Irradiance>", dim_, re.I) h10_ = h10.group(0) h101 = re.findall(r"<SOLAR_IRRADIANCE_VALUE>([^<]*)" r"</SOLAR_IRRADIANCE_VALUE>", h10_, re.I) if h101: self.SolIrradiance = dict(zip(LBA_full_sorted, h101)) # self.additional.append(["Solar Irradiance per band:"]) # for ii,ind in enumerate(h101): # self.additional[-1].append(ind) else: # Preconfigured Irradiation values spot_irr_dic = { 'SPOT1a': dict(zip(LBA_full_sorted, [1855., 1615., 1090., 1680.])), 'SPOT1b': dict(zip(LBA_full_sorted, [1845., 1575., 1040., 1690.])), 'SPOT2a': dict(zip(LBA_full_sorted, [1865., 1620., 1085., 1705.])), 'SPOT2b': dict(zip(LBA_full_sorted, [1865., 1615., 1090., 1670.])), 'SPOT3a': dict(zip(LBA_full_sorted, [1854., 1580., 1065., 1668.])), 'SPOT3b': dict(zip(LBA_full_sorted, [1855., 1597., 1067., 1667.])), 'SPOT4a': dict(zip(LBA_full_sorted, [1843., 1568., 1052., 233., 1568.])), 'SPOT4b': dict(zip(LBA_full_sorted, [1851., 1586., 1054., 240., 1586.])), 'SPOT5a': dict(zip(LBA_full_sorted, [1858., 1573., 1043., 236., 1762.])), 'SPOT5b': dict(zip(LBA_full_sorted, [1858., 1575., 1047., 234., 1773.]))} self.SolIrradiance = spot_irr_dic[get_GMS_sensorcode(self.GMS_identifier)] # Preconfigured CWLs -- # ref: Guyot & Gu (1994): 'Effect of Radiometric Corrections on NDVI-Determined # from SPOT-HRV and Landsat-TM Data'; Hill 1990 Comparative Analysis of Landsat-5 TM and SPOT HRV-1 Data for # Use in Multiple Sensor Approaches ; # resource: SPOT techical report: Resolutions and spectral modes sensorcode = get_GMS_sensorcode(self.GMS_identifier)[:2] if sensorcode in ['SPOT1a', 'SPOT1b', 'SPOT2a', 'SPOT2b', 'SPOT3a', 'SPOT3b']: self.CWL = dict(zip(LBA_full_sorted, [545., 638., 819., 615.])) elif sensorcode in ['SPOT4a', 'SPOT4b']: self.CWL = dict(zip(LBA_full_sorted, [540., 650., 835., 1630., 645.])) elif sensorcode == ['SPOT5a', 'SPOT5b']: self.CWL = dict(zip(LBA_full_sorted, [540., 650., 835., 1630., 595.])) self.orbitParams = get_orbit_params(self.GMS_identifier) self.filter_layerdependent_metadata() self.spec_vals = get_special_values(self.GMS_identifier)
[docs] def Read_LANDSAT_mtltxt(self, LayerBandsAssignment): """----METHOD_3------------------------------------------------------------ read metadata from LANDSAT metafile: <dataname>.MTL.txt. Metadatafile of LPGS processing chain :param LayerBandsAssignment: """ # self.default_attr() self.LayerBandsAssignment = LayerBandsAssignment self.nBands = len(LayerBandsAssignment) if os.path.isdir(self.FolderOrArchive): glob_res = glob.glob(os.path.join(self.FolderOrArchive, '*MTL.txt')) assert len(glob_res) > 0, 'No *.MTL metadata file can be found in %s!' % self.FolderOrArchive self.Metafile = glob_res[0] mtl_ = open(self.Metafile, "r").read() else: # archive mtl_, self.Metafile = open_specific_file_within_archive(self.FolderOrArchive, '*MTL.txt') # Processing Level h1 = re.search(r'PRODUCT_TYPE = "([a-zA-Z0-9]*)"', mtl_, re.I) if h1 is None: h1 = re.search(r'DATA_TYPE = "([a-zA-Z0-9]*)"', mtl_, re.I) self.ProcLCode = h1.group(1) # Satellite + Sensor + Sensor Mode h2 = re.search(r'SPACECRAFT_ID = "([a-zA-Z0-9_]*)"[\s]*' r'SENSOR_ID = "([a-zA-Z0-9+]*)"[\s]*' r'SENSOR_MODE = "([\S]*)"', mtl_, re.I) if h2: self.Satellite = 'Landsat-%s' % re.search(r'LANDSAT[\D]*([0-9])', h2.group(1), re.I).group(1) self.Sensor = h2.group(2) self.Sensormode = h2.group(3) else: h2a = re.search(r'SPACECRAFT_ID = "([a-zA-Z0-9_]*)"', mtl_, re.I) h2b = re.search(r'SENSOR_ID = "([a-zA-Z0-9_+]*)"', mtl_, re.I) h2c = re.search(r'SENSOR_MODE = "([a-zA-Z0-9_+]*)"', mtl_, re.I) self.Satellite = 'Landsat-%s' % re.search(r'LANDSAT[\D]*([0-9])', h2a.group(1), re.I).group(1) self.Sensor = h2b.group(1) self.Sensormode = h2c.group(1) if h2c is not None else self.Sensormode # Landsat-8 self.Sensor = 'ETM+' if self.Sensor == 'ETM' else self.Sensor # EntityID h2c = re.search(r'LANDSAT_SCENE_ID = "([A-Z0-9]*)"', mtl_, re.I) if h2c: self.EntityID = h2c.group(1) # Acquisition Date + Time h3 = re.search(r'ACQUISITION_DATE = ([0-9-]*)[\s]*' r'SCENE_CENTER_SCAN_TIME = "?([0-9:]*)"?', mtl_, re.I) if h3 is None: h3 = re.search(r'DATE_ACQUIRED = ([0-9-]*)[\s]*' r'SCENE_CENTER_TIME = "?([0-9:]*)"?', mtl_, re.I) AcqDate = h3.group(1) AcqTime = h3.group(2) # set self.AcqDateTime as well as self.AcqDate and self.AcqTime self.AcqDateTime = datetime.datetime.strptime( '%s %s%s' % (AcqDate, AcqTime, '.000000+0000'), '%Y-%m-%d %H:%M:%S.%f%z') # Earth sun distance self.EarthSunDist = self.get_EarthSunDistance(self.AcqDate) # Units self.ScaleFactor = 1 self.PhysUnit = "DN" # Angles: incidence angle, sunAzimuth, sunElevation, field of view h5 = re.search(r"SUN_AZIMUTH = ([\S]*)[\s]*" r"SUN_ELEVATION = ([\S]*)", mtl_, re.I) self.SunAzimuth = float(h5.group(1)) self.SunElevation = float(h5.group(2)) self.FOV = get_FieldOfView(self.GMS_identifier) # Quality h6 = re.search(r"GROUP = CORRECTIONS_APPLIED[\s\S]*" r"END_GROUP = CORRECTIONS_APPLIED", mtl_, re.I) if h6 is None: h6 = re.search(r"GROUP = IMAGE_ATTRIBUTES[\s\S]*" r"END_GROUP = IMAGE_ATTRIBUTES", mtl_, re.I) h6_ = h6.group(0) h61 = (h6_.split("\n")) x = 0 for i in h61: if x == 0 or x + 1 == len(h61): pass else: i_ = i.strip().replace('"', "") self.Quality.append(i_.split(" = ")) x += 1 # Additional: coordinate system, geom. Resolution h7 = re.search(r"GROUP = PROJECTION_PARAMETERS[\s\S]*END_GROUP = L1_METADATA_FILE", mtl_, re.I) h7_ = h7.group(0) h71 = (h7_.split("\n")) for x, i in enumerate(h71): if re.search(r"Group", i, re.I): pass else: i_ = i.strip().replace('"', "") self.additional.append(i_.split(" = ")) re_CS_TYPE = re.search(r'MAP_PROJECTION = "([\w+\s]*)"', h7_, re.I) re_CS_DATUM = re.search(r'DATUM = "([\w+\s]*)"', h7_, re.I) re_CS_UTM_ZONE = re.search(r'ZONE_NUMBER = ([0-9]*)\n', mtl_, re.I) re_CS_UTM_ZONE = re.search(r'UTM_ZONE = ([0-9]*)\n', h7_, re.I) if re_CS_UTM_ZONE is None else re_CS_UTM_ZONE # Landsat8 self.CS_TYPE = 'LonLat' if re_CS_TYPE is not None and re_CS_TYPE.group(1) == 'GEOGRAPHIC' else 'UTM' \ if re_CS_TYPE is not None and re_CS_TYPE.group(1) == 'UTM' else self.CS_TYPE self.CS_DATUM = 'WGS84' if re_CS_DATUM is not None and re_CS_DATUM.group(1) == 'WGS84' else self.CS_DATUM self.CS_UTM_ZONE = int(re_CS_UTM_ZONE.group(1)) if re_CS_UTM_ZONE is not None else self.CS_UTM_ZONE # viewing Angle self.ViewingAngle = 0 # Landsat8 h8 = re.search(r"ROLL_ANGLE = ([\S]*)", mtl_, re.I) if h8: self.ViewingAngle = float(h8.group(1)) # Corner Coordinates ## Lon/Lat for all given image corners UL,UR,LL,LR (tuples) h10_UL = re.findall(r"PRODUCT_UL_CORNER_[A-Z]+ = (.*)[\S]*", mtl_, re.I) h10_UR = re.findall(r"PRODUCT_UR_CORNER_[A-Z]+ = (.*)[\S]*", mtl_, re.I) h10_LL = re.findall(r"PRODUCT_LL_CORNER_[A-Z]+ = (.*)[\S]*", mtl_, re.I) h10_LR = re.findall(r"PRODUCT_LR_CORNER_[A-Z]+ = (.*)[\S]*", mtl_, re.I) if not h10_UL: # Landsat8 h10_UL = re.findall(r"CORNER_UL_[\S]+ = (.*)[\S]*", mtl_, re.I) h10_UR = re.findall(r"CORNER_UR_[\S]+ = (.*)[\S]*", mtl_, re.I) h10_LL = re.findall(r"CORNER_LL_[\S]+ = (.*)[\S]*", mtl_, re.I) h10_LR = re.findall(r"CORNER_LR_[\S]+ = (.*)[\S]*", mtl_, re.I) if h10_UL: # Set Corner Tie Point Coordinates (order = UL,UR,LL,LR) self.CornerTieP_LonLat.append(tuple([float(h10_UL[i]) for i in [1, 0]])) self.CornerTieP_LonLat.append(tuple([float(h10_UR[i]) for i in [1, 0]])) self.CornerTieP_LonLat.append(tuple([float(h10_LL[i]) for i in [1, 0]])) self.CornerTieP_LonLat.append(tuple([float(h10_LR[i]) for i in [1, 0]])) self.CornerTieP_UTM.append(tuple([float(h10_UL[i]) for i in [2, 3]])) self.CornerTieP_UTM.append(tuple([float(h10_UR[i]) for i in [2, 3]])) self.CornerTieP_UTM.append(tuple([float(h10_LL[i]) for i in [2, 3]])) self.CornerTieP_UTM.append(tuple([float(h10_LR[i]) for i in [2, 3]])) # WRS path/row h11_p = re.search(r'WRS_PATH = ([0-9]*)', mtl_, re.I) if h11_p: self.WRS_path = h11_p.group(1) h11_r1 = re.search(r'STARTING_ROW = ([0-9]*)', mtl_, re.I) h11_r2 = re.search(r'ENDING_ROW = ([0-9]*)', mtl_, re.I) if h11_r1 is None: # Landsat-8 h11_r = re.search(r'WRS_ROW = ([0-9]*)', mtl_, re.I) self.WRS_row = int(h11_r.group(1)) else: self.WRS_row = int(h11_r1.group(1)) if h11_r1.group(1) == h11_r2.group(1) else self.WRS_row # Fill missing values if self.EntityID == '': self.logger.info('Scene-ID could not be extracted and has to be retrieved from %s metadata database...' % self.Satellite) result = DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'scenes', ['entityID'], {'id': self.SceneID}) if len(result) == 1: # e.g. [('LE71950282003121EDC00',)] self.EntityID = result[0][0] elif len(result) == 0: self.logger.warning("Scene ID could not be assigned because no dataset matching to the query " "parameters could be found in metadata database.") else: # e.g. [('LE71950282003121EDC00',), ('LE71950282003105ASN00',)] self.logger.warning("Scene ID could not be assigned because %s datasets matching to the query " "parameters were found in metadata database." % len(result)) # if self.EntityID=='': # dataset = 'LANDSAT_TM' if self.Satellite=='Landsat-5' else \ # 'LANDSAT_ETM' if self.Satellite=='Landsat-7' else 'LANDSAT_8' if self.Satellite=='Landsat-8' else '' # if dataset != '': # webmeta = list(usgsapi.search(dataset,'EE',distance=0,ll={'longitude':self.CornerTieP_LonLat[2][0], \ # 'latitude':self.CornerTieP_LonLat[2][1]},ur={'longitude':self.CornerTieP_LonLat[1][0], \ # 'latitude':self.CornerTieP_LonLat[1][1]},start_date=self.AcqDate,end_date=self.AcqDate)) # if len(webmeta)==1: # self.EntityID=webmeta[0]['entityId'] # else: # sen = {'MSS':'M','TM':'T','ETM+':'E','OLI_TIRS':'C','OLI':'O'}[self.Sensor] # sat = self.Satellite.split('-')[1] # path_res = re.search(r'WRS_PATH = ([0-9]+)',mtl_, re.I) # path_str = "%03d"%int(path_res.group(1)) if path_res!=None else '000' # row_res = re.search(r'STARTING_ROW = ([0-9]+)',mtl_, re.I) # if row_res is None: row_res = re.search(r'WRS_ROW = ([0-9]+)',mtl_, re.I) # row_str = "%03d"%int(row_res.group(1)) if row_res!=None else '000' # tt = datetime.datetime.strptime(self.AcqDate, '%Y-%m-%d').timetuple() # julianD = '%d%03d'%(tt.tm_year,tt.tm_yday) # station_res = re.search(r'GROUND_STATION = "([\S]+)"',mtl_, re.I) # if station_res is None: station_res = re.search(r'STATION_ID = "([\S]+)"',mtl_, re.I) # station_str = station_res.group(1) if station_res is not None else 'XXX' # idWithoutVersion = 'L%s%s%s%s%s%s'%(sen,sat,path_str,row_str,julianD,station_str) # filt_webmeta = [i for i in webmeta if i['entityId'].startswith(idWithoutVersion)] # if len(filt_webmeta) == 1: # self.EntityID = filt_webmeta[0]['entityId'] ########################## # band specific metadata # ########################## LBA_full_sorted = natsorted(self.LayerBandsAssignment_full) # Gains and Offsets h4 = re.search(r"GROUP = MIN_MAX_RADIANCE[\s\S]*END_GROUP = MIN_MAX_PIXEL_VALUE", mtl_, re.I) h4_ = h4.group(0) h4max_rad = re.findall(r" LMAX_BAND[0-9]+ = ([\S]*)", h4_, re.I) # space in front IS needed h4min_rad = re.findall(r" LMIN_BAND[0-9]+ = ([\S]*)", h4_, re.I) # space in front IS needed h4max_pix = re.findall(r"QCALMAX_BAND[0-9]+ = ([\S]*)", h4_, re.I) h4min_pix = re.findall(r"QCALMIN_BAND[0-9]+ = ([\S]*)", h4_, re.I) if not h4max_rad: h4max_rad = re.findall(r" RADIANCE_MAXIMUM_BAND_[0-9_VCID]+ = ([\S]*)", h4_, re.I) # space in front IS needed h4min_rad = re.findall(r" RADIANCE_MINIMUM_BAND_[0-9_VCID]+ = ([\S]*)", h4_, re.I) # space in front IS needed h4max_pix = re.findall(r"QUANTIZE_CAL_MAX_BAND_[0-9_VCID]+ = ([\S]*)", h4_, re.I) h4min_pix = re.findall(r"QUANTIZE_CAL_MIN_BAND_[0-9_VCID]+ = ([\S]*)", h4_, re.I) # additional: LMAX, LMIN, QCALMAX, QCALMIN self.additional.append([["LMAX"], ["LMIN"], ["QCALMAX"], ["QCALMIN"]]) # Offsets + Gains Gains, Offsets = [], [] for ii, ind in enumerate(h4min_rad): Gains.append( (float(h4max_rad[ii]) - float(h4min_rad[ii])) / (float(h4max_pix[ii]) - float(h4min_pix[ii]))) Offsets.append(float(ind) - float(h4min_pix[ii]) * Gains[-1]) self.additional[-1][-4].append(h4max_rad[ii]) self.additional[-1][-3].append(h4min_rad[ii]) self.additional[-1][-2].append(h4max_pix[ii]) self.additional[-1][-1].append(h4min_pix[ii]) self.Gains = {bN: Gains[i] for i, bN in enumerate(LBA_full_sorted)} self.Offsets = {bN: Offsets[i] for i, bN in enumerate(LBA_full_sorted)} # Solar irradiance, central wavelengths , full width half maximum per band self.wvlUnit = 'Nanometers' # Exact values calculated based in SRFs self.SolIrradiance, self.CWL, self.FWHM = self.calc_solar_irradiance_CWL_FWHM_per_band() # 3x dict # Provider values if not self.SolIrradiance: # Preconfigured Irradiation and CWL values if re.search(r"[\D]*5", self.Satellite): # Landsat 5; resource for center wavelength (6 = thermal) self.SolIrradiance = {'1': 1957., '2': 1826., '3': 1554., '4': 1036., '5': 215., '6': 0.0, '7': 80.67} self.CWL = {'1': 485., '2': 560., '3': 660., '4': 830., '5': 1650., '6': 11450., '7': 2215.} if re.search(r"[\D]*7", self.Satellite): # Landsat 7; resource for center wavelength: # https://opticks.org/display/opticksDev/Sensor+Wavelength+Definitions # 6L(thermal),6H(thermal),B8(pan) self.SolIrradiance = {'1': 1969., '2': 1840., '3': 1551., '4': 1044., '5': 225.7, '6L': 0.0, '6H': 0.0, '7': 82.07, '8': 1368.} self.CWL = {'1': 482.5, '2': 565., '3': 660., '4': 825., '5': 1650., '6L': 11450., '6H': 11450., '7': 2215., '8': 710.} if re.search(r"[\D]*8", self.Satellite): # Landsat 8 # no irradiation values available self.CWL = {'1': 443., '2': 482.6, '3': 561.3, '4': 654.6, '5': 864.6, '6': 1609.1, '7': 2201.2, '8': 591.7, '9': 1373.5, '10': 10903.6, '11': 12003.} # if None in SolIrradiance: # Thermal Constants (K1 = [W*m-2*um-1]; K1 = [K]) if re.search(r"[\D]*5", self.Satellite): # resource: http://geography.middlebury.edu/data/gg1002/Handouts/Landsat_DN_Temp.pdf ThermalConstK1 = [0., 0., 0., 0., 0., 607.76, 0.] ThermalConstK2 = [0., 0., 0., 0., 0., 1260.56, 0.] if re.search(r"[\D]*7", self.Satellite): # resource: http://geography.middlebury.edu/data/gg1002/Handouts/Landsat_DN_Temp.pdf ThermalConstK1 = [0., 0., 0., 0., 0., 666.09, 666.09, 0., 0.] ThermalConstK2 = [0., 0., 0., 0., 0., 1282.71, 1282.71, 0., 0.] if re.search(r"[\D]*8", self.Satellite): # Landsat 8 K1_res = re.findall(r"K1_CONSTANT_BAND_[0-9]+ = ([\S]*)", mtl_, re.I) K2_res = re.findall(r"K2_CONSTANT_BAND_[0-9]+ = ([\S]*)", mtl_, re.I) if len(K1_res) == 2: ThermalConstK1 = [0., 0., 0., 0., 0., 0., 0., 0., 0., float(K1_res[0]), float(K1_res[1])] ThermalConstK2 = [0., 0., 0., 0., 0., 0., 0., 0., 0., float(K2_res[0]), float(K2_res[1])] else: self.logger.error('Unable to set thermal constants. Expected to derive 2 values for K1 and K2. ' 'Found %s' % len(K1_res)) self.ThermalConstK1 = {bN: ThermalConstK1[i] for i, bN in enumerate(LBA_full_sorted)} self.ThermalConstK2 = {bN: ThermalConstK2[i] for i, bN in enumerate(LBA_full_sorted)} # reflectance coefficients (Landsat8) h9 = re.search(r"GROUP = RADIOMETRIC_RESCALING[\s\S]*END_GROUP = RADIOMETRIC_RESCALING", mtl_, re.I) if h9: h9_ = h9.group(0) h9gain_ref = re.findall(r" REFLECTANCE_MULT_BAND_[0-9]+ = ([\S]*)", h9_, re.I) h9gain_ref_bandNr = re.findall(r" REFLECTANCE_MULT_BAND_([0-9]+) = [\S]*", h9_, re.I) h9offs_ref = re.findall(r" REFLECTANCE_ADD_BAND_[0-9]+ = ([\S]*)", h9_, re.I) h9offs_ref_bandNr = re.findall(r" REFLECTANCE_ADD_BAND_([0-9]+) = [\S]*", h9_, re.I) if h9gain_ref: GainsRef = [None] * len(self.Gains) OffsetsRef = [None] * len(self.Offsets) for ii, ind in zip(h9gain_ref_bandNr, h9gain_ref): GainsRef[LBA_full_sorted.index(ii)] = ind for ii, ind in zip(h9offs_ref_bandNr, h9offs_ref): OffsetsRef[LBA_full_sorted.index(ii)] = ind self.GainsRef = {bN: GainsRef[i] for i, bN in enumerate(LBA_full_sorted)} self.OffsetsRef = {bN: OffsetsRef[i] for i, bN in enumerate(LBA_full_sorted)} self.orbitParams = get_orbit_params(self.GMS_identifier) self.filter_layerdependent_metadata() self.spec_vals = get_special_values(self.GMS_identifier)
# mtl.close()
[docs] def Read_RE_metaxml(self): """----METHOD_4------------------------------------------------------------ read metadata from RapidEye metafile: <dataname>metadata.xml """ # self.default_attr() if os.path.isdir(self.FolderOrArchive): glob_res = glob.glob(os.path.join(self.FolderOrArchive, '*/*_metadata.xml')) assert len(glob_res) > 0, 'No *metadata.xml file can be found in %s/*/!' % self.FolderOrArchive self.Metafile = glob_res[0] mxml_ = open(self.Metafile, "r").read() else: # archive mxml_, self.Metafile = open_specific_file_within_archive(self.FolderOrArchive, '*/*_metadata.xml') # EntityID h1 = re.search(r"<[a-z]*:identifier>([\S]*)</[a-z]*:identifier>", mxml_, re.I) self.EntityID = h1.group(1) if h1 else "Not found" # Processing Level h2 = re.search(r"<[a-z]*:productType>([a-zA-Z0-9]*)</[a-z]*:productType>", mxml_, re.I) self.ProcLCode = h2.group(1) if h2 else "Not found" # Satellite h3 = re.search(r"<[a-z]*:serialIdentifier>([a-zA-Z0-9-]*)</[a-z]*:serialIdentifier>", mxml_, re.I) self.Satellite = 'RapidEye-%s' % re.search(r'[\D]*([0-9])', h3.group(1), re.I).group(1) if h3 else "Not found" # Sensor (Instrument shortname) h4 = re.search(r"<[a-z]*:Instrument>[\s]*<eop:shortName>([\S]*)</[a-z]*:shortName>", mxml_, re.I) self.Sensor = h4.group(1) if h4 else "Not found" # Acquisition Parameters: Incidence Angle, SunAzimuth, SunElevation, ViewingAngle, FieldOfView, AcqDate, AcqTime try: h5 = re.search(r'<[a-z]*:incidenceAngle uom="deg">([\S]*)' r'</[a-z]*:incidenceAngle>[\s]*' r'<opt:illuminationAzimuthAngle uom="deg">([\S]*)' r'</opt:illuminationAzimuthAngle>[\s]*' r'<opt:illuminationElevationAngle uom="deg">([\S]*)' r'</opt:illuminationElevationAngle>[\s]*' r'<re:azimuthAngle uom="deg">([\S]*)' r'</re:azimuthAngle>[\s]*' r'<re:spaceCraftViewAngle uom="deg">([\S]*)' r'</re:spaceCraftViewAngle>[\s]*' r'<re:acquisitionDateTime>([0-9-]*)T([0-9:]*)[\S]*' r'</re:acquisitionDateTime>', mxml_, re.I) self.IncidenceAngle = float(h5.group(1)) self.SunAzimuth = float(h5.group(2)) self.SunElevation = float(h5.group(3)) # angle from true north at the image or tile center to the scan (line) direction at image center, # in clockwise positive degrees. self.additional.append(["spaceCraftAzimuthAngle:", str(round(float(h5.group(4)), 1))]) self.ViewingAngle = float(h5.group(5)) self.FOV = get_FieldOfView(self.GMS_identifier) AcqDate = h5.group(6) AcqTime = h5.group(7) except AttributeError: h5 = re.search(r'<hma:acrossTrackIncidenceAngle uom="deg">([\S]*)' r'</hma:acrossTrackIncidenceAngle>[\s]*' r'<ohr:illuminationAzimuthAngle uom="deg">([\S]*)' r'</ohr:illuminationAzimuthAngle>[\s]*' r'<ohr:illuminationElevationAngle uom="deg">([\S]*)' r'</ohr:illuminationElevationAngle>[\s]*' r'<re:azimuthAngle uom="deg">([\S]*)' r'</re:azimuthAngle>[\s]*' r'<re:acquisitionDateTime>([0-9-]*)' r'T([0-9:]*)[\S]*' r'</re:acquisitionDateTime>', mxml_, re.I) # self.IncidenceAngle = float(h5.group(1)) self.SunAzimuth = float(h5.group(2)) self.SunElevation = float(h5.group(3)) self.additional.append(["spaceCraftAzimuthAngle:", str(round(float(h5.group(4)), 1))]) self.ViewingAngle = 9999.99 self.FOV = get_FieldOfView(self.GMS_identifier) AcqDate = h5.group(5) AcqTime = h5.group(6) # set self.AcqDateTime as well as self.AcqDate and self.AcqTime self.AcqDateTime = datetime.datetime.strptime( '%s %s%s' % (AcqDate, AcqTime, '.000000+0000'), '%Y-%m-%d %H:%M:%S.%f%z') # Earth sun distance self.EarthSunDist = self.get_EarthSunDistance(self.AcqDate) # Additional: Projection h6 = re.search(r"<re:geodeticDatum>([\S]*)" r"</re:geodeticDatum>[\s]*" r"<re:projection>([\S^<\s]*)" r"</re:projection>", mxml_, re.I) try: self.additional.append(["SpatialReference", ["geodeticDatum", h6.group(1)], ["projection", h6.group(2)] ]) except AttributeError: # Level1-B data has no geodaticDatum pass # Corrections applied + Quality h7 = re.search(r"<re:radiometricCorrectionApplied>([\w]*)" r"</re:radiometricCorrectionApplied>[\s]*" r"<re:radiometricCalibrationVersion>([\S]*)" r"</re:radiometricCalibrationVersion>[\s]*" r"<re:geoCorrectionLevel>([\S\s^<]*)" r"</re:geoCorrectionLevel>[\s]*" r"<re:elevationCorrectionApplied>([\S]*)" r"</re:elevationCorrectionApplied>[\s]*" r"<re:atmosphericCorrectionApplied>([\w]*)" r"</re:atmosphericCorrectionApplied>[\s]*" r"<re:productAccuracy>([\S\s^<]*)" r"</re:productAccuracy>", mxml_, re.I) # fuer IRIS ihre Daten if h7 is None: h7 = re.search(r"<re:radiometricCorrectionApplied>([\w]*)" r"</re:radiometricCorrectionApplied>[\s]*" r"<re:geoCorrectionLevel>([\S\s^<]*)" r"</re:geoCorrectionLevel>[\s]*" r"<re:elevationCorrectionApplied>([\S]*)" r"</re:elevationCorrectionApplied>[\s]*" r"<re:atmosphericCorrectionApplied>([\w]*)" r"</re:atmosphericCorrectionApplied>", mxml_, re.I) self.additional.append( ["Corrections", ["radiometricCorrectionApplied", h7.group(1)], ["geoCorrectionLevel", h7.group(2)], ["elevationCorrectionApplied", h7.group(3)], ["atmosphericCorrectionApplied", h7.group(4)] ]) else: self.additional.append( ["Corrections", ["radiometricCorrectionApplied", h7.group(1)], ["radiometricCalibrationVersion", h7.group(2)], ["geoCorrectionLevel", h7.group(3)], ["elevationCorrectionApplied", h7.group(4)], ["atmosphericCorrectionApplied", h7.group(5)] ]) self.Quality.append(["geomProductAccuracy[m]:", str( round(float(h7.group(6)), 1))]) # Estimated product horizontal CE90 uncertainty [m] # additional. missing lines, suspectlines, binning, shifting, masking h81 = re.findall(r"<re:percentMissingLines>([^<]*)</re:percentMissingLines>", mxml_, re.I) h82 = re.findall(r"<re:percentSuspectLines>([^<]*)</re:percentSuspectLines>", mxml_, re.I) h83 = re.findall(r"<re:binning>([^<]*)</re:binning>", mxml_, re.I) h84 = re.findall(r"<re:shifting>([^<]*)</re:shifting>", mxml_, re.I) h85 = re.findall(r"<re:masking>([^<]*)</re:masking>", mxml_, re.I) self.Quality.append( ["MissingLines[%]perBand", h81]) # Percentage of missing lines in the source data of this band # Percentage of suspect lines (lines that contained downlink errors) in the source data for the band self.Quality.append(["SuspectLines[%]perBand", h82]) self.additional.append( ["binning_perBand", h83]) # Indicates the binning used (across track x along track) [1x1,2x2,3x3,1x2,2x1] self.additional.append( ["shifting_perBand", h84]) # Indicates the sensor applied right shifting [none, 1bit, 2bits, 3bits, 4bits] self.additional.append(["masking_perBand", h85]) # Indicates the sensor applied masking [111, 110, 100, 000] # Units self.ScaleFactor = 1 self.PhysUnit = "DN" # Coordinate Reference System re_CS_EPSG = re.search( r'<re:ProductInformation>[\s\S]*' r'<re:epsgCode>([0-9]*)</re:epsgCode>[\s\S]*' r'</re:ProductInformation>', mxml_, re.I) re_CS_TYPE = re.search( r'<re:ProductInformation>[\s\S]*' r'<re:projection>([\s\S]*)</re:projection>[\s\S]*' r'</re:ProductInformation>', mxml_, re.I) re_CS_DATUM = re.search( r'<re:ProductInformation>[\s\S]*' r'<re:geodeticDatum>([\w+\s]*)</re:geodeticDatum>[\s\S]*' r'</re:ProductInformation>', mxml_, re.I) re_CS_UTM_ZONE = re.search( r'<re:ProductInformation>[\s\S]*' r'<re:projectionZone>([0-9]*)</re:projectionZone>[\s\S]*' r'</re:ProductInformation>', mxml_, re.I) self.CS_EPSG = int(re_CS_EPSG.group(1)) if re_CS_EPSG else self.CS_EPSG self.CS_TYPE = 'LonLat' if re_CS_TYPE and not re.search(r'UTM', re_CS_TYPE.group(1)) else 'UTM' \ if re_CS_TYPE and re.search(r'UTM', re_CS_TYPE.group(1)) else self.CS_TYPE self.CS_DATUM = 'WGS84' if re_CS_DATUM is not None and re_CS_DATUM.group(1) == 'WGS_1984' else self.CS_DATUM self.CS_UTM_ZONE = int(re_CS_UTM_ZONE.group(1)) if re_CS_UTM_ZONE is not None else self.CS_UTM_ZONE # Corner Coordinates ## Lon/Lat for all given image corners UL,UR,LL,LR (tuples) h10_UL = re.findall( r'<re:topLeft>' r'<re:latitude>(.*)</re:latitude>' r'<re:longitude>(.*)</re:longitude>' r'</re:topLeft>', mxml_, re.I) h10_UR = re.findall( r'<re:topRight>' r'<re:latitude>(.*)</re:latitude>' r'<re:longitude>(.*)</re:longitude>' r'</re:topRight>', mxml_, re.I) h10_LL = re.findall( r'<re:bottomLeft>' r'<re:latitude>(.*)</re:latitude>' r'<re:longitude>(.*)</re:longitude>' r'</re:bottomLeft>', mxml_, re.I) h10_LR = re.findall( r'<re:bottomRight>' r'<re:latitude>(.*)</re:latitude>' r'<re:longitude>(.*)</re:longitude>' r'</re:bottomRight>', mxml_, re.I) if h10_UL: # Set Corner Tie Point Coordinates (order = UL,UR,LL,LR) self.CornerTieP_LonLat.append(tuple([float(h10_UL[0][1]), float(h10_UL[0][0])])) self.CornerTieP_LonLat.append(tuple([float(h10_UR[0][1]), float(h10_UR[0][0])])) self.CornerTieP_LonLat.append(tuple([float(h10_LL[0][1]), float(h10_LL[0][0])])) self.CornerTieP_LonLat.append(tuple([float(h10_LR[0][1]), float(h10_LR[0][0])])) ########################## # band specific metadata # ########################## LBA_full_sorted = natsorted(self.LayerBandsAssignment_full) # Gains + Offsets h9 = re.findall(r"<re:radiometricScaleFactor>([^<]*)</re:radiometricScaleFactor>", mxml_, re.I) self.Gains = dict(zip(LBA_full_sorted, [float(gain) for gain in h9])) self.Offsets = dict(zip(LBA_full_sorted, [0, 0, 0, 0, 0])) # Solar irradiance, central wavelengths , full width half maximum per band self.wvlUnit = 'Nanometers' # derive number of bands from number of given gains/offsets in header file or from raster data # noinspection PyBroadException try: self.nBands = (np.mean([len(self.Gains), len(self.Offsets)]) if np.std([len(self.Gains), len(self.Offsets)]) == 0 and len(self.Gains) != 0 else GEOP.GEOPROCESSING(self.Dataname, self.logger).bands) except Exception: self.logger.warning('Unable to get number of bands for the dataset %s Provider values are used for ' 'solar irradiation, CWL and FWHM!.' % self.Dataname) self.LayerBandsAssignment = get_LayerBandsAssignment(self.GMS_identifier, self.nBands) # Exact values calculated based in SRFs self.SolIrradiance, self.CWL, self.FWHM = self.calc_solar_irradiance_CWL_FWHM_per_band() # Provider values if not self.SolIrradiance: # Preconfigured Irradiation values self.SolIrradiance = dict(zip(LBA_full_sorted, [1997.8, 1863.5, 1560.4, 1395.0, 1124.4])) # Preconfigured CWLs self.CWL = dict(zip(LBA_full_sorted, [475., 555., 657., 710., 805.])) self.orbitParams = get_orbit_params(self.GMS_identifier) self.filter_layerdependent_metadata() self.spec_vals = get_special_values(self.GMS_identifier)
[docs] def Read_ASTER_hdffile(self, subsystem): """#----METHOD_5------------------------------------------------------------ read metadata from ASTER hdf input: hdffile: subsystem: output: :param subsystem: """ # self.default_attr() dat_ = open(self.FolderOrArchive, "r").read() if sys.version_info[0] < 3 else \ open(self.FolderOrArchive, "rb").read().decode('latin-1') # Split meta from raster data meta = re.search(r"GROUP[\s]*=[\s]" r"ASTERGENERICMETADATA[\s\S]*?" r"END_GROUP[\s]*=[\s]" r"INVENTORYMETADATA", dat_, re.I) meta_ = meta.group(0) if meta else '' genericmeta = re.search(r"GROUP[\s]*=[\s]" r"ASTERGENERICMETADATA[\s\S]*?" r"END_GROUP[\s]*=[\s]" r"ASTERGENERICMETADATA", meta_, re.I) inventorymeta = re.search(r"GROUP[\s]*=[\s]" r"INVENTORYMETADATA[\s\S]*?" r"END_GROUP[\s]*=[\s]" r"INVENTORYMETADATA", meta_, re.I) gcsgenericmeta = re.search(r"GROUP[\s]*=[\s]" r"GDSGENERICMETADATA[\s\S]*?" r"END_GROUP[\s]*=[\s]" r"GDSGENERICMETADATA", meta_, re.I) vnirmeta = re.search(r"GROUP[\s]*=[\s]" r"PRODUCTSPECIFICMETADATAVNIR[\s\S]*?" r"END_GROUP[\s]*=[\s]" r"PRODUCTSPECIFICMETADATAVNIR", meta_, re.I) swirmeta = re.search(r"GROUP[\s]*=[\s]" r"PRODUCTSPECIFICMETADATASWIR[\s\S]*?" r"END_GROUP[\s]*=[\s]" r"PRODUCTSPECIFICMETADATASWIR", meta_, re.I) tirmeta = re.search(r"GROUP[\s]*=[\s]" r"PRODUCTSPECIFICMETADATATIR[\s\S]*?" r"END_GROUP[\s]*=[\s]" r"PRODUCTSPECIFICMETADATATIR", meta_, re.I) genericmeta_ = genericmeta.group(0) if genericmeta else '' inventorymeta_ = inventorymeta.group(0) if inventorymeta else '' gcsgenericmeta_ = gcsgenericmeta.group(0) if gcsgenericmeta else '' vnirmeta_ = vnirmeta.group(0) if vnirmeta else '' swirmeta_ = swirmeta.group(0) if swirmeta else '' tirmeta_ = tirmeta.group(0) if tirmeta else '' h_ = '\n\n\n'.join([genericmeta_, inventorymeta_, gcsgenericmeta_, vnirmeta_, swirmeta_, tirmeta_]) # with open("./testing/out/ASTER_HDFmeta__h_.txt", "w") as allMetaOut: # allMetaOut.write(h_) # EntityID h1 = re.search(r"OBJECT[\s]*=[\s]*" r"IDOFASTERGDSDATAGRANULE[\s\S]*" r"END_OBJECT[\s]*=[\s]*" r"IDOFASTERGDSDATAGRANULE", h_, re.I) h11 = re.search(r'VALUE[\s]*=[\s]*"([\s\S]*)"', h1.group(0), re.I) self.EntityID = [h11.group(1), re.search(r"\"(ASTL1A[\s0-9]*)\"", h_).group(1)] # Viewing Angle h2 = re.search(r"GROUP[\s]*=[\s]*" r"POINTINGANGLES[\s\S]*" r"END_GROUP[\s]*=[\s]*" r"POINTINGANGLES", h_, re.I) h21 = re.findall(r"VALUE[\s]*=[\s]*([-]*[0-9.0-9]+)", h2.group(0), re.I) self.additional.append(["ViewingAngles", "VNIR", float(h21[0]), "SWIR", float(h21[1]), "TIR", float(h21[2])]) if max(float(h21[0]), float(h21[1]), float(h21[2])) - min(float(h21[0]), float(h21[1]), float(h21[2])) < 1: self.ViewingAngle = float(h21[0]) else: self.ViewingAngle = -99.0 # additional GainMode h3 = re.search(r"GROUP[\s]*=[\s]*" r"GAININFORMATION[\s\S]*" r"END_GROUP[\s]*=[\s]*" r"GAININFORMATION", genericmeta_, re.I) h31 = re.findall(r'VALUE[\s]*=[\s]*[\S]?\"[0-9A-Z]*\", \"([A-Z]*)\"', h3.group(0), re.I) gains = {'HGH': [170.8, 179.0, 106.8, 27.5, 8.8, 7.9, 7.55, 5.27, 4.02, 'N/A', 'N/A', 'N/A', 'N/A', 'N/A'], 'NOR': [427.0, 358.0, 218.0, 55.0, 17.6, 15.8, 15.1, 10.55, 8.04, 28.17, 27.75, 26.97, 23.30, 21.38], 'LOW': [569.0, 477.0, 290.0, 73.3, 23.4, 21.0, 20.1, 14.06, 10.72, 'N/A', 'N/A', 'N/A', 'N/A', 'N/A'], 'LO1': [569.0, 477.0, 290.0, 73.3, 23.4, 21.0, 20.1, 14.06, 10.72, 'N/A', 'N/A', 'N/A', 'N/A', 'N/A'], 'LO2': ['N/A', 'N/A', 'N/A', 73.3, 103.5, 98.7, 83.8, 62.0, 67.0, 'N/A', 'N/A', 'N/A', 'N/A', 'N/A'], 'OFF': "OFF"} self.additional.append([["GainMode:"], ["Max_radiance:"]]) for x, i in enumerate(h31[:15]): self.additional[-1][-2].append(i) self.additional[-1][-1].append(gains[i][x]) # Units self.ScaleFactor = 1 self.PhysUnit = "DN" # Satellite self.Satellite = 'Terra' # Sensor h10 = re.search(r"OBJECT[\s]*=[\s]*" r"INSTRUMENTSHORTNAME[\s\S]*" r"END_OBJECT[\s]*=[\s]*" r"INSTRUMENTSHORTNAME", h_, re.I) self.Sensor = re.search(r"VALUE[\s]*=[\s]*[\"]([A-Za-z]*)\"", h10.group(0), re.I).group(1) # Subsystem h5 = re.search(r"GROUP[\s]*=[\s]*" r"OBSERVATIONMODE[\s\S]*" r"END_GROUP[\s]*=[\s]*" r"OBSERVATIONMODE", h_, re.I) avail_subsystems = re.findall(r'VALUE[\s]*=[\s]*[(]\"([a-zA-Z0-9]*)\", \"([ONF]*)\"', h5.group(0), re.I) if subsystem not in ['VNIR1', 'VNIR2', 'SWIR', 'TIR']: raise ValueError('Unexpected subsystem >%s<. Unable to specify the correct ASTER subsystem to be ' 'processed.' % subsystem) else: if subsystem == 'VNIR1' and avail_subsystems[0][1] == 'ON': self.nBands = 3 if subsystem == 'VNIR2' and avail_subsystems[1][1] == 'ON': self.nBands = 1 if subsystem == 'SWIR' and avail_subsystems[2][1] == 'ON': self.nBands = 6 if subsystem == 'TIR' and avail_subsystems[3][1] == 'ON': self.nBands = 5 self.Subsystem = subsystem # Field of view (requires Satellite, Sensor, Subsystem) self.FOV = get_FieldOfView(self.GMS_identifier) # Ground resolution re_res_GSD = re.findall(r'OBJECT[\s]*=[\s]*' r'SPATIALRESOLUTION[\s\S]*' r'VALUE[\s]*=[\s]*[\S]{1}([1-9][0-9]), ([1-9][0-9]), ([1-9][0-9])[\s\S]*' r'END_OBJECT[\s]*=[\s]*' r'SPATIALRESOLUTION', h_, re.I) idx_subsystem = \ 0 if subsystem[:4] == 'VNIR' else \ 1 if subsystem[:4] == 'SWIR' else \ 2 if subsystem[:4] == 'TIR' else None self.gResolution = float(re_res_GSD[0][idx_subsystem]) \ if re_res_GSD and idx_subsystem is not None else self.gResolution # Flight direction h6 = re.search(r"OBJECT[\s]*=[\s]*" r"FLYINGDIRECTION[\s\S]*\"([ASDE]*?)\"", h_, re.I) Fdir = {'AS': "Ascending", 'DE': "Descending"} self.additional.append(["Flight Direction", Fdir[h6.group(1)]]) # SunAzimuth SunElevation h7 = re.search(r"OBJECT[\s]*=[\s]*" r"SOLARDIRECTION[\s\S]*" r"END_OBJECT[\s]*=[\s]*" r"SOLARDIRECTION", h_, re.I) h71 = re.search(r"VALUE[\s]*=[\s]*[\S]{1}([0-9.]*), ([0-9.]*)", h7.group(0), re.I) self.SunAzimuth = float(h71.group(1)) self.SunElevation = float(h71.group(2)) # data Quality h8 = re.findall(r"GROUP[\s]*=[\s]*" r"DATAQUALITY[1-9][0-4BN]?[^.]*" r"END_GROUP[\s]*=[\s]*" r"DATAQUALITY[1-9][0-4BN]?", h_, re.I) h81 = re.findall(r'VALUE[\s]*=[\s]*([^\n]*)', "\n".join(h8), re.I) self.Quality.append(["NoOfBadPixel:"]) for i in h81: self.Quality[-1].append(i) # ProdLevel h9 = re.search(r"OBJECT[\s]*=[\s]*" r"SHORTNAME[\s\S]*" r"END_OBJECT[\s]*=[\s]*" r"SHORTNAME", h_, re.I) self.ProcLCode = re.search(r"VALUE[\s]*=[\s]*[\"]([0-9A-Za-z]*)\"", h9.group(0), re.I).group(1) # AcqTime h11 = re.search(r"OBJECT[\s]*=[\s]*" r"TIMEOFDAY[\s\S]*" r"END_OBJECT[\s]*=[\s]*" r"TIMEOFDAY", h_, re.I) h111 = re.search(r"VALUE[\s]*=[\s]*[\"]([\d][\d][\d][\d][\d][\d])[\S]*\"", h11.group(0), re.I) if type(h111).__name__ != 'NoneType': AcqTime = "%s:%s:%s" % (h111.group(1)[:2], h111.group(1)[2:4], h111.group(1)[4:6]) else: h112 = re.search(r"VALUE[\s]*=[\s]*[\"]([\d:]*)[\S]*\"", h11.group(0), re.I) AcqTime = h112.group(1) # AcqDate h12 = re.search(r"OBJECT[\s]*=[\s]*" r"CALENDARDATE[\s\S]*" r"END_OBJECT[\s]*=[\s]*" r"CALENDARDATE", h_, re.I) h121 = re.search(r"VALUE[\s]*=[\s]*[\"]([\d]*)\"", h12.group(0), re.I) if type(h121).__name__ != 'NoneType': AcqDate = "%s-%s-%s" % (h121.group(1)[:4], h121.group(1)[4:6], h121.group(1)[6:8]) else: h121 = re.search(r"VALUE[\s]*=[\s]*[\"]([\d-]*)\"", h12.group(0), re.I) AcqDate = h121.group(1) # set self.AcqDateTime as well as self.AcqDate and self.AcqTime self.AcqDateTime = datetime.datetime.strptime( '%s %s%s' % (AcqDate, AcqTime, '.000000+0000'), '%Y-%m-%d %H:%M:%S.%f%z') # Earth sun distance self.EarthSunDist = self.get_EarthSunDistance(self.AcqDate) # data Quality h13 = re.search(r"GROUP[\s]*=[\s]*" r"QASTATS[\s\S]*" r"END_GROUP[\s]*=[\s]*" r"QASTATS", h_, re.I) h131 = re.findall(r'VALUE[\s]*=[\s]*([0-9.0-9]*)', h13.group(0), re.I) self.Quality.append(["PercMissingData:", h131[0]]) # The percentage of missing data of the scene [%] self.Quality.append(["PercOutOfBoundsData:", h131[1]]) # The percentage of out of bounds data of the scene [%] self.Quality.append(["PercInterpolatedData:", h131[2]]) # The percentage of interpolated data of the scene [%] # Coordinate Reference System (L1B+) topgroupname = 'PRODUCTSPECIFICMETADATA' + subsystem[:4] # read only the information from first band in current ASTER subsystem # (the others should have the same parameters) re_res_procParm = re.search(r'GROUP[\s]*=[\s]*%s[\s\S]*' r'GROUP[\s]*=[\s]*(PROCESSINGPARAMETERS%s[\s\S]*' r'END_GROUP[\s]*=[\s]*PROCESSINGPARAMETERS%s)[\s\S]*' r'END_GROUP[\s]*=[\s]*%s' % (topgroupname, self.LayerBandsAssignment[0], self.LayerBandsAssignment[0], topgroupname), h_, re.I) if type(re_res_procParm).__name__ != 'NoneType': re_res_procParm = re_res_procParm.group(1) re_CS_TYPE = re.search(r'OBJECT[\s]*=[\s]*' r'MPMETHOD[\s\S]*' r'VALUE[\s]*=[\s]*"([\s\S]*)"[\s\S]*' r'END_OBJECT[\s]*=[\s]*' r'MPMETHOD', re_res_procParm, re.I) re_CS_UTM_ZONE = re.search(r'OBJECT[\s]*=[\s]*' r'UTMZONECODE[\s\S]*' r'VALUE[\s]*=[\s]*([0-9][0-9]?)[\s\S]*' r'END_OBJECT[\s]*=[\s]*' r'UTMZONECODE', re_res_procParm, re.I) self.CS_TYPE = 'LonLat' if re_CS_TYPE and not re.search(r'UTM', re_CS_TYPE.group(1)) else 'UTM' \ if re_CS_TYPE and re.search(r'UTM', re_CS_TYPE.group(1)) else self.CS_TYPE self.CS_DATUM = 'WGS84' if self.CS_TYPE == 'UTM' else self.CS_DATUM self.CS_UTM_ZONE = int(re_CS_UTM_ZONE.group(1)) if re_CS_UTM_ZONE else self.CS_UTM_ZONE # Corner Coordinates ## Lon/Lat for all given image corners UL,UR,LL,LR (tuples) re_res_sceneInfo = re.search(r'GROUP[\s]*=[\s]*' r'SCENEINFORMATION[\s\S]*' r'END_GROUP[\s]*=[\s]*' r'SCENEINFORMATION', h_, re.I).group(0) re_res_UL = re.findall(r'OBJECT[\s]*=[\s]*' r'UPPERLEFT[\s\S]*' r'VALUE[\s]*=[\s]*[\S]{1}([0-9.]*), ([0-9.]*)[\s\S]*' r'END_OBJECT[\s]*=[\s]*' r'UPPERLEFT', re_res_sceneInfo, re.I) re_res_UR = re.findall(r'OBJECT[\s]*=[\s]*' r'UPPERRIGHT[\s\S]*' r'VALUE[\s]*=[\s]*[\S]{1}([0-9.]*), ([0-9.]*)[\s\S]*' r'END_OBJECT[\s]*=[\s]*' r'UPPERRIGHT', re_res_sceneInfo, re.I) re_res_LL = re.findall(r'OBJECT[\s]*=[\s]*' r'LOWERLEFT[\s\S]*' r'VALUE[\s]*=[\s]*[\S]{1}([0-9.]*), ([0-9.]*)[\s\S]*' r'END_OBJECT[\s]*=[\s]*' r'LOWERLEFT', re_res_sceneInfo, re.I) re_res_LR = re.findall(r'OBJECT[\s]*=[\s]*' r'LOWERRIGHT[\s\S]*' r'VALUE[\s]*=[\s]*[\S]{1}([0-9.]*), ([0-9.]*)[\s\S]*' r'END_OBJECT[\s]*=[\s]*' r'LOWERRIGHT', re_res_sceneInfo, re.I) if re_res_UL: # Set Corner Tie Point Coordinates (order = UL,UR,LL,LR) self.CornerTieP_LonLat.append(tuple([float(re_res_UL[0][1]), float(re_res_UL[0][0])])) self.CornerTieP_LonLat.append(tuple([float(re_res_UR[0][1]), float(re_res_UR[0][0])])) self.CornerTieP_LonLat.append(tuple([float(re_res_LL[0][1]), float(re_res_LL[0][0])])) self.CornerTieP_LonLat.append(tuple([float(re_res_LR[0][1]), float(re_res_LR[0][0])])) ########################## # band specific metadata # ########################## LBA_full_sorted = natsorted(self.LayerBandsAssignment_full) # Gains/Offsets h4 = re.findall(r"GROUP[\s]*=[\s]*" r"UNITCONVERSIONCOEFF[1-9][0-4BN]?[^(]*" r"END_GROUP[\s]*=[\s]*" r"UNITCONVERSIONCOEFF[1-9][0-4BN]?", h_, re.I) h41 = re.findall(r'VALUE[\s]*=[\s]*([0-9.0-9]+)', "\n".join(h4), re.I) h42 = re.findall(r'VALUE[\s]*=[\s]*([-]+[0-9.0-9]+)', "\n".join(h4), re.I) self.Gains = dict(zip(LBA_full_sorted, [float(i) for i in h41])) self.Offsets = dict(zip(LBA_full_sorted, [float(i) for i in h42])) # Solar irradiance, central wavelengths , full width half maximum per band self.wvlUnit = 'Nanometers' self.LayerBandsAssignment = get_LayerBandsAssignment(self.GMS_identifier, self.nBands) # Exact values calculated based in SRFs self.SolIrradiance, self.CWL, self.FWHM = self.calc_solar_irradiance_CWL_FWHM_per_band() # Provider values if not self.SolIrradiance: # Preconfigured Irradiation values self.SolIrradiance = dict(zip(LBA_full_sorted, [1845.99, 1555.74, 1119.47] + [None] * 12)) # Preconfigured CWLs self.CWL = dict(zip(LBA_full_sorted, [556., 659., 807., 804., 1656., 2167., 2208., 2266., 2336., 2400., 8291., 8634., 9075., 10657., 11318])) # Thermal Constants (K1 = [W*m-2*um-1]; K1 = [K]) # Preconfigured values; resource: http://www.pancroma.com/ASTER%20Surface%20Temperature%20Analysis.html self.ThermalConstK1 = dict(zip(LBA_full_sorted, [0.] * 10 + [3040.136402, 2482.375199, 1935.060183, 866.468575, 641.326517])) self.ThermalConstK2 = dict(zip(LBA_full_sorted, [0.] * 10 + [1735.337945, 1666.398761, 1585.420044, 1350.069147, 1271.221673])) self.orbitParams = get_orbit_params(self.GMS_identifier) self.filter_layerdependent_metadata() self.spec_vals = get_special_values(self.GMS_identifier)
[docs] def Read_ALOS_summary(self): """----METHOD_6------------------------------------------------------------ read metadata from ALOS summary.txt """ # self.default_attr() if os.path.isdir(self.FolderOrArchive): glob_res = glob.glob(os.path.join(self.FolderOrArchive, '*/*data*/summary.txt')) assert len(glob_res) > 0, 'No summary.txt file can be found in %s/*/*data*/!' % self.FolderOrArchive self.Metafile = glob_res[0] sum_ = open(self.Metafile, "r").read() else: # archive sum_, self.Metafile = open_specific_file_within_archive(self.FolderOrArchive, '*/*data*/summary.txt') # EntityID h1 = re.search(r'Scs_SceneID="([a-zA-Z0-9]*)"', sum_, re.I) self.EntityID = h1.group(1) # Satellite, Sensor, ProcLevel, h5 = re.search(r'Lbi_Satellite="([\S]*)"[\s]*' r'Lbi_Sensor="([\S-]*)"[\s]*' r'Lbi_ProcessLevel="([\S-]*)"', sum_, re.I) self.Satellite = h5.group(1) self.Sensor = h5.group(2) self.ProcLCode = h5.group(3) # Ground resolution re_res_GSD = re.search(r'Pds_PixelSpacing="([\d]*)"', sum_, re.I) self.gResolution = float(re_res_GSD.group(1)) if re_res_GSD else self.gResolution # Additional map info + resampling try: # no mapinfo or resampling for Level1A hx = re.search(r'Ps_(ResamplingMethod="[\S]*")[\s]*' r'Pds_(UTM_ZoneNo="[\S]*")[\s]*' r'Pds_(MapDirection="[\S]*")[\s]*' r'Pds_(PixelSpacing="[\d]*")', sum_, re.I) self.additional.append(["Map_Info: " + ', '.join(hx.groups())]) except AttributeError: pass # SunElevation, SunAzimuth, ViewingAngle =(Img_Pointing_Angle), # IncidenceAngle =(Img_SceneCenterAngle), Field of View h2 = re.search(r'Img_SunAngleElevation="([\d.]*)"[\s]*' r'Img_SunAngleAzimuth="([\d.]*)"[\s]*' r'Img_PointingAngle="([\d.]*)"[\s]*' r'Img_SceneCenterAngle="([\S]*)"', sum_, re.I) self.SunElevation = float(h2.group(1)) self.SunAzimuth = float(h2.group(2)) self.ViewingAngle = float(h2.group(3)) incAngle = h2.group(4) if incAngle.lower().startswith("l"): # L means incidence angle to the left. Set to negativ values self.IncidenceAngle = float("-" + incAngle[1:]) elif incAngle.lower().startswith("r"): # R means incidence angle to the right. Set to positiv values self.IncidenceAngle = float(incAngle[1:]) else: # Sign ("L" or "R") will not be added in case of zero. self.IncidenceAngle = float(incAngle) self.FOV = get_FieldOfView(self.GMS_identifier) self.ScaleFactor = 1 self.PhysUnit = "DN" # Additional: Exposure coefficients, Saturation coefficients of each band h4 = re.search(r'Img_ExposureOfBand1="([\d.-]*)"[\s]*' r'Img_ExposureOfBand2="([\d.-]*)"[\s]*' r'Img_ExposureOfBand3="([\d.-]*)"[\s]*' r'Img_ExposureOfBand4="([\d.-]*)"[\s]*' r'Img_SaturationLevelOfBand1="([\d.-]*)"[\s]*' r'Img_SaturationLevelOfBand2="([\d.-]*)"[\s]*' r'Img_SaturationLevelOfBand3="([\d.-]*)"[\s]*' r'Img_SaturationLevelOfBand4="([\d.-]*)"', sum_, re.I) self.additional.append(["Exposure Coeffients: B1:'%s',B2:'%s',B3:'%s',B4:'%s'" % (h4.group(1), h4.group(2), h4.group(3), h4.group(4))]) self.additional.append(["Saturation coeffients: B1:'%s',B2:'%s',B3:'%s',B4:'%s'" % (h4.group(5), h4.group(6), h4.group(7), h4.group(8))]) # AcqDate + AcqTime h6 = re.search(r'Lbi_ObservationDate="([\d]*)"', sum_, re.I) AcqDate = "%s-%s-%s" % (h6.group(1)[:4], h6.group(1)[4:6], h6.group(1)[6:8]) re_AcqTime = re.search(r'Img_SceneCenterDateTime="[0-9]* ([0-9][0-9]:[0-9][0-9]:.*)"', sum_, re.I) AcqTime_dec = re_AcqTime.group(1) if re_AcqTime else self.AcqTime AcqTime = AcqTime_dec.split('.')[0] # set self.AcqDateTime as well as self.AcqDate and self.AcqTime if AcqTime_dec: self.AcqDateTime = datetime.datetime.strptime( '%s %s%s' % (AcqDate, AcqTime_dec, '+0000'), '%Y-%m-%d %H:%M:%S.%f%z') else: self.AcqDateTime = datetime.datetime.strptime('%s%s' % (AcqDate, '+0000'), '%Y-%m-%d%z') self.AcqTime = AcqTime # Earth sun distance self.EarthSunDist = self.get_EarthSunDistance(self.AcqDate) # Quality (clouds) # cloudsperc = {'0': "0 to 2%", '1': "3 to 10%", '1': "11 to 20%", '1': "21 to 30%", '1': "31 to 40%", # '1': "41 to 50%", # '1': "51 to 60%", '1': "61 to 70%", '1': "71 to 80%", '1': "81 to 90%", '1': "91 to 100%", # '99': "No assessment"} FIXME F601 dictionary key '1' repeated with different values # h7 = re.search(r'Img_CloudQuantityOfAllImage="([\d]*)"', sum_, re.I) # self.Quality.append(["CloudCoverage: " + cloudsperc[h7.group(1)]]) # Corner Coordinates ## Lon/Lat for all given image corners UL,UR,LL,LR (tuples) h10_UL = re.findall(r'Img_[\s\S]*SceneLeftTopLatitude="(.*)"[\s\S]' r'Img_[\s\S]*SceneLeftTopLongitude="(.*)"', sum_, re.I) h10_UR = re.findall(r'Img_[\s\S]*SceneRightTopLatitude="(.*)"[\s\S]' r'Img_[\s\S]*SceneRightTopLongitude="(.*)"', sum_, re.I) h10_LL = re.findall(r'Img_[\s\S]*SceneLeftBottomLatitude="(.*)"[\s\S]' r'Img_[\s\S]*SceneLeftBottomLongitude="(.*)"', sum_, re.I) h10_LR = re.findall( r'Img_[\s\S]*SceneRightBottomLatitude="(.*)"[\s\S]' r'Img_[\s\S]*SceneRightBottomLongitude="(.*)"', sum_, re.I) if h10_UL: # Set Corner Tie Point Coordinates (order = UL,UR,LL,LR) self.CornerTieP_LonLat.append(tuple([float(h10_UL[0][1]), float(h10_UL[0][0])])) self.CornerTieP_LonLat.append(tuple([float(h10_UR[0][1]), float(h10_UR[0][0])])) self.CornerTieP_LonLat.append(tuple([float(h10_LL[0][1]), float(h10_LL[0][0])])) self.CornerTieP_LonLat.append(tuple([float(h10_LR[0][1]), float(h10_LR[0][0])])) # Coordinate Reference System re_res_CS_UTM_ZONE = re.search(r'Pds_UTM_ZoneNo="([0-9][0-9]?)"', sum_, re.I) self.CS_UTM_ZONE = int(re_res_CS_UTM_ZONE.group(1)) if re_res_CS_UTM_ZONE else self.CS_UTM_ZONE if self.CS_UTM_ZONE != -99.: self.CS_TYPE = 'UTM' self.CS_EPSG = self.CS_EPSG if self.CornerTieP_LonLat == [] else int('326' + str(self.CS_UTM_ZONE)) \ if self.CornerTieP_LonLat[0][1] > 0.0 else int('327' + str(self.CS_UTM_ZONE)) self.CS_DATUM = 'WGS84' else: self.CS_TYPE = 'LonLat' if self.CornerTieP_LonLat != [] and -180. <= self.CornerTieP_LonLat[0][0] <= 180. \ and -90. <= self.CornerTieP_LonLat[0][1] <= 90. else self.CS_TYPE self.CS_EPSG = 4326 if self.CS_TYPE == 'LonLat' else self.CS_TYPE self.CS_DATUM = 'WGS84' if self.CS_TYPE == 'LonLat' else self.CS_DATUM ########################## # band specific metadata # ########################## LBA_full_sorted = natsorted(self.LayerBandsAssignment_full) # GainMode with corresponding coefficients + Offsets gains_AVNIR = {'1': ['N/A', 'N/A', 'N/A', 'N/A'], '2': [0.5880, 0.5730, 0.5020, 0.5570], '3': [0.9410, 0.9140, 1.2040, 0.835], '4': [1.412, 1.373, 'N/A', 0.8350]} # gains_PRISM = {'1': ['N/A', 'N/A', 'N/A', 'N/A'], '2': [0.501, 'N/A', 'N/A', 'N/A'], # '3': ['N/A', 'N/A', 'N/A', 'N/A'], '4': ['N/A', 'N/A', 'N/A', 'N/A']} h3 = re.search(r'Img_GainModeBand1="([1-4])"[\s]*' r'Img_GainModeBand2="([1-4])"[\s]*' r'Img_GainModeBand3="([1-4])"' r'[\s]*Img_GainModeBand4="([1-4])"', sum_, re.I) self.additional.append( ["GainMode: B1:'%s',B2:'%s',B3:'%s',B4:'%s'" % (h3.group(1), h3.group(2), h3.group(3), h3.group(4))]) self.Gains = dict(zip(LBA_full_sorted, [gains_AVNIR[h3.group(1)][0], gains_AVNIR[h3.group(2)][1], gains_AVNIR[h3.group(3)][2], gains_AVNIR[h3.group(4)][3]])) self.Offsets = dict(zip(LBA_full_sorted, [0, 0, 0, 0])) # only gains are required for DN2radiance calculation # Solar irradiance, central wavelengths, full width half maximum per band self.wvlUnit = 'Nanometers' # derive number of bands from number of given gains/offsets in header file or from raster data # noinspection PyBroadException try: self.nBands = (np.mean([len(self.Gains), len(self.Offsets)]) if np.std([len(self.Gains), len(self.Offsets)]) == 0 and len(self.Gains) != 0 else GEOP.GEOPROCESSING(self.Dataname, self.logger).bands) except Exception: self.logger.warning('Unable to get number of bands for the dataset %s Provider values are used for ' 'solar irradiation, CWL and FWHM!.' % self.Dataname) self.LayerBandsAssignment = get_LayerBandsAssignment(self.GMS_identifier, self.nBands) # Exact values calculated based in SRFs self.SolIrradiance, self.CWL, self.FWHM = self.calc_solar_irradiance_CWL_FWHM_per_band() # Provider values if not self.SolIrradiance: # Preconfigured Irradiation values # = Thullier. source: https://earth.esa.int/c/document_library/get_file?folderId=19584&name=DLFE-262.pdf self.SolIrradiance = dict(zip(LBA_full_sorted, [1943.3, 1813.7, 1562.3, 1076.5])) # Preconfigured CWLs self.CWL = dict(zip(LBA_full_sorted, [460., 560., 650., 825.])) # http://www.isprs-ann-photogramm-remote-sens-spatial-inf-sci.net/I-7/291/2012/isprsannals-I-7-291-2012.pdf self.FWHM = dict(zip(LBA_full_sorted, [80., 80., 80., 130.])) self.filter_layerdependent_metadata() self.orbitParams = get_orbit_params(self.GMS_identifier) self.spec_vals = get_special_values(self.GMS_identifier)
[docs] def Read_ALOS_LEADER(self): """Read metadata from ALOS leader file. binary. For exact information content see: file:///misc/ro2/behling/Satelliten/ALOS/doc/ALOS Product Format description.pdf """ # self.default_attr() if os.path.isdir(self.FolderOrArchive): glob_res = glob.glob(os.path.join(self.FolderOrArchive, '*/*data*/LED-*')) assert len(glob_res) > 0, 'No LED* file can be found in %s/*/*data*/!' % self.FolderOrArchive self.Metafile = glob_res[0] lead_ = open(self.Metafile, "rb").read() else: # archive lead_, self.Metafile = \ open_specific_file_within_archive(self.FolderOrArchive, '*/*data*/LED-*', read_mode='rb') # Gains & offsets LBA_full_sorted = natsorted(self.LayerBandsAssignment_full) self.Gains = dict(zip(LBA_full_sorted, [float(lead_[4680 * 3 + 2703:4680 * 3 + 2710]), float(lead_[4680 * 3 + 2719:4680 * 3 + 2726]), float(lead_[4680 * 3 + 2735:4680 * 3 + 2742]), float(lead_[4680 * 3 + 2751:4680 * 3 + 2758])])) self.Offsets = dict(zip(LBA_full_sorted, [float(lead_[4680 * 3 + 2711:4680 * 3 + 2718]), float(lead_[4680 * 3 + 2727:4680 * 3 + 2734]), float(lead_[4680 * 3 + 2743:4680 * 3 + 2750]), float(lead_[4680 * 3 + 2759:4680 * 3 + 2766])]))
[docs] def Read_Sentinel2_xmls(self): """Read metadata from Sentinel-2 generic xml and granule xml""" # query database to get entityid assert self.SceneID and self.SceneID != -9999, "Read_Sentinel2_xmls(): Missing scene ID. " res = DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'scenes', ['entityid'], {'id': self.SceneID}) assert len(res) != 0, \ "Invalid SceneID given - no corresponding scene with the ID=%s found in database.\n" % self.SceneID assert len(res) == 1, "Error in database. The sceneid %s exists more than once. \n" % self.SceneID S2AB = 'S2A' if re.search(r"Sentinel-2A", self.Satellite, re.I) else 'S2B' S2ABgranuleID = res[0][0] ################# # get XML roots # ################# if os.path.isdir(self.FolderOrArchive): # metadata has to be read from folder ##################################### # get xml_Scene_root (contains scene metadata) glob_res = glob.glob(os.path.join(self.FolderOrArchive, '%s*.xml' % S2AB)) if not glob_res: # new style packaging glob_res = glob.glob(os.path.join(self.FolderOrArchive, 'MTD*.xml')) assert len(glob_res) > 0, 'No %s*.xml or MTD*.xml file can be found in %s/!' % (S2AB, self.FolderOrArchive) self.Metafile = glob_res[0] xml_Scene_root = ET.parse(glob_res[0]).getroot() # xml_parser from file # get xml_GR_root (contains granule metadata) glob_res = glob.glob(os.path.join(self.FolderOrArchive, 'GRANULE/' + S2ABgranuleID + '/%s*.xml' % S2AB)) if not glob_res: # new style packaging glob_res = glob.glob(os.path.join(self.FolderOrArchive, 'GRANULE/' + S2ABgranuleID + '/MTD*.xml')) assert len(glob_res) > 0, \ 'No /GRANULE/<%sgranuleID>/%s*.xml or MTD*.xml file can be found in %s/!' \ % (S2AB, S2AB, self.FolderOrArchive) self.Metafile = self.Metafile + ", " + glob_res[0] xml_GR_root = ET.parse(glob_res).getroot() # xml_parser from file else: # metadata has to be read from within archive ############################################# # get xml_Scene_root and xml_GR_root (contain scene and granule metadata) try: # old style packaging xml_SC_str_, self.Metafile = \ open_specific_file_within_archive(self.FolderOrArchive, '*.SAFE/%s*.xml' % S2AB) xml_GR_str_, Metafile_ = \ open_specific_file_within_archive(self.FolderOrArchive, '*.SAFE/GRANULE/' + S2ABgranuleID + '/%s*.xml' % S2AB) except RuntimeError: # wrong matching expression # new style packaging xml_SC_str_, self.Metafile = open_specific_file_within_archive(self.FolderOrArchive, '*.SAFE/MTD*.xml') xml_GR_str_, Metafile_ = open_specific_file_within_archive(self.FolderOrArchive, '*.SAFE/GRANULE/' + S2ABgranuleID + '/MTD*.xml') xml_Scene_root = ET.fromstring(xml_SC_str_) xml_GR_root = ET.fromstring(xml_GR_str_) self.Metafile = self.Metafile + ", " + Metafile_ ################################### # EXTRACT METADATA FROM XML ROOTS # ################################### # define Sentinel 2 metadata (hard coded) self.Sensor = "MSI" # extract metadata from xml_Scene_root # ######################################## # get the namespace within the xml_Scene_root m = re.match(r'{(.*)\}', xml_Scene_root.tag) # extract namespace from "{https://....}Level-1C_Tile_ID" assert m, 'XML Namespace could not be identified within Sentinel-2 metadata XML file.' namespace = m.group(1) self.EntityID = \ xml_Scene_root.find(".//Datatake").attrib['datatakeIdentifier'] # FIXME tileID (Granule) oder scene ID??? self.Satellite = xml_Scene_root.find(".//SPACECRAFT_NAME").text # AcqDate, AcqTime # NOTE ths corresponds to the while data take, not to the granule, BUT has th same value as the one from granule # DateTime = xml_Scene_root.find(".//PRODUCT_START_TIME").text # ''' PRODUCT_START_TIME = Sensing Time of the first line of the first scene in the product. # Alternative: 'DATATAKE_SENSING_START': Sensing start time of the Datatake''' # self.AcqDate = DateTime[:10] # self.AcqTime = DateTime[11:19] self.ScaleFactor = int(xml_Scene_root.find(".//QUANTIFICATION_VALUE").text) self.PhysUnit = "TOA_Reflectance in [0-%d]" % self.ScaleFactor # Flight direction Fdir = {'ASCENDING': "Ascending", 'DESCENDING': "Descending"} self.additional.append(["Flight Direction", Fdir[xml_Scene_root.find(".//SENSING_ORBIT_DIRECTION").text]]) self.ProcLCode = xml_Scene_root.find(".//PROCESSING_LEVEL").text # extract metadata from xml_GR_root # ##################################### # get the namespace within the xml_GR_root m = re.match(r'{(.*)\}', xml_GR_root.tag) # extract namespace from "{https://....}Level-1C_Tile_ID" assert m, 'XML Namespace could not be identified within metadata XML file.' namespace = m.group(1) # e.g., "https://psd-12.sentinel2.eo.esa.int/PSD/S2_PDI_Level-1C_Tile_Metadata.xsd" # set self.AcqDateTime as well as self.AcqDate and self.AcqTime self.AcqDateTime = iso8601.parse_date(xml_GR_root.find(".//SENSING_TIME").text) # SunAngles self.SunElevation = 90 - float(xml_GR_root.find(".//Mean_Sun_Angle/ZENITH_ANGLE").text) # mean angle of granule self.SunAzimuth = float(xml_GR_root.find(".//Mean_Sun_Angle/AZIMUTH_ANGLE").text) # mean angle of granule # coordinate system geo_codings = HLP_F.find_in_xml_root(namespace, xml_GR_root, 'Geometric_Info', "Tile_Geocoding") self.CS_EPSG = int(geo_codings.find(".//HORIZONTAL_CS_CODE").text.split(":")[1]) CooSys = geo_codings.find(".//HORIZONTAL_CS_NAME").text self.CS_DATUM = "WGS84" if re.search(r"wgs84", CooSys, re.I).group(0) else self.CS_DATUM self.CS_TYPE = "UTM" if re.search(r"utm", CooSys, re.I).group(0) else self.CS_TYPE if self.CS_TYPE == "UTM": tmp = CooSys.split(" ")[-1] self.CS_UTM_ZONE = \ int(tmp[:-1]) if tmp[-1] == 'N' else \ -int(tmp[:-1]) if tmp[-1] == 'S' else self.CS_UTM_ZONE # corner coords subsytem_Res_dic = {"%s10" % S2AB: 10, "%s20" % S2AB: 20, "%s60" % S2AB: 60} if self.CS_TYPE == 'UTM': spatial_samplings = {float(size.get("resolution")): {key: int(size.find(key).text) for key in ["NROWS", "NCOLS"]} for size in geo_codings.findall("Size")} for geo in geo_codings.findall("Geoposition"): spatial_samplings[float(geo.get("resolution"))].update( {key: float(geo.find(key).text) for key in ["ULX", "ULY", "XDIM", "YDIM"]}) ss_sub = spatial_samplings[subsytem_Res_dic[self.Subsystem]] LRX = ss_sub['ULX'] + ss_sub['NCOLS'] * ss_sub['XDIM'] LRY = ss_sub['ULY'] + ss_sub['NROWS'] * ss_sub['YDIM'] self.CornerTieP_UTM = [(ss_sub['ULX'], ss_sub['ULY']), (LRX, ss_sub['ULY']), (ss_sub['ULX'], LRY), (LRX, LRY)] # (x,y) for UL,UR,LL,LR # geometricResolution self.gResolution = subsytem_Res_dic[self.Subsystem] # determine metadata from extracted metadata values self.EarthSunDist = self.get_EarthSunDistance(self.AcqDate) # Quality flags # FIXME does not work (at least with old data) Quality_temp = (xml_Scene_root.find(".//Technical_Quality_Assessment")) self.Quality.append(["DEGRADED_ANC_DATA_PERCENTAGE", Quality_temp.find("./DEGRADED_ANC_DATA_PERCENTAGE").text]) self.Quality.append(["DEGRADED_MSI_DATA_PERCENTAGE", Quality_temp.find("./DEGRADED_MSI_DATA_PERCENTAGE").text]) Quality_temp2 = xml_Scene_root.find(".//Quality_Inspections") quality_flags = ["SENSOR_QUALITY_FLAG", "GEOMETRIC_QUALITY_FLAG", "GENERAL_QUALITY_FLAG", "FORMAT_CORRECTNESS_FLAG", "RADIOMETRIC_QUALITY_FLAG"] try: # layout example: <SENSOR_QUALITY_FLAG>PASSED</SENSOR_QUALITY_FLAG> for ql in quality_flags: self.Quality.append([ql, Quality_temp2.find("./" + ql).text]) except AttributeError: # since ~11/2017 the quality checks layout in the XML has changed: # layout example: <quality_check checkType="SENSOR_QUALITY">PASSED</quality_check> elements = Quality_temp2.findall('quality_check') checkTypeValDict = {ele.attrib['checkType']: ele.text for ele in elements} for ql in quality_flags: self.Quality.append([ql, checkTypeValDict[ql.split('_FLAG')[0]]]) ########################## # band specific metadata # ########################## LBA_full_sorted = natsorted(self.LayerBandsAssignment_full) # ATTENTION Gains are only provided for 12 bands! I don't know why? Gains = [float(ele.text) for ele in xml_Scene_root.findall(".//PHYSICAL_GAINS")] Gains = Gains if len(Gains) == 13 else [1] + Gains self.Gains = dict(zip(LBA_full_sorted, Gains)) # FIXME assuming that the first band at 443nm has been left out here IS POSSIBLY WRONG # FIXME (could also be band 8A oder band 9 (water vapour)) # Solar irradiance, central wavelengths, full width half maximum per band self.wvlUnit = 'Nanometers' self.LayerBandsAssignment = get_LayerBandsAssignment(self.GMS_identifier) # Exact values calculated based in SRFs self.SolIrradiance, self.CWL, self.FWHM = self.calc_solar_irradiance_CWL_FWHM_per_band() # Provider values if not self.SolIrradiance: # get from xml file self.SolIrradiance = \ dict(zip(LBA_full_sorted, [float(ele.text) for ele in xml_Scene_root.find(".//Solar_Irradiance_List").findall("SOLAR_IRRADIANCE")])) # Preconfigured CWLs self.CWL = dict(zip(LBA_full_sorted, [float(ele.text) for ele in xml_Scene_root.find(".//Spectral_Information_List").findall(".//CENTRAL")])) # SensorAngles meta_temp = {} branch = HLP_F.find_in_xml_root(namespace, xml_GR_root, *("Geometric_Info", "Tile_Angles")) meta_temp["bandId2bandName"] = {int(ele.get("bandId")): ele.text.split("_")[-2] for ele in xml_GR_root.findall(".//MASK_FILENAME") if ele.get("bandId") is not None} meta_temp["bandName2bandId"] = {bandName: bandId for bandId, bandName in meta_temp["bandId2bandName"].items()} meta_temp["bandIds"] = sorted(list(meta_temp["bandId2bandName"].keys())) meta_temp["viewing_zenith_detectors"] = \ {bandId: {bf.get("detectorId"): HLP_F.get_values_from_xml(HLP_F.find_in_xml(bf, *("Zenith", "Values_List"))) for bf in branch.findall("Viewing_Incidence_Angles_Grids[@bandId='%i']" % bandId)} for bandId in meta_temp["bandIds"]} meta_temp["viewing_zenith"] = HLP_F.stack_detectors(meta_temp["viewing_zenith_detectors"]) meta_temp["viewing_azimuth_detectors"] = {bandId: {bf.get("detectorId"): HLP_F.get_values_from_xml( HLP_F.find_in_xml(bf, *("Azimuth", "Values_List"))) for bf in branch.findall( "Viewing_Incidence_Angles_Grids[@bandId='%i']" % bandId)} for bandId in meta_temp["bandIds"]} meta_temp["viewing_azimuth"] = HLP_F.stack_detectors(meta_temp["viewing_azimuth_detectors"]) # mean values of all mean band values # FIXME self.ViewingAngle = np.mean([np.nanmean(i) for i in meta_temp['viewing_zenith'].values()]) self.IncidenceAngle = np.mean([np.nanmean(i) for i in meta_temp['viewing_azimuth'].values()]) # 5000m step arrays per band. dict: key = bandindex, value = stacked_array for all detectors self.ViewingAngle_arrProv = meta_temp['viewing_zenith'] self.IncidenceAngle_arrProv = meta_temp['viewing_azimuth'] LBA2Id_dic = {'1': 0, '2': 1, '3': 2, '4': 3, '5': 4, '6': 5, '7': 6, '8': 7, '8A': 8, '9': 9, '10': 10, '11': 11, '12': 12} def filter_dic(AngleArr): return {LBAn: AngleArr[LBA2Id_dic[LBAn]] for LBAn in self.LayerBandsAssignment} self.ViewingAngle_arrProv = filter_dic(self.ViewingAngle_arrProv) self.IncidenceAngle_arrProv = filter_dic(self.IncidenceAngle_arrProv) self.FOV = get_FieldOfView(self.GMS_identifier) self.orbitParams = get_orbit_params(self.GMS_identifier) self.filter_layerdependent_metadata() self.spec_vals = get_special_values(self.GMS_identifier)
[docs] def add_rasObj_dims_projection_physUnit(self, rasObj, dict_LayerOptTherm, temp_logger=None): self.rows = rasObj.rows self.cols = rasObj.cols self.bands = rasObj.bands if rasObj.projection != '': self.map_info = geotransform2mapinfo(rasObj.geotransform, rasObj.projection) self.projection = rasObj.projection self.gResolution = abs(rasObj.geotransform[1]) self.CS_EPSG = WKT2EPSG(rasObj.projection) dict_conv_physUnit = {'Rad': "W * m-2 * sr-1 * micrometer-1", 'TOA_Ref': 'TOA_Reflectance in [0-%d]' % CFG.scale_factor_TOARef, 'BOA_Ref': 'BOA_Reflectance in [0-%d]' % CFG.scale_factor_BOARef, 'Temp': 'Degrees Celsius with scale factor = 100'} if list(set(dict_LayerOptTherm.values())) == ['optical']: self.PhysUnit = dict_conv_physUnit[CFG.target_radunit_optical] elif list(set(dict_LayerOptTherm.values())) == ['thermal']: self.PhysUnit = dict_conv_physUnit[CFG.target_radunit_thermal] elif sorted(list(set(dict_LayerOptTherm.values()))) == ['optical', 'thermal']: self.PhysUnit = ['Optical bands: %s' % dict_conv_physUnit[CFG.target_radunit_optical], 'Thermal bands: %s' % dict_conv_physUnit[CFG.target_radunit_thermal]] else: logger = self.logger if hasattr(self, 'logger') else temp_logger assert logger, "ERROR: Physical unit could not be determined due to unexpected 'dict_LayerOptTherm'. " \ "Got %s." % dict_LayerOptTherm logger.error("Physical unit could not be determined due to unexpected 'dict_LayerOptTherm'. Got %s." % dict_LayerOptTherm)
[docs] def to_odict(self): # type: () -> collections.OrderedDict """Creates an OrderedDict containing selected attribute of the METADATA object that will later be included in ENVI file headers in the same order. """ # FIXME orbit params are missing # descr_dic = dict( # FillZeroSaturated von HLP_F ausgeben lassen # ALOS_Rad= # "(1) GEOCODED Level1B2 Product; '" + self.Dataname + "'\n " # "(2) Int16 RadianceData in [W * m-2 * sr-1 * micrometer-1]*10; " # "radiance scale factor: 10 (fillPixels: -99, zeroPixels:0, saturatedPixels: 32767 (Max of Int16))'", # ALOS_Ref= # "(1) Orthorectified JAXA or GFZ; '" + self.Dataname + "'\n " # "(2) Int16 TOA_Reflectance in [0-10000]; " # "radiance scale factor: 10 (fillPixels: -99, zeroPixels:0, saturatedPixels: 32767 (Max of Int16))'", # Terra_Rad= # "(1) Orthorectified JAXA or GFZ; '" + self.Dataname + "'\n " # "(2) Int16 RadianceData in [W * m-2 * sr-1 * micrometer-1]*10; " # "radiance scale factor: 10 (fillPixels: -99, zeroPixels:0, saturatedPixels: 32767 (Max of Int16))'", # Terra_Ref= # "(1) Orthorectified JAXA or GFZ; '" + self.Dataname + "'\n " # "(2) Int16 TOA_Reflectance in [0-10000]; " # "radiance scale factor: 10 (fillPixels: -99, zeroPixels:0, saturatedPixels: 32767 (Max of Int16))'", # Landsat_Rad= # "(1) Landsat DN: "+self.ProcLCode+ " Product; '"+self.Dataname+"'\n " # "(2) Int16 RadianceData in [W * m-2 * sr-1 * micrometer-1]*10; " # "radiance scale factor: 10 (fillPixels: -99, zeroPixels:0, saturatedPixels: 32767 (Max of Int16))'", # Landsat_Ref= # "(1) Landsat DN: "+self.ProcLCode + " Product; '" + self.Dataname + "'\n " # "(2) Int16 TOA_Reflectance in [0-10000] " # "(fillPixels: -99, zeroPixels:0, saturatedPixels: 32767 (Max of Int16))'", # RapidEye_Rad= # "(1) Ortho Level3A01 Product; '"+self.Dataname+"'\n " # "(2) Int16 RadianceData in [W * m-2 * sr-1 * micrometer-1]*10; " # "radiance scale factor: 10 (fillPixels: -99, zeroPixels:0, saturatedPixels: 32767 (Max of Int16))'", # RapidEye_Ref= # "(1) Ortho Level3A01 Product; '" + self.Dataname + "'\n " # "(2) Int16 TOA_Reflectance in [0-10000] " # "(fillPixels: -99, zeroPixels:0, saturatedPixels: 32767 (Max of Int16))'", # SPOT_Rad= # "(1) Ortho Level3A01 Product; '" + self.Dataname + "'\n " # "(2) Int16 RadianceData in [W * m-2 * sr-1 * micrometer-1]*10; " # "radiance scale factor: 10 (fillPixels: -99, zeroPixels:0, saturatedPixels: 32767 (Max of Int16))'", # SPOT_Ref= # "(1) Ortho Level3A01 Product; '" + self.Dataname + "'\n " # "(2) Int16 TOA_Reflectance in [0-10000] " # "(fillPixels: -99, zeroPixels:0, saturatedPixels: 32767 (Max of Int16))'") # copy directly compatible keys Meta = collections.OrderedDict() # Meta['description'] = descr_dic[self.Satellite + '_' + CFG.target_radunit_optical] for odictKey in enviHdr_keyOrder: if odictKey in map_odictKeys_objAttrnames: attrVal = getattr(self, map_odictKeys_objAttrnames[odictKey]) if attrVal and map_odictKeys_objAttrnames[odictKey] in layerdependent_metadata: # convert band specific metadata dicts to lists in the order of LayerBandsAssignment Meta.update({odictKey: [attrVal[band] for band in self.LayerBandsAssignment if band in attrVal]}) else: Meta.update({odictKey: attrVal}) elif odictKey == 'map info' and self.map_info: Meta['map info'] = self.map_info elif odictKey == 'coordinate system string' and self.projection: Meta['coordinate system string'] = self.projection elif odictKey == 'data ignore value': Meta['data ignore value'] = self.spec_vals['fill'] if 'fill' in self.spec_vals else None elif odictKey == 'corner coordinates lonlat': Meta['corner coordinates lonlat'] = str(self.CornerTieP_LonLat).replace('(', '[').replace(')', ']') elif odictKey == 'wavelength units': Meta['wavelength units'] = "Nanometers" elif odictKey == 'band names': Meta['band names'] = self.bandnames # add keys that will not be included into ENVI header if self.ViewingAngle_arrProv is not None: Meta['ViewingAngle_arrProv'] = {k: v.tolist() for k, v in self.ViewingAngle_arrProv.items()} Meta['IncidenceAngle'] = self.IncidenceAngle if self.IncidenceAngle_arrProv is not None: Meta['IncidenceAngle_arrProv'] = {k: v.tolist() for k, v in self.IncidenceAngle_arrProv.items()} return Meta
[docs] def from_odict(self, odict): # type: (collections.OrderedDict) -> METADATA # copy directly compatible keys # [setattr(self, attrN, odict[odictKey]) for odictKey, attrN in map_odictKeys_objAttrnames.items()] for odictKey, attrN in map_odictKeys_objAttrnames.items(): if attrN not in layerdependent_metadata: setattr(self, attrN, odict[odictKey]) else: # convert band specific metadata to dicts setattr(self, attrN, dict(zip(odict['LayerBandsAssignment'], odict[odictKey]))) # set the remaining attributes if 'map info' in odict: self.map_info = odict['map info'] if 'coordinate system string' in odict: self.projection = odict['coordinate system string'] if 'data ignore value' in odict: self.spec_vals['fill'] = odict['data ignore value'] if 'ViewingAngle_arrProv' in odict: self.ViewingAngle_arrProv = {bN: np.array(odict['ViewingAngle_arrProv'][bN]) for bN in self.LayerBandsAssignment if bN in odict['ViewingAngle_arrProv']} if 'IncidenceAngle_arrProv' in odict: self.IncidenceAngle_arrProv = {bN: np.array(odict['IncidenceAngle_arrProv'][bN]) for bN in self.LayerBandsAssignment if bN in odict['IncidenceAngle_arrProv']} return self
[docs] def calc_solar_irradiance_CWL_FWHM_per_band(self): # type: () -> (dict, dict, dict) sensorcode = get_GMS_sensorcode(self.GMS_identifier) # ms_pan = ('multi' if self.nBands > 1 else 'pan') irr_bands, cwl_bands, fwhm_bands = {}, {}, {} if not sensorcode: self.logger.warning('GMS-sensorcode missing. Provider values are used for solar irradiation, CWL and FWHM.') else: self.logger.info('Calculating solar irradiance, central wavelengths and full width half maxima...') sol_irr = Solar_Irradiance_reader(wvl_min_nm=350, wvl_max_nm=2500) srf_dict = SRF_Reader(self.GMS_identifier, no_pan=False, no_thermal=False) for band in srf_dict.keys(): if srf_dict[band] is None: irr_bands[band] = None cwl_bands[band] = None fwhm_bands[band] = None else: WVL_band = (srf_dict[band][:, 0] if 300 < np.max(srf_dict[band][:, 0]) < 15000 else srf_dict[band][:, 0] * 1000) # reads wavelengths given in nm and µm RSP_band = srf_dict[band][:, 1] # sol_irr_at_WVL = \ # scipy.interpolate.interp1d(sol_irr[:, 0], sol_irr[:, 1], kind='linear')(WVL_band) # ASTER # band 8: ValueError: A value in x_new is above the interpolation range. sol_irr_at_WVL = np.interp(WVL_band, sol_irr[:, 0], sol_irr[:, 1], left=0, right=0) irr_bands[band] = round(np.sum(sol_irr_at_WVL * RSP_band) / np.sum(RSP_band), 2) cwl_bands[band] = round(np.sum(WVL_band * RSP_band) / np.sum(RSP_band), 2) fwhm_bands[band] = round(np.max(WVL_band[RSP_band >= (np.max(RSP_band) / 2.)]) - np.min(WVL_band[RSP_band >= (np.max(RSP_band) / 2.)]), 2) return irr_bands, cwl_bands, fwhm_bands
[docs] def get_EarthSunDistance(self, acqDate): """Get earth sun distance (requires file of pre calculated earth sun distance per day) :param acqDate: """ if not os.path.exists(CFG.path_earthSunDist): self.logger.warning("\n\t WARNING: Earth Sun Distance is assumed to be " "1.0 because no database can be found at %s.""" % CFG.path_earthSunDist) return 1.0 if not acqDate: self.logger.warning("\n\t WARNING: Earth Sun Distance is assumed to be 1.0 because " "acquisition date could not be read from metadata.") return 1.0 with open(CFG.path_earthSunDist, "r") as EA_dist_f: EA_dist_dict = {} for line in EA_dist_f: date, EA = [item.strip() for item in line.split(",")] EA_dist_dict[date] = EA return float(EA_dist_dict[acqDate])
[docs] def calc_center_acquisition_time(self, fullSceneCornerLonLat, logger): """Calculates a missing center acquistion time using acquisition date, full scene corner coordinates and solar azimuth. :param fullSceneCornerLonLat: :param logger: """ assert is_dataset_provided_as_fullScene(self.GMS_identifier) and len(fullSceneCornerLonLat) == 4, \ 'Center acquisition time can only be computed for datasets provided as full scenes, not for tiles.' ul, lr = fullSceneCornerLonLat[0], fullSceneCornerLonLat[3] center_coord = [np.mean([ul[0], lr[0]]), np.mean([ul[1], lr[1]])] time0_ord = mdates.date2num( datetime.datetime.strptime('%s %s' % (self.AcqDate, '00:00:00'), '%Y-%m-%d %H:%M:%S')) time1_ord = mdates.date2num( datetime.datetime.strptime('%s %s' % (self.AcqDate, '23:59:59'), '%Y-%m-%d %H:%M:%S')) time_stamps_ord = np.linspace(time0_ord, time1_ord, 12 * 60 * 60) time_stamps_with_tzinfo = mdates.num2date(time_stamps_ord) time_stamps = np.array([time_stamps_with_tzinfo[i].replace(tzinfo=None) for i in range(len(time_stamps_with_tzinfo))]) sols_az_rad = astronomy.get_alt_az(time_stamps, [center_coord[0]] * time_stamps.size, [center_coord[1]] * time_stamps.size)[1] sol_azs = 180 * sols_az_rad / math.pi diff_az = np.abs(float(self.SunAzimuth) - sol_azs) acq_datetime = time_stamps[np.where(diff_az == np.min(diff_az))][0] AcqTime = acq_datetime.strftime(format='%H:%M:%S') logger.info('Center acquisition time has been calculated: %s' % AcqTime) # update self. self.AcqDateTime = datetime.datetime.strptime( '%s %s%s' % (self.AcqDate, AcqTime, '.000000+0000'), '%Y-%m-%d %H:%M:%S.%f%z') return self.AcqTime
[docs] def get_overpassDuration_SceneLength(self, fullSceneCornerLonLat, fullSceneCornerPos, shape_fullArr, logger): """Calculates duration of image acquisition in seconds. :param fullSceneCornerLonLat: :param fullSceneCornerPos: :param shape_fullArr: :param logger: """ assert is_dataset_provided_as_fullScene(self.GMS_identifier) and len(fullSceneCornerLonLat) == 4, \ 'Overpass duration and scene length can only be computed for datasets provided as full scenes, not for ' \ 'tiles.' # check if current scene is a subset assert fullSceneCornerPos != list(([0, 0], [0, shape_fullArr[1] - 1], [shape_fullArr[0] - 1, 0], [shape_fullArr[0] - 1, shape_fullArr[1] - 1])),\ 'Overpass duration and scene length cannot be calculated because the given data represents a subset of ' \ 'the original scene.' # compute scene length orbitAltitudeKm, orbitPeriodMin = self.orbitParams[0], self.orbitParams[2] UL, UR, LL, LR = fullSceneCornerLonLat geod = pyproj.Geod(ellps='WGS84') scene_length = np.mean([geod.inv(UL[0], UL[1], LL[0], LL[1])[2], geod.inv(UR[0], UR[1], LR[0], LR[1])[2]]) / 1000 # along-track distance [km] logger.info('Calculation of scene length...: %s km' % round(float(scene_length), 2)) # compute overpass duration orbitPeriodLength = 2 * math.pi * (6371 + orbitAltitudeKm) overpassDurationSec = (scene_length / orbitPeriodLength) * orbitPeriodMin * 60. logger.info('Calculation of overpass duration...: %s sec' % round(overpassDurationSec, 2)) return overpassDurationSec, scene_length
[docs] def filter_layerdependent_metadata(self): for attrname in layerdependent_metadata: attrVal = getattr(self, attrname) if not attrVal: continue if isinstance(attrVal, dict): setattr(self, attrname, {bN: attrVal[bN] for bN in self.LayerBandsAssignment}) elif isinstance(attrVal, collections.OrderedDict): setattr(self, attrname, collections.OrderedDict((bN, attrVal[bN]) for bN in self.LayerBandsAssignment)) else: raise ValueError if attrVal: assert len(getattr(self, attrname)) == len(self.LayerBandsAssignment)
layerdependent_metadata = ['SolIrradiance', 'CWL', 'FWHM', 'Offsets', 'OffsetsRef', 'Gains', 'GainsRef', 'ThermalConstK1', 'ThermalConstK2', 'ViewingAngle_arrProv', 'IncidenceAngle_arrProv'] map_odictKeys_objAttrnames = { 'samples': 'cols', 'lines': 'rows', 'bands': 'bands', 'version_gms_preprocessing': 'version_gms_preprocessing', 'versionalias_gms_preprocessing': 'versionalias_gms_preprocessing', 'CS_EPSG': 'CS_EPSG', 'CS_TYPE': 'CS_TYPE', 'CS_DATUM': 'CS_DATUM', 'CS_UTM_ZONE': 'CS_UTM_ZONE', 'scene length': 'scene_length', 'wavelength': 'CWL', 'bandwidths': 'FWHM', 'LayerBandsAssignment': 'LayerBandsAssignment', 'data gain values': 'Gains', 'data offset values': 'Offsets', 'reflectance gain values': 'GainsRef', 'reflectance offset values': 'OffsetsRef', 'Metafile': 'Metafile', 'Satellite': 'Satellite', 'Sensor': 'Sensor', 'Subsystem': 'Subsystem', 'EntityID': 'EntityID', 'SceneID': 'SceneID', 'gResolution': 'gResolution', 'AcqDate': 'AcqDate', 'AcqTime': 'AcqTime', 'overpass duraction sec': 'overpassDurationSec', 'ProcLCode': 'ProcLCode', 'SunElevation': 'SunElevation', 'SunAzimuth': 'SunAzimuth', 'SolIrradiance': 'SolIrradiance', 'ThermalConstK1': 'ThermalConstK1', 'ThermalConstK2': 'ThermalConstK2', 'EarthSunDist': 'EarthSunDist', 'ViewingAngle': 'ViewingAngle', 'IncidenceAngle': 'IncidenceAngle', 'FieldOfView': 'FOV', 'PhysUnit': 'PhysUnit', 'ScaleFactor': 'ScaleFactor', 'Quality': 'Quality', 'Additional': 'additional' }
[docs]def get_LayerBandsAssignment(GMS_id, nBands=None, sort_by_cwl=None, no_thermal=None, no_pan=None, return_fullLBA=False, proc_level=''): # type: (GMS_identifier, int, bool, bool, bool, bool, str) -> list """Returns LayerBandsAssignment corresponding to given satellite, sensor and subsystem and with respect to CFG.sort_bands_by_cwl, CFG.skip_thermal and CFG.skip_pan. :param GMS_id: <dict>, derived from self.get_GMS_identifier() NOTE: only if there is an additional key 'proc_level', the processing level will be respected. This is needed to get the correct LBA after atm. correction :param nBands: should be specified if number of bands differs from standard (e.g. SPOT data containing no PAN) :param sort_by_cwl: whether to sort the returned bands list by central wavelength position (default: CFG.sort_bands_by_cwl) :param no_thermal: whether to exclude thermal bands from the returned bands list (default: CFG.skip_thermal) :param no_pan: whether to exclude panchromatic bands from the returned bands list (default: CFG.skip_pan) :param return_fullLBA: in case there is a subsystem: whether to return LayerBandsAssignment for all bands or for the current subsystem :param proc_level: processing level for which the LayerBandsAssignment is returned (overrides the proc_level given with GMS_id) """ # set defaults # NOTE: these values cannot be set in function signature because CFG is not present at library import time sort_by_cwl = sort_by_cwl if sort_by_cwl is not None else CFG.sort_bands_by_cwl no_thermal = no_thermal if no_thermal is not None else CFG.skip_thermal no_pan = no_pan if no_pan is not None else CFG.skip_pan proc_level = proc_level or GMS_id.proc_level if GMS_id.image_type == 'RSD': GMS_sensorcode = get_GMS_sensorcode(GMS_id) assert GMS_sensorcode, 'Unable to get Layer Bands Assignment. No valid sensorcode privided (got >None<). ' if return_fullLBA: GMS_sensorcode = \ 'AST_full' if GMS_sensorcode.startswith('AST') else \ 'S2A_full' if GMS_sensorcode.startswith('S2A') else \ 'S2B_full' if GMS_sensorcode.startswith('S2B') else GMS_sensorcode dict_LayerBandsAssignment = { 'AVNIR-2': ['1', '2', '3', '4'], 'AST_full': ['1', '2', '3N', '3B', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14'], 'AST_V1': ['1', '2', '3N'], 'AST_V2': ['3B'], 'AST_S': ['4', '5', '6', '7', '8', '9'], 'AST_T': ['10', '11', '12', '13', '14'], 'TM4': ['1', '2', '3', '4', '5', '6', '7'], 'TM5': ['1', '2', '3', '4', '5', '6', '7'], 'TM7': ['1', '2', '3', '4', '5', '6L', '6H', '7', '8'], 'LDCM': ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11'], 'SPOT1a': ['1', '2', '3', '4'], 'SPOT2a': ['1', '2', '3', '4'], 'SPOT3a': ['1', '2', '3', '4'], 'SPOT4a': ['1', '2', '3', '4', '5'], 'SPOT5a': ['1', '2', '3', '4', '5'], 'SPOT1b': ['1', '2', '3', '4'], 'SPOT2b': ['1', '2', '3', '4'], 'SPOT3b': ['1', '2', '3', '4'], 'SPOT4b': ['1', '2', '3', '4', '5'], 'SPOT5b': ['1', '2', '3', '4', '5'], 'RE5': ['1', '2', '3', '4', '5'], 'S2A_full': ['1', '2', '3', '4', '5', '6', '7', '8', '8A', '9', '10', '11', '12'], 'S2B_full': ['1', '2', '3', '4', '5', '6', '7', '8', '8A', '9', '10', '11', '12'], 'S2A10': ['2', '3', '4', '8'], 'S2A20': ['5', '6', '7', '8A', '11', '12'], 'S2A60': ['1', '9', '10'], 'S2B10': ['2', '3', '4', '8'], 'S2B20': ['5', '6', '7', '8A', '11', '12'], 'S2B60': ['1', '9', '10'], } dict_cwlSorted_LayerBandsAssignment = { 'TM4': ['1', '2', '3', '4', '5', '7', '6'], 'TM5': ['1', '2', '3', '4', '5', '7', '6'], 'TM7': ['1', '2', '3', '8', '4', '5', '7', '6L', '6H'], 'LDCM': ['1', '2', '3', '8', '4', '5', '9', '6', '7', '10', '11'], 'SPOT1a': ['1', '4', '2', '3'], 'SPOT2a': ['1', '4', '2', '3'], 'SPOT3a': ['1', '4', '2', '3'], 'SPOT4a': ['1', '5', '2', '3', '4'], 'SPOT5a': ['1', '5', '2', '3', '4'], 'SPOT1b': ['1', '4', '2', '3'], 'SPOT2b': ['1', '4', '2', '3'], 'SPOT3b': ['1', '4', '2', '3'], 'SPOT4b': ['1', '5', '2', '3', '4'], 'SPOT5b': ['1', '5', '2', '3', '4'], } if nBands is None or nBands == len(dict_LayerBandsAssignment[GMS_sensorcode]): assert GMS_sensorcode in dict_LayerBandsAssignment, \ 'Unable to get Layer Bands Assignment. No valid sensorcode privided (got >%s<).' % GMS_sensorcode LayerBandsAssignment = dict_LayerBandsAssignment[GMS_sensorcode] if sort_by_cwl and GMS_sensorcode in ['TM4', 'TM5', 'TM7', 'LDCM']: LayerBandsAssignment = dict_cwlSorted_LayerBandsAssignment[GMS_sensorcode] else: # special case SPOT MSI containing no PAN or SPOT PAN containing only PAN assert re.match(r'SPOT', GMS_id.satellite, re.I) and \ nBands in [len(dict_LayerBandsAssignment[GMS_sensorcode]) - 1, 1], \ "Unable to get Layer Bands Assignment. Provided number of bands doesn´t match known layer band " \ "assignments." LayerBandsAssignment = [dict_LayerBandsAssignment[GMS_sensorcode][-1]] if nBands == 1 \ else dict_LayerBandsAssignment[GMS_sensorcode][:-1] if no_thermal: LayerBandsAssignment = [i for i in LayerBandsAssignment if not isTHERMAL(GMS_id, i)] if no_pan: LayerBandsAssignment = [i for i in LayerBandsAssignment if not isPAN(GMS_id, i)] else: LayerBandsAssignment = ['1'] # remove those bands that are excluded by atmospheric corrections if proc_level >= L1C if proc_level not in [None, 'L1A', 'L1B']: # TODO replace with enum procL if CFG.target_radunit_optical == 'BOA_Ref': # return LBA after AC try: bands_after_ac = get_bands_after_AC(GMS_id) LayerBandsAssignment = [i for i in LayerBandsAssignment if i in bands_after_ac] except ACNotSupportedError: # atmospheric correction is not yet supported -> LBA will be the same after L1C pass if proc_level in ['L2B', 'L2C']: # handle different number of bands after spectral homogenization to target sensor if GMS_id.dataset_ID == CFG.datasetid_spectral_ref: pass # return normal LBA from above elif CFG.datasetid_spectral_ref is not None: # the target sensor is NOT a custom sensor but has the spectral characteristics of a known sensor # => use the LBA of the target sensor after AC as the new LBA for the requested sensor # find out how the spectral characteristics of this known target sensor look like after AC from ..model.gms_object import GMS_identifier # noqa F811 # redefinition of unused 'GMS_identifier' tgt_sat, tgt_sen = datasetid_to_sat_sen(CFG.datasetid_spectral_ref) tgt_GMSid = GMS_identifier(image_type='RSD', satellite=tgt_sat, sensor=tgt_sen, subsystem='', proc_level='L2A', dataset_ID=-9999, logger=None) try: tgt_sen_LBA = get_bands_after_AC(tgt_GMSid) except ACNotSupportedError: # use the target sensor LBA before AC (because target sensor could not be atmospherically corrected) tgt_GMSid.proc_level = 'L1B' tgt_sen_LBA = get_LayerBandsAssignment(tgt_GMSid) LayerBandsAssignment = tgt_sen_LBA else: # fallback: return a LBA matching the number of bands after spectral homogenization LayerBandsAssignment = [str(i + 1) for i in range(len(CFG.target_CWL))] return LayerBandsAssignment
[docs]def get_bands_after_AC(GMS_id): # type: (GMS_identifier) -> List[str] """Returns a list of bands that are not removed by atmospheric correction. :param GMS_id: <dict>, derived from self.get_GMS_identifier() :return: e.g. ['1', '2', '3', '4', '5', '6', '7', '9'] for Landsat-8 """ path_ac_options = get_path_ac_options(GMS_id) if not path_ac_options or not os.path.exists(path_ac_options): raise ACNotSupportedError('Atmospheric correction is not yet supported for %s %s.' % (GMS_id.satellite, GMS_id.sensor)) # FIXME this does not work for L7 # NOTE: don't validate because options contain pathes that do not exist on another server ac_bandNs = get_ac_options(path_ac_options, validation=False)['AC']['bands'] ac_out_bands = [bN.split('B0')[1] if bN.startswith('B0') else bN.split('B')[1] for bN in ac_bandNs] # sorted return ac_out_bands
[docs]def get_center_wavelengths_by_LBA(satellite, sensor, LBA, subsystem=None): # type: (str, str, list, str) -> List[float] """Returns a list of center wavelengths of spectral bands for the given satellite/sensor/LayerBandsAss. combination. :param satellite: target satellite (e.g., 'Sentinel-2A') :param sensor: target sensor (e.g., 'MSI') :param LBA: LayerBandsAssignment :param subsystem: target sensor subsystem (e.g., 'VNIR') """ srf = RSR(satellite=satellite, sensor=sensor, subsystem=subsystem, LayerBandsAssignment=LBA) return list(srf.wvl)
[docs]def get_dict_LayerOptTherm(GMS_id, LayerBandsAssignment): dict_out = collections.OrderedDict() [dict_out.update({lr: 'thermal' if isTHERMAL(GMS_id, lr) else 'optical'}) for lr in LayerBandsAssignment] return dict_out
[docs]def isPAN(GMS_id, LayerNr): GMS_sensorcode = get_GMS_sensorcode(GMS_id) dict_isPAN = {'TM7': ['8'], 'LDCM': ['8'], 'SPOT1a': ['4'], 'SPOT2a': ['4'], 'SPOT3a': ['4'], 'SPOT4a': ['5'], 'SPOT5a': ['5'], 'SPOT1b': ['4'], 'SPOT2b': ['4'], 'SPOT3b': ['4'], 'SPOT4b': ['5'], 'SPOT5b': ['5']} return True if GMS_sensorcode in dict_isPAN and LayerNr in dict_isPAN[GMS_sensorcode] else False
[docs]def isTHERMAL(GMS_id, LayerNr): GMS_sensorcode = get_GMS_sensorcode(GMS_id) dict_isTHERMAL = {'TM4': ['6'], 'TM5': ['6'], 'TM7': ['6L', '6H'], 'LDCM': ['10', '11'], 'AST_T': ['10', '11', '12', '13', '14']} return True if GMS_sensorcode in dict_isTHERMAL and LayerNr in dict_isTHERMAL[GMS_sensorcode] else False
[docs]def get_FieldOfView(GMS_id): GMS_sensorcode = get_GMS_sensorcode(GMS_id) dict_FOV = {'AVNIR-2': 5.79, 'AST_V1': 6.09, 'AST_V2': 5.19, 'AST_S': 4.9, 'AST_T': 4.9, # http://eospso.gsfc.nasa.gov/sites/default/files/mission_handbooks/Terra.pdf 'TM4': 14.92, 'TM5': 14.92, 'TM7': 14.92, 'LDCM': 14.92, 'SPOT1a': 4.18, 'SPOT2a': 4.18, 'SPOT3a': 4.18, 'SPOT4a': 4.18, 'SPOT5a': 4.18, 'SPOT1b': 4.18, 'SPOT2b': 4.18, 'SPOT3b': 4.18, 'SPOT4b': 4.18, 'SPOT5b': 4.18, 'RE5': 6.99, 'S2A10': 20.6, 'S2A20': 20.6, 'S2A60': 20.6, 'S2B10': 20.6, 'S2B20': 20.6, 'S2B60': 20.6} return dict_FOV[GMS_sensorcode]
[docs]def get_orbit_params(GMS_id): GMS_sensorcode = get_GMS_sensorcode(GMS_id) # sensor altitude above ground [kilometers] dict_altitude = {'AVNIR-2': 691.65, 'AST_V1': 705, 'AST_V2': 705, 'AST_S': 705, 'AST_T': 705, 'TM4': 705, 'TM5': 705, 'TM7': 705, 'LDCM': 705, 'SPOT1a': 822, 'SPOT2a': 822, 'SPOT3a': 822, 'SPOT4a': 822, 'SPOT5a': 822, 'SPOT1b': 822, 'SPOT2b': 822, 'SPOT3b': 822, 'SPOT4b': 822, 'SPOT5b': 822, 'RE5': 630, 'S2A10': 786, 'S2A20': 786, 'S2A60': 786, 'S2B10': 786, 'S2B20': 786, 'S2B60': 786} # sensor inclination [degrees] dict_inclination = {'AVNIR-2': 98.16, 'AST_V1': 98.3, 'AST_V2': 98.3, 'AST_S': 98.3, 'AST_T': 98.3, 'TM4': 98.2, 'TM5': 98.2, 'TM7': 98.2, 'LDCM': 98.2, 'SPOT1a': 98.7, 'SPOT2a': 98.7, 'SPOT3a': 98.7, 'SPOT4a': 98.7, 'SPOT5a': 98.7, 'SPOT1b': 98.7, 'SPOT2b': 98.7, 'SPOT3b': 98.7, 'SPOT4b': 98.7, 'SPOT5b': 98.7, 'RE5': 98, 'S2A10': 98.62, 'S2A20': 98.62, 'S2A60': 98.62, 'S2B10': 98.62, 'S2B20': 98.62, 'S2B60': 98.62} # time needed for one complete earth revolution [minutes] dict_period = {'AVNIR-2': 98.7, 'AST_V1': 98.88, 'AST_V2': 98.88, 'AST_S': 98.88, 'AST_T': 98.88, 'TM4': 98.9, 'TM5': 98.9, 'TM7': 98.9, 'LDCM': 98.9, 'SPOT1a': 101.4, 'SPOT2a': 101.4, 'SPOT3a': 101.4, 'SPOT4a': 101.4, 'SPOT5a': 101.4, 'SPOT1b': 101.4, 'SPOT2b': 101.4, 'SPOT3b': 101.4, 'SPOT4b': 101.4, 'SPOT5b': 101.4, 'RE5': 96.7, 'S2A10': 100.6, 'S2A20': 100.6, 'S2A60': 100.6, 'S2B10': 100.6, 'S2B20': 100.6, 'S2B60': 100.6} return [dict_altitude[GMS_sensorcode], dict_inclination[GMS_sensorcode], dict_period[GMS_sensorcode]]
[docs]def get_special_values(GMS_id): GMS_sensorcode = get_GMS_sensorcode(GMS_id) # type: str dict_fill = {'AVNIR-2': 0, 'AST_V1': 0, 'AST_V2': 0, 'AST_S': 0, 'AST_T': 0, 'TM4': 0, 'TM5': 0, 'TM7': 0, 'LDCM': 0, 'SPOT1a': 0, 'SPOT2a': 0, 'SPOT3a': 0, 'SPOT4a': 0, 'SPOT5a': 0, 'SPOT1b': 0, 'SPOT2b': 0, 'SPOT3b': 0, 'SPOT4b': 0, 'SPOT5b': 0, 'RE5': 0, 'S2A10': 0, 'S2A20': 0, 'S2A60': 0, 'S2B10': 0, 'S2B20': 0, 'S2B60': 0, } dict_zero = {'AVNIR-2': None, 'AST_V1': 1, 'AST_V2': 1, 'AST_S': 1, 'AST_T': 1, 'TM4': None, 'TM5': None, 'TM7': None, 'LDCM': None, 'SPOT1a': None, 'SPOT2a': None, 'SPOT3a': None, 'SPOT4a': None, 'SPOT5a': None, 'SPOT1b': None, 'SPOT2b': None, 'SPOT3b': None, 'SPOT4b': None, 'SPOT5b': None, 'RE5': None, 'S2A10': None, 'S2A20': None, 'S2A60': None, 'S2B10': None, 'S2B20': None, 'S2B60': None, } dict_saturated = {'AVNIR-2': None, 'AST_V1': 255, 'AST_V2': 255, 'AST_S': 255, 'AST_T': 65535, 'TM4': None, 'TM5': None, 'TM7': None, 'LDCM': 65535, 'SPOT1a': 255, 'SPOT2a': 255, 'SPOT3a': 255, 'SPOT4a': 255, 'SPOT5a': 255, 'SPOT1b': 255, 'SPOT2b': 255, 'SPOT3b': 255, 'SPOT4b': 255, 'SPOT5b': 255, 'RE5': None, 'S2A10': 65535, 'S2A20': 65535, 'S2A60': 65535, 'S2B10': 65535, 'S2B20': 65535, 'S2B60': 65535 } return {'fill': dict_fill[GMS_sensorcode], 'zero': dict_zero[GMS_sensorcode], 'saturated': dict_saturated[GMS_sensorcode]}
[docs]def metaDict_to_metaODict(metaDict, logger=None): """Converts a GMS metadata dictionary to an ordered dictionary according to the sorting given in Output_writer.enviHdr_keyOrder. :param metaDict: <dict> GMS metadata dictionary :param logger: <logging.logger> if given, warnings will be logged. Otherwise they are raised. """ from ..io.output_writer import enviHdr_keyOrder expected_keys = [k for k in enviHdr_keyOrder if k in metaDict] only_gmsFile_keys = ['ViewingAngle_arrProv', 'IncidenceAngle_arrProv', 'projection'] unexpected_keys = [k for k in metaDict.keys() if k not in expected_keys and k not in only_gmsFile_keys] if unexpected_keys: msg = 'Got unexpected keys in metadata dictionary: %s. Adding them at the end of output header files.' \ % ', '.join(unexpected_keys) if logger: logger.warning(msg) else: warnings.warn(msg) meta_vals = [metaDict[k] for k in expected_keys] + [metaDict[k] for k in unexpected_keys] return collections.OrderedDict(zip(expected_keys + unexpected_keys, meta_vals))
[docs]def LandsatID2dataset(ID_list): dataset_list = [] for ID in ID_list: dataset = dict(image_type='RSD', satellite=None, sensor=None, subsystem=None, acquisition_date=None, entity_ID=ID) dataset['satellite'] = 'Landsat-5' if ID[:3] == 'LT5' else 'Landsat-7' if ID[:3] == 'LE7' \ else 'Landsat-8' if ID[:3] == 'LC8' else dataset['satellite'] dataset['sensor'] = 'TM' if ID[:3] == 'LT5' else 'ETM+' if ID[:3] == 'LE7' else \ 'OLI_TIRS' if ID[:3] == 'LC8' else dataset['satellite'] dataset['subsystem'] = None dataset['acquisition_date'] = \ (datetime.datetime(int(ID[9:13]), 1, 1) + datetime.timedelta(int(ID[13:16]) - 1)).strftime('%Y-%m-%d') dataset_list.append(dataset) return dataset_list
[docs]def get_sensormode(dataset): if re.search(r'SPOT', dataset['satellite']): path_archive = path_generator(dataset).get_local_archive_path_baseN() dim_ = open_specific_file_within_archive(path_archive, '*/scene01/metadata.dim')[0] SPOT_mode = re.search(r"<SENSOR_CODE>([a-zA-Z0-9]*)</SENSOR_CODE>", dim_, re.I).group(1) assert SPOT_mode in ['J', 'X', 'XS', 'A', 'P', 'M'], 'Unknown SPOT sensor mode: %s' % SPOT_mode return 'M' if SPOT_mode in ['J', 'X', 'XS'] else 'P' else: return 'M'