Source code for gms_preprocessing.processing.process_controller

# -*- coding: utf-8 -*-

# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
#
# Copyright (C) 2020  Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
#
# This software was developed within the context of the GeoMultiSens project funded
# by the German Federal Ministry of Education and Research
# (project grant code: 01 IS 14 010 A-C).
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later version.
# Please note the following exception: `gms_preprocessing` depends on tqdm, which
# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.

from __future__ import (division, print_function, unicode_literals, absolute_import)

import numpy as np
from pandas import DataFrame
import datetime
import os
import time
from itertools import chain
import signal
import re
from typing import TYPE_CHECKING
import shutil
import sys
from natsort import natsorted

from ..io import output_writer as OUT_W
from ..io import input_reader as INP_R
from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc.path_generator import path_generator
from ..misc.logging import GMS_logger, shutdown_loggers
from ..algorithms import L1A_P, L1B_P, L1C_P, L2A_P, L2B_P, L2C_P
from ..model.metadata import get_LayerBandsAssignment
from ..model.gms_object import failed_GMS_object, GMS_object, GMS_identifier
from .pipeline import (L1A_map, L1A_map_1, L1A_map_2, L1A_map_3, L1B_map, L1C_map,
                       L2A_map, L2B_map, L2C_map)
from ..options.config import set_config
from .multiproc import MAP, imap_unordered
from ..misc.definition_dicts import proc_chain, db_jobs_statistics_def
from ..misc.locks import release_unclosed_locks
from ..version import __version__, __versionalias__

from py_tools_ds.numeric.array import get_array_tilebounds

if TYPE_CHECKING:
    from collections import OrderedDict  # noqa F401  # flake8 issue
    from typing import List  # noqa F401  # flake8 issue
    from ..options.config import GMS_config  # noqa F401  # flake8 issue


__author__ = 'Daniel Scheffler'


[docs]class ProcessController(object): def __init__(self, job_ID, **config_kwargs): """gms_preprocessing process controller :param job_ID: job ID belonging to a valid database record within table 'jobs' :param config_kwargs: keyword arguments to be passed to gms_preprocessing.set_config() """ # assertions if not isinstance(job_ID, int): raise ValueError("'job_ID' must be an integer value. Got %s." % type(job_ID)) # set GMS configuration config_kwargs.update(dict(reset_status=True)) self.config = set_config(job_ID, **config_kwargs) # type: GMS_config # defaults self._logger = None self._DB_job_record = None self.profiler = None self.failed_objects = [] self.L1A_newObjects = [] self.L1B_newObjects = [] self.L1C_newObjects = [] self.L2A_newObjects = [] self.L2A_tiles = [] self.L2B_newObjects = [] self.L2C_newObjects = [] self.summary_detailed = None self.summary_quick = None # check if process_controller is executed by debugger # isdebugging = 1 if True in [frame[1].endswith("pydevd.py") for frame in inspect.stack()] else False # if isdebugging: # override the existing settings in order to get write access everywhere # pass # called_from_iPyNb = 1 if 'ipykernel/__main__.py' in sys.argv[0] else 0 # create job log self._path_job_logfile = os.path.join(self.config.path_job_logs, '%s.log' % self.config.ID) if os.path.exists(self._path_job_logfile): HLP_F.silentremove(self._path_job_logfile) self.logger.info("Executing gms_preprocessing, version: %s (%s)" % (__version__, __versionalias__)) self.logger.info('Process Controller initialized for job ID %s (comment: %s).' % (self.config.ID, self.DB_job_record.comment)) self.logger.info('Job logfile: %s' % self._path_job_logfile) # save config self._path_job_optionsfile = os.path.join(self.config.path_job_logs, '%s_options.json' % self.config.ID) self.config.save(self._path_job_optionsfile) self.logger.info('Job options file: %s' % self._path_job_optionsfile) if self.config.delete_old_output: self.logger.info('Deleting previously processed data...') self.DB_job_record.delete_procdata_of_entire_job(force=True) @property def logger(self): if self._logger and self._logger.handlers[:]: return self._logger else: self._logger = GMS_logger('ProcessController__%s' % self.config.ID, fmt_suffix='ProcessController', path_logfile=self._path_job_logfile, log_level=self.config.log_level, append=True) return self._logger @logger.setter def logger(self, logger): self._logger = logger @logger.deleter def logger(self): if self._logger not in [None, 'not set']: self._logger.close() self._logger = None @property def DB_job_record(self): if self._DB_job_record: return self._DB_job_record else: self._DB_job_record = DB_T.GMS_JOB(self.config.conn_database) self._DB_job_record.from_job_ID(self.config.ID) return self._DB_job_record @DB_job_record.setter def DB_job_record(self, value): self._DB_job_record = value @property def sceneids_failed(self): return [obj.scene_ID for obj in self.failed_objects] def _add_local_availability_single_dataset(self, dataset): # type: (OrderedDict) -> OrderedDict # TODO revise this function # query the database and get the last written processing level and LayerBandsAssignment DB_match = DB_T.get_info_from_postgreSQLdb( self.config.conn_database, 'scenes_proc', ['proc_level', 'layer_bands_assignment'], dict(sceneid=dataset['scene_ID'])) # get the corresponding logfile path_logfile = path_generator(dataset).get_path_logfile(merged_subsystems=False) path_logfile_merged_ss = path_generator(dataset).get_path_logfile(merged_subsystems=True) def get_AllWrittenProcL_dueLog(path_log): # TODO replace this by database query + os.path.exists """Returns all processing level that have been successfully written according to logfile.""" if not os.path.exists(path_log): if path_log == path_logfile: # path_logfile_merged_ss has already been searched self.logger.info("No logfile named '%s' found for %s at %s. Dataset has to be reprocessed." % (os.path.basename(path_log), dataset['entity_ID'], os.path.dirname(path_log))) AllWrittenProcL_dueLog = [] else: logfile = open(path_log, 'r').read() AllWrittenProcL_dueLog = re.findall(r":*(\S*\s*) data successfully saved.", logfile, re.I) if not AllWrittenProcL_dueLog and path_logfile == path_logfile_merged_ss: # AllWrittenProcL_dueLog = [] self.logger.info('%s: According to logfile no completely processed data exist at any ' 'processing level. Dataset has to be reprocessed.' % dataset['entity_ID']) else: AllWrittenProcL_dueLog = natsorted(list(set(AllWrittenProcL_dueLog))) return AllWrittenProcL_dueLog # check if there are not multiple database records for this dataset if len(DB_match) == 1 or DB_match == [] or DB_match == 'database connection fault': # get all processing level that have been successfully written # NOTE: first check for merged subsystem datasets because they have hiver processing levels AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile_merged_ss) if not AllWrittenProcL: AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile) else: # A L2A+ dataset with merged subsystems has been found. Use that logfile. path_logfile = path_logfile_merged_ss dataset['proc_level'] = None # default (dataset has to be reprocessed) # loop through all the found proc. levels and find the one that fulfills all requirements for ProcL in reversed(AllWrittenProcL): if dataset['proc_level']: break # proc_level found; no further searching for lower proc_levels assumed_path_GMS_file = '%s_%s.gms' % (os.path.splitext(path_logfile)[0], ProcL) # check if there is also a corresponding GMS_file on disk if os.path.isfile(assumed_path_GMS_file): GMS_file_dict = INP_R.GMSfile2dict(assumed_path_GMS_file) target_LayerBandsAssignment = \ get_LayerBandsAssignment( GMS_identifier( image_type=dataset['image_type'], satellite=dataset['satellite'], sensor=dataset['sensor'], subsystem=dataset['subsystem'] if path_logfile != path_logfile_merged_ss else '', proc_level=ProcL, # must be respected because LBA changes after atm. Corr. dataset_ID=dataset['dataset_ID']), nBands=(1 if dataset['sensormode'] == 'P' else None)) # check if the LayerBandsAssignment of the written dataset on disk equals the # desired LayerBandsAssignment if target_LayerBandsAssignment == GMS_file_dict['LayerBandsAssignment']: # update the database record if the dataset could not be found in database if DB_match == [] or DB_match == 'database connection fault': self.logger.info('The dataset %s is not included in the database of processed data but' ' according to logfile %s has been written successfully. Recreating ' 'missing database entry.' % (dataset['entity_ID'], ProcL)) DB_T.data_DB_updater(GMS_file_dict) dataset['proc_level'] = ProcL # if the dataset could be found in database elif len(DB_match) == 1: try: self.logger.info('Found a matching %s dataset for %s. Processing skipped until %s.' % (ProcL, dataset['entity_ID'], proc_chain[proc_chain.index(ProcL) + 1])) except IndexError: self.logger.info('Found a matching %s dataset for %s. Processing already done.' % (ProcL, dataset['entity_ID'])) if DB_match[0][0] == ProcL: dataset['proc_level'] = DB_match[0][0] else: dataset['proc_level'] = ProcL else: self.logger.info('Found a matching %s dataset for %s but with a different ' 'LayerBandsAssignment (desired: %s; found %s). Dataset has to be reprocessed.' % (ProcL, dataset['entity_ID'], target_LayerBandsAssignment, GMS_file_dict['LayerBandsAssignment'])) else: self.logger.info('%s for dataset %s has been written due to logfile but no corresponding ' 'dataset has been found.' % (ProcL, dataset['entity_ID']) + ' Searching for lower processing level...' if AllWrittenProcL.index(ProcL) != 0 else '') elif len(DB_match) > 1: self.logger.info('According to database there are multiple matches for the dataset %s. Dataset has to ' 'be reprocessed.' % dataset['entity_ID']) dataset['proc_level'] = None else: dataset['proc_level'] = None return dataset
[docs] def add_local_availability(self, datasets): # type: (List[OrderedDict]) -> List[OrderedDict] """Check availability of all subsets per scene and processing level. NOTE: The processing level of those scenes, where not all subsystems are available in the same processing level is reset. :param datasets: List of one OrderedDict per subsystem as generated by CFG.data_list """ datasets = [self._add_local_availability_single_dataset(ds) for ds in datasets] ###################################################################################################### # validate that all subsystems of the same sceneid are at the same processing level; otherwise reset # ###################################################################################################### datasets_validated = [] datasets_grouped = HLP_F.group_dicts_by_key(datasets, key='scene_ID') for ds_group in datasets_grouped: proc_lvls = [ds['proc_level'] for ds in ds_group] if not len(list(set(proc_lvls))) == 1: # reset processing level of those scenes where not all subsystems are available self.logger.info('%s: Found already processed subsystems at different processing levels %s. ' 'Dataset has to be reprocessed to avoid errors.' % (ds_group[0]['entity_ID'], proc_lvls)) for ds in ds_group: ds['proc_level'] = None datasets_validated.append(ds) else: datasets_validated.extend(ds_group) return datasets_validated
@staticmethod def _is_inMEM(GMS_objects, dataset): # type: (list, OrderedDict) -> bool """Checks whether a dataset within a dataset list has been processed in the previous processing level. :param GMS_objects: <list> a list of GMS objects that has been recently processed :param dataset: <collections.OrderedDict> as generated by L0A_P.get_data_list_of_current_jobID() """ # check if the scene ID of the given dataset is in the scene IDs of the previously processed datasets return dataset['scene_ID'] in [obj.scene_ID for obj in GMS_objects] def _get_processor_data_list(self, procLvl, prevLvl_objects=None): """Returns a list of datasets that have to be read from disk and then processed by a specific processor. :param procLvl: :param prevLvl_objects: :return: """ def is_procL_lower(dataset): return HLP_F.is_proc_level_lower(dataset['proc_level'], target_lvl=procLvl) if prevLvl_objects is None: return [dataset for dataset in self.config.data_list if is_procL_lower(dataset)] # TODO generator? else: return [dataset for dataset in self.config.data_list if is_procL_lower(dataset) and not self._is_inMEM(prevLvl_objects + self.failed_objects, dataset)]
[docs] def get_DB_objects(self, procLvl, prevLvl_objects=None, parallLev=None, blocksize=None): """ Returns a list of GMS objects for datasets available on disk that have to be processed by the current processor. :param procLvl: <str> processing level oof the current processor :param prevLvl_objects: <list> of in-mem GMS objects produced by the previous processor :param parallLev: <str> parallelization level ('scenes' or 'tiles') -> defines if full cubes or blocks are to be returned :param blocksize: <tuple> block size in case blocks are to be returned, e.g. (2000,2000) :return: """ # TODO get prevLvl_objects automatically from self if procLvl == 'L1A': return [] else: # handle input parameters parallLev = parallLev or self.config.parallelization_level blocksize = blocksize or self.config.tiling_block_size_XY prevLvl = proc_chain[proc_chain.index(procLvl) - 1] # TODO replace by enum # get GMSfile list dataset_dicts = self._get_processor_data_list(procLvl, prevLvl_objects) GMSfile_list_prevLvl_inDB = INP_R.get_list_GMSfiles(dataset_dicts, prevLvl) # create GMS objects from disk with respect to parallelization level and block size if parallLev == 'scenes': # get input parameters for creating GMS objects as full cubes work = [[GMS, ['cube', None]] for GMS in GMSfile_list_prevLvl_inDB] else: # define tile positions and size def get_tilepos_list(GMSfile): return get_array_tilebounds(array_shape=INP_R.GMSfile2dict(GMSfile)['shape_fullArr'], tile_shape=blocksize) # get input parameters for creating GMS objects as blocks work = [[GMSfile, ['block', tp]] for GMSfile in GMSfile_list_prevLvl_inDB for tp in get_tilepos_list(GMSfile)] # create GMS objects for the found files on disk # NOTE: DON'T multiprocess that with MAP(GMS_object(*initargs).from_disk, work) # in case of multiple subsystems GMS_object(*initargs) would always point to the same object in memory # -> subsystem attribute will be overwritten each time from ..misc.helper_functions import get_parentObjDict parentObjDict = get_parentObjDict() DB_objs = [parentObjDict[prevLvl].from_disk(tuple_GMS_subset=w) for w in work] if DB_objs: DB_objs = list(chain.from_iterable(DB_objs)) if list in [type(i) for i in DB_objs] else list(DB_objs) return DB_objs
[docs] def run_all_processors(self, custom_data_list=None, serialize_after_each_mapper=False): """ Run all processors at once. """ # enable clean shutdown possibility # NOTE: a signal.SIGKILL (kill -9 ...) forces to kill the process and cannot be catched or handled signal.signal(signal.SIGINT, self.stop) # catches a KeyboardInterrupt signal.signal(signal.SIGTERM, self.stop) # catches a 'kill' or 'pkill' # noinspection PyBroadException try: if self.config.profiling: from pyinstrument import Profiler self.profiler = Profiler() # or Profiler(use_signal=False), see below self.profiler.start() self.logger.info('Execution of entire GeoMultiSens pre-processing chain started for job ID %s...' % self.config.ID) self.DB_job_record.reset_job_progress() # updates attributes of DB_job_record and related DB entry self.config.status = 'running' GMS_object.proc_status_all_GMSobjs.clear() # reset self.update_DB_job_record() # TODO implement that into config.status.setter self.failed_objects = [] # get list of datasets to be processed if custom_data_list: self.config.data_list = custom_data_list # add local availability self.config.data_list = self.add_local_availability(self.config.data_list) self.update_DB_job_statistics(self.config.data_list) if not serialize_after_each_mapper: # group dataset dicts by sceneid dataset_groups = HLP_F.group_dicts_by_key(self.config.data_list, key='scene_ID') # close logger to release FileHandler of job log (workers will log into job logfile) del self.logger # RUN PREPROCESSING from .pipeline import run_complete_preprocessing GMS_objs = imap_unordered(run_complete_preprocessing, dataset_groups, flatten_output=True) # separate results into successful and failed objects def assign_attr(tgt_procL): return [obj for obj in GMS_objs if isinstance(obj, GMS_object) and obj.proc_level == tgt_procL] self.L1A_newObjects = assign_attr('L1A') self.L1B_newObjects = assign_attr('L1B') self.L1C_newObjects = assign_attr('L1C') self.L2A_newObjects = assign_attr('L2A') self.L2B_newObjects = assign_attr('L2B') self.L2C_newObjects = assign_attr('L2C') self.failed_objects = [obj for obj in GMS_objs if isinstance(obj, failed_GMS_object)] else: self.L1A_processing() self.L1B_processing() self.L1C_processing() self.L2A_processing() self.L2B_processing() self.L2C_processing() # create summary self.create_job_summary() self.logger.info('Execution finished.') self.logger.info('The job logfile, options file and the summary files have been saved here: \n' '%s.*' % os.path.splitext(self.logger.path_logfile)[0]) # TODO implement failed_with_warnings: self.config.status = 'finished' if not self.failed_objects else 'finished_with_errors' self.config.end_time = datetime.datetime.now() self.config.computation_time = self.config.end_time - self.config.start_time self.logger.info('Time for execution: %s' % self.config.computation_time) except Exception: # noqa E722 # bare except self.config.status = 'failed' if not self.config.disable_exception_handler: self.logger.error('Execution failed with an error:', exc_info=True) else: self.logger.error('Execution failed with an error:') raise finally: # update database entry of current job self.update_DB_job_record() if self.config.profiling: self.profiler.stop() print(self.profiler.output_text(unicode=True, color=True)) self.shutdown()
[docs] def stop(self, signum, frame): """Interrupt the running process controller gracefully.""" self.logger.info('Process controller stopped via %s.' % ('KeyboardInterrupt' if signum == 2 else 'SIGTERM command')) self.config.status = 'canceled' self.update_DB_job_record() self.shutdown() if signum == 2: raise KeyboardInterrupt('Received a KeyboardInterrupt.') # terminate execution and show traceback elif signum == 15: sys.exit(0)
# raise SystemExit()
[docs] def shutdown(self): """Shutdown the process controller instance (loggers, remove temporary directories, ...).""" self.logger.info('Shutting down gracefully...') # release unclosed locks release_unclosed_locks() # clear any temporary files tempdir = os.path.join(self.config.path_tempdir) self.logger.info('Deleting temporary directory %s.' % tempdir) if os.path.exists(tempdir): shutil.rmtree(tempdir, ignore_errors=True) del self.logger shutdown_loggers()
[docs] def benchmark(self): """ Run a benchmark. """ data_list_bench = self.config.data_list for count_datasets in range(len(data_list_bench)): t_processing_all_runs, t_IO_all_runs = [], [] for count_run in range(10): current_data_list = data_list_bench[0:count_datasets + 1] if os.path.exists(self.config.path_database): os.remove(self.config.path_database) t_start = time.time() self.run_all_processors(current_data_list) t_processing_all_runs.append(time.time() - t_start) t_IO_all_runs.append(globals()['time_IO']) assert current_data_list, 'Empty data list.' OUT_W.write_global_benchmark_output(t_processing_all_runs, t_IO_all_runs, current_data_list)
[docs] def L1A_processing(self): """ Run Level 1A processing: Data import and metadata homogenization """ if self.config.exec_L1AP[0]: self.logger.info('\n\n##### Level 1A Processing started - raster format and metadata homogenization ####\n') datalist_L1A_P = self._get_processor_data_list('L1A') if self.config.parallelization_level == 'scenes': # map L1A_resObjects = MAP(L1A_map, datalist_L1A_P, CPUs=12) else: # tiles all_L1A_tiles_map1 = MAP(L1A_map_1, datalist_L1A_P, flatten_output=True) # map_1 # merge results to new list of splits L1A_obj_tiles = MAP(L1A_map_2, all_L1A_tiles_map1) # map_2 grouped_L1A_Tiles = HLP_F.group_objects_by_attributes( L1A_obj_tiles, 'scene_ID', 'subsystem') # group results L1A_objects = MAP(L1A_P.L1A_object.from_tiles, grouped_L1A_Tiles) # reduce L1A_resObjects = MAP(L1A_map_3, L1A_objects) # map_3 self.L1A_newObjects = [obj for obj in L1A_resObjects if isinstance(obj, L1A_P.L1A_object)] self.failed_objects += [obj for obj in L1A_resObjects if isinstance(obj, failed_GMS_object) and obj.scene_ID not in self.sceneids_failed] return self.L1A_newObjects
[docs] def L1B_processing(self): """ Run Level 1B processing: calculation of geometric shifts """ # TODO implement check for running spatial index mediator server # run on full cubes if self.config.exec_L1BP[0]: self.logger.info('\n\n####### Level 1B Processing started - detection of geometric displacements #######\n') L1A_DBObjects = self.get_DB_objects('L1B', self.L1A_newObjects, parallLev='scenes') L1A_Instances = self.L1A_newObjects + L1A_DBObjects # combine newly and earlier processed L1A data L1B_resObjects = MAP(L1B_map, L1A_Instances) self.L1B_newObjects = [obj for obj in L1B_resObjects if isinstance(obj, L1B_P.L1B_object)] self.failed_objects += [obj for obj in L1B_resObjects if isinstance(obj, failed_GMS_object) and obj.scene_ID not in self.sceneids_failed] return self.L1B_newObjects
[docs] def L1C_processing(self): """ Run Level 1C processing: atmospheric correction """ if self.config.exec_L1CP[0]: self.logger.info('\n\n############## Level 1C Processing started - atmospheric correction ##############\n') if self.config.parallelization_level == 'scenes': L1B_DBObjects = self.get_DB_objects('L1C', self.L1B_newObjects) L1B_Instances = self.L1B_newObjects + L1B_DBObjects # combine newly and earlier processed L1B data # group by scene ID (all subsystems belonging to the same scene ID must be processed together) grouped_L1B_Instances = HLP_F.group_objects_by_attributes(L1B_Instances, 'scene_ID') L1C_resObjects = MAP(L1C_map, grouped_L1B_Instances, flatten_output=True, CPUs=15) # FIXME CPUs set to 15 for testing else: # tiles raise NotImplementedError("Tiled processing is not yet completely implemented for L1C processor. Use " "parallelization level 'scenes' instead!") # blocksize = (5000, 5000) # """if newly processed L1A objects are present: cut them into tiles""" # L1B_newTiles = [] # if self.L1B_newObjects: # tuples_obj_blocksize = [(obj, blocksize) for obj in self.L1B_newObjects] # L1B_newTiles = MAP(HLP_F.cut_GMS_obj_into_blocks, tuples_obj_blocksize, flatten_output=True) # # """combine newly and earlier processed L1B data""" # L1B_newDBTiles = self.get_DB_objects('L1C', self.L1B_newObjects, blocksize=blocksize) # L1B_tiles = L1B_newTiles + L1B_newDBTiles # # # TODO merge subsets of S2/Aster in order to provide all bands for atm.correction # L1C_tiles = MAP(L1C_map, L1B_tiles) # grouped_L1C_Tiles = \ # HLP_F.group_objects_by_attributes(L1C_tiles, 'scene_ID', 'subsystem') # group results # [L1C_tiles_group[0].delete_tempFiles() for L1C_tiles_group in grouped_L1C_Tiles] # L1C_resObjects = MAP(L1C_P.L1C_object().from_tiles, grouped_L1C_Tiles) # reduce self.L1C_newObjects = [obj for obj in L1C_resObjects if isinstance(obj, L1C_P.L1C_object)] self.failed_objects += [obj for obj in L1C_resObjects if isinstance(obj, failed_GMS_object) and obj.scene_ID not in self.sceneids_failed] return self.L1C_newObjects
[docs] def L2A_processing(self): """ Run Level 2A processing: geometric homogenization """ if self.config.exec_L2AP[0]: self.logger.info( '\n\n#### Level 2A Processing started - shift correction / geometric homogenization ####\n') """combine newly and earlier processed L1C data""" L1C_DBObjects = self.get_DB_objects('L2A', self.L1C_newObjects, parallLev='scenes') L1C_Instances = self.L1C_newObjects + L1C_DBObjects # combine newly and earlier processed L1C data # group by scene ID (all subsystems belonging to the same scene ID must be processed together) grouped_L1C_Instances = HLP_F.group_objects_by_attributes(L1C_Instances, 'scene_ID') L2A_resTiles = MAP(L2A_map, grouped_L1C_Instances, flatten_output=True) self.L2A_tiles = [obj for obj in L2A_resTiles if isinstance(obj, L2A_P.L2A_object)] self.failed_objects += [obj for obj in L2A_resTiles if isinstance(obj, failed_GMS_object) and obj.scene_ID not in self.sceneids_failed] return self.L2A_tiles
[docs] def L2B_processing(self): """ Run Level 2B processing: spectral homogenization """ if self.config.exec_L2BP[0]: self.logger.info('\n\n############# Level 2B Processing started - spectral homogenization ##############\n') if self.config.parallelization_level == 'scenes': # don't know if scenes makes sense in L2B processing because full objects are very big! """if newly processed L2A objects are present: merge them to scenes""" grouped_L2A_Tiles = HLP_F.group_objects_by_attributes(self.L2A_tiles, 'scene_ID') # group results # reduce # will be too slow because it has to pickle back really large L2A_newObjects # L2A_newObjects = MAP(HLP_F.merge_GMS_tiles_to_GMS_obj, grouped_L2A_Tiles) L2A_newObjects = [L2A_P.L2A_object.from_tiles(tileList) for tileList in grouped_L2A_Tiles] """combine newly and earlier processed L2A data""" L2A_DBObjects = self.get_DB_objects('L2B', self.L2A_tiles) L2A_Instances = L2A_newObjects + L2A_DBObjects # combine newly and earlier processed L2A data L2B_resObjects = MAP(L2B_map, L2A_Instances) else: # tiles L2A_newTiles = self.L2A_tiles # tiles have the block size specified in L2A_map_2 """combine newly and earlier processed L2A data""" blocksize = (2048, 2048) # must be equal to the blocksize of L2A_newTiles specified in L2A_map_2 L2A_newDBTiles = self.get_DB_objects('L2B', self.L2A_tiles, blocksize=blocksize) L2A_tiles = L2A_newTiles + L2A_newDBTiles L2B_tiles = MAP(L2B_map, L2A_tiles) # group results # FIXME nötig an dieser Stelle? grouped_L2B_Tiles = HLP_F.group_objects_by_attributes(L2B_tiles, 'scene_ID') [L2B_tiles_group[0].delete_tempFiles() for L2B_tiles_group in grouped_L2B_Tiles] L2B_resObjects = [L2B_P.L2B_object.from_tiles(tileList) for tileList in grouped_L2B_Tiles] self.L2B_newObjects = [obj for obj in L2B_resObjects if isinstance(obj, L2B_P.L2B_object)] self.failed_objects += [obj for obj in L2B_resObjects if isinstance(obj, failed_GMS_object) and obj.scene_ID not in self.sceneids_failed] return self.L2B_newObjects
[docs] def L2C_processing(self): """ Run Level 2C processing: accurracy assessment and MGRS tiling """ # FIXME only parallelization_level == 'scenes' implemented if self.config.exec_L2CP[0]: self.logger.info('\n\n########## Level 2C Processing started - calculation of quality layers ###########\n') """combine newly and earlier processed L2A data""" L2B_DBObjects = self.get_DB_objects('L2C', self.L2B_newObjects, parallLev='scenes') L2B_Instances = self.L2B_newObjects + L2B_DBObjects # combine newly and earlier processed L2A data L2C_resObjects = MAP(L2C_map, L2B_Instances, CPUs=8) # FIXME 8 workers due to heavy IO # FIXME in case of inmem_serialization mode results are too big to be back-pickled self.L2C_newObjects = [obj for obj in L2C_resObjects if isinstance(obj, L2C_P.L2C_object)] self.failed_objects += [obj for obj in L2C_resObjects if isinstance(obj, failed_GMS_object) and obj.scene_ID not in self.sceneids_failed] return self.L2C_newObjects
[docs] def update_DB_job_record(self): """ Update the database records of the current job (table 'jobs'). """ # TODO move this method to config.Job # update 'failed_sceneids' column of job record within jobs table sceneids_failed = list(set([obj.scene_ID for obj in self.failed_objects])) DB_T.update_records_in_postgreSQLdb( self.config.conn_database, 'jobs', {'failed_sceneids': sceneids_failed, # update 'failed_sceneids' column 'finishtime': self.config.end_time, # add job finish timestamp 'status': self.config.status}, # update 'job_status' column {'id': self.config.ID}, timeout=30000)
[docs] def update_DB_job_statistics(self, usecase_datalist): """ Update job statistics of the running job in the database. """ # TODO move this method to config.Job already_updated_IDs = [] for ds in usecase_datalist: if ds['proc_level'] is not None and ds['scene_ID'] not in already_updated_IDs: # update statistics column of jobs table DB_T.increment_decrement_arrayCol_in_postgreSQLdb( self.config.conn_database, 'jobs', 'statistics', cond_dict={'id': self.config.ID}, idx_val2decrement=db_jobs_statistics_def['pending'], idx_val2increment=db_jobs_statistics_def[ds['proc_level']], timeout=30000) # avoid double updating in case of subsystems belonging to the same scene ID already_updated_IDs.append(ds['scene_ID'])
[docs] def create_job_summary(self): """ Create job success summary """ # get objects with highest requested processing level highest_procL_Objs = [] for pL in reversed(proc_chain): if getattr(self.config, 'exec_%sP' % pL)[0]: highest_procL_Objs = \ getattr(self, '%s_newObjects' % pL) if pL != 'L2A' else (self.L2A_tiles or self.L2A_newObjects) break gms_objects2summarize = highest_procL_Objs + self.failed_objects if gms_objects2summarize: # create summaries detailed_JS, quick_JS = get_job_summary(gms_objects2summarize) detailed_JS.to_excel(os.path.join(self.config.path_job_logs, '%s_summary.xlsx' % self.config.ID)) detailed_JS.to_csv(os.path.join(self.config.path_job_logs, '%s_summary.csv' % self.config.ID), sep='\t') self.logger.info('\nQUICK JOB SUMMARY (ID %s):\n' % self.config.ID + quick_JS.to_string()) self.summary_detailed = detailed_JS self.summary_quick = quick_JS else: # TODO implement check if proc level with lowest procL has to be processed at all (due to job.exec_L1X) # TODO otherwise it is possible that get_job_summary receives an empty list self.logger.warning("Job summary skipped because get_job_summary() received an empty list of GMS objects.")
[docs] def clear_lists_procObj(self): self.failed_objects = [] self.L1A_newObjects = [] self.L1B_newObjects = [] self.L1C_newObjects = [] self.L2A_tiles = [] self.L2B_newObjects = [] self.L2C_newObjects = []
[docs]def get_job_summary(list_GMS_objects): # get detailed job summary DJS_cols = ['GMS_object', 'scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem', 'image_type', 'proc_level', 'arr_shape', 'arr_pos', 'failedMapper', 'ExceptionType', 'ExceptionValue', 'ExceptionTraceback'] DJS = DataFrame(columns=DJS_cols) DJS['GMS_object'] = list_GMS_objects for col in DJS_cols[1:]: def get_val(obj): return getattr(obj, col) if hasattr(obj, col) else None DJS[col] = list(DJS['GMS_object'].map(get_val)) del DJS['GMS_object'] DJS = DJS.sort_values(by=['satellite', 'sensor', 'entity_ID']) # get quick job summary QJS = DataFrame(columns=['satellite', 'sensor', 'count', 'proc_successfully', 'proc_failed']) all_sat, all_sen = zip(*[i.split('__') for i in (np.unique(DJS['satellite'] + '__' + DJS['sensor']))]) QJS['satellite'] = all_sat QJS['sensor'] = all_sen # count objects with the same satellite/sensor/sceneid combination QJS['count'] = [len(DJS[(DJS['satellite'] == sat) & (DJS['sensor'] == sen)]['scene_ID'].unique()) for sat, sen in zip(all_sat, all_sen)] QJS['proc_successfully'] = [len(DJS[(DJS['satellite'] == sat) & (DJS['sensor'] == sen) & (DJS['failedMapper'].isnull())]['scene_ID'].unique()) for sat, sen in zip(all_sat, all_sen)] QJS['proc_failed'] = QJS['count'] - QJS['proc_successfully'] QJS = QJS.sort_values(by=['satellite', 'sensor']) return DJS, QJS