# -*- coding: utf-8 -*-
# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
#
# Copyright (C) 2020 Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
#
# This software was developed within the context of the GeoMultiSens project funded
# by the German Federal Ministry of Education and Research
# (project grant code: 01 IS 14 010 A-C).
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later version.
# Please note the following exception: `gms_preprocessing` depends on tqdm, which
# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
from typing import List, Tuple, Generator, Iterable, Union # noqa F401 # flake8 issue
from psutil import virtual_memory
from ..options.config import GMS_config as CFG
from ..misc import exception_handler as EXC_H
from ..misc.path_generator import path_generator
from ..misc.logging import GMS_logger
from ..misc.locks import ProcessLock, MemoryReserver, redis_conn
from ..algorithms import L1A_P
from ..algorithms import L1B_P
from ..algorithms import L1C_P
from ..algorithms import L2A_P
from ..algorithms import L2B_P
from ..algorithms import L2C_P
from ..model.gms_object import \
failed_GMS_object, update_proc_status, return_proc_reports_only, estimate_mem_usage
from ..model.gms_object import GMS_object # noqa F401 # flake8 issue
from ..algorithms.geoprocessing import get_common_extent
__author__ = 'Daniel Scheffler'
[docs]@EXC_H.log_uncaught_exceptions
@update_proc_status
def L1A_map(dataset_dict): # map (scene-wise parallelization)
# type: (dict) -> L1A_P.L1A_object
L1A_obj = L1A_P.L1A_object(**dataset_dict)
L1A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
L1A_obj.import_rasterdata()
L1A_obj.import_metadata()
L1A_obj.validate_GeoTransProj_GeoAlign() # sets self.GeoTransProj_ok and self.GeoAlign_ok
L1A_obj.apply_nodata_mask_to_ObjAttr('arr') # nodata mask is automatically calculated
L1A_obj.add_rasterInfo_to_MetaObj()
L1A_obj.reference_data('UTM')
L1A_obj.calc_TOARadRefTemp()
L1A_obj.calc_corner_positions() # requires mask_nodata
L1A_obj.calc_center_AcqTime() # (if neccessary); requires corner positions
L1A_obj.calc_mean_VAA()
L1A_obj.calc_orbit_overpassParams() # requires corner positions
L1A_obj.apply_nodata_mask_to_ObjAttr('mask_clouds', 0)
if CFG.exec_L1AP[1]:
L1A_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
L1A_obj.record_mem_usage()
L1A_obj.delete_tempFiles()
return L1A_obj
[docs]@EXC_H.log_uncaught_exceptions
@update_proc_status
def L1A_map_1(dataset_dict, block_size=None): # map (scene-wise parallelization)
# type: (dict, tuple) -> List[L1A_P.L1A_object]
L1A_obj = L1A_P.L1A_object(**dataset_dict)
L1A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
L1A_obj.import_rasterdata()
L1A_obj.import_metadata()
L1A_obj.validate_GeoTransProj_GeoAlign() # sets self.GeoTransProj_ok and self.GeoAlign_ok
L1A_obj.apply_nodata_mask_to_ObjAttr('arr') # nodata mask is automatically calculated
L1A_obj.add_rasterInfo_to_MetaObj()
L1A_obj.reference_data('UTM')
tiles = [tile for tile in
# cut (block-wise parallelization)
L1A_obj.to_tiles(blocksize=block_size if block_size else CFG.tiling_block_size_XY)
if tile is not None] # None is returned in case the tile contains no data at all
return tiles
[docs]@EXC_H.log_uncaught_exceptions
@update_proc_status
def L1A_map_2(L1A_tile): # map (block-wise parallelization)
# type: (L1A_P.L1A_object) -> L1A_P.L1A_object
L1A_tile.calc_TOARadRefTemp()
if not CFG.inmem_serialization:
L1A_tile.to_ENVI(CFG.write_ENVIclassif_cloudmask, is_tempfile=True)
return L1A_tile
[docs]@EXC_H.log_uncaught_exceptions
@update_proc_status
def L1A_map_3(L1A_obj): # map (scene-wise parallelization)
# type: (L1A_P.L1A_object) -> L1A_P.L1A_object
L1A_obj.calc_corner_positions() # requires mask_nodata
L1A_obj.calc_center_AcqTime() # (if neccessary); requires corner positions
L1A_obj.calc_mean_VAA()
L1A_obj.calc_orbit_overpassParams() # requires corner positions
L1A_obj.apply_nodata_mask_to_ObjAttr('mask_clouds', 0)
if CFG.exec_L1AP[1]:
L1A_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
L1A_obj.delete_tempFiles()
else:
L1A_obj.delete_tempFiles()
L1A_obj.record_mem_usage()
return L1A_obj
[docs]@EXC_H.log_uncaught_exceptions
@update_proc_status
def L1B_map(L1A_obj):
# type: (L1A_P.L1A_object) -> L1B_P.L1B_object
"""L1A_obj enthält in Python- (im Gegensatz zur inmem_serialization-) Implementierung KEINE ARRAY-DATEN!,
nur die für die ganze Szene gültigen Metadaten"""
L1A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
L1B_obj = L1B_P.L1B_object(L1A_obj)
L1B_obj.compute_global_shifts()
if CFG.exec_L1BP[1]:
L1B_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
L1B_obj.record_mem_usage()
L1B_obj.delete_tempFiles()
return L1B_obj
[docs]@EXC_H.log_uncaught_exceptions
@update_proc_status
def L1C_map(L1B_objs):
# type: (Iterable[L1B_P.L1B_object]) -> List[L1C_P.L1C_object]
"""Atmospheric correction.
NOTE: all subsystems (containing all spatial samplings) belonging to the same scene ID are needed
:param L1B_objs: list containing one or multiple L1B objects belonging to the same scene ID.
"""
list(L1B_objs)[0].block_at_system_overload(max_usage=CFG.critical_mem_usage)
# initialize L1C objects
L1C_objs = [L1C_P.L1C_object(L1B_obj) for L1B_obj in L1B_objs]
# check in config if atmospheric correction is desired
if CFG.target_radunit_optical == 'BOA_Ref':
# atmospheric correction (asserts that there is an ac_options.json file on disk for the current sensor)
if L1C_objs[0].ac_options:
# perform atmospheric correction
L1C_objs = L1C_P.AtmCorr(*L1C_objs).run_atmospheric_correction(dump_ac_input=False)
else:
[L1C_obj.logger.warning('Atmospheric correction is not yet supported for %s %s and has been skipped.'
% (L1C_obj.satellite, L1C_obj.sensor)) for L1C_obj in L1C_objs]
else:
[L1C_obj.logger.warning('Atmospheric correction skipped because optical conversion type is set to %s.'
% CFG.target_radunit_optical) for L1C_obj in L1C_objs]
# write outputs and delete temporary data
for L1C_obj in L1C_objs:
if CFG.exec_L1CP[1]:
L1C_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
if L1C_obj.arr_shape == 'cube':
L1C_obj.delete_tempFiles()
L1C_obj.delete_ac_input_arrays()
[L1C_obj.record_mem_usage() for L1C_obj in L1C_objs]
return L1C_objs
[docs]@EXC_H.log_uncaught_exceptions
@update_proc_status
def L2A_map(L1C_objs, block_size=None, return_tiles=True):
# type: (Union[List[L1C_P.L1C_object], Tuple[L1C_P.L1C_object]], tuple, bool) -> Union[List[L2A_P.L2A_object], L2A_P.L2A_object] # noqa
"""Geometric homogenization.
Performs correction of geometric displacements, resampling to target grid of the usecase and merges multiple
GMS objects belonging to the same scene ID (subsystems) to a single L2A object.
NOTE: Output L2A_object must be cut into tiles because L2A_obj size in memory exceeds maximum serialization size.
:param L1C_objs: list containing one or multiple L1C objects belonging to the same scene ID.
:param block_size: X/Y size of output tiles in pixels, e.g. (1024,1024)
:param return_tiles: return computed L2A object in tiles
:return: list of L2A_object tiles
"""
L1C_objs[0].block_at_system_overload(max_usage=CFG.critical_mem_usage)
# initialize L2A objects
L2A_objs = [L2A_P.L2A_object(L1C_obj) for L1C_obj in L1C_objs]
# get common extent (NOTE: using lon/lat coordinates here would cause black borders around the UTM image
# because get_common_extent() just uses the envelop without regard to the projection
clip_extent = \
get_common_extent([obj.trueDataCornerUTM for obj in L2A_objs]) \
if len(L2A_objs) > 1 else L2A_objs[0].trueDataCornerUTM
# correct geometric displacements and resample to target grid
for L2A_obj in L2A_objs:
L2A_obj.correct_spatial_shifts(cliptoextent=CFG.clip_to_extent,
clipextent=clip_extent, clipextent_prj=L2A_obj.arr.prj)
# merge multiple subsystems belonging to the same scene ID to a single GMS object
if len(L2A_objs) > 1:
L2A_obj = L2A_P.L2A_object.from_sensor_subsystems(L2A_objs)
else:
L2A_obj = L2A_objs[0]
# update attributes
L2A_obj.calc_mask_nodata(overwrite=True)
L2A_obj.calc_corner_positions()
# write output
if CFG.exec_L2AP[1]:
L2A_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
# delete tempfiles of separate subsystem GMS objects
[L2A_obj.delete_tempFiles() for L2A_obj in L2A_objs]
if return_tiles:
L2A_tiles = list(L2A_obj.to_tiles(blocksize=block_size if block_size else CFG.tiling_block_size_XY))
L2A_tiles = [i for i in L2A_tiles if i is not None] # None is returned in case the tile contains no data at all
[L2A_tile.record_mem_usage() for L2A_tile in L2A_tiles]
return L2A_tiles
else:
L2A_obj.record_mem_usage()
return L2A_obj
[docs]@EXC_H.log_uncaught_exceptions
@update_proc_status
def L2B_map(L2A_obj):
# type: (L2A_P.L2A_object) -> L2B_P.L2B_object
L2A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
L2B_obj = L2B_P.L2B_object(L2A_obj)
L2B_obj.spectral_homogenization()
if CFG.exec_L2BP[1]:
L2B_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
if L2B_obj.arr_shape == 'cube':
L2B_obj.delete_tempFiles()
L2B_obj.record_mem_usage()
return L2B_obj
[docs]@EXC_H.log_uncaught_exceptions
@update_proc_status
def L2C_map(L2B_obj):
# type: (L2B_P.L2B_object) -> L2C_P.L2C_object
L2B_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
L2C_obj = L2C_P.L2C_object(L2B_obj)
if CFG.exec_L2CP[1]:
L2C_MRGS_tiles = L2C_obj.to_MGRS_tiles(pixbuffer=CFG.mgrs_pixel_buffer)
[MGRS_tile.to_ENVI(CFG.write_ENVIclassif_cloudmask,
compression=CFG.output_data_compression) for MGRS_tile in L2C_MRGS_tiles]
L2C_obj.record_mem_usage()
L2C_obj.delete_tempFiles()
return L2C_obj # contains no array data in Python mode
[docs]@return_proc_reports_only
# @return_GMS_objs_without_arrays
def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise parallelization)
# type: (List[dict]) -> Union[L1A_P.GMS_object, List, Generator]
"""
NOTE: Exceptions in this function are must be catched by calling function (process controller).
:param list_dataset_dicts_per_scene:
:return:
"""
with GMS_logger('log__%s' % CFG.ID, fmt_suffix=list_dataset_dicts_per_scene[0]['scene_ID'],
log_level=CFG.log_level, append=True) as pipeline_logger:
# set CPU and memory limits
cpulimit = CFG.CPUs_all_jobs
mem2reserve = 15
if redis_conn:
mem_estim = estimate_mem_usage(list_dataset_dicts_per_scene[0]['dataset_ID'],
list_dataset_dicts_per_scene[0]['satellite'])
if mem_estim:
mem2reserve = mem_estim
else:
cpulimit = int((virtual_memory().total * .8 - virtual_memory().used) / 1024 ** 3 / mem2reserve)
pipeline_logger.info('No memory usage statistics from earlier jobs found for the current '
'configuration. Limiting processes to %s in order to collect memory statistics '
'first.' % cpulimit)
# start processing
with MemoryReserver(mem2lock_gb=mem2reserve, logger=pipeline_logger, max_usage=CFG.max_mem_usage),\
ProcessLock(allowed_slots=cpulimit, logger=pipeline_logger):
if len(list(set([ds['proc_level'] for ds in list_dataset_dicts_per_scene]))) != 1:
raise ValueError('Lists of subsystem datasets with different processing levels are not supported here. '
'Received %s.' % list_dataset_dicts_per_scene)
input_proc_level = list_dataset_dicts_per_scene[0]['proc_level']
##################
# L1A processing #
##################
L1A_objects = []
if CFG.exec_L1AP[0] and input_proc_level is None:
L1A_objects = \
[L1A_map(subsystem_dataset_dict) for subsystem_dataset_dict in list_dataset_dicts_per_scene]
if any([isinstance(obj, failed_GMS_object) for obj in L1A_objects]):
return L1A_objects
##################
# L1B processing #
##################
# select subsystem with optimal band for co-registration
# L1B_obj_coreg = L1B_map(L1A_objects[0])
# copy coreg information to remaining subsets
L1B_objects = L1A_objects
if CFG.exec_L1BP[0]:
# add earlier processed L1A data
if input_proc_level == 'L1A':
for ds in list_dataset_dicts_per_scene:
GMSfile = path_generator(ds, proc_level='L1A').get_path_gmsfile()
L1A_objects.append(L1A_P.L1A_object.from_disk([GMSfile, ['cube', None]]))
L1B_objects = [L1B_map(L1A_obj) for L1A_obj in L1A_objects]
del L1A_objects
if any([isinstance(obj, failed_GMS_object) for obj in L1B_objects]):
return L1B_objects
##################
# L1C processing #
##################
L1C_objects = L1B_objects
if CFG.exec_L1CP[0]:
# add earlier processed L1B data
if input_proc_level == 'L1B':
for ds in list_dataset_dicts_per_scene:
GMSfile = path_generator(ds, proc_level='L1B').get_path_gmsfile()
L1B_objects.append(L1B_P.L1B_object.from_disk([GMSfile, ['cube', None]]))
L1C_objects = L1C_map(L1B_objects)
del L1B_objects
if any([isinstance(obj, failed_GMS_object) for obj in L1C_objects]):
return L1C_objects
if not CFG.exec_L2AP[0]:
return L1C_objects
##################
# L2A processing #
##################
# add earlier processed L1C data
if input_proc_level == 'L1C':
for ds in list_dataset_dicts_per_scene:
GMSfile = path_generator(ds, proc_level='L1C').get_path_gmsfile()
L1C_objects.append(L1C_P.L1C_object.from_disk([GMSfile, ['cube', None]]))
L2A_obj = L2A_map(L1C_objects, return_tiles=False)
del L1C_objects
if isinstance(L2A_obj, failed_GMS_object) or not CFG.exec_L2BP[0]:
return L2A_obj
##################
# L2B processing #
##################
# add earlier processed L2A data
if input_proc_level == 'L2A':
assert len(list_dataset_dicts_per_scene) == 1, \
'Expected only a single L2A dataset since subsystems are merged.'
GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2A').get_path_gmsfile()
L2A_obj = L2A_P.L2A_object.from_disk([GMSfile, ['cube', None]])
L2B_obj = L2B_map(L2A_obj)
del L2A_obj
if isinstance(L2B_obj, failed_GMS_object) or not CFG.exec_L2CP[0]:
return L2B_obj
##################
# L2C processing #
##################
# add earlier processed L2B data
if input_proc_level == 'L2B':
assert len(list_dataset_dicts_per_scene) == 1, \
'Expected only a single L2B dataset since subsystems are merged.'
GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2B').get_path_gmsfile()
L2B_obj = L2B_P.L2B_object.from_disk([GMSfile, ['cube', None]])
L2C_obj = L2C_map(L2B_obj) # type: Union[GMS_object, failed_GMS_object, List]
del L2B_obj
return L2C_obj