Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
3# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
4#
5# Copyright (C) 2020 Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
6#
7# This software was developed within the context of the GeoMultiSens project funded
8# by the German Federal Ministry of Education and Research
9# (project grant code: 01 IS 14 010 A-C).
10#
11# This program is free software: you can redistribute it and/or modify it under
12# the terms of the GNU General Public License as published by the Free Software
13# Foundation, either version 3 of the License, or (at your option) any later version.
14# Please note the following exception: `gms_preprocessing` depends on tqdm, which
15# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
16# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
17# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
18#
19# This program is distributed in the hope that it will be useful, but WITHOUT
20# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
21# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
22# details.
23#
24# You should have received a copy of the GNU Lesser General Public License along
25# with this program. If not, see <http://www.gnu.org/licenses/>.
27import datetime
28import socket
29import os
30import warnings
31import numpy as np
32import builtins
33import re
34import psycopg2
35import psycopg2.extras
36from collections import OrderedDict
37from collections.abc import Mapping
38import multiprocessing
39from inspect import getargvalues, stack, signature, _empty
40import json
41from json import JSONDecodeError
42from jsmin import jsmin
43from cerberus import Validator
44import pkgutil
45import logging
46import time
47import psutil
48from pprint import pformat
49from typing import TYPE_CHECKING
51from .options_schema import gms_schema_input, gms_schema_config_output, parameter_mapping, get_param_from_json_config
52from ..version import __version__, __versionalias__
54if TYPE_CHECKING:
55 from gms_preprocessing.misc.database_tools import GMS_JOB # noqa F401 # flake8 issue
58__author__ = 'Daniel Scheffler'
61class GMS_configuration(object):
62 def __getattr__(self, attr):
63 if hasattr(builtins, 'GMS_JobConfig'):
64 if attr in ['job', 'usecase']:
65 # This is only to keep compatibility with HU-INF codes
66 return getattr(builtins, 'GMS_JobConfig')
67 return getattr(builtins.GMS_JobConfig, attr)
68 else:
69 raise EnvironmentError("Config has not been set already on this machine. Run 'set_config()' first!'")
71 def __repr__(self):
72 return getattr(builtins, 'GMS_JobConfig').__repr__()
75GMS_config = GMS_configuration() # type: JobConfig
78path_gmslib = os.path.dirname(pkgutil.get_loader("gms_preprocessing").path)
79path_options_default = os.path.join(path_gmslib, 'options', 'options_default.json')
82def set_config(job_ID, json_config='', reset_status=False, **kwargs):
83 """Set up a configuration for a new gms_preprocessing job!
85 :param job_ID: job ID of the job to be executed, e.g. 123456 (must be present in database)
86 :param json_config: path to JSON file containing configuration parameters or a string in JSON format
87 :param reset_status: whether to reset the job status or not (default=False)
88 :param kwargs: keyword arguments to be passed to JobConfig
89 NOTE: All keyword arguments given here WILL OVERRIDE configurations that have been
90 previously set via WebUI or via the json_config parameter!
92 :Keyword Arguments:
93 - inmem_serialization: False: write intermediate results to disk in order to save memory
94 True: keep intermediate results in memory in order to save IO time
95 - parallelization_level: <str> choices: 'scenes' - parallelization on scene-level
96 'tiles' - parallelization on tile-level
97 - db_host: host name of the server that runs the postgreSQL database
98 - spatial_index_server_host: host name of the server that runs the SpatialIndexMediator
99 - spatial_index_server_port: port used for connecting to SpatialIndexMediator
100 - delete_old_output: <bool> whether to delete previously created output of the given job ID
101 before running the job (default = False)
102 - exec_L1AP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
103 - exec_L1BP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
104 - exec_L1CP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
105 - exec_L2AP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
106 - exec_L2BP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
107 - exec_L2CP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
108 - CPUs: number of CPU cores to be used for processing (default: None -> use all available)
109 - allow_subMultiprocessing:
110 allow multiprocessing within workers
111 - disable_exception_handler:
112 enable/disable automatic handling of unexpected exceptions (default: True -> enabled)
113 - log_level: the logging level to be used (choices: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL';
114 default: 'INFO')
115 - tiling_block_size_XY:
116 X/Y block size to be used for any tiling process (default: (2048,2048)
117 - is_test: whether the current job represents a software test job (run by a test runner) or not
118 (default=False)
119 - profiling: enable/disable code profiling (default: False)
120 - benchmark_global:
121 enable/disable benchmark of the whole processing pipeline
122 - path_procdata_scenes:
123 output path to store processed scenes
124 - path_procdata_MGRS:
125 output path to store processed MGRS tiles
126 - path_archive: input path where downloaded data are stored
127 - virtual_sensor_id: 1: Landsat-8, 10: Sentinel-2A 10m
128 - datasetid_spatial_ref: 249 Sentinel-2A
129 :rtype: JobConfig
130 """
131 # validate kwargs
132 for k in kwargs:
133 if k not in parameter_mapping and k != 'db_host':
134 raise ValueError("'%s' is not a valid parameter. Valid parameters are: \n%s"
135 % (k, list(parameter_mapping.keys())))
137 #################################
138 # set GMS_JobConfig in builtins #
139 #################################
140 # FIXME virtual_sensor_id and datasetid_spatial_ref are not respected by JobConfig.
141 if not hasattr(builtins, 'GMS_JobConfig') or reset_status:
142 builtins.GMS_JobConfig = JobConfig(job_ID, json_config=json_config, **kwargs)
144 #####################
145 # check environment #
146 #####################
147 if not hasattr(builtins, 'GMS_EnvOK') or not getattr(builtins, 'GMS_EnvOK'):
148 # check environment
149 from ..misc import environment as ENV
150 logger = logging.getLogger('GMSEnvironmentChecker')
151 logger.setLevel(logging.DEBUG)
152 GMSEnv = ENV.GMSEnvironment()
153 GMSEnv.check_dependencies()
154 GMSEnv.check_read_write_permissions()
155 GMSEnv.ensure_properly_activated_GDAL()
156 GMSEnv.check_ecmwf_api_creds()
158 # close unclosed locks from previous runs
159 from ..misc.locks import release_unclosed_locks
160 release_unclosed_locks()
162 builtins.GMS_EnvOK = True
164 return getattr(builtins, 'GMS_JobConfig')
167def get_conn_database(hostname='localhost', timeout=3):
168 # type: (str, int) -> str
169 """Return database connection string.
171 :param hostname: the host that runs the GMS postgreSQL database
172 :param timeout: connection timeout in seconds
173 :return:
174 """
175 return "dbname='geomultisens' user='gmsdb' password='gmsdb' host='%s' connect_timeout=%d" \
176 % (hostname, timeout)
179class JobConfig(object):
180 def __init__(self, ID, json_config='', **user_opts):
181 """Create a job configuration
183 Workflow:
184 # 0. Environment
185 # 1. 2 Wege, wo JSON herkommen kann: per console-command oder aus Datenbank
186 # - bei console-command: GMS_JOB.from_... muss default-options in DB schreiben
187 # => zuerst JobConfig auf Basis von JSON erstellen
188 # 2. dann überschreiben mit user-defined parametern (entweder init-parameter oder db-settings per webapp)
190 :param ID: job ID of the job to be executed, e.g. 123456 (must be present in database)
191 :param json_config path to JSON file containing configuration parameters or a string in JSON format
192 :param user_opts keyword arguments as passed by gms_preprocessing.set_config()
193 """
194 # privates
195 self._DB_job_record = None # type: GMS_JOB
196 self._DB_config_table = None # type: dict
197 self._kwargs_defaults = None
199 # fixed attributes
200 # possible values: 'pending', 'running', 'canceled', 'failed', 'finished_with_warnings',
201 # 'finished_with_errors', 'finished'
202 self.status = 'pending'
203 self.start_time = datetime.datetime.now()
204 self.end_time = None
205 self.computation_time = None
206 self.hostname = socket.gethostname()
207 self.version = __version__
208 self.versionalias = __versionalias__
210 #######################
211 # POPULATE PARAMETERS #
212 #######################
214 # args
215 self.ID = ID
216 self.json_config = json_config
217 self.kwargs = user_opts
219 # database connection
220 self.db_host = user_opts['db_host']
221 self.conn_database = get_conn_database(hostname=self.db_host)
223 # get validated options dict from JSON-options
224 self.json_opts_fused_valid = self.get_json_opts(validate=True)
226 gp = self.get_parameter
228 ##################
229 # global options #
230 ##################
232 self.inmem_serialization = gp('inmem_serialization')
233 self.parallelization_level = gp('parallelization_level')
234 self.spatial_index_server_host = gp('spatial_index_server_host')
235 self.spatial_index_server_port = gp('spatial_index_server_port')
236 self.CPUs = gp('CPUs', fallback=multiprocessing.cpu_count())
237 self.CPUs_all_jobs = gp('CPUs_all_jobs')
238 self.max_mem_usage = gp('max_mem_usage')
239 self.critical_mem_usage = gp('critical_mem_usage')
240 self.max_parallel_reads_writes = gp('max_parallel_reads_writes')
241 self.allow_subMultiprocessing = gp('allow_subMultiprocessing')
242 self.delete_old_output = gp('delete_old_output')
243 self.disable_exception_handler = gp('disable_exception_handler')
244 self.disable_IO_locks = gp('disable_IO_locks')
245 self.disable_CPU_locks = gp('disable_CPU_locks')
246 self.disable_DB_locks = gp('disable_DB_locks')
247 self.disable_memory_locks = gp('disable_memory_locks')
248 self.min_version_mem_usage_stats = gp('min_version_mem_usage_stats')
249 self.log_level = gp('log_level')
250 self.tiling_block_size_XY = gp('tiling_block_size_XY')
251 self.is_test = gp('is_test')
252 self.profiling = gp('profiling')
253 self.benchmark_global = gp('benchmark_global')
255 #########
256 # paths #
257 #########
259 # external
260 self.path_spatIdxSrv = self.DB_config_table['path_spatial_index_mediator_server']
261 self.path_tempdir = self.absP(self.DB_config_table['path_tempdir'])
262 self.path_custom_sicor_options = gp('path_custom_sicor_options')
263 self.path_dem_proc_srtm_90m = self.absP(self.DB_config_table['path_dem_proc_srtm_90m'])
264 self.path_spechomo_classif = gp('path_spechomo_classif')
266 # internal (included in gms_preprocessing repository)
267 self.path_earthSunDist = self.joinP(path_gmslib, 'database', 'earth_sun_distance',
268 'Earth_Sun_distances_per_day_edited.csv')
269 self.path_SNR_models = self.joinP(path_gmslib, 'database', 'snr')
270 self.path_cloud_classif = self.joinP(path_gmslib, 'database', 'cloud_classifier')
271 self.path_solar_irr = self.joinP(path_gmslib, 'database', 'solar_irradiance',
272 'SUNp1fontenla__350-2500nm_@0.1nm_converted.txt')
274 if not self.is_test:
275 def get_dbpath(dbkey):
276 return self.joinP(self.path_fileserver, self.DB_config_table[dbkey])
278 # normal mode
279 self.path_fileserver = self.DB_config_table['path_data_root']
280 self.path_archive = gp('path_archive', fallback=get_dbpath('foldername_download'))
281 self.path_procdata_scenes = gp('path_procdata_scenes', fallback=get_dbpath('foldername_procdata_scenes'))
282 self.path_procdata_MGRS = gp('path_procdata_MGRS', fallback=get_dbpath('foldername_procdata_MGRS'))
283 self.path_ECMWF_db = gp('path_ECMWF_db', fallback=self.absP(self.DB_config_table['path_ECMWF_db']))
284 self.path_benchmarks = gp('path_benchmarks', fallback=self.absP(self.DB_config_table['path_benchmarks']))
285 self.path_job_logs = gp('path_job_logs', fallback=get_dbpath('foldername_job_logs'))
287 else:
288 # software test mode, the repository should be self-contained -> use only relative paths
289 self.path_fileserver = self.joinP(path_gmslib, '..', 'tests', 'data')
290 self.path_archive = self.joinP(path_gmslib, '..', 'tests', 'data', 'archive_data')
291 self.path_procdata_scenes = self.joinP(path_gmslib, '..', 'tests', 'data', 'output_scenes')
292 # self.path_procdata_scenes = '/data1/gms/mount/db/data/processed_scenes/test_loggingerrors/' # FIXME
293 self.path_procdata_MGRS = self.joinP(path_gmslib, '..', 'tests', 'data', 'output_mgrs_tiles')
294 self.path_ECMWF_db = self.joinP(path_gmslib, '..', 'tests', 'data', 'processed_ECMWF')
295 self.path_benchmarks = self.joinP(path_gmslib, '..', 'tests', 'data', 'benchmarks')
296 self.path_job_logs = self.joinP(path_gmslib, '..', 'tests', 'data', 'job_logs')
297 # self.path_job_logs = '/data1/gms/mount/db/data/job_logs/' # FIXME
299 ###########################
300 # processor configuration #
301 ###########################
303 # general_opts
304 self.skip_thermal = gp('skip_thermal')
305 self.skip_pan = gp('skip_pan')
306 self.sort_bands_by_cwl = gp('sort_bands_by_cwl')
307 self.target_radunit_optical = gp('target_radunit_optical')
308 self.target_radunit_thermal = gp('target_radunit_thermal')
309 self.scale_factor_TOARef = gp('scale_factor_TOARef')
310 self.scale_factor_BOARef = gp('scale_factor_BOARef')
311 self.mgrs_pixel_buffer = gp('mgrs_pixel_buffer')
312 self.output_data_compression = gp('output_data_compression')
313 self.write_ENVIclassif_cloudmask = gp('write_ENVIclassif_cloudmask')
315 # processor specific opts
317 # L1A
318 self.exec_L1AP = gp('exec_L1AP')
319 self.SZA_SAA_calculation_accurracy = gp('SZA_SAA_calculation_accurracy')
320 self.export_VZA_SZA_SAA_RAA_stats = gp('export_VZA_SZA_SAA_RAA_stats')
322 # L1B
323 self.exec_L1BP = gp('exec_L1BP')
324 self.skip_coreg = gp('skip_coreg')
325 self.spatial_ref_min_overlap = gp('spatial_ref_min_overlap')
326 self.spatial_ref_min_cloudcov = gp('spatial_ref_min_cloudcov')
327 self.spatial_ref_max_cloudcov = gp('spatial_ref_max_cloudcov')
328 self.spatial_ref_plusminus_days = gp('spatial_ref_plusminus_days')
329 self.spatial_ref_plusminus_years = gp('spatial_ref_plusminus_years')
330 self.coreg_band_wavelength_for_matching = gp('coreg_band_wavelength_for_matching')
331 self.coreg_max_shift_allowed = gp('coreg_max_shift_allowed')
332 self.coreg_window_size = gp('coreg_window_size')
334 # L1C
335 self.exec_L1CP = gp('exec_L1CP')
336 self.cloud_masking_algorithm = gp('cloud_masking_algorithm')
337 self.export_L1C_obj_dumps = gp('export_L1C_obj_dumps')
338 self.auto_download_ecmwf = gp('auto_download_ecmwf')
339 self.ac_fillnonclear_areas = gp('ac_fillnonclear_areas')
340 self.ac_clear_area_labels = gp('ac_clear_area_labels')
341 self.ac_scale_factor_errors = gp('ac_scale_factor_errors')
342 self.ac_max_ram_gb = gp('ac_max_ram_gb')
343 self.ac_estimate_accuracy = gp('ac_estimate_accuracy')
344 self.ac_bandwise_accuracy = gp('ac_bandwise_accuracy')
346 # L2A
347 self.exec_L2AP = gp('exec_L2AP')
348 self.align_coord_grids = gp('align_coord_grids')
349 self.match_gsd = gp('match_gsd')
350 self.spatial_resamp_alg = gp('spatial_resamp_alg')
351 self.clip_to_extent = gp('clip_to_extent')
352 self.spathomo_estimate_accuracy = gp('spathomo_estimate_accuracy')
354 # L2B
355 self.exec_L2BP = gp('exec_L2BP')
356 self.spechomo_method = gp('spechomo_method')
357 self.spechomo_n_clusters = gp('spechomo_n_clusters')
358 self.spechomo_rfr_n_trees = 50 # this is static confic value, not a user option
359 self.spechomo_rfr_tree_depth = 10 # this is static confic value, not a user option
360 self.spechomo_classif_alg = gp('spechomo_classif_alg')
361 self.spechomo_kNN_n_neighbors = gp('spechomo_kNN_n_neighbors')
362 self.spechomo_estimate_accuracy = gp('spechomo_estimate_accuracy')
363 self.spechomo_bandwise_accuracy = gp('spechomo_bandwise_accuracy')
365 if self.spechomo_method == 'RFR':
366 raise NotImplementedError("The spectral harmonization method 'RFR' is currently not completely implemented."
367 "Please us another one.")
368 # FIXME RFR classifiers are missing (cannot be added to the repository to to file size > 1 GB)
370 # L2C
371 self.exec_L2CP = gp('exec_L2CP')
373 ################################
374 # target sensor specifications #
375 ################################
377 self.virtual_sensor_id = gp('virtual_sensor_id', attr_db_job_record='virtualsensorid')
378 # FIXME Why is datasetid_spatial_ref missing in virtual_sensors table
379 self.datasetid_spatial_ref = gp('datasetid_spatial_ref', attr_db_job_record='datasetid_spatial_ref')
381 VSSpecs = self.get_virtual_sensor_specs()
382 self.virtual_sensor_name = VSSpecs['name']
384 # spectral specifications
385 self.datasetid_spectral_ref = VSSpecs['spectral_characteristics_datasetid'] # None in case of custom sensor
386 # FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen:
387 self.target_CWL = VSSpecs['wavelengths_pos']
388 # FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen:
389 self.target_FWHM = VSSpecs['band_width']
391 # spatial specifications
392 target_gsd_tmp = VSSpecs['spatial_resolution'] # table features only 1 value for X/Y-dims FIXME user inputs?
393 # FIXME target GSD setting is a duplicate to datasetid_spatial_ref!
394 self.target_gsd = xgsd, ygsd = \
395 [target_gsd_tmp]*2 if isinstance(target_gsd_tmp, (int, float)) else target_gsd_tmp
396 self.target_epsg_code = VSSpecs['projection_epsg']
397 # FIXME values in case user defines only Landsat?
398 self.spatial_ref_gridx = np.arange(xgsd / 2., xgsd / 2. + 2 * xgsd, xgsd).tolist() # e.g. [15, 45]
399 self.spatial_ref_gridy = np.arange(ygsd / 2., ygsd / 2. + 2 * ygsd, ygsd).tolist()
401 #############
402 # data list #
403 #############
405 self.data_list = self.get_data_list_of_current_jobID()
407 ############
408 # validate #
409 ############
410 self.validate_exec_configs()
412 GMSValidator(allow_unknown=True, schema=gms_schema_config_output).validate(self.to_dict())
414 # check if parallel read/write processes have been limited on a storage mountpoint shared between multiple hosts
415 if self.max_parallel_reads_writes != 0:
416 self.check_no_read_write_limit_on_xtfs_mountpoint()
418 def get_init_argskwargs(self, ignore=("logger",)):
419 """
420 Return a tuple containing dictionary of calling function's. named arguments and a list of
421 calling function's unnamed positional arguments.
422 """
424 posname, kwname, argskwargs = getargvalues(stack()[1][0])[-3:]
425 argskwargs.update(argskwargs.pop(kwname, []))
426 argskwargs = {k: v for k, v in argskwargs.items() if k not in ignore and k != 'self' and not k.startswith('__')}
427 sig = signature(self.__init__)
428 argsnames = [k for k in sig.parameters if sig.parameters[k].default == _empty]
429 return {'args': {k: v for k, v in argskwargs.items() if k in argsnames},
430 'kwargs': {k: v for k, v in argskwargs.items() if k not in argsnames}}
432 def get_parameter(self, key_user_opts, attr_db_job_record='', fallback=None):
433 # 1. JobConfig parameters: parameters that are directly passed to JobConfig
434 if key_user_opts in self.kwargs:
435 return self.kwargs[key_user_opts]
437 # 2. WebUI parameters: parameters that have been defined via WebUI
438 if attr_db_job_record:
439 return getattr(self.DB_job_record, attr_db_job_record)
441 # 3. JSON parameters: parameters that have been defined via JSON Input (command line or advanced UI params)
442 val_json = get_param_from_json_config(key_user_opts, self.json_opts_fused_valid)
443 if val_json or val_json is False:
444 return val_json
446 # fallback: if nothing has been returned until here
447 return fallback
449 @property
450 def DB_job_record(self):
451 # type: () -> GMS_JOB
452 if not self._DB_job_record:
453 # check if job ID exists in database
454 from ..misc.database_tools import GMS_JOB # noqa F811 # redefinition of unused 'GMS_JOB' from line 22
455 try:
456 self._DB_job_record = GMS_JOB(self.conn_database).from_job_ID(self.ID)
457 except ValueError:
458 raise
460 return self._DB_job_record
462 @property
463 def DB_config_table(self):
464 # type: () -> dict
465 """Returns the content of the config table of the postgreSQL database as dictionary."""
466 if not self._DB_config_table:
467 from ..misc.database_tools import get_info_from_postgreSQLdb
468 db_cfg = dict(get_info_from_postgreSQLdb(self.conn_database, 'config', ['key', 'value']))
470 # convert relative to absolute paths
471 self._DB_config_table = {k: self.absP(v) if k.startswith('path_') and v.startswith('./') else v
472 for k, v in db_cfg.items()}
474 return self._DB_config_table
476 def get_virtual_sensor_specs(self):
477 # type: () -> dict
478 """Returns the content of the virtual_sensors table of the postgreSQL database as dictionary."""
479 from ..misc.database_tools import get_info_from_postgreSQLdb
481 # column spectral_characteristics_datasetid is not used later because its given by jobs.datasetid_spatial_ref
482 cols2read = ['name', 'projection_epsg', 'spatial_resolution', 'spectral_characteristics_datasetid',
483 'wavelengths_pos', 'band_width']
485 res = get_info_from_postgreSQLdb(self.conn_database, 'virtual_sensors',
486 cols2read, {'id': self.virtual_sensor_id})[0]
488 VSSpecs = dict()
489 for i, col in enumerate(cols2read):
490 val = res[i]
491 if col == 'spectral_characteristics_datasetid' and val == -1: # nodata value
492 val = None
494 VSSpecs[col] = val
496 return VSSpecs
498 def get_json_opts(self, validate=True):
499 """Get a dictionary of GMS config parameters according to the jobs table of the database.
501 NOTE: Reads the default options from options_default.json and updates the values with those from database.
502 """
503 def update_dict(d, u):
504 for k, v in u.items():
505 if isinstance(v, Mapping):
506 d[k] = update_dict(d.get(k, {}), v)
507 else:
508 d[k] = v
509 return d
511 # read options_default.json
512 default_options = get_options(path_options_default, validation=validate)
514 #############################################
515 # update default options with those from DB #
516 #############################################
518 if self.DB_job_record.analysis_parameter:
519 try:
520 params_dict = json.loads(jsmin(self.DB_job_record.analysis_parameter))
521 except JSONDecodeError:
522 warnings.warn('The advanced options given in the WebUI could not be decoded. '
523 'JSON decoder failed with the following error:')
524 raise
526 # convert values to useful data types and update the default values
527 params_dict = json_to_python(params_dict)
528 update_dict(default_options, params_dict)
530 ###############################################################################################################
531 # if json config is provided (via python bindings or CLI parser -> override all options with that json config #
532 ###############################################################################################################
534 if self.json_config:
535 if self.json_config.startswith("{"):
536 try:
537 params_dict = json.loads(jsmin(self.json_config))
538 except JSONDecodeError:
539 warnings.warn('The given JSON options string could not be decoded. '
540 'JSON decoder failed with the following error:')
541 raise
542 elif os.path.isfile(self.json_config):
543 try:
544 with open(self.json_config, 'r') as inF:
545 params_dict = json.loads(jsmin(inF.read()))
546 except JSONDecodeError:
547 warnings.warn('The given JSON options file %s could not be decoded. '
548 'JSON decoder failed with the following error:' % self.json_config)
549 raise
551 else:
552 raise ValueError("The parameter 'json_config' must be a JSON formatted string or a JSON file on disk.")
554 # convert values to useful data types and update the default values
555 params_dict = json_to_python(params_dict)
556 update_dict(default_options, params_dict)
558 if validate:
559 GMSValidator(allow_unknown=True, schema=gms_schema_input).validate(default_options)
561 json_options = default_options
562 return json_options
564 @staticmethod
565 def absP(relP):
566 return os.path.abspath(os.path.join(os.path.dirname(__file__), relP))
568 @staticmethod
569 def joinP(*items):
570 return os.path.join(*items)
572 def get_data_list_of_current_jobID(self):
573 """
574 Get a list of datasets to be processed from database and return it together with some metadata.
576 :return: <list> of OrderedDicts, e.g. [OrderedDict([('proc_level', None), ('scene_ID', 5895940),
577 ('datasetid', 104), ('image_type', 'RSD'), ('satellite', 'Landsat-8'), ('sensor', 'OLI_TIRS'),
578 ('subsystem', ''), ('acquisition_date', datetime.datetime(2015, 2, 5, 10, 2, 52)),
579 ('entity_ID', 'LC81930242015036LGN00'), ('filename', 'LC81930242015036LGN00.tar.gz'),
580 ('sensormode', 'M'), ('logger', None)]), ...]
581 """
582 from ..model.metadata import get_sensormode
583 data_list = []
584 with psycopg2.connect(self.conn_database) as conn:
585 with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
586 cur.execute(r"""
587 WITH jobs_unnested AS (
588 SELECT id, unnest(sceneids) AS sceneid FROM jobs
589 )
590 SELECT jobs_unnested.sceneid,
591 scenes.datasetid,
592 scenes.acquisitiondate,
593 scenes.entityid,
594 scenes.filename,
595 COALESCE(scenes_proc.proc_level::text, 'L1A') AS proc_level,
596 datasets.image_type,
597 satellites.name AS satellite,
598 sensors.name AS sensor,
599 subsystems.name AS subsystem
600 FROM jobs_unnested
601 LEFT OUTER JOIN scenes ON scenes.id = jobs_unnested.sceneid
602 LEFT OUTER JOIN scenes_proc ON scenes_proc.sceneid = jobs_unnested.sceneid
603 LEFT OUTER JOIN datasets ON datasets.id = datasetid
604 LEFT OUTER JOIN satellites ON satellites.id = satelliteid
605 LEFT OUTER JOIN sensors ON sensors.id = sensorid
606 LEFT OUTER JOIN subsystems ON subsystems.id = subsystemid
607 WHERE jobs_unnested.id = %s
608 """,
609 (self.ID,))
611 for row in cur.fetchall():
612 ds = OrderedDict()
613 ds["proc_level"] = row["proc_level"] if not self.is_test else None
614 ds["scene_ID"] = row["sceneid"]
615 ds["dataset_ID"] = row["datasetid"]
616 ds["image_type"] = row["image_type"]
617 ds["satellite"] = row["satellite"]
618 ds["sensor"] = row["sensor"]
619 ds["subsystem"] = row["subsystem"]
620 ds["acq_datetime"] = row["acquisitiondate"]
621 ds["entity_ID"] = row["entityid"]
622 ds["filename"] = row["filename"]
624 ds['sensor'] = 'ETM+' if re.search(r'ETM+', ds['sensor']) else ds['sensor']
625 if self.skip_thermal and ds['subsystem'] == 'TIR':
626 continue # removes ASTER TIR in case of skip_thermal
627 ds['subsystem'] = '' if ds['subsystem'] is None else ds['subsystem']
628 ds['sensormode'] = get_sensormode(ds)
629 if self.skip_pan and ds['sensormode'] == 'P':
630 continue # removes e.g. SPOT PAN in case of skip_pan
632 if re.search(r"Sentinel-2A", ds['satellite'], re.I):
633 for subsystem in ['S2A10', 'S2A20', 'S2A60']:
634 sub_ds = ds.copy()
635 sub_ds['subsystem'] = subsystem
636 data_list.append(sub_ds)
637 elif re.search(r"Sentinel-2B", ds['satellite'], re.I):
638 for subsystem in ['S2B10', 'S2B20', 'S2B60']:
639 sub_ds = ds.copy()
640 sub_ds['subsystem'] = subsystem
641 data_list.append(sub_ds)
642 elif re.search(r"Terra", ds['satellite'], re.I):
643 for subsystem in ['VNIR1', 'VNIR2', 'SWIR', 'TIR']:
644 sub_ds = ds.copy()
645 sub_ds['subsystem'] = subsystem
646 data_list.append(sub_ds)
647 else:
648 data_list.append(ds)
650 self.data_list = data_list
651 return self.data_list
653 def validate_exec_configs(self):
654 for i in ['L1AP', 'L1BP', 'L1CP', 'L2AP', 'L2BP', 'L2CP']:
655 exec_lvl = getattr(self, 'exec_%s' % i)
657 if exec_lvl is None:
658 continue
660 # check input format
661 if all([len(exec_lvl) == 3, (np.array(exec_lvl) == np.array(np.array(exec_lvl, np.bool), np.int)).all()]):
662 execute, write, delete = exec_lvl
664 # written output cannot be turned off in execution mode 'Python'
665 if not self.inmem_serialization and execute and not write:
666 warnings.warn("If CFG.inmem_serialization is False the output writer for %s has to be enabled "
667 "because any operations on GMS_obj.arr read the intermediate results from disk. "
668 "Turning it on.." % i)
669 write = True
670 setattr(self, 'exec_%s' % i, [execute, write, delete])
672 else:
673 raise ValueError('Execution mode must be provided as list of 3 elements containing only boolean '
674 'values. Got %s for %s.' % (exec_lvl, i))
676 def check_no_read_write_limit_on_xtfs_mountpoint(self):
677 intensive_IO_paths = [self.path_fileserver, self.path_archive, self.path_benchmarks,
678 self.path_dem_proc_srtm_90m, self.path_ECMWF_db, self.path_procdata_MGRS,
679 self.path_procdata_scenes]
681 mount_points = {el.mountpoint: el for el in psutil.disk_partitions(all=True)}
683 for path in intensive_IO_paths:
684 for mp, mp_object in mount_points.items():
685 if path.startswith(mp) and mp_object.device.startswith('xtreemfs'):
686 warnings.warn("Path %s appears to be on an XtreemFS mountpoint. It is highly recommended to set "
687 "the configuration parameter 'max_parallel_reads_writes' to 0 in that case! "
688 "Otherwise read/write processes might be slowed down! Continuing in 20 seconds.."
689 % path)
690 time.sleep(20)
691 break
693 def to_dict(self):
694 """Generate a dictionary in the same structure like the one in options_default.json from the current config."""
695 opts_default = get_options(path_options_default)
697 # add all keys included in options_default.json
698 outdict = dict()
699 for key in opts_default.keys():
700 if not isinstance(opts_default[key], (dict, OrderedDict)):
701 outdict[key] = getattr(self, key)
702 else:
703 group = key
704 if group not in outdict:
705 outdict[group] = dict()
707 for group_key in opts_default[group]:
708 if not isinstance(opts_default[group][group_key], (dict, OrderedDict)):
709 outdict[group][group_key] = getattr(self, group_key)
710 else:
711 subgroup = group_key
712 if subgroup not in outdict[group]:
713 outdict[group][subgroup] = dict()
715 for subgroup_key in opts_default[group][subgroup]:
716 try:
717 outdict[group][subgroup][subgroup_key] = getattr(self, subgroup_key)
718 except AttributeError:
719 procexec_keys = ['run_processor', 'write_output', 'delete_output']
720 if subgroup_key in procexec_keys:
721 proc_code = subgroup
722 outdict[group][subgroup][subgroup_key] = \
723 getattr(self, 'exec_%sP' % proc_code)[procexec_keys.index(subgroup_key)]
724 else:
725 raise
727 # add job metadata
728 outdict.update(dict(
729 job_meta={k: getattr(self, k) for k in ['ID', 'start_time', 'end_time', 'computation_time', 'hostname']},
730 data_list={'dataset_%s' % i: ds for i, ds in enumerate(self.data_list)}))
732 # add data_list
734 return outdict
736 def to_jsonable_dict(self):
737 # type: (JobConfig) -> dict
738 return python_to_json(self.to_dict())
740 def save(self, path_outfile):
741 """Save the JobConfig instance to a JSON file in the same structure like the one in options_default.json.
743 :param path_outfile: path of the output JSON file
744 """
745 with open(path_outfile, 'w') as outF:
746 json.dump(self.to_jsonable_dict(), outF, skipkeys=False, indent=4)
748 def __repr__(self):
749 return pformat(self.to_dict())
752def is_GMSConfig_available():
753 try:
754 if GMS_config is not None:
755 return True
756 except (EnvironmentError, OSError):
757 return False
760def json_to_python(value):
761 def is_number(s):
762 try:
763 float(s)
764 return True
765 except ValueError:
766 return False
768 if type(value) is dict:
769 return {json_to_python(k): json_to_python(v) for k, v in value.items()}
770 elif type(value) is list:
771 return [json_to_python(v) for v in value]
772 else:
773 if value == "None":
774 return None
775 if value == "slice(None, None, None)":
776 return slice(None)
777 if value in [True, "true"]:
778 return True
779 if value in [False, "false"]:
780 return False
781 if is_number(value):
782 try:
783 if str(int(value)) != str(float(value)):
784 return int(value)
785 else:
786 return float(value)
787 except ValueError:
788 return float(value)
789 else:
790 return value
793def python_to_json(value):
794 if type(value) in [dict, OrderedDict]:
795 return {python_to_json(k): python_to_json(v) for k, v in value.items()}
796 elif type(value) is list:
797 return [python_to_json(v) for v in value]
798 elif type(value) is np.ndarray:
799 return [python_to_json(v) for v in value.tolist()]
800 else:
801 if value is None:
802 return "None"
803 if value is slice(None):
804 return "slice(None, None, None)"
805 if value is True:
806 return "true"
807 if value is False:
808 return "false"
809 if type(value) is datetime.datetime:
810 return datetime.datetime.strftime(value, '%Y-%m-%d %H:%M:%S.%f%z')
811 else:
812 return value
815class GMSValidator(Validator):
816 def __init__(self, *args, **kwargs):
817 """
819 :param args: Arguments to be passed to cerberus.Validator
820 :param kwargs: Keyword arguments to be passed to cerberus.Validator
821 """
822 super(GMSValidator, self).__init__(*args, **kwargs)
824 def validate(self, document2validate, **kwargs):
825 if super(GMSValidator, self).validate(document=document2validate, **kwargs) is False:
826 raise ValueError("Options is malformed: %s" % str(self.errors))
829def get_options(target, validation=True):
830 """Return dictionary with all options.
832 :param validation: True / False, whether to validate options read from files or not
833 :param target: if path to file, then json is used to load, otherwise the default template is used
834 :return: dictionary with options
835 """
837 if os.path.isfile(target):
838 with open(target, "r") as fl:
839 options = json_to_python(json.loads(jsmin(fl.read())))
841 if validation is True:
842 GMSValidator(allow_unknown=True, schema=gms_schema_input).validate(options)
844 return options
845 else:
846 raise FileNotFoundError("Options file not found at file path %s." % target)