Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2 

3# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data 

4# 

5# Copyright (C) 2020 Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de) 

6# 

7# This software was developed within the context of the GeoMultiSens project funded 

8# by the German Federal Ministry of Education and Research 

9# (project grant code: 01 IS 14 010 A-C). 

10# 

11# This program is free software: you can redistribute it and/or modify it under 

12# the terms of the GNU General Public License as published by the Free Software 

13# Foundation, either version 3 of the License, or (at your option) any later version. 

14# Please note the following exception: `gms_preprocessing` depends on tqdm, which 

15# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files 

16# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore". 

17# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE. 

18# 

19# This program is distributed in the hope that it will be useful, but WITHOUT 

20# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

21# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

22# details. 

23# 

24# You should have received a copy of the GNU Lesser General Public License along 

25# with this program. If not, see <http://www.gnu.org/licenses/>. 

26 

27import datetime 

28import socket 

29import os 

30import warnings 

31import numpy as np 

32import builtins 

33import re 

34import psycopg2 

35import psycopg2.extras 

36from collections import OrderedDict 

37from collections.abc import Mapping 

38import multiprocessing 

39from inspect import getargvalues, stack, signature, _empty 

40import json 

41from json import JSONDecodeError 

42from jsmin import jsmin 

43from cerberus import Validator 

44import pkgutil 

45import logging 

46import time 

47import psutil 

48from pprint import pformat 

49from typing import TYPE_CHECKING 

50 

51from .options_schema import gms_schema_input, gms_schema_config_output, parameter_mapping, get_param_from_json_config 

52from ..version import __version__, __versionalias__ 

53 

54if TYPE_CHECKING: 

55 from gms_preprocessing.misc.database_tools import GMS_JOB # noqa F401 # flake8 issue 

56 

57 

58__author__ = 'Daniel Scheffler' 

59 

60 

61class GMS_configuration(object): 

62 def __getattr__(self, attr): 

63 if hasattr(builtins, 'GMS_JobConfig'): 

64 if attr in ['job', 'usecase']: 

65 # This is only to keep compatibility with HU-INF codes 

66 return getattr(builtins, 'GMS_JobConfig') 

67 return getattr(builtins.GMS_JobConfig, attr) 

68 else: 

69 raise EnvironmentError("Config has not been set already on this machine. Run 'set_config()' first!'") 

70 

71 def __repr__(self): 

72 return getattr(builtins, 'GMS_JobConfig').__repr__() 

73 

74 

75GMS_config = GMS_configuration() # type: JobConfig 

76 

77 

78path_gmslib = os.path.dirname(pkgutil.get_loader("gms_preprocessing").path) 

79path_options_default = os.path.join(path_gmslib, 'options', 'options_default.json') 

80 

81 

82def set_config(job_ID, json_config='', reset_status=False, **kwargs): 

83 """Set up a configuration for a new gms_preprocessing job! 

84 

85 :param job_ID: job ID of the job to be executed, e.g. 123456 (must be present in database) 

86 :param json_config: path to JSON file containing configuration parameters or a string in JSON format 

87 :param reset_status: whether to reset the job status or not (default=False) 

88 :param kwargs: keyword arguments to be passed to JobConfig 

89 NOTE: All keyword arguments given here WILL OVERRIDE configurations that have been 

90 previously set via WebUI or via the json_config parameter! 

91 

92 :Keyword Arguments: 

93 - inmem_serialization: False: write intermediate results to disk in order to save memory 

94 True: keep intermediate results in memory in order to save IO time 

95 - parallelization_level: <str> choices: 'scenes' - parallelization on scene-level 

96 'tiles' - parallelization on tile-level 

97 - db_host: host name of the server that runs the postgreSQL database 

98 - spatial_index_server_host: host name of the server that runs the SpatialIndexMediator 

99 - spatial_index_server_port: port used for connecting to SpatialIndexMediator 

100 - delete_old_output: <bool> whether to delete previously created output of the given job ID 

101 before running the job (default = False) 

102 - exec_L1AP: list of 3 elements: [run processor, write output, delete output if not needed anymore] 

103 - exec_L1BP: list of 3 elements: [run processor, write output, delete output if not needed anymore] 

104 - exec_L1CP: list of 3 elements: [run processor, write output, delete output if not needed anymore] 

105 - exec_L2AP: list of 3 elements: [run processor, write output, delete output if not needed anymore] 

106 - exec_L2BP: list of 3 elements: [run processor, write output, delete output if not needed anymore] 

107 - exec_L2CP: list of 3 elements: [run processor, write output, delete output if not needed anymore] 

108 - CPUs: number of CPU cores to be used for processing (default: None -> use all available) 

109 - allow_subMultiprocessing: 

110 allow multiprocessing within workers 

111 - disable_exception_handler: 

112 enable/disable automatic handling of unexpected exceptions (default: True -> enabled) 

113 - log_level: the logging level to be used (choices: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'; 

114 default: 'INFO') 

115 - tiling_block_size_XY: 

116 X/Y block size to be used for any tiling process (default: (2048,2048) 

117 - is_test: whether the current job represents a software test job (run by a test runner) or not 

118 (default=False) 

119 - profiling: enable/disable code profiling (default: False) 

120 - benchmark_global: 

121 enable/disable benchmark of the whole processing pipeline 

122 - path_procdata_scenes: 

123 output path to store processed scenes 

124 - path_procdata_MGRS: 

125 output path to store processed MGRS tiles 

126 - path_archive: input path where downloaded data are stored 

127 - virtual_sensor_id: 1: Landsat-8, 10: Sentinel-2A 10m 

128 - datasetid_spatial_ref: 249 Sentinel-2A 

129 :rtype: JobConfig 

130 """ 

131 # validate kwargs 

132 for k in kwargs: 

133 if k not in parameter_mapping and k != 'db_host': 

134 raise ValueError("'%s' is not a valid parameter. Valid parameters are: \n%s" 

135 % (k, list(parameter_mapping.keys()))) 

136 

137 ################################# 

138 # set GMS_JobConfig in builtins # 

139 ################################# 

140 # FIXME virtual_sensor_id and datasetid_spatial_ref are not respected by JobConfig. 

141 if not hasattr(builtins, 'GMS_JobConfig') or reset_status: 

142 builtins.GMS_JobConfig = JobConfig(job_ID, json_config=json_config, **kwargs) 

143 

144 ##################### 

145 # check environment # 

146 ##################### 

147 if not hasattr(builtins, 'GMS_EnvOK') or not getattr(builtins, 'GMS_EnvOK'): 

148 # check environment 

149 from ..misc import environment as ENV 

150 logger = logging.getLogger('GMSEnvironmentChecker') 

151 logger.setLevel(logging.DEBUG) 

152 GMSEnv = ENV.GMSEnvironment() 

153 GMSEnv.check_dependencies() 

154 GMSEnv.check_read_write_permissions() 

155 GMSEnv.ensure_properly_activated_GDAL() 

156 GMSEnv.check_ecmwf_api_creds() 

157 

158 # close unclosed locks from previous runs 

159 from ..misc.locks import release_unclosed_locks 

160 release_unclosed_locks() 

161 

162 builtins.GMS_EnvOK = True 

163 

164 return getattr(builtins, 'GMS_JobConfig') 

165 

166 

167def get_conn_database(hostname='localhost', timeout=3): 

168 # type: (str, int) -> str 

169 """Return database connection string. 

170 

171 :param hostname: the host that runs the GMS postgreSQL database 

172 :param timeout: connection timeout in seconds 

173 :return: 

174 """ 

175 return "dbname='geomultisens' user='gmsdb' password='gmsdb' host='%s' connect_timeout=%d" \ 

176 % (hostname, timeout) 

177 

178 

179class JobConfig(object): 

180 def __init__(self, ID, json_config='', **user_opts): 

181 """Create a job configuration 

182 

183 Workflow: 

184 # 0. Environment 

185 # 1. 2 Wege, wo JSON herkommen kann: per console-command oder aus Datenbank 

186 # - bei console-command: GMS_JOB.from_... muss default-options in DB schreiben 

187 # => zuerst JobConfig auf Basis von JSON erstellen 

188 # 2. dann überschreiben mit user-defined parametern (entweder init-parameter oder db-settings per webapp) 

189 

190 :param ID: job ID of the job to be executed, e.g. 123456 (must be present in database) 

191 :param json_config path to JSON file containing configuration parameters or a string in JSON format 

192 :param user_opts keyword arguments as passed by gms_preprocessing.set_config() 

193 """ 

194 # privates 

195 self._DB_job_record = None # type: GMS_JOB 

196 self._DB_config_table = None # type: dict 

197 self._kwargs_defaults = None 

198 

199 # fixed attributes 

200 # possible values: 'pending', 'running', 'canceled', 'failed', 'finished_with_warnings', 

201 # 'finished_with_errors', 'finished' 

202 self.status = 'pending' 

203 self.start_time = datetime.datetime.now() 

204 self.end_time = None 

205 self.computation_time = None 

206 self.hostname = socket.gethostname() 

207 self.version = __version__ 

208 self.versionalias = __versionalias__ 

209 

210 ####################### 

211 # POPULATE PARAMETERS # 

212 ####################### 

213 

214 # args 

215 self.ID = ID 

216 self.json_config = json_config 

217 self.kwargs = user_opts 

218 

219 # database connection 

220 self.db_host = user_opts['db_host'] 

221 self.conn_database = get_conn_database(hostname=self.db_host) 

222 

223 # get validated options dict from JSON-options 

224 self.json_opts_fused_valid = self.get_json_opts(validate=True) 

225 

226 gp = self.get_parameter 

227 

228 ################## 

229 # global options # 

230 ################## 

231 

232 self.inmem_serialization = gp('inmem_serialization') 

233 self.parallelization_level = gp('parallelization_level') 

234 self.spatial_index_server_host = gp('spatial_index_server_host') 

235 self.spatial_index_server_port = gp('spatial_index_server_port') 

236 self.CPUs = gp('CPUs', fallback=multiprocessing.cpu_count()) 

237 self.CPUs_all_jobs = gp('CPUs_all_jobs') 

238 self.max_mem_usage = gp('max_mem_usage') 

239 self.critical_mem_usage = gp('critical_mem_usage') 

240 self.max_parallel_reads_writes = gp('max_parallel_reads_writes') 

241 self.allow_subMultiprocessing = gp('allow_subMultiprocessing') 

242 self.delete_old_output = gp('delete_old_output') 

243 self.disable_exception_handler = gp('disable_exception_handler') 

244 self.disable_IO_locks = gp('disable_IO_locks') 

245 self.disable_CPU_locks = gp('disable_CPU_locks') 

246 self.disable_DB_locks = gp('disable_DB_locks') 

247 self.disable_memory_locks = gp('disable_memory_locks') 

248 self.min_version_mem_usage_stats = gp('min_version_mem_usage_stats') 

249 self.log_level = gp('log_level') 

250 self.tiling_block_size_XY = gp('tiling_block_size_XY') 

251 self.is_test = gp('is_test') 

252 self.profiling = gp('profiling') 

253 self.benchmark_global = gp('benchmark_global') 

254 

255 ######### 

256 # paths # 

257 ######### 

258 

259 # external 

260 self.path_spatIdxSrv = self.DB_config_table['path_spatial_index_mediator_server'] 

261 self.path_tempdir = self.absP(self.DB_config_table['path_tempdir']) 

262 self.path_custom_sicor_options = gp('path_custom_sicor_options') 

263 self.path_dem_proc_srtm_90m = self.absP(self.DB_config_table['path_dem_proc_srtm_90m']) 

264 self.path_spechomo_classif = gp('path_spechomo_classif') 

265 

266 # internal (included in gms_preprocessing repository) 

267 self.path_earthSunDist = self.joinP(path_gmslib, 'database', 'earth_sun_distance', 

268 'Earth_Sun_distances_per_day_edited.csv') 

269 self.path_SNR_models = self.joinP(path_gmslib, 'database', 'snr') 

270 self.path_cloud_classif = self.joinP(path_gmslib, 'database', 'cloud_classifier') 

271 self.path_solar_irr = self.joinP(path_gmslib, 'database', 'solar_irradiance', 

272 'SUNp1fontenla__350-2500nm_@0.1nm_converted.txt') 

273 

274 if not self.is_test: 

275 def get_dbpath(dbkey): 

276 return self.joinP(self.path_fileserver, self.DB_config_table[dbkey]) 

277 

278 # normal mode 

279 self.path_fileserver = self.DB_config_table['path_data_root'] 

280 self.path_archive = gp('path_archive', fallback=get_dbpath('foldername_download')) 

281 self.path_procdata_scenes = gp('path_procdata_scenes', fallback=get_dbpath('foldername_procdata_scenes')) 

282 self.path_procdata_MGRS = gp('path_procdata_MGRS', fallback=get_dbpath('foldername_procdata_MGRS')) 

283 self.path_ECMWF_db = gp('path_ECMWF_db', fallback=self.absP(self.DB_config_table['path_ECMWF_db'])) 

284 self.path_benchmarks = gp('path_benchmarks', fallback=self.absP(self.DB_config_table['path_benchmarks'])) 

285 self.path_job_logs = gp('path_job_logs', fallback=get_dbpath('foldername_job_logs')) 

286 

287 else: 

288 # software test mode, the repository should be self-contained -> use only relative paths 

289 self.path_fileserver = self.joinP(path_gmslib, '..', 'tests', 'data') 

290 self.path_archive = self.joinP(path_gmslib, '..', 'tests', 'data', 'archive_data') 

291 self.path_procdata_scenes = self.joinP(path_gmslib, '..', 'tests', 'data', 'output_scenes') 

292 # self.path_procdata_scenes = '/data1/gms/mount/db/data/processed_scenes/test_loggingerrors/' # FIXME 

293 self.path_procdata_MGRS = self.joinP(path_gmslib, '..', 'tests', 'data', 'output_mgrs_tiles') 

294 self.path_ECMWF_db = self.joinP(path_gmslib, '..', 'tests', 'data', 'processed_ECMWF') 

295 self.path_benchmarks = self.joinP(path_gmslib, '..', 'tests', 'data', 'benchmarks') 

296 self.path_job_logs = self.joinP(path_gmslib, '..', 'tests', 'data', 'job_logs') 

297 # self.path_job_logs = '/data1/gms/mount/db/data/job_logs/' # FIXME 

298 

299 ########################### 

300 # processor configuration # 

301 ########################### 

302 

303 # general_opts 

304 self.skip_thermal = gp('skip_thermal') 

305 self.skip_pan = gp('skip_pan') 

306 self.sort_bands_by_cwl = gp('sort_bands_by_cwl') 

307 self.target_radunit_optical = gp('target_radunit_optical') 

308 self.target_radunit_thermal = gp('target_radunit_thermal') 

309 self.scale_factor_TOARef = gp('scale_factor_TOARef') 

310 self.scale_factor_BOARef = gp('scale_factor_BOARef') 

311 self.mgrs_pixel_buffer = gp('mgrs_pixel_buffer') 

312 self.output_data_compression = gp('output_data_compression') 

313 self.write_ENVIclassif_cloudmask = gp('write_ENVIclassif_cloudmask') 

314 

315 # processor specific opts 

316 

317 # L1A 

318 self.exec_L1AP = gp('exec_L1AP') 

319 self.SZA_SAA_calculation_accurracy = gp('SZA_SAA_calculation_accurracy') 

320 self.export_VZA_SZA_SAA_RAA_stats = gp('export_VZA_SZA_SAA_RAA_stats') 

321 

322 # L1B 

323 self.exec_L1BP = gp('exec_L1BP') 

324 self.skip_coreg = gp('skip_coreg') 

325 self.spatial_ref_min_overlap = gp('spatial_ref_min_overlap') 

326 self.spatial_ref_min_cloudcov = gp('spatial_ref_min_cloudcov') 

327 self.spatial_ref_max_cloudcov = gp('spatial_ref_max_cloudcov') 

328 self.spatial_ref_plusminus_days = gp('spatial_ref_plusminus_days') 

329 self.spatial_ref_plusminus_years = gp('spatial_ref_plusminus_years') 

330 self.coreg_band_wavelength_for_matching = gp('coreg_band_wavelength_for_matching') 

331 self.coreg_max_shift_allowed = gp('coreg_max_shift_allowed') 

332 self.coreg_window_size = gp('coreg_window_size') 

333 

334 # L1C 

335 self.exec_L1CP = gp('exec_L1CP') 

336 self.cloud_masking_algorithm = gp('cloud_masking_algorithm') 

337 self.export_L1C_obj_dumps = gp('export_L1C_obj_dumps') 

338 self.auto_download_ecmwf = gp('auto_download_ecmwf') 

339 self.ac_fillnonclear_areas = gp('ac_fillnonclear_areas') 

340 self.ac_clear_area_labels = gp('ac_clear_area_labels') 

341 self.ac_scale_factor_errors = gp('ac_scale_factor_errors') 

342 self.ac_max_ram_gb = gp('ac_max_ram_gb') 

343 self.ac_estimate_accuracy = gp('ac_estimate_accuracy') 

344 self.ac_bandwise_accuracy = gp('ac_bandwise_accuracy') 

345 

346 # L2A 

347 self.exec_L2AP = gp('exec_L2AP') 

348 self.align_coord_grids = gp('align_coord_grids') 

349 self.match_gsd = gp('match_gsd') 

350 self.spatial_resamp_alg = gp('spatial_resamp_alg') 

351 self.clip_to_extent = gp('clip_to_extent') 

352 self.spathomo_estimate_accuracy = gp('spathomo_estimate_accuracy') 

353 

354 # L2B 

355 self.exec_L2BP = gp('exec_L2BP') 

356 self.spechomo_method = gp('spechomo_method') 

357 self.spechomo_n_clusters = gp('spechomo_n_clusters') 

358 self.spechomo_rfr_n_trees = 50 # this is static confic value, not a user option 

359 self.spechomo_rfr_tree_depth = 10 # this is static confic value, not a user option 

360 self.spechomo_classif_alg = gp('spechomo_classif_alg') 

361 self.spechomo_kNN_n_neighbors = gp('spechomo_kNN_n_neighbors') 

362 self.spechomo_estimate_accuracy = gp('spechomo_estimate_accuracy') 

363 self.spechomo_bandwise_accuracy = gp('spechomo_bandwise_accuracy') 

364 

365 if self.spechomo_method == 'RFR': 

366 raise NotImplementedError("The spectral harmonization method 'RFR' is currently not completely implemented." 

367 "Please us another one.") 

368 # FIXME RFR classifiers are missing (cannot be added to the repository to to file size > 1 GB) 

369 

370 # L2C 

371 self.exec_L2CP = gp('exec_L2CP') 

372 

373 ################################ 

374 # target sensor specifications # 

375 ################################ 

376 

377 self.virtual_sensor_id = gp('virtual_sensor_id', attr_db_job_record='virtualsensorid') 

378 # FIXME Why is datasetid_spatial_ref missing in virtual_sensors table 

379 self.datasetid_spatial_ref = gp('datasetid_spatial_ref', attr_db_job_record='datasetid_spatial_ref') 

380 

381 VSSpecs = self.get_virtual_sensor_specs() 

382 self.virtual_sensor_name = VSSpecs['name'] 

383 

384 # spectral specifications 

385 self.datasetid_spectral_ref = VSSpecs['spectral_characteristics_datasetid'] # None in case of custom sensor 

386 # FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen: 

387 self.target_CWL = VSSpecs['wavelengths_pos'] 

388 # FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen: 

389 self.target_FWHM = VSSpecs['band_width'] 

390 

391 # spatial specifications 

392 target_gsd_tmp = VSSpecs['spatial_resolution'] # table features only 1 value for X/Y-dims FIXME user inputs? 

393 # FIXME target GSD setting is a duplicate to datasetid_spatial_ref! 

394 self.target_gsd = xgsd, ygsd = \ 

395 [target_gsd_tmp]*2 if isinstance(target_gsd_tmp, (int, float)) else target_gsd_tmp 

396 self.target_epsg_code = VSSpecs['projection_epsg'] 

397 # FIXME values in case user defines only Landsat? 

398 self.spatial_ref_gridx = np.arange(xgsd / 2., xgsd / 2. + 2 * xgsd, xgsd).tolist() # e.g. [15, 45] 

399 self.spatial_ref_gridy = np.arange(ygsd / 2., ygsd / 2. + 2 * ygsd, ygsd).tolist() 

400 

401 ############# 

402 # data list # 

403 ############# 

404 

405 self.data_list = self.get_data_list_of_current_jobID() 

406 

407 ############ 

408 # validate # 

409 ############ 

410 self.validate_exec_configs() 

411 

412 GMSValidator(allow_unknown=True, schema=gms_schema_config_output).validate(self.to_dict()) 

413 

414 # check if parallel read/write processes have been limited on a storage mountpoint shared between multiple hosts 

415 if self.max_parallel_reads_writes != 0: 

416 self.check_no_read_write_limit_on_xtfs_mountpoint() 

417 

418 def get_init_argskwargs(self, ignore=("logger",)): 

419 """ 

420 Return a tuple containing dictionary of calling function's. named arguments and a list of 

421 calling function's unnamed positional arguments. 

422 """ 

423 

424 posname, kwname, argskwargs = getargvalues(stack()[1][0])[-3:] 

425 argskwargs.update(argskwargs.pop(kwname, [])) 

426 argskwargs = {k: v for k, v in argskwargs.items() if k not in ignore and k != 'self' and not k.startswith('__')} 

427 sig = signature(self.__init__) 

428 argsnames = [k for k in sig.parameters if sig.parameters[k].default == _empty] 

429 return {'args': {k: v for k, v in argskwargs.items() if k in argsnames}, 

430 'kwargs': {k: v for k, v in argskwargs.items() if k not in argsnames}} 

431 

432 def get_parameter(self, key_user_opts, attr_db_job_record='', fallback=None): 

433 # 1. JobConfig parameters: parameters that are directly passed to JobConfig 

434 if key_user_opts in self.kwargs: 

435 return self.kwargs[key_user_opts] 

436 

437 # 2. WebUI parameters: parameters that have been defined via WebUI 

438 if attr_db_job_record: 

439 return getattr(self.DB_job_record, attr_db_job_record) 

440 

441 # 3. JSON parameters: parameters that have been defined via JSON Input (command line or advanced UI params) 

442 val_json = get_param_from_json_config(key_user_opts, self.json_opts_fused_valid) 

443 if val_json or val_json is False: 

444 return val_json 

445 

446 # fallback: if nothing has been returned until here 

447 return fallback 

448 

449 @property 

450 def DB_job_record(self): 

451 # type: () -> GMS_JOB 

452 if not self._DB_job_record: 

453 # check if job ID exists in database 

454 from ..misc.database_tools import GMS_JOB # noqa F811 # redefinition of unused 'GMS_JOB' from line 22 

455 try: 

456 self._DB_job_record = GMS_JOB(self.conn_database).from_job_ID(self.ID) 

457 except ValueError: 

458 raise 

459 

460 return self._DB_job_record 

461 

462 @property 

463 def DB_config_table(self): 

464 # type: () -> dict 

465 """Returns the content of the config table of the postgreSQL database as dictionary.""" 

466 if not self._DB_config_table: 

467 from ..misc.database_tools import get_info_from_postgreSQLdb 

468 db_cfg = dict(get_info_from_postgreSQLdb(self.conn_database, 'config', ['key', 'value'])) 

469 

470 # convert relative to absolute paths 

471 self._DB_config_table = {k: self.absP(v) if k.startswith('path_') and v.startswith('./') else v 

472 for k, v in db_cfg.items()} 

473 

474 return self._DB_config_table 

475 

476 def get_virtual_sensor_specs(self): 

477 # type: () -> dict 

478 """Returns the content of the virtual_sensors table of the postgreSQL database as dictionary.""" 

479 from ..misc.database_tools import get_info_from_postgreSQLdb 

480 

481 # column spectral_characteristics_datasetid is not used later because its given by jobs.datasetid_spatial_ref 

482 cols2read = ['name', 'projection_epsg', 'spatial_resolution', 'spectral_characteristics_datasetid', 

483 'wavelengths_pos', 'band_width'] 

484 

485 res = get_info_from_postgreSQLdb(self.conn_database, 'virtual_sensors', 

486 cols2read, {'id': self.virtual_sensor_id})[0] 

487 

488 VSSpecs = dict() 

489 for i, col in enumerate(cols2read): 

490 val = res[i] 

491 if col == 'spectral_characteristics_datasetid' and val == -1: # nodata value 

492 val = None 

493 

494 VSSpecs[col] = val 

495 

496 return VSSpecs 

497 

498 def get_json_opts(self, validate=True): 

499 """Get a dictionary of GMS config parameters according to the jobs table of the database. 

500 

501 NOTE: Reads the default options from options_default.json and updates the values with those from database. 

502 """ 

503 def update_dict(d, u): 

504 for k, v in u.items(): 

505 if isinstance(v, Mapping): 

506 d[k] = update_dict(d.get(k, {}), v) 

507 else: 

508 d[k] = v 

509 return d 

510 

511 # read options_default.json 

512 default_options = get_options(path_options_default, validation=validate) 

513 

514 ############################################# 

515 # update default options with those from DB # 

516 ############################################# 

517 

518 if self.DB_job_record.analysis_parameter: 

519 try: 

520 params_dict = json.loads(jsmin(self.DB_job_record.analysis_parameter)) 

521 except JSONDecodeError: 

522 warnings.warn('The advanced options given in the WebUI could not be decoded. ' 

523 'JSON decoder failed with the following error:') 

524 raise 

525 

526 # convert values to useful data types and update the default values 

527 params_dict = json_to_python(params_dict) 

528 update_dict(default_options, params_dict) 

529 

530 ############################################################################################################### 

531 # if json config is provided (via python bindings or CLI parser -> override all options with that json config # 

532 ############################################################################################################### 

533 

534 if self.json_config: 

535 if self.json_config.startswith("{"): 

536 try: 

537 params_dict = json.loads(jsmin(self.json_config)) 

538 except JSONDecodeError: 

539 warnings.warn('The given JSON options string could not be decoded. ' 

540 'JSON decoder failed with the following error:') 

541 raise 

542 elif os.path.isfile(self.json_config): 

543 try: 

544 with open(self.json_config, 'r') as inF: 

545 params_dict = json.loads(jsmin(inF.read())) 

546 except JSONDecodeError: 

547 warnings.warn('The given JSON options file %s could not be decoded. ' 

548 'JSON decoder failed with the following error:' % self.json_config) 

549 raise 

550 

551 else: 

552 raise ValueError("The parameter 'json_config' must be a JSON formatted string or a JSON file on disk.") 

553 

554 # convert values to useful data types and update the default values 

555 params_dict = json_to_python(params_dict) 

556 update_dict(default_options, params_dict) 

557 

558 if validate: 

559 GMSValidator(allow_unknown=True, schema=gms_schema_input).validate(default_options) 

560 

561 json_options = default_options 

562 return json_options 

563 

564 @staticmethod 

565 def absP(relP): 

566 return os.path.abspath(os.path.join(os.path.dirname(__file__), relP)) 

567 

568 @staticmethod 

569 def joinP(*items): 

570 return os.path.join(*items) 

571 

572 def get_data_list_of_current_jobID(self): 

573 """ 

574 Get a list of datasets to be processed from database and return it together with some metadata. 

575 

576 :return: <list> of OrderedDicts, e.g. [OrderedDict([('proc_level', None), ('scene_ID', 5895940), 

577 ('datasetid', 104), ('image_type', 'RSD'), ('satellite', 'Landsat-8'), ('sensor', 'OLI_TIRS'), 

578 ('subsystem', ''), ('acquisition_date', datetime.datetime(2015, 2, 5, 10, 2, 52)), 

579 ('entity_ID', 'LC81930242015036LGN00'), ('filename', 'LC81930242015036LGN00.tar.gz'), 

580 ('sensormode', 'M'), ('logger', None)]), ...] 

581 """ 

582 from ..model.metadata import get_sensormode 

583 data_list = [] 

584 with psycopg2.connect(self.conn_database) as conn: 

585 with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: 

586 cur.execute(r""" 

587 WITH jobs_unnested AS ( 

588 SELECT id, unnest(sceneids) AS sceneid FROM jobs 

589 ) 

590 SELECT jobs_unnested.sceneid, 

591 scenes.datasetid, 

592 scenes.acquisitiondate, 

593 scenes.entityid, 

594 scenes.filename, 

595 COALESCE(scenes_proc.proc_level::text, 'L1A') AS proc_level, 

596 datasets.image_type, 

597 satellites.name AS satellite, 

598 sensors.name AS sensor, 

599 subsystems.name AS subsystem 

600 FROM jobs_unnested 

601 LEFT OUTER JOIN scenes ON scenes.id = jobs_unnested.sceneid 

602 LEFT OUTER JOIN scenes_proc ON scenes_proc.sceneid = jobs_unnested.sceneid 

603 LEFT OUTER JOIN datasets ON datasets.id = datasetid 

604 LEFT OUTER JOIN satellites ON satellites.id = satelliteid 

605 LEFT OUTER JOIN sensors ON sensors.id = sensorid 

606 LEFT OUTER JOIN subsystems ON subsystems.id = subsystemid 

607 WHERE jobs_unnested.id = %s 

608 """, 

609 (self.ID,)) 

610 

611 for row in cur.fetchall(): 

612 ds = OrderedDict() 

613 ds["proc_level"] = row["proc_level"] if not self.is_test else None 

614 ds["scene_ID"] = row["sceneid"] 

615 ds["dataset_ID"] = row["datasetid"] 

616 ds["image_type"] = row["image_type"] 

617 ds["satellite"] = row["satellite"] 

618 ds["sensor"] = row["sensor"] 

619 ds["subsystem"] = row["subsystem"] 

620 ds["acq_datetime"] = row["acquisitiondate"] 

621 ds["entity_ID"] = row["entityid"] 

622 ds["filename"] = row["filename"] 

623 

624 ds['sensor'] = 'ETM+' if re.search(r'ETM+', ds['sensor']) else ds['sensor'] 

625 if self.skip_thermal and ds['subsystem'] == 'TIR': 

626 continue # removes ASTER TIR in case of skip_thermal 

627 ds['subsystem'] = '' if ds['subsystem'] is None else ds['subsystem'] 

628 ds['sensormode'] = get_sensormode(ds) 

629 if self.skip_pan and ds['sensormode'] == 'P': 

630 continue # removes e.g. SPOT PAN in case of skip_pan 

631 

632 if re.search(r"Sentinel-2A", ds['satellite'], re.I): 

633 for subsystem in ['S2A10', 'S2A20', 'S2A60']: 

634 sub_ds = ds.copy() 

635 sub_ds['subsystem'] = subsystem 

636 data_list.append(sub_ds) 

637 elif re.search(r"Sentinel-2B", ds['satellite'], re.I): 

638 for subsystem in ['S2B10', 'S2B20', 'S2B60']: 

639 sub_ds = ds.copy() 

640 sub_ds['subsystem'] = subsystem 

641 data_list.append(sub_ds) 

642 elif re.search(r"Terra", ds['satellite'], re.I): 

643 for subsystem in ['VNIR1', 'VNIR2', 'SWIR', 'TIR']: 

644 sub_ds = ds.copy() 

645 sub_ds['subsystem'] = subsystem 

646 data_list.append(sub_ds) 

647 else: 

648 data_list.append(ds) 

649 

650 self.data_list = data_list 

651 return self.data_list 

652 

653 def validate_exec_configs(self): 

654 for i in ['L1AP', 'L1BP', 'L1CP', 'L2AP', 'L2BP', 'L2CP']: 

655 exec_lvl = getattr(self, 'exec_%s' % i) 

656 

657 if exec_lvl is None: 

658 continue 

659 

660 # check input format 

661 if all([len(exec_lvl) == 3, (np.array(exec_lvl) == np.array(np.array(exec_lvl, np.bool), np.int)).all()]): 

662 execute, write, delete = exec_lvl 

663 

664 # written output cannot be turned off in execution mode 'Python' 

665 if not self.inmem_serialization and execute and not write: 

666 warnings.warn("If CFG.inmem_serialization is False the output writer for %s has to be enabled " 

667 "because any operations on GMS_obj.arr read the intermediate results from disk. " 

668 "Turning it on.." % i) 

669 write = True 

670 setattr(self, 'exec_%s' % i, [execute, write, delete]) 

671 

672 else: 

673 raise ValueError('Execution mode must be provided as list of 3 elements containing only boolean ' 

674 'values. Got %s for %s.' % (exec_lvl, i)) 

675 

676 def check_no_read_write_limit_on_xtfs_mountpoint(self): 

677 intensive_IO_paths = [self.path_fileserver, self.path_archive, self.path_benchmarks, 

678 self.path_dem_proc_srtm_90m, self.path_ECMWF_db, self.path_procdata_MGRS, 

679 self.path_procdata_scenes] 

680 

681 mount_points = {el.mountpoint: el for el in psutil.disk_partitions(all=True)} 

682 

683 for path in intensive_IO_paths: 

684 for mp, mp_object in mount_points.items(): 

685 if path.startswith(mp) and mp_object.device.startswith('xtreemfs'): 

686 warnings.warn("Path %s appears to be on an XtreemFS mountpoint. It is highly recommended to set " 

687 "the configuration parameter 'max_parallel_reads_writes' to 0 in that case! " 

688 "Otherwise read/write processes might be slowed down! Continuing in 20 seconds.." 

689 % path) 

690 time.sleep(20) 

691 break 

692 

693 def to_dict(self): 

694 """Generate a dictionary in the same structure like the one in options_default.json from the current config.""" 

695 opts_default = get_options(path_options_default) 

696 

697 # add all keys included in options_default.json 

698 outdict = dict() 

699 for key in opts_default.keys(): 

700 if not isinstance(opts_default[key], (dict, OrderedDict)): 

701 outdict[key] = getattr(self, key) 

702 else: 

703 group = key 

704 if group not in outdict: 

705 outdict[group] = dict() 

706 

707 for group_key in opts_default[group]: 

708 if not isinstance(opts_default[group][group_key], (dict, OrderedDict)): 

709 outdict[group][group_key] = getattr(self, group_key) 

710 else: 

711 subgroup = group_key 

712 if subgroup not in outdict[group]: 

713 outdict[group][subgroup] = dict() 

714 

715 for subgroup_key in opts_default[group][subgroup]: 

716 try: 

717 outdict[group][subgroup][subgroup_key] = getattr(self, subgroup_key) 

718 except AttributeError: 

719 procexec_keys = ['run_processor', 'write_output', 'delete_output'] 

720 if subgroup_key in procexec_keys: 

721 proc_code = subgroup 

722 outdict[group][subgroup][subgroup_key] = \ 

723 getattr(self, 'exec_%sP' % proc_code)[procexec_keys.index(subgroup_key)] 

724 else: 

725 raise 

726 

727 # add job metadata 

728 outdict.update(dict( 

729 job_meta={k: getattr(self, k) for k in ['ID', 'start_time', 'end_time', 'computation_time', 'hostname']}, 

730 data_list={'dataset_%s' % i: ds for i, ds in enumerate(self.data_list)})) 

731 

732 # add data_list 

733 

734 return outdict 

735 

736 def to_jsonable_dict(self): 

737 # type: (JobConfig) -> dict 

738 return python_to_json(self.to_dict()) 

739 

740 def save(self, path_outfile): 

741 """Save the JobConfig instance to a JSON file in the same structure like the one in options_default.json. 

742 

743 :param path_outfile: path of the output JSON file 

744 """ 

745 with open(path_outfile, 'w') as outF: 

746 json.dump(self.to_jsonable_dict(), outF, skipkeys=False, indent=4) 

747 

748 def __repr__(self): 

749 return pformat(self.to_dict()) 

750 

751 

752def is_GMSConfig_available(): 

753 try: 

754 if GMS_config is not None: 

755 return True 

756 except (EnvironmentError, OSError): 

757 return False 

758 

759 

760def json_to_python(value): 

761 def is_number(s): 

762 try: 

763 float(s) 

764 return True 

765 except ValueError: 

766 return False 

767 

768 if type(value) is dict: 

769 return {json_to_python(k): json_to_python(v) for k, v in value.items()} 

770 elif type(value) is list: 

771 return [json_to_python(v) for v in value] 

772 else: 

773 if value == "None": 

774 return None 

775 if value == "slice(None, None, None)": 

776 return slice(None) 

777 if value in [True, "true"]: 

778 return True 

779 if value in [False, "false"]: 

780 return False 

781 if is_number(value): 

782 try: 

783 if str(int(value)) != str(float(value)): 

784 return int(value) 

785 else: 

786 return float(value) 

787 except ValueError: 

788 return float(value) 

789 else: 

790 return value 

791 

792 

793def python_to_json(value): 

794 if type(value) in [dict, OrderedDict]: 

795 return {python_to_json(k): python_to_json(v) for k, v in value.items()} 

796 elif type(value) is list: 

797 return [python_to_json(v) for v in value] 

798 elif type(value) is np.ndarray: 

799 return [python_to_json(v) for v in value.tolist()] 

800 else: 

801 if value is None: 

802 return "None" 

803 if value is slice(None): 

804 return "slice(None, None, None)" 

805 if value is True: 

806 return "true" 

807 if value is False: 

808 return "false" 

809 if type(value) is datetime.datetime: 

810 return datetime.datetime.strftime(value, '%Y-%m-%d %H:%M:%S.%f%z') 

811 else: 

812 return value 

813 

814 

815class GMSValidator(Validator): 

816 def __init__(self, *args, **kwargs): 

817 """ 

818 

819 :param args: Arguments to be passed to cerberus.Validator 

820 :param kwargs: Keyword arguments to be passed to cerberus.Validator 

821 """ 

822 super(GMSValidator, self).__init__(*args, **kwargs) 

823 

824 def validate(self, document2validate, **kwargs): 

825 if super(GMSValidator, self).validate(document=document2validate, **kwargs) is False: 

826 raise ValueError("Options is malformed: %s" % str(self.errors)) 

827 

828 

829def get_options(target, validation=True): 

830 """Return dictionary with all options. 

831 

832 :param validation: True / False, whether to validate options read from files or not 

833 :param target: if path to file, then json is used to load, otherwise the default template is used 

834 :return: dictionary with options 

835 """ 

836 

837 if os.path.isfile(target): 

838 with open(target, "r") as fl: 

839 options = json_to_python(json.loads(jsmin(fl.read()))) 

840 

841 if validation is True: 

842 GMSValidator(allow_unknown=True, schema=gms_schema_input).validate(options) 

843 

844 return options 

845 else: 

846 raise FileNotFoundError("Options file not found at file path %s." % target)