# -*- coding: utf-8 -*-
# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
#
# Copyright (C) 2020 Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
#
# This software was developed within the context of the GeoMultiSens project funded
# by the German Federal Ministry of Education and Research
# (project grant code: 01 IS 14 010 A-C).
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later version.
# Please note the following exception: `gms_preprocessing` depends on tqdm, which
# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
import collections
import glob
import os
import re
import tempfile
import warnings
import uuid
from logging import Logger
from typing import Union, TYPE_CHECKING # noqa F401 # flake8 issue
from ..options.config import GMS_config as CFG
from .definition_dicts import get_GMS_sensorcode
if TYPE_CHECKING:
from ..model.gms_object import GMS_identifier # noqa F401 # flake8 issue
# get_scene_and_dataset_infos_from_postgreSQLdb # inline in order to avoid circular dependencies
__author__ = 'Daniel Scheffler'
[docs]class path_generator(object):
"""Methods return absolute paths corresponding to the input object.
To be instanced with the dict of a L1A/L1B/... object or a list with the attributes below.
If 'scene_ID' (integer) is passed to kwargs, all eventually given args are ignored.
Instead they are retrieved from postgreSQLdb."""
def __init__(self, *args, **kwargs):
if 'scene_ID' in kwargs:
from .database_tools import get_scene_and_dataset_infos_from_postgreSQLdb
args = [get_scene_and_dataset_infos_from_postgreSQLdb(kwargs['scene_ID'])] # return [dict]
assert len(args) in [1, 8, 9], "Received invalid length of 'args' argument."
isdict = True if len(args) == 1 and type(args[0] in [dict, collections.OrderedDict]) else False
if not isdict and len(args) == 8:
args += (None,) # set logger to None if not given in tuple
elif isdict and 'logger' not in args[0]:
args[0].update({'logger': None}) # set logger to None if not given in dict
argsdict = args[0] if isdict else dict(zip(['proc_level', 'image_type', 'satellite', 'sensor', 'subsystem',
'acq_datetime', 'entity_ID', 'filename', 'logger'], list(args)))
self.proc_level = kwargs.get('proc_level', argsdict['proc_level'])
self.image_type = argsdict['image_type']
self.satellite = argsdict['satellite']
# sensor: database distinguishes SLC_OFF and SLC_ON but file structure not
self.sensor = argsdict['sensor'] if not argsdict['sensor'] in ['ETM+_SLC_OFF', 'ETM+_SLC_ON'] else 'ETM+'
self.subsystem = argsdict['subsystem']
self.AcqDate = argsdict['acq_datetime']
self.entity_ID = argsdict['entity_ID']
self.filename = argsdict['filename']
self.logger = argsdict['logger']
self.MGRS_info = kwargs.get('MGRS_info', None)
def __getstate__(self):
"""Defines how the attributes of path_generator are pickled."""
if self.logger not in [None, 'not set']:
self.logger.close()
self.logger = None
return self.__dict__
def __setstate__(self, ObjDict):
"""Defines how the attributes of GMS object are unpickled."""
self.__dict__ = ObjDict
[docs] def get_path_rawdata(self):
"""Returns the folder of all downloaded data for the current scene."""
if self.sensor and re.search(self.sensor, 'SRTM', re.I):
return os.path.join(CFG.path_archive, self.satellite, self.sensor,
self.subsystem) # FIXME downloader should store data into sensor folder
else:
return os.path.join(CFG.path_archive, self.satellite, self.sensor)
[docs] def get_path_procdata(self):
"""Returns the target folder of all processed data for the current scene."""
pOrd = (CFG.path_procdata_MGRS, 'virtual_sensor_id_%s' % CFG.virtual_sensor_id,
self.MGRS_info['grid1mil'], self.MGRS_info['grid100k'], self.entity_ID) if self.MGRS_info else \
(CFG.path_procdata_scenes, self.satellite, self.sensor, self.entity_ID)
return os.path.join(*pOrd)
[docs] def get_baseN(self, merged_subsystems=False):
"""Returns the basename belonging to the given scene.
:param merged_subsystems: if True, a subsystem is not included in the returned basename
(usefor for merged subsystems in L2A+)
"""
if self.subsystem and not merged_subsystems:
items2include = (self.satellite, self.sensor, self.subsystem, self.entity_ID)
else:
items2include = (self.satellite, self.sensor, self.entity_ID)
if self.MGRS_info:
items2include += (self.MGRS_info['tile_ID'],)
return '__'.join(list(items2include))
[docs] def get_path_logfile(self, merged_subsystems=False):
"""Returns the path of the logfile belonging to the given scene, e.g. '/path/to/file/file.log'.
:param merged_subsystems: if True, a subsystem is not included in the returned logfile path
(usefor for merged subsystems in L2A+)
"""
return os.path.join(self.get_path_procdata(), self.get_baseN(merged_subsystems=merged_subsystems) + '.log')
[docs] def get_local_archive_path_baseN(self):
"""Returns the path of the downloaded raw data archive, e.g. '/path/to/file/file.tar.gz'."""
folder_rawdata = self.get_path_rawdata()
self.filename = self.filename if self.filename else self.entity_ID
if os.path.exists(os.path.join(folder_rawdata, self.filename)):
outP = os.path.join(folder_rawdata, self.filename)
else:
extensions_found = [ext for ext in ['.tar.gz', '.zip', '.hdf']
if os.path.exists(os.path.join(folder_rawdata, '%s%s' % (self.filename, ext)))]
if extensions_found:
assert len(extensions_found) > 0, \
'The dataset %s.* cannot be found at %s' % (self.filename, folder_rawdata)
assert len(extensions_found) == 1, "The folder %s contains multiple files identified as raw data " \
"to be processed. Choosing first one.." % folder_rawdata
outP = os.path.join(folder_rawdata, '%s%s' % (self.filename, extensions_found[0]))
else:
if self.filename.endswith('.SAFE') and \
os.path.exists(os.path.join(folder_rawdata, os.path.splitext(self.filename)[0]) + '.zip'):
outP = os.path.join(folder_rawdata,
os.path.splitext(self.filename)[0]) + '.zip' # FIXME Bug in Datenbank
else:
raise FileNotFoundError('The dataset %s.* cannot be found at %s'
% (self.filename, folder_rawdata)) # TODO DOWNLOAD COMMAND
return outP
[docs] def get_path_gmsfile(self):
"""Returns the path of the .gms file belonging to the given processing level, e.g. '/path/to/file/file.gms'."""
return os.path.join(self.get_path_procdata(), '%s_%s.gms' % (self.get_baseN(), self.proc_level))
[docs] def get_path_imagedata(self):
"""Returns the path of the .bsq file belonging to the given processing level, e.g. '/path/to/file/file.bsq'."""
return os.path.join(self.get_path_procdata(), '%s_image_data_%s.bsq' % (self.get_baseN(), self.proc_level))
[docs] def get_path_maskdata(self):
"""Returns the path of the *_masks_*.bsq file belonging to the given processing level,
e.g. '/path/to/file/file_masks_L1A.bsq'."""
return os.path.join(self.get_path_procdata(), '%s_masks_%s.bsq' % (self.get_baseN(), self.proc_level))
[docs] def get_path_cloudmaskdata(self):
"""Returns the path of the *_mask_clouds_*.bsq file belonging to the given processing level,
e.g. '/path/to/file/file_mask_clouds_L1A.bsq'."""
return os.path.join(self.get_path_procdata(), '%s_mask_clouds_%s.bsq' % (self.get_baseN(), self.proc_level))
[docs] def get_path_accuracylayers(self):
"""Returns the path of the *_accuracy_layers_*.bsq file, e.g., '/path/to/file/file_accuracy_layers_L2C.bsq'.
NOTE: Accuracy layers are only present in L2C.
"""
if self.proc_level == 'L2C':
return os.path.join(self.get_path_procdata(), '%s_accuracy_layers_%s.bsq'
% (self.get_baseN(), self.proc_level))
[docs] def get_path_tempdir(self):
path_archive = self.get_local_archive_path_baseN()
RootName = os.path.splitext(os.path.basename(path_archive))[0]
RootName = os.path.splitext(RootName)[0] if os.path.splitext(RootName)[1] else RootName
RootName += '__' + uuid.uuid4().hex # add a hex code in order to get uniqueness
return os.path.join(CFG.path_tempdir, RootName, self.sensor, self.subsystem) \
if self.subsystem else os.path.join(CFG.path_tempdir, RootName, self.sensor)
[docs] def get_outPath_hdr(self, attrName2write):
# type: (str) -> str
"""Returns the output path for the given attribute to be written.
:param attrName2write: <str> name of the GMS object attribute to be written"""
outNameSuffix = 'image_data' if attrName2write == 'arr' else attrName2write
outNameHdr = '%s_%s_%s.hdr' % (self.get_baseN(), outNameSuffix, self.proc_level) if outNameSuffix else \
'%s_%s.hdr' % (self.get_baseN(), self.proc_level)
return os.path.join(self.get_path_procdata(), outNameHdr)
[docs] def get_pathes_all_procdata(self): # TODO
image = self.get_path_imagedata()
mask = self.get_path_maskdata()
mask_clouds = self.get_path_cloudmaskdata()
accuracylayers = self.get_path_accuracylayers()
gms_file = self.get_path_gmsfile()
log_file = self.get_path_logfile()
all_pathes = [image, mask, mask_clouds, accuracylayers, gms_file, log_file]
warnings.warn(
'get_pathes_all_procdata() is not yet completely implemented and will not return complete path list!')
return all_pathes
[docs]def get_tempfile(ext=None, prefix=None, tgt_dir=None):
"""Returns the path to a tempfile.mkstemp() file that can be passed to any function that expects a physical path.
The tempfile has to be deleted manually.
:param ext: file extension (None if None)
:param prefix: optional file prefix
:param tgt_dir: target directory (automatically set if None)
"""
if tgt_dir is None:
tgt_dir = CFG.path_tempdir
prefix = 'GeoMultiSens__' if prefix is None else prefix
fd, path = tempfile.mkstemp(prefix=prefix, suffix=ext, dir=tgt_dir)
os.close(fd)
return path
[docs]def get_path_cloud_class_obj(GMS_id, get_all=False):
"""Returns the absolute path of the the training data used by cloud classifier.
:param GMS_id:
:param get_all:
"""
GMS_sensorcode = get_GMS_sensorcode(GMS_id)
satellite, sensor, logger = GMS_id.satellite, GMS_id.sensor, GMS_id.logger
path_cloud_classifier_objects = CFG.path_cloud_classif
obj_name_dic = {
'AVNIR-2': None,
'TM4': None,
'TM5': None,
'TM7': None,
'LDCM': None,
'SPOT1a': None,
'SPOT1b': None,
'SPOT2a': None,
'SPOT2b': None,
'SPOT3a': None,
'SPOT3b': None,
'SPOT4a': None,
'SPOT4b': None,
'SPOT5a': None,
'SPOT5b': None,
'RE5': None,
'AST_V1': None,
'AST_V2': None,
'AST_S': None,
'AST_T': None,
'S2A10': 'S2__cld_mask_20160321_s2_8l0.200_11l0.060_12l0.040_v20161124_16:41:04.h5',
'S2A20': 'S2__cld_mask_20160321_s2_8l0.200_11l0.060_12l0.040_v20161124_16:41:04.h5',
'S2A60': 'S2__cld_mask_20160321_s2_8l0.200_11l0.060_12l0.040_v20161124_16:41:04.h5',
'S2B10': 'S2__cld_mask_20160321_s2_8l0.200_11l0.060_12l0.040_v20161124_16:41:04.h5',
'S2B20': 'S2__cld_mask_20160321_s2_8l0.200_11l0.060_12l0.040_v20161124_16:41:04.h5',
'S2B60': 'S2__cld_mask_20160321_s2_8l0.200_11l0.060_12l0.040_v20161124_16:41:04.h5'}
if get_all: # returns a list
listClf = glob.glob(os.path.join(path_cloud_classifier_objects, '*.dill'))
classifier_names = listClf if listClf != [] else None
classifier_path = [os.path.join(path_cloud_classifier_objects, str(i)) for i in classifier_names]
if not os.path.isfile(os.path.join(path_cloud_classifier_objects, classifier_path)[0]):
logger.warning('Cloud masking is not yet implemented for %s %s.' % (satellite, sensor))
classifier_path = None
else:
try:
classif_objName = obj_name_dic[GMS_sensorcode]
if classif_objName:
classifier_path = os.path.join(path_cloud_classifier_objects, classif_objName)
if not os.path.isfile(os.path.join(path_cloud_classifier_objects, classifier_path)):
warnings.warn("Path generator expects a specific cloud mask object (%s) at %s but it does not "
"exist. Are you sure that 'path_cloud_classif' has been correctly set in the config "
"table of postgreSQL database and that the file is included in the repository? By "
"default the classifier object should be available at "
"<GMS root dir>/database/cloud_classifier/"
% (classif_objName, path_cloud_classifier_objects))
logger.warning(
'Cloud masking not possible for %s %s due to environment error.' # TODO move to environment
% (satellite, sensor))
classifier_path = None
else:
logger.warning('Cloud masking is not yet implemented for %s %s.' % (satellite, sensor))
classifier_path = None
except KeyError:
logger.warning("Sensorcode '%s' is not included in sensorcode dictionary and can not be converted into GMS "
"sensorcode." % GMS_sensorcode)
classifier_path = None
return classifier_path
[docs]def get_path_snr_model(GMS_id):
# type: (GMS_identifier) -> str
"""Returns the absolute path of the SNR model for the given sensor.
:param GMS_id:
"""
satellite, sensor = (GMS_id.satellite, GMS_id.sensor)
satellite = 'RapidEye' if re.match(r'RapidEye', satellite, re.I) else satellite
sensor = sensor[:-1] if re.match(r'SPOT', satellite, re.I) and sensor[-1] not in ['1', '2'] else sensor
return os.path.join(CFG.path_SNR_models, satellite, sensor, 'SNR_model.csv')
[docs]def get_path_ac_options(GMS_id):
# type: (GMS_identifier)->Union[str, None]
"""Returns the path of the options json file needed for atmospheric correction.
"""
GMSid_ac = GMS_id
GMSid_ac.subsystem = ''
sensorcode = get_GMS_sensorcode(GMSid_ac)
ac_options_file_dic = {
'AVNIR-2': None,
'TM4': 'l8_options.json',
'TM5': 'l8_options.json',
'TM7': 'l8_options.json', # AC uses Landsat-8 options for L7 but reads only a subset of the options
'LDCM': 'l8_options.json',
'SPOT1a': None,
'SPOT1b': None,
'SPOT2a': None,
'SPOT2b': None,
'SPOT3a': None,
'SPOT3b': None,
'SPOT4a': None,
'SPOT4b': None,
'SPOT5a': None,
'SPOT5b': None,
'RE5': None,
'AST_full': None,
'S2A_full': 's2_options.json',
'S2B_full': 's2_options.json',
}
try:
fName_optFile = ac_options_file_dic[get_GMS_sensorcode(GMS_id)]
except KeyError:
GMS_id.logger.warning(
"Sensorcode '%s' is not included in ac_options dictionary. "
"Thus atmospheric correction is not available for the current scene." % sensorcode)
fName_optFile = None
if fName_optFile:
from sicor import options
path_ac = os.path.join(os.path.dirname(options.__file__), fName_optFile)
# validate
logger = GMS_id.logger or Logger(__name__)
if not os.path.exists(path_ac):
logger.warning('Could not locate options file for atmospheric correction at %s.' % path_ac)
return path_ac
else:
return None