Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2 

3# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data 

4# 

5# Copyright (C) 2020 Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de) 

6# 

7# This software was developed within the context of the GeoMultiSens project funded 

8# by the German Federal Ministry of Education and Research 

9# (project grant code: 01 IS 14 010 A-C). 

10# 

11# This program is free software: you can redistribute it and/or modify it under 

12# the terms of the GNU General Public License as published by the Free Software 

13# Foundation, either version 3 of the License, or (at your option) any later version. 

14# Please note the following exception: `gms_preprocessing` depends on tqdm, which 

15# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files 

16# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore". 

17# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE. 

18# 

19# This program is distributed in the hope that it will be useful, but WITHOUT 

20# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

21# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

22# details. 

23# 

24# You should have received a copy of the GNU Lesser General Public License along 

25# with this program. If not, see <http://www.gnu.org/licenses/>. 

26 

27import copy 

28import datetime 

29import functools 

30import glob 

31import json 

32import os 

33import re 

34import shutil 

35import sys 

36import warnings 

37import logging 

38from collections import OrderedDict 

39from itertools import chain 

40from typing import Iterable, List, Union, Generator, TYPE_CHECKING # noqa F401 # flake8 issue 

41import psutil 

42import time 

43from pkg_resources import parse_version 

44 

45import numpy as np 

46import spectral 

47from spectral.io import envi 

48from pandas import DataFrame, read_csv 

49from nested_dict import nested_dict 

50 

51try: 

52 from osgeo import gdalnumeric 

53except ImportError: 

54 import gdalnumeric 

55 

56from geoarray import GeoArray, NoDataMask, CloudMask 

57from py_tools_ds.geo.coord_grid import is_coord_grid_equal 

58from py_tools_ds.geo.projection import EPSG2WKT, WKT2EPSG, get_UTMzone, isProjectedOrGeographic 

59from py_tools_ds.geo.map_info import geotransform2mapinfo, mapinfo2geotransform 

60from py_tools_ds.geo.coord_calc import calc_FullDataset_corner_positions 

61from py_tools_ds.geo.coord_trafo import pixelToLatLon, pixelToMapYX, imXY2mapXY, imYX2mapYX 

62from py_tools_ds.numeric.array import get_array_tilebounds 

63from sicor.options import get_options as get_ac_options 

64 

65from ..misc.logging import GMS_logger as DatasetLogger 

66from ..misc.logging import close_logger 

67from ..misc import database_tools as DB_T 

68from ..misc import path_generator as PG 

69from ..model.mgrs_tile import MGRS_tile 

70from ..model.metadata import \ 

71 METADATA, get_dict_LayerOptTherm, metaDict_to_metaODict, get_LayerBandsAssignment, layerdependent_metadata 

72from ..options.config import GMS_config as CFG 

73from ..algorithms import geoprocessing as GEOP 

74from ..io import input_reader as INP_R 

75from ..io import output_writer as OUT_W 

76from ..misc import helper_functions as HLP_F 

77from ..misc import definition_dicts as DEF_D 

78from ..misc.locks import IOLock 

79 

80if TYPE_CHECKING: 

81 from ..algorithms.L1C_P import L1C_object # noqa F401 # flake8 issue 

82 

83__author__ = 'Daniel Scheffler' 

84 

85 

86class GMS_object(object): 

87 # class attributes 

88 # NOTE: these attributes can be modified and seen by ALL GMS_object instances 

89 proc_status_all_GMSobjs = nested_dict() 

90 

91 def __init__(self, pathImage=''): 

92 # add private attributes 

93 self._logger = None 

94 self._loggers_disabled = False 

95 self._log = '' 

96 self._MetaObj = None 

97 self._LayerBandsAssignment = None 

98 self._coreg_needed = None 

99 self._resamp_needed = None 

100 self._arr = None 

101 self._mask_nodata = None 

102 self._mask_clouds = None 

103 self._mask_clouds_confidence = None 

104 self._masks = None 

105 self._dem = None 

106 self._pathGen = None 

107 self._ac_options = {} 

108 self._ac_errors = None 

109 self._spat_homo_errors = None 

110 self._spec_homo_errors = None 

111 self._accuracy_layers = None 

112 self._dict_LayerOptTherm = None 

113 self._cloud_masking_algorithm = None 

114 self._coreg_info = None 

115 

116 # defaults 

117 self.proc_level = 'init' 

118 self.image_type = '' 

119 self.satellite = '' 

120 self.sensor = '' 

121 self.subsystem = '' 

122 self.sensormode = '' 

123 self.acq_datetime = None # also includes time, set by from_disk() and within L1A_P 

124 self.entity_ID = '' 

125 self.scene_ID = -9999 

126 self.filename = '' 

127 self.dataset_ID = -9999 

128 self.outInterleave = 'bsq' 

129 self.VAA_mean = None # set by self.calc_mean_VAA() 

130 self.corner_lonlat = None 

131 self.trueDataCornerPos = [] # set by self.calc_corner_positions() 

132 self.trueDataCornerLonLat = [] # set by self.calc_corner_positions() 

133 self.fullSceneCornerPos = [] # set by self.calc_corner_positions() 

134 self.fullSceneCornerLonLat = [] # set by self.calc_corner_positions() 

135 # rows,cols,bands of the full scene (not of the subset as possibly represented by self.arr.shape) 

136 self.shape_fullArr = [None, None, None] 

137 self.arr_shape = 'cube' 

138 self.arr_desc = '' # description of data units for self.arr 

139 self.arr_pos = None # <tuple> in the form ((row_start,row_end),(col_start,col_end)) 

140 self.tile_pos = None # <list>, filled by self.get_tilepos() 

141 self.GeoTransProj_ok = True # set by self.validate_GeoTransProj_GeoAlign() 

142 self.GeoAlign_ok = True # set by self.validate_GeoTransProj_GeoAlign() 

143 self.masks_meta = {} # set by self.build_L1A_masks() 

144 # self.CLD_obj = CLD_P.GmsCloudClassifier(classifier=self.path_cloud_class_obj) 

145 

146 # set pathes 

147 self.path_archive = '' 

148 self.path_procdata = '' 

149 self.ExtractedFolder = '' 

150 self.baseN = '' 

151 self.path_logfile = '' 

152 self.path_archive_valid = False 

153 self.path_InFilePreprocessor = None 

154 self.path_MetaPreprocessor = None 

155 

156 self.job_ID = CFG.ID 

157 self.dataset_ID = -9999 if self.scene_ID == -9999 else \ 

158 int(DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'scenes', ['datasetid'], 

159 {'id': self.scene_ID})[0][0]) 

160 self.scenes_proc_ID = None # set by Output writer after creation/update of db record in table scenes_proc 

161 self.mgrs_tiles_proc_ID = None # set by Output writer after creation/update of db rec in table mgrs_tiles_proc 

162 self.MGRS_info = None 

163 self.lonlat_arr = None # set by self.write_tiles_to_ENVIfile 

164 self.trueDataCornerUTM = None # set by self.from_tiles 

165 

166 self.mem_usage = {} 

167 

168 # set pathes 

169 self.path_cloud_class_obj = '' 

170 

171 # handle initialization arguments 

172 if pathImage: 

173 self.arr = pathImage # run the setter for 'arr' which creates an Instance of GeoArray 

174 

175 def __getstate__(self): 

176 """Defines how the attributes of GMS object are pickled.""" 

177 

178 self.close_loggers() 

179 del self.pathGen # path generator can only be used for the current processing level 

180 

181 # delete arrays if their in-mem size is to big to be pickled 

182 # => (avoids MaybeEncodingError: Error sending result: '[<gms_preprocessing.algorithms.L2C_P.L2C_object 

183 # object at 0x7fc44f6399e8>]'. Reason: 'error("'i' format requires -2147483648 <= number <= 2147483647",)') 

184 if self.proc_level == 'L2C' and CFG.inmem_serialization: 

185 self.flush_array_data() 

186 

187 return self.__dict__ 

188 

189 def __setstate__(self, ObjDict): 

190 """Defines how the attributes of GMS object are unpickled.""" 

191 

192 self.__dict__ = ObjDict 

193 # TODO unpickle meta to MetaObj 

194 

195 def __deepcopy__(self, memodict={}): 

196 """Returns a deepcopy of the object excluding loggers because loggers are not serializable.""" 

197 

198 cls = self.__class__ 

199 result = cls.__new__(cls) 

200 self.close_loggers() 

201 del self.pathGen # has a logger 

202 memodict[id(self)] = result 

203 

204 for k, v in self.__dict__.items(): 

205 setattr(result, k, copy.deepcopy(v, memodict)) 

206 return result 

207 

208 def set_pathes(self): 

209 self.baseN = self.pathGen.get_baseN() 

210 self.path_procdata = self.pathGen.get_path_procdata() 

211 self.ExtractedFolder = self.pathGen.get_path_tempdir() 

212 self.path_logfile = self.pathGen.get_path_logfile() 

213 self.pathGen = PG.path_generator(self.__dict__) # passes a logger in addition to previous attributes 

214 self.path_archive = self.pathGen.get_local_archive_path_baseN() 

215 

216 if not CFG.inmem_serialization: 

217 self.path_InFilePreprocessor = os.path.join(self.ExtractedFolder, '%s%s_DN.bsq' 

218 % (self.entity_ID, 

219 ('_%s' % self.subsystem if self.subsystem else ''))) 

220 else: # keep data in memory 

221 self.path_InFilePreprocessor = None # None: keeps all produced data in memory (numpy array attributes) 

222 

223 self.path_MetaPreprocessor = self.path_archive 

224 

225 def validate_pathes(self): 

226 if not os.path.isfile(self.path_archive) and not os.path.isdir(self.path_archive): 

227 self.logger.info("The %s dataset '%s' has not been processed earlier and no corresponding raw data archive" 

228 "has been found at %s." % (self.sensor, self.entity_ID, self.path_archive)) 

229 self.logger.info('Trying to download the dataset...') 

230 self.path_archive_valid = self._data_downloader(self.sensor, self.entity_ID) 

231 else: 

232 self.path_archive_valid = True 

233 

234 if not CFG.inmem_serialization and self.ExtractedFolder and not os.path.isdir(self.ExtractedFolder): 

235 os.makedirs(self.ExtractedFolder) 

236 

237 assert os.path.exists(self.path_archive), 'Invalid path to RAW data. File %s does not exist at %s.' \ 

238 % (os.path.basename(self.path_archive), 

239 os.path.dirname(self.path_archive)) 

240 assert isinstance(self.path_archive, str), 'Invalid path to RAW data. Got %s instead of string or unicode.' \ 

241 % type(self.path_archive) 

242 if not CFG.inmem_serialization and self.ExtractedFolder: 

243 assert os.path.exists(self.path_archive), \ 

244 'Invalid path for temporary files. Directory %s does not exist.' % self.ExtractedFolder 

245 

246 @property 

247 def logger(self): 

248 if self._loggers_disabled: 

249 return None 

250 if self._logger and self._logger.handlers[:]: 

251 return self._logger 

252 else: 

253 self._logger = DatasetLogger('log__' + self.baseN, fmt_suffix=self.scene_ID, path_logfile=self.path_logfile, 

254 log_level=CFG.log_level, append=True) 

255 return self._logger 

256 

257 @logger.setter 

258 def logger(self, logger): 

259 assert isinstance(logger, logging.Logger) or logger in ['not set', None], \ 

260 "GMS_obj.logger can not be set to %s." % logger 

261 

262 # save prior logs 

263 # if logger is None and self._logger is not None: 

264 # self.log += self.logger.captured_stream 

265 self._logger = logger 

266 

267 @property # FIXME does not work yet 

268 def log(self): 

269 """Returns a string of all logged messages until now.""" 

270 

271 return self._log 

272 

273 @log.setter 

274 def log(self, string): 

275 assert isinstance(string, str), "'log' can only be set to a string. Got %s." % type(string) 

276 self._log = string 

277 

278 @property 

279 def proc_status(self): 

280 # type: () -> str 

281 """ 

282 Get the processing status of the current GMS_object (subclass) instance for the current processing level. 

283 

284 Possible values: 'initialized', 'running', 'finished', 'failed' 

285 """ 

286 # NOTE: self.proc_status_all_GMSobjs is a class attribute (visible and modifyable from any other subsystem) 

287 return self.proc_status_all_GMSobjs[self.scene_ID][self.subsystem][self.proc_level] 

288 

289 @proc_status.setter 

290 def proc_status(self, new_status): 

291 # type: (str) -> None 

292 self.proc_status_all_GMSobjs[self.scene_ID][self.subsystem][self.proc_level] = new_status 

293 

294 @property 

295 def GMS_identifier(self): 

296 return GMS_identifier(self.image_type, self.satellite, self.sensor, self.subsystem, self.proc_level, 

297 self.dataset_ID, self.logger) 

298 

299 @property 

300 def MetaObj(self): 

301 # TODO if there is no MetaObj -> create MetaObj by reading metadata from disk 

302 # reading from disk should use L1A_P.L1A_object.import_metadata -> so just return None 

303 

304 return self._MetaObj 

305 

306 @MetaObj.setter 

307 def MetaObj(self, MetaObj): 

308 assert isinstance(MetaObj, METADATA), "'MetaObj' can only be set to an instance of METADATA class. " \ 

309 "Got %s." % type(MetaObj) 

310 self._MetaObj = MetaObj 

311 

312 @MetaObj.deleter 

313 def MetaObj(self): 

314 if self._MetaObj and self._MetaObj.logger not in [None, 'not set']: 

315 self._MetaObj.logger.close() 

316 self._MetaObj.logger = None 

317 

318 self._MetaObj = None 

319 

320 @property 

321 def pathGen(self): 

322 # type: () -> PG.path_generator 

323 """ 

324 Returns the path generator object for generating file pathes belonging to the GMS object. 

325 """ 

326 

327 if self._pathGen and self._pathGen.proc_level == self.proc_level: 

328 return self._pathGen 

329 else: 

330 self._pathGen = PG.path_generator(self.__dict__.copy(), MGRS_info=self.MGRS_info) 

331 

332 return self._pathGen 

333 

334 @pathGen.setter 

335 def pathGen(self, pathGen): 

336 assert isinstance(pathGen, PG.path_generator), 'GMS_object.pathGen can only be set to an instance of ' \ 

337 'path_generator. Got %s.' % type(pathGen) 

338 self._pathGen = pathGen 

339 

340 @pathGen.deleter 

341 def pathGen(self): 

342 self._pathGen = None 

343 

344 @property 

345 def subset(self): 

346 return [self.arr_shape, self.arr_pos] 

347 

348 @property 

349 def LayerBandsAssignment(self): 

350 # FIXME merge that with self.MetaObj.LayerBandsAssignment 

351 # FIXME -> otherwise a change of LBA in MetaObj is not recognized here 

352 if self._LayerBandsAssignment: 

353 return self._LayerBandsAssignment 

354 elif self.image_type == 'RSD': 

355 self._LayerBandsAssignment = get_LayerBandsAssignment(self.GMS_identifier) \ 

356 if self.sensormode != 'P' else get_LayerBandsAssignment(self.GMS_identifier, nBands=1) 

357 return self._LayerBandsAssignment 

358 else: 

359 return '' 

360 

361 @LayerBandsAssignment.setter 

362 def LayerBandsAssignment(self, LBA_list): 

363 self._LayerBandsAssignment = LBA_list 

364 self.MetaObj.LayerBandsAssignment = LBA_list 

365 

366 @property 

367 def dict_LayerOptTherm(self): 

368 if self._dict_LayerOptTherm: 

369 return self._dict_LayerOptTherm 

370 elif self.LayerBandsAssignment: 

371 self._dict_LayerOptTherm = get_dict_LayerOptTherm(self.GMS_identifier, self.LayerBandsAssignment) 

372 return self._dict_LayerOptTherm 

373 else: 

374 return None 

375 

376 @property 

377 def georef(self): 

378 """Returns True if the current dataset can serve as spatial reference.""" 

379 

380 return True if self.image_type == 'RSD' and re.search(r'OLI', self.sensor, re.I) else False 

381 

382 @property 

383 def coreg_needed(self): 

384 if self._coreg_needed is None: 

385 self._coreg_needed = not (self.dataset_ID == CFG.datasetid_spatial_ref) 

386 return self._coreg_needed 

387 

388 @coreg_needed.setter 

389 def coreg_needed(self, value): 

390 self._coreg_needed = value 

391 

392 @property 

393 def coreg_info(self): 

394 if not self._coreg_info: 

395 self._coreg_info = { 

396 'corrected_shifts_px': {'x': 0, 'y': 0}, 

397 'corrected_shifts_map': {'x': 0, 'y': 0}, 

398 'original map info': self.MetaObj.map_info, 

399 'updated map info': None, 

400 'shift_reliability': None, 

401 'reference scene ID': None, 

402 'reference entity ID': None, 

403 'reference geotransform': None, 

404 # reference projection must be the own projection in order to avoid overwriting with a wrong EPSG 

405 'reference projection': self.MetaObj.projection, 

406 'reference extent': {'rows': None, 'cols': None}, 

407 'reference grid': [list(CFG.spatial_ref_gridx), 

408 list(CFG.spatial_ref_gridy)], 

409 'success': False 

410 } 

411 

412 return self._coreg_info 

413 

414 @coreg_info.setter 

415 def coreg_info(self, val): 

416 self._coreg_info = val 

417 

418 @property 

419 def resamp_needed(self): 

420 if self._resamp_needed is None: 

421 gt = mapinfo2geotransform(self.MetaObj.map_info) 

422 self._resamp_needed = not is_coord_grid_equal(gt, CFG.spatial_ref_gridx, 

423 CFG.spatial_ref_gridy) 

424 return self._resamp_needed 

425 

426 @resamp_needed.setter 

427 def resamp_needed(self, value): 

428 self._resamp_needed = value 

429 

430 @property 

431 def arr(self): 

432 # type: () -> GeoArray 

433 # TODO this must return a subset if self.subset is not None 

434 return self._arr 

435 

436 @arr.setter 

437 def arr(self, *geoArr_initArgs): 

438 # TODO this must be able to handle subset inputs in tiled processing 

439 self._arr = GeoArray(*geoArr_initArgs) 

440 

441 # set nodata value and geoinfos 

442 # NOTE: MetaObj is NOT gettable before import_metadata has been executed! 

443 if hasattr(self, 'MetaObj') and self.MetaObj: 

444 self._arr.nodata = self.MetaObj.spec_vals['fill'] 

445 self._arr.gt = mapinfo2geotransform(self.MetaObj.map_info) if self.MetaObj.map_info else [0, 1, 0, 0, 0, -1] 

446 self._arr.prj = self.MetaObj.projection 

447 else: 

448 self._arr.nodata = DEF_D.get_outFillZeroSaturated(self._arr.dtype)[0] 

449 

450 # set bandnames like this: [B01, .., B8A,] 

451 if self.LayerBandsAssignment: 

452 if len(self.LayerBandsAssignment) == self._arr.bands: 

453 self._arr.bandnames = self.LBA2bandnames(self.LayerBandsAssignment) 

454 else: 

455 warnings.warn("Cannot set 'bandnames' attribute of GMS_object.arr because LayerBandsAssignment has %s " 

456 "bands and GMS_object.arr has %s bands." 

457 % (len(self.LayerBandsAssignment), self._arr.bands)) 

458 

459 @arr.deleter 

460 def arr(self): 

461 self._arr = None 

462 

463 @property 

464 def arr_meta(self): 

465 return self.MetaObj.to_odict() 

466 

467 @property 

468 def mask_nodata(self): 

469 if self._mask_nodata is not None: 

470 if not self._mask_nodata.is_inmem and self._mask_nodata.bands > 1: 

471 # NoDataMask object self._mask_nodata points to multi-band image file (bands mask_nodata/mask_clouds) 

472 # -> read processes of not needed bands need to be avoided 

473 self._mask_nodata = NoDataMask(self._mask_nodata[:, :, 0], 

474 geotransform=self._mask_nodata.gt, 

475 projection=self._mask_nodata.prj) 

476 return self._mask_nodata 

477 

478 elif self._masks: 

479 # return nodata mask from self.masks 

480 self._mask_nodata = NoDataMask(self._masks[:, :, 0], # TODO use band names 

481 geotransform=self._masks.gt, 

482 projection=self._masks.prj) 

483 return self._mask_nodata 

484 

485 elif isinstance(self.arr, GeoArray): 

486 self.logger.info('Calculating nodata mask...') 

487 self._mask_nodata = self.arr.mask_nodata # calculates mask nodata if not already present 

488 return self._mask_nodata 

489 else: 

490 return None 

491 

492 @mask_nodata.setter 

493 def mask_nodata(self, *geoArr_initArgs): 

494 if geoArr_initArgs[0] is not None: 

495 self._mask_nodata = NoDataMask(*geoArr_initArgs) 

496 self._mask_nodata.nodata = False 

497 self._mask_nodata.gt = self.arr.gt 

498 self._mask_nodata.prj = self.arr.prj 

499 else: 

500 del self.mask_nodata 

501 

502 @mask_nodata.deleter 

503 def mask_nodata(self): 

504 self._mask_nodata = None 

505 

506 @property 

507 def mask_clouds(self): 

508 if self._mask_clouds is not None: 

509 if not self._mask_clouds.is_inmem and self._mask_clouds.bands > 1: 

510 # CloudMask object self._mask_clouds points to multi-band image file on disk 

511 # (bands mask_nodata/mask_clouds) -> read processes of not needed bands need to be avoided 

512 self._mask_clouds = CloudMask(self._mask_clouds[:, :, 1], 

513 geotransform=self._mask_clouds.gt, 

514 projection=self._mask_clouds.prj) # TODO add legend 

515 return self._mask_clouds 

516 

517 elif self._masks and self._masks.bands > 1: # FIXME this will not be secure if there are more than 2 bands 

518 # return cloud mask from self.masks 

519 self._mask_clouds = CloudMask(self._masks[:, :, 1], # TODO use band names 

520 geotransform=self._masks.gt, 

521 projection=self._masks.prj) 

522 return self._mask_clouds 

523 else: 

524 return None # TODO don't calculate cloud mask? 

525 

526 @mask_clouds.setter 

527 def mask_clouds(self, *geoArr_initArgs): 

528 if geoArr_initArgs[0] is not None: # FIXME shape validation? 

529 self._mask_clouds = CloudMask(*geoArr_initArgs) 

530 self._mask_clouds.nodata = 0 

531 self._mask_clouds.gt = self.arr.gt 

532 self._mask_clouds.prj = self.arr.prj 

533 else: 

534 del self.mask_clouds 

535 

536 @mask_clouds.deleter 

537 def mask_clouds(self): 

538 self._mask_clouds = None 

539 

540 @property 

541 def dem(self): 

542 """ 

543 Returns an SRTM DEM in the exact dimension an pixel grid of self.arr as an instance of GeoArray. 

544 """ 

545 

546 if self._dem is None: 

547 self.logger.info('Generating DEM...') 

548 DC_args = (self.arr.box.boxMapXY, self.arr.prj, self.arr.xgsd, self.arr.ygsd) 

549 try: 

550 DC = INP_R.DEM_Creator(dem_sensor='SRTM', logger=self.logger) 

551 self._dem = DC.from_extent(*DC_args) 

552 except RuntimeError: 

553 self.logger.warning('SRTM DEM generation failed. Falling back to ASTER...') 

554 DC = INP_R.DEM_Creator(dem_sensor='ASTER', logger=self.logger) 

555 self._dem = DC.from_extent(*DC_args) 

556 

557 self._dem.nodata = DEF_D.get_outFillZeroSaturated(self._dem.dtype)[0] 

558 return self._dem 

559 

560 @dem.setter 

561 def dem(self, *geoArr_initArgs): 

562 if geoArr_initArgs[0] is not None: 

563 geoArr = GeoArray(*geoArr_initArgs) 

564 assert self._dem.shape[:2] == self.arr.shape[:2] 

565 

566 self._dem = geoArr 

567 self._dem.nodata = DEF_D.get_outFillZeroSaturated(self._dem.dtype)[0] 

568 self._dem.gt = self.arr.gt 

569 self._dem.prj = self.arr.prj 

570 else: 

571 del self.dem 

572 

573 @dem.deleter 

574 def dem(self): 

575 self._dem = None 

576 

577 @property 

578 def masks(self): 

579 # if self.mask_nodata is not None and self.mask_clouds is not None and \ 

580 # self._masks is not None and self._masks.bands==1: 

581 

582 # self.build_combined_masks_array() 

583 

584 return self._masks 

585 

586 @masks.setter 

587 def masks(self, *geoArr_initArgs): 

588 """ 

589 NOTE: This does not automatically update mask_nodata and mask_clouds BUT if mask_nodata and mask_clouds are 

590 None their getters will automatically synchronize! 

591 """ 

592 

593 if geoArr_initArgs[0] is not None: 

594 self._masks = GeoArray(*geoArr_initArgs) 

595 self._masks.nodata = 0 

596 self._masks.gt = self.arr.gt 

597 self._masks.prj = self.arr.prj 

598 else: 

599 del self.masks 

600 

601 @masks.deleter 

602 def masks(self): 

603 self._masks = None 

604 

605 @property 

606 def mask_clouds_confidence(self): 

607 return self._mask_clouds_confidence 

608 

609 @mask_clouds_confidence.setter 

610 def mask_clouds_confidence(self, *geoArr_initArgs): 

611 if geoArr_initArgs[0] is not None: 

612 cnfArr = GeoArray(*geoArr_initArgs) 

613 

614 assert cnfArr.shape == self.arr.shape[:2], \ 

615 "The 'mask_clouds_confidence' GeoArray can only be instanced with an array of the same dimensions " \ 

616 "like GMS_obj.arr. Got %s." % str(cnfArr.shape) 

617 

618 # noinspection PyProtectedMember 

619 if cnfArr._nodata is None: 

620 cnfArr.nodata = DEF_D.get_outFillZeroSaturated(cnfArr.dtype)[0] 

621 cnfArr.gt = self.arr.gt 

622 cnfArr.prj = self.arr.prj 

623 cnfArr.bandnames = ['confidence'] 

624 

625 self._mask_clouds_confidence = cnfArr 

626 else: 

627 del self.mask_clouds_confidence 

628 

629 @mask_clouds_confidence.deleter 

630 def mask_clouds_confidence(self): 

631 self._mask_clouds_confidence = None 

632 

633 @property 

634 def ac_errors(self): 

635 """Returns an instance of GeoArray containing error information calculated by the atmospheric correction. 

636 

637 :return: 

638 """ 

639 

640 return self._ac_errors 

641 

642 @ac_errors.setter 

643 def ac_errors(self, *geoArr_initArgs): 

644 if geoArr_initArgs[0] is not None: 

645 errArr = GeoArray(*geoArr_initArgs) 

646 

647 if CFG.ac_bandwise_accuracy: 

648 assert errArr.shape == self.arr.shape, \ 

649 "The 'ac_errors' GeoArray can only be instanced with an array of the same dimensions like " \ 

650 "GMS_obj.arr. Got %s." % str(errArr.shape) 

651 else: 

652 assert errArr.shape[:2] == self.arr.shape[:2], \ 

653 "The 'ac_errors' GeoArray can only be instanced with an array of the same X/Y dimensions like " \ 

654 "GMS_obj.arr. Got %s." % str(errArr.shape) 

655 

656 # noinspection PyProtectedMember 

657 if errArr._nodata is None: 

658 errArr.nodata = DEF_D.get_outFillZeroSaturated(errArr.dtype)[0] 

659 errArr.gt = self.arr.gt 

660 errArr.prj = self.arr.prj 

661 errArr.bandnames = self.LBA2bandnames(self.LayerBandsAssignment) if errArr.ndim == 3 else ['median'] 

662 

663 self._ac_errors = errArr 

664 else: 

665 del self.ac_errors 

666 

667 @ac_errors.deleter 

668 def ac_errors(self): 

669 self._ac_errors = None 

670 

671 @property 

672 def spat_homo_errors(self): 

673 if not self._spat_homo_errors and self.coreg_info['shift_reliability'] is not None: 

674 errArr = GeoArray(np.full(self.arr.shape[:2], self.coreg_info['shift_reliability'], dtype=np.uint8), 

675 geotransform=self.arr.geotransform, 

676 projection=self.arr.projection, 

677 bandnames=['shift_reliability'], 

678 nodata=DEF_D.get_outFillZeroSaturated(np.uint8)[0]) 

679 errArr[self.arr.mask_nodata.astype(np.int8) == 0] = errArr.nodata 

680 

681 self._spat_homo_errors = errArr 

682 

683 return self._spat_homo_errors 

684 

685 @spat_homo_errors.deleter 

686 def spat_homo_errors(self): 

687 self._spat_homo_errors = None 

688 

689 @property 

690 def spec_homo_errors(self): 

691 """Returns an instance of GeoArray containing error information calculated during spectral homogenization. 

692 

693 :return: 

694 """ 

695 

696 return self._spec_homo_errors 

697 

698 @spec_homo_errors.setter 

699 def spec_homo_errors(self, *geoArr_initArgs): 

700 if geoArr_initArgs[0] is not None: 

701 errArr = GeoArray(*geoArr_initArgs) 

702 

703 if CFG.spechomo_bandwise_accuracy: 

704 assert errArr.shape == self.arr.shape, \ 

705 "The 'spec_homo_errors' GeoArray can only be instanced with an array of the same dimensions like " \ 

706 "GMS_obj.arr. Got %s." % str(errArr.shape) 

707 else: 

708 assert errArr.shape[:2] == self.arr.shape[:2], \ 

709 "The 'spec_homo_errors' GeoArray can only be instanced with an array of the same X/Y dimensions " \ 

710 "like GMS_obj.arr. Got %s." % str(errArr.shape) 

711 

712 # noinspection PyProtectedMember 

713 if errArr._nodata is None: 

714 errArr.nodata = DEF_D.get_outFillZeroSaturated(errArr.dtype)[0] 

715 errArr.gt = self.arr.gt 

716 errArr.prj = self.arr.prj 

717 errArr.bandnames = self.LBA2bandnames(self.LayerBandsAssignment) if errArr.ndim == 3 else ['median'] 

718 

719 self._spec_homo_errors = errArr 

720 else: 

721 del self.spec_homo_errors 

722 

723 @spec_homo_errors.deleter 

724 def spec_homo_errors(self): 

725 self._spec_homo_errors = None 

726 

727 @property 

728 def accuracy_layers(self): 

729 return self._accuracy_layers 

730 

731 @accuracy_layers.setter 

732 def accuracy_layers(self, *geoArr_initArgs): 

733 if geoArr_initArgs[0] is not None: 

734 acc_lay = GeoArray(*geoArr_initArgs) 

735 assert acc_lay.shape[:2] == self.arr.shape[:2],\ 

736 "The 'accuracy_layers' GeoArray can only be instanced with an array of the same dimensions like " \ 

737 "GMS_obj.arr. Got %s." % str(acc_lay.shape) 

738 

739 # noinspection PyProtectedMember 

740 if acc_lay._nodata is None: 

741 acc_lay.nodata = DEF_D.get_outFillZeroSaturated(acc_lay.dtype)[0] 

742 acc_lay.gt = self.arr.gt 

743 acc_lay.prj = self.arr.prj 

744 

745 if not acc_lay.bandnames: 

746 raise ValueError 

747 

748 self._accuracy_layers = acc_lay 

749 else: 

750 del self._accuracy_layers 

751 

752 @accuracy_layers.deleter 

753 def accuracy_layers(self): 

754 self._accuracy_layers = None 

755 

756 @property 

757 def accuracy_layers_meta(self): 

758 if self._accuracy_layers is not None: 

759 return {'map info': geotransform2mapinfo(self._accuracy_layers.gt, self._accuracy_layers.projection), 

760 'coordinate system string': self._accuracy_layers.projection, 

761 'bands': self._accuracy_layers.bands, 

762 'band names': list(self._accuracy_layers.bandnames), 

763 'data ignore value': self._accuracy_layers.nodata} 

764 else: 

765 return None 

766 

767 def generate_accuracy_layers(self): 

768 if not self.proc_level.startswith('L2'): 

769 self.logger.warning('Attempt to get %s accuracy layers failed - they are a Level 2 feature only.' 

770 % self.proc_level) 

771 return None 

772 

773 try: 

774 from ..algorithms.L2C_P import AccuracyCube 

775 self._accuracy_layers = AccuracyCube(self) # don't use setter because it sets it to a GeoArray instance 

776 

777 except ValueError as e: 

778 if str(e) == 'The given GMS_object contains no accuracy layers for combination.': 

779 if any([CFG.ac_estimate_accuracy, CFG.spathomo_estimate_accuracy, CFG.spechomo_estimate_accuracy]): 

780 self.logger.warning('The given GMS_object contains no accuracy layers although computation ' 

781 'of accurracy layers was enabled in job configuration.') 

782 else: 

783 pass # self._accuracy_layers keeps None 

784 else: 

785 raise 

786 

787 except Exception as e: 

788 raise RuntimeError('Failed to generate AccuracyCube!', e) 

789 

790 @property 

791 def cloud_masking_algorithm(self): 

792 if not self._cloud_masking_algorithm: 

793 self._cloud_masking_algorithm = CFG.cloud_masking_algorithm[self.satellite] 

794 return self._cloud_masking_algorithm 

795 

796 @property 

797 def ac_options(self): 

798 # type: () -> dict 

799 """ 

800 Returns the options dictionary needed as input for atmospheric correction. If an empty dictionary is returned, 

801 atmospheric correction is not yet available for the current sensor and will later be skipped. 

802 """ 

803 

804 if not self._ac_options: 

805 path_ac_options = CFG.path_custom_sicor_options or PG.get_path_ac_options(self.GMS_identifier) 

806 

807 if path_ac_options and os.path.exists(path_ac_options): 

808 # don't validate because options contain pathes that do not exist on another server: 

809 opt_dict = get_ac_options(path_ac_options, validation=False) 

810 

811 # update some file paths depending on the current environment 

812 opt_dict['DEM']['fn'] = CFG.path_dem_proc_srtm_90m 

813 opt_dict['ECMWF']['path_db'] = CFG.path_ECMWF_db 

814 opt_dict['S2Image'][ 

815 'S2_MSI_granule_path'] = None # only a placeholder -> will always be None for GMS usage 

816 opt_dict['output'] = [] # outputs are not needed for GMS -> so 

817 opt_dict['report']['report_path'] = os.path.join(self.pathGen.get_path_procdata(), '[TYPE]') 

818 if 'uncertainties' in opt_dict: 

819 if CFG.ac_estimate_accuracy: 

820 opt_dict['uncertainties']['snr_model'] = PG.get_path_snr_model(self.GMS_identifier) 

821 else: 

822 del opt_dict['uncertainties'] # SICOR will not compute uncertainties if that key is missing 

823 

824 # apply custom configuration 

825 opt_dict["logger"]['level'] = CFG.log_level 

826 opt_dict["ram"]['upper_limit'] = CFG.ac_max_ram_gb 

827 opt_dict["ram"]['unit'] = 'GB' 

828 opt_dict["AC"]['fill_nonclear_areas'] = CFG.ac_fillnonclear_areas 

829 opt_dict["AC"]['clear_area_labels'] = CFG.ac_clear_area_labels 

830 # opt_dict['AC']['n_cores'] = CFG.CPUs if CFG.allow_subMultiprocessing else 1 

831 

832 self._ac_options = opt_dict 

833 else: 

834 self.logger.warning('There is no options file available for atmospheric correction. ' 

835 'Atmospheric correction must be skipped.') 

836 

837 return self._ac_options 

838 

839 def get_copied_dict_and_props(self, remove_privates=False): 

840 # type: (bool) -> dict 

841 """Returns a copy of the current object dictionary including the current values of all object properties.""" 

842 

843 # loggers must be closed 

844 self.close_loggers() 

845 # this disables automatic recreation of loggers (otherwise loggers are created by using getattr()): 

846 self._loggers_disabled = True 

847 

848 out_dict = self.__dict__.copy() 

849 

850 # add properties 

851 property_names = [p for p in dir(self.__class__) if isinstance(getattr(self.__class__, p), property)] 

852 [out_dict.update({propK: copy.copy(getattr(self, propK))}) for propK in property_names] 

853 

854 # remove private attributes 

855 if remove_privates: 

856 out_dict = {k: v for k, v in out_dict.items() if not k.startswith('_')} 

857 

858 self._loggers_disabled = False # enables automatic recreation of loggers 

859 

860 return out_dict 

861 

862 def attributes2dict(self, remove_privates=False): 

863 # type: (bool) -> dict 

864 """Returns a copy of the current object dictionary including the current values of all object properties.""" 

865 

866 # loggers must be closed 

867 self.close_loggers() 

868 # this disables automatic recreation of loggers (otherwise loggers are created by using getattr()): 

869 self._loggers_disabled = True 

870 

871 out_dict = self.__dict__.copy() 

872 

873 # add some selected property values 

874 for i in ['GMS_identifier', 'LayerBandsAssignment', 'coreg_needed', 'coreg_info', 'resamp_needed', 

875 'dict_LayerOptTherm', 'georef', 'MetaObj']: 

876 if i == 'MetaObj': 

877 out_dict['meta_odict'] = self.MetaObj.to_odict() 

878 else: 

879 out_dict[i] = getattr(self, i) 

880 

881 # remove private attributes 

882 if remove_privates: 

883 out_dict = {k: v for k, v in out_dict.items() if not k.startswith('_')} 

884 

885 self._loggers_disabled = False # enables automatic recreation of loggers 

886 return out_dict 

887 

888 def _data_downloader(self, sensor, entity_ID): 

889 self.logger.info('Data downloader started.') 

890 success = False 

891 " > download source code for Landsat here < " 

892 if not success: 

893 self.logger.critical( 

894 "Download for %s dataset '%s' failed. No further processing possible." % (sensor, entity_ID)) 

895 raise RuntimeError('Archive download failed.') 

896 return success 

897 

898 @classmethod 

899 def from_disk(cls, tuple_GMS_subset): 

900 """Fills an already instanced GMS object with data from disk. Excludes array attributes in Python mode. 

901 

902 :param tuple_GMS_subset: <tuple> e.g. ('/path/gms_file.gms', ['cube', None]) 

903 """ 

904 GMS_obj = cls() 

905 

906 path_GMS_file = tuple_GMS_subset[0] 

907 GMSfileDict = INP_R.GMSfile2dict(path_GMS_file) 

908 

909 # copy all attributes from GMS file (private attributes are not touched since they are not included in GMS file) 

910 

911 # set MetaObj first in order to make some getters and setters work 

912 GMS_id = GMS_identifier(image_type=GMSfileDict['image_type'], satellite=GMSfileDict['satellite'], 

913 sensor=GMSfileDict['sensor'], subsystem=GMSfileDict['subsystem'], 

914 proc_level=GMSfileDict['proc_level'], dataset_ID=GMSfileDict['dataset_ID'], logger=None) 

915 GMS_obj.MetaObj = METADATA(GMS_id).from_odict(GMSfileDict['meta_odict']) 

916 

917 for key, value in GMSfileDict.items(): 

918 if key in ['GMS_identifier', 'georef', 'dict_LayerOptTherm']: 

919 continue # properties that should better be created on the fly 

920 elif key == 'acq_datetime': 

921 GMS_obj.acq_datetime = datetime.datetime.strptime(value, '%Y-%m-%d %H:%M:%S.%f%z') 

922 else: 

923 try: 

924 setattr(GMS_obj, key, value) 

925 except Exception: 

926 raise AttributeError("Can't set attribute %s." % key) 

927 

928 GMS_obj.arr_shape, GMS_obj.arr_pos = tuple_GMS_subset[1] 

929 

930 GMS_obj.arr = GMS_obj.pathGen.get_path_imagedata() 

931 # self.mask_nodata and self.mask_clouds are auto-synchronized via self.masks (see their getters): 

932 GMS_obj.masks = GMS_obj.pathGen.get_path_maskdata() 

933 GMS_obj.accuracy_layers = GMS_obj.pathGen.get_path_accuracylayers() 

934 

935 return GMS_obj 

936 

937 @classmethod 

938 def from_sensor_subsystems(cls, list_GMS_objs): 

939 # type: (List[GMS_object]) -> GMS_object 

940 """Merge separate GMS objects belonging to the same scene-ID into ONE GMS object. 

941 

942 :param list_GMS_objs: <list> of GMS objects covering the same geographic area but representing different 

943 sensor subsystems (e.g. 3 GMS_objects for Sentinel-2 10m/20m/60m bands) 

944 """ 

945 GMS_obj_merged = cls() 

946 

947 # assertions 

948 assert len(list_GMS_objs) > 1, "'GMS_object.from_sensor_subsystems()' expects multiple input GMS objects. " \ 

949 "Got %d." % len(list_GMS_objs) 

950 assert all([is_coord_grid_equal(list_GMS_objs[0].arr.gt, *obj.arr.xygrid_specs) for obj in list_GMS_objs[1:]]),\ 

951 "The input GMS objects must have the same pixel grid. Received: %s" \ 

952 % np.array([obj.arr.xygrid_specs for obj in list_GMS_objs]) 

953 assert len(list(set([GMS_obj.proc_level for GMS_obj in list_GMS_objs]))) == 1, \ 

954 "The input GMS objects for GMS_object.from_sensor_subsystems() must have the same processing level." 

955 subsystems = [GMS_obj.subsystem for GMS_obj in list_GMS_objs] 

956 assert len(subsystems) == len(list(set(subsystems))), \ 

957 "The input 'list_GMS_objs' contains duplicates: %s" % subsystems 

958 

959 ################## 

960 # merge logfiles # 

961 ################## 

962 

963 # read all logs into DataFrame, sort it by the first column 

964 [GMS_obj.close_loggers() for GMS_obj in list_GMS_objs] # close the loggers of the input objects 

965 paths_inLogs = [GMS_obj.pathGen.get_path_logfile() for GMS_obj in list_GMS_objs] 

966 allLogs_df = DataFrame() 

967 for log in paths_inLogs: 

968 df = read_csv(log, sep='\n', delimiter=None, header=None, # no delimiter needed 

969 engine='python') # engine suppresses a pandas warning 

970 allLogs_df = allLogs_df.append(df) 

971 

972 allLogs_df = allLogs_df.sort_values(0) # sorting uses timestamps that appear on first position in logs 

973 allLogs_df = allLogs_df.drop_duplicates() # otherwise, e.g., logs from AC would appear 3 times for S2A 

974 

975 # set common metadata, needed for logfile 

976 GMS_obj_merged.baseN = list_GMS_objs[0].pathGen.get_baseN(merged_subsystems=True) 

977 GMS_obj_merged.path_logfile = list_GMS_objs[0].pathGen.get_path_logfile(merged_subsystems=True) 

978 GMS_obj_merged.scene_ID = list_GMS_objs[0].scene_ID 

979 

980 # write the merged logfile and flush previous logger 

981 np.savetxt(GMS_obj_merged.path_logfile, np.array(allLogs_df), delimiter=None, fmt="%s") 

982 GMS_obj_merged.close_loggers() 

983 

984 # log 

985 GMS_obj_merged.logger.info('Merging the subsystems %s to a single GMS object...' 

986 % ', '.join([GMS_obj.subsystem for GMS_obj in list_GMS_objs])) 

987 

988 ################## 

989 # MERGE METADATA # 

990 ################## 

991 

992 # copy all attributes from the first input GMS file (private attributes are not touched) 

993 for key, value in list_GMS_objs[0].__dict__.copy().items(): 

994 if key in ['GMS_identifier', 'georef', 'dict_LayerOptTherm']: 

995 continue # properties that should better be created on the fly 

996 elif key in ['baseN', 'path_logfile', 'scene_ID', 'subsystem']: 

997 continue # either previously set with common values or not needed for merged GMS_object 

998 try: 

999 setattr(GMS_obj_merged, key, value) 

1000 except Exception: 

1001 raise AttributeError("Can't set attribute %s." % key) 

1002 

1003 # update LayerBandsAssignment and get full list of output bandnames 

1004 from .metadata import get_LayerBandsAssignment 

1005 # use identifier of first input GMS object for getting LBA (respects current proc_level): 

1006 gms_idf = list_GMS_objs[0].GMS_identifier 

1007 GMS_obj_merged.LayerBandsAssignment = get_LayerBandsAssignment(gms_idf, return_fullLBA=True) 

1008 bandnames = ['B%s' % i if len(i) == 2 else 'B0%s' % i for i in GMS_obj_merged.LayerBandsAssignment] 

1009 

1010 # update layer-dependent metadata with respect to remaining input GMS objects 

1011 GMS_obj_merged.MetaObj.LayerBandsAssignment = GMS_obj_merged.LayerBandsAssignment 

1012 GMS_obj_merged.MetaObj.Subsystem = '' 

1013 

1014 GMS_obj_merged.subsystem = '' 

1015 del GMS_obj_merged.pathGen # must be refreshed because subsystem is now '' 

1016 GMS_obj_merged.close_loggers() # must also be refreshed because it depends on pathGen 

1017 

1018 for attrN in layerdependent_metadata: 

1019 # combine values from separate subsystems to a single value 

1020 attrDic_fullLBA = {} 

1021 for GMS_obj in list_GMS_objs: 

1022 attr_val = getattr(GMS_obj.MetaObj, attrN) 

1023 if isinstance(attr_val, list): 

1024 attrDic_fullLBA.update(dict(zip(GMS_obj.LayerBandsAssignment, attr_val))) 

1025 elif isinstance(attr_val, (dict, OrderedDict)): 

1026 attrDic_fullLBA.update(attr_val) 

1027 else: 

1028 raise ValueError(attrN) 

1029 

1030 # update the attribute in self.MetaObj 

1031 if attrDic_fullLBA: 

1032 val2set = [attrDic_fullLBA[bN] for bN in GMS_obj_merged.LayerBandsAssignment] \ 

1033 if isinstance(getattr(list_GMS_objs[0].MetaObj, attrN), list) else attrDic_fullLBA 

1034 setattr(GMS_obj_merged.MetaObj, attrN, val2set) 

1035 

1036 #################### 

1037 # MERGE ARRAY DATA # 

1038 #################### 

1039 

1040 # find the common extent. NOTE: boundsMap is expected in the order [xmin,xmax,ymin,ymax] 

1041 geoExtents = np.array([GMS_obj.arr.box.boundsMap for GMS_obj in list_GMS_objs]) 

1042 common_extent = (min(geoExtents[:, 0]), max(geoExtents[:, 1]), min(geoExtents[:, 2]), max(geoExtents[:, 3])) 

1043 

1044 # overwrite array data with merged arrays, clipped to common_extent and reordered according to FullLayerBandsAss 

1045 for attrname in ['arr', 'ac_errors', 'dem', 'mask_nodata', 'mask_clouds', 'mask_clouds_confidence', 'masks']: 

1046 

1047 # get current attribute of each subsystem without running property getters 

1048 all_arrays = [getattr(GMS_obj, '_%s' % attrname) for GMS_obj in list_GMS_objs] 

1049 

1050 # get the same geographical extent for each input GMS object 

1051 if len(set(tuple(ext) for ext in geoExtents.tolist())) > 1: 

1052 # in case of different extents 

1053 geoArrs_same_extent = [] 

1054 

1055 for geoArr in all_arrays: 

1056 

1057 if geoArr is not None: 

1058 # FIXME mask_clouds_confidence is no GeoArray until here 

1059 # FIXME -> has no nodata value -> calculation throughs warning 

1060 geoArr_same_extent = \ 

1061 GeoArray(*geoArr.get_mapPos( 

1062 mapBounds=np.array(common_extent)[[0, 2, 1, 3]], # pass (xmin, ymin, xmax, ymax) 

1063 mapBounds_prj=geoArr.prj), 

1064 bandnames=list(geoArr.bandnames.keys()), 

1065 nodata=geoArr.nodata) 

1066 geoArrs_same_extent.append(geoArr_same_extent) 

1067 else: 

1068 # e.g. in case of cloud mask that is only appended to the GMS object with the same 

1069 # spatial resolution) 

1070 geoArrs_same_extent.append(None) 

1071 

1072 else: 

1073 # skip get_mapPos() if all input GMS objects have the same extent 

1074 geoArrs_same_extent = all_arrays 

1075 

1076 # validate output GeoArrays # 

1077 ############################# 

1078 

1079 if len([gA for gA in geoArrs_same_extent if gA is not None]) > 1: 

1080 equal_bounds = all([geoArrs_same_extent[0].box.boundsMap == gA.box.boundsMap 

1081 for gA in geoArrs_same_extent[1:]]) 

1082 equal_epsg = all([geoArrs_same_extent[0].epsg == gA.epsg for gA in geoArrs_same_extent[1:]]) 

1083 equal_xydims = all([geoArrs_same_extent[0].shape[:2] == gA.shape[:2] for gA in geoArrs_same_extent[1:]]) 

1084 if not all([equal_bounds, equal_epsg, equal_xydims]): 

1085 raise RuntimeError('Something went wrong during getting the same geographical extent for all the ' 

1086 'input GMS objects. The extents, projections or pixel dimensions of the ' 

1087 'calculated input GMS objects are not equal.') 

1088 

1089 # set output arrays # 

1090 ##################### 

1091 

1092 # handle those arrays where bands have to be reordered according to FullLayerBandsAssignment 

1093 if attrname in ['arr', 'ac_errors'] and list(set(geoArrs_same_extent)) != [None]: 

1094 # check that each desired band name for the current attribute is provided by one of the input 

1095 # GMS objects 

1096 available_bandNs = list(chain.from_iterable([list(gA.bandnames) for gA in geoArrs_same_extent])) 

1097 for bN in bandnames: 

1098 if bN not in available_bandNs: 

1099 raise ValueError("The given input GMS objects (subsystems) do not provide a bandname '%s' for " 

1100 "the attribute '%s'. Available band names amongst all input GMS objects are: " 

1101 "%s" % (bN, attrname, str(available_bandNs))) 

1102 

1103 # merge arrays 

1104 def get_band(bandN): 

1105 return [gA[bandN] for gA in geoArrs_same_extent if gA and bandN in gA.bandnames][0] 

1106 full_geoArr = GeoArray(np.dstack([get_band(bandN) for bandN in bandnames]), 

1107 geoArrs_same_extent[0].gt, geoArrs_same_extent[0].prj, 

1108 bandnames=bandnames, 

1109 nodata=geoArrs_same_extent[0].nodata) 

1110 

1111 # handle CFG.ac_bandwise_accuracy (average ac_errors if needed) 

1112 # NOTE: full_geoArr will only have 3 dims in case of multiple subsystems 

1113 # -> median cannot directly computed during AC due to different GSDs of the subsystems 

1114 # -> first perform spatial homogenization, directly after that compute median 

1115 # -> this is not done later (e.g., in L2C) to avoid memory overflows in L2B or L2C 

1116 if attrname == 'ac_errors' and not CFG.ac_bandwise_accuracy and full_geoArr.bands > 1: 

1117 full_geoArr = np.median(full_geoArr, axis=2).astype(full_geoArr.dtype) 

1118 

1119 setattr(GMS_obj_merged, attrname, full_geoArr) 

1120 

1121 # handle the remaining arrays 

1122 else: 

1123 # masks, dem, mask_nodata, mask_clouds, mask_clouds_confidence 

1124 if attrname == 'dem': 

1125 # use the DEM of the first input object 

1126 # (if the grid is the same, the DEMs should be the same anyway) 

1127 GMS_obj_merged.dem = geoArrs_same_extent[0] 

1128 

1129 elif attrname == 'mask_nodata': 

1130 # must not be merged -> self.arr is already merged, so just recalculate it (np.all) 

1131 GMS_obj_merged.mask_nodata = GMS_obj_merged.calc_mask_nodata(overwrite=True) 

1132 

1133 elif attrname == 'mask_clouds': 

1134 # possibly only present in ONE subsystem (set by atm. Corr.) 

1135 mask_clouds = [msk for msk in geoArrs_same_extent if msk is not None] 

1136 if len(mask_clouds) > 1: 

1137 raise ValueError('Expected mask clouds in only one subsystem. Got %s.' % len(mask_clouds)) 

1138 GMS_obj_merged.mask_clouds = mask_clouds[0] if mask_clouds else None 

1139 

1140 elif attrname == 'mask_clouds_confidence': 

1141 # possibly only present in ONE subsystem (set by atm. Corr.) 

1142 mask_clouds_conf = [msk for msk in geoArrs_same_extent if msk is not None] 

1143 if len(mask_clouds_conf) > 1: 

1144 raise ValueError( 

1145 'Expected mask_clouds_conf in only one subsystem. Got %s.' % len(mask_clouds_conf)) 

1146 GMS_obj_merged.mask_clouds_confidence = mask_clouds_conf[0] if mask_clouds_conf else None 

1147 

1148 elif attrname == 'masks': 

1149 # self.mask_nodata and self.mask_clouds will already be set here -> so just recreate it from there 

1150 GMS_obj_merged.masks = None 

1151 

1152 # avoid unequal nodata edges between individual layers (resampling artifacts etc.) # 

1153 #################################################################################### 

1154 

1155 # get pixel values of areas that have not been atmospherically corrected (non-clear) 

1156 nonclear_labels = [lbl for lbl in ["Clear", "Snow", "Water", "Shadow", "Cirrus", "Cloud"] 

1157 if lbl not in CFG.ac_clear_area_labels] 

1158 cloud_mask_legend = DEF_D.get_mask_classdefinition('mask_clouds', GMS_obj_merged.satellite) 

1159 nonclear_pixVals = [cloud_mask_legend[lbl] for lbl in nonclear_labels] 

1160 

1161 # apply cloud mask to image data and all products derived from image data 

1162 # (only if image data represents BOA-Ref and cloud areas are not to be filled with TOA-Ref) 

1163 

1164 if re.search(r'BOA_Reflectance', GMS_obj_merged.MetaObj.PhysUnit, re.I) and not CFG.ac_fillnonclear_areas: 

1165 # fill non-clear areas with no data values (for all bands) 

1166 for pixVal in nonclear_pixVals: 

1167 mask_nonclear = GMS_obj_merged.mask_clouds[:] == pixVal 

1168 GMS_obj_merged.arr[mask_nonclear] = DEF_D.get_outFillZeroSaturated(GMS_obj_merged.arr.dtype)[0] 

1169 

1170 if GMS_obj_merged.ac_errors: 

1171 GMS_obj_merged.ac_errors[mask_nonclear] = \ 

1172 DEF_D.get_outFillZeroSaturated(GMS_obj_merged.ac_errors.dtype)[0] 

1173 

1174 # update no data mask (adds all pixels to no data where at least one band has no data) 

1175 GMS_obj_merged.calc_mask_nodata(overwrite=True) 

1176 

1177 # apply updated nodata mask to array data (only to no data area, not to non-clear area) 

1178 # => this sets all pixels to no data where at least one band has no data 

1179 # NOTE: This may cause the cloud mask not exactly match the holes in self.arr so that there may be small 

1180 # gaps between the cloud edge and where the image data begins. This is due to resampling, e.g. from 

1181 # Sentinel-2 60m grid to Landsat 30 m grid. Nodata mask marks those areas as no data although there is 

1182 # no cloud at exactly that pixel. This is ok! 

1183 area2replace = np.all([GMS_obj_merged.mask_nodata[:] == 0] + 

1184 [GMS_obj_merged.mask_clouds[:] != pixVal for pixVal in nonclear_pixVals], axis=0) \ 

1185 if GMS_obj_merged.mask_clouds is not None else GMS_obj_merged.mask_nodata[:] == 0 

1186 

1187 for attrname in ['arr', 'ac_errors']: 

1188 attr_val = getattr(GMS_obj_merged, attrname) 

1189 

1190 if attr_val is not None: 

1191 attr_val[area2replace] = DEF_D.get_outFillZeroSaturated(attr_val.dtype)[0] 

1192 setattr(GMS_obj_merged, attrname, attr_val) 

1193 

1194 # recreate self.masks 

1195 GMS_obj_merged.build_combined_masks_array() 

1196 

1197 # update array-dependent metadata 

1198 GMS_obj_merged.MetaObj.cols = GMS_obj_merged.arr.cols 

1199 GMS_obj_merged.MetaObj.rows = GMS_obj_merged.arr.rows 

1200 GMS_obj_merged.MetaObj.bands = GMS_obj_merged.arr.bands 

1201 GMS_obj_merged.MetaObj.map_info = geotransform2mapinfo(GMS_obj_merged.arr.gt, GMS_obj_merged.arr.prj) 

1202 GMS_obj_merged.MetaObj.projection = GMS_obj_merged.arr.prj 

1203 

1204 # update corner coordinates # (calc_corner_positions is a method of L1A_object) 

1205 GMS_obj_merged.calc_corner_positions() 

1206 

1207 # set shape of full array 

1208 GMS_obj_merged.shape_fullArr = GMS_obj_merged.arr.shape 

1209 

1210 return GMS_obj_merged 

1211 

1212 @classmethod 

1213 def from_tiles(cls, list_GMS_tiles): 

1214 # type: (list) -> GMS_object 

1215 """Merge separate GMS objects with different spatial coverage but belonging to one scene-ID to ONE GMS object. 

1216 

1217 :param list_GMS_tiles: <list> of GMS objects that have been created by cut_GMS_obj_into_blocks() 

1218 """ 

1219 GMS_obj = cls() 

1220 

1221 if 'IMapUnorderedIterator' in str(type(list_GMS_tiles)): 

1222 list_GMS_tiles = list(list_GMS_tiles) 

1223 

1224 # copy all attributes except of array attributes 

1225 tile1 = list_GMS_tiles[0] 

1226 [setattr(GMS_obj, i, getattr(tile1, i)) for i in tile1.__dict__ 

1227 if not callable(getattr(tile1, i)) and not isinstance(getattr(tile1, i), (np.ndarray, GeoArray))] 

1228 

1229 # MERGE ARRAY-ATTRIBUTES 

1230 list_arraynames = [i for i in tile1.__dict__ if not callable(getattr(tile1, i)) and 

1231 isinstance(getattr(tile1, i), (np.ndarray, GeoArray))] 

1232 list_arraynames = ['_arr'] + [i for i in list_arraynames if 

1233 i != '_arr'] # list must start with _arr, otherwise setters will not work 

1234 

1235 for arrname in list_arraynames: 

1236 samplearray = getattr(tile1, arrname) 

1237 assert isinstance(samplearray, (np.ndarray, GeoArray)), \ 

1238 'Received a %s object for attribute %s. Expected a numpy array or an instance of GeoArray.' \ 

1239 % (type(samplearray), arrname) 

1240 is_3d = samplearray.ndim == 3 

1241 bands = (samplearray.shape[2],) if is_3d else () # dynamic -> works for arr, cld_arr,... 

1242 target_shape = tuple(GMS_obj.shape_fullArr[:2]) + bands 

1243 target_dtype = samplearray.dtype 

1244 merged_array = GMS_obj._merge_arrays(list_GMS_tiles, arrname, target_shape, target_dtype) 

1245 

1246 setattr(GMS_obj, arrname if not arrname.startswith('_') else arrname[1:], 

1247 merged_array) # use setters if possible 

1248 # NOTE: this asserts that each attribute starting with '_' has also a property with a setter! 

1249 

1250 # UPDATE ARRAY-DEPENDENT ATTRIBUTES 

1251 GMS_obj.arr_shape = 'cube' 

1252 GMS_obj.arr_pos = None 

1253 

1254 # update MetaObj attributes 

1255 GMS_obj.MetaObj.cols = GMS_obj.arr.cols 

1256 GMS_obj.MetaObj.rows = GMS_obj.arr.rows 

1257 GMS_obj.MetaObj.bands = GMS_obj.arr.bands 

1258 GMS_obj.MetaObj.map_info = geotransform2mapinfo(GMS_obj.arr.gt, GMS_obj.arr.prj) 

1259 GMS_obj.MetaObj.projection = GMS_obj.arr.prj 

1260 

1261 # calculate data_corners_imXY (mask_nodata is always an array here because get_mapPos always returns an array) 

1262 corners_imYX = calc_FullDataset_corner_positions( 

1263 GMS_obj.mask_nodata, assert_four_corners=False, algorithm='shapely') 

1264 GMS_obj.trueDataCornerPos = [(YX[1], YX[0]) for YX in corners_imYX] # [UL, UR, LL, LR] 

1265 

1266 # calculate trueDataCornerLonLat 

1267 data_corners_LatLon = pixelToLatLon(GMS_obj.trueDataCornerPos, 

1268 geotransform=GMS_obj.arr.gt, projection=GMS_obj.arr.prj) 

1269 GMS_obj.trueDataCornerLonLat = [(YX[1], YX[0]) for YX in data_corners_LatLon] 

1270 

1271 # calculate trueDataCornerUTM 

1272 data_corners_utmYX = pixelToMapYX(GMS_obj.trueDataCornerPos, geotransform=GMS_obj.arr.gt, 

1273 projection=GMS_obj.arr.prj) # FIXME asserts gt in UTM coordinates 

1274 GMS_obj.trueDataCornerUTM = [(YX[1], YX[0]) for YX in data_corners_utmYX] 

1275 

1276 return GMS_obj 

1277 

1278 def get_subset_obj(self, imBounds=None, mapBounds=None, mapBounds_prj=None, out_prj=None, logmsg=None, 

1279 progress=False, v=False): 

1280 # type: (tuple, tuple, str, str, str, bool, bool) -> Union[GMS_object, None] 

1281 """Return a subset of the given GMS object, based on the given bounds coordinates. 

1282 

1283 Array attributes are clipped and relevant metadata keys are updated according to new extent. In case the subset 

1284 does not contain any data but only no-data values, None is returned. 

1285 

1286 :param imBounds: <tuple> tuple of image coordinates in the form (xmin,xmax,ymin,ymax) 

1287 :param mapBounds: <tuple> tuple of map coordinates in the form (xmin,xmax,ymin,ymax) 

1288 :param mapBounds_prj: <str> a WKT string containing projection of the given map bounds 

1289 (can be different to projection of the GMS object; ignored if map bounds not given) 

1290 :param out_prj: <str> a WKT string containing output projection. 

1291 If not given, the projection of self.arr is used. 

1292 :param logmsg: <str> a message to be logged when this method is called 

1293 :param progress: <bool> whether to show progress bar (default: False) 

1294 :param v: <bool> verbose mode (default: False) 

1295 :return: <GMS_object> the GMS object subset 

1296 """ 

1297 

1298 if logmsg: 

1299 self.logger.info(logmsg) 

1300 

1301 # copy object 

1302 from ..misc.helper_functions import get_parentObjDict 

1303 parentObjDict = get_parentObjDict() 

1304 sub_GMS_obj = parentObjDict[self.proc_level](*HLP_F.initArgsDict[self.proc_level]) # init 

1305 sub_GMS_obj.__dict__.update( 

1306 {k: getattr(self, k) for k in self.__dict__.keys() 

1307 if not isinstance(getattr(self, k), (GeoArray, np.ndarray))}.copy()) 

1308 

1309 sub_GMS_obj = copy.deepcopy(sub_GMS_obj) 

1310 

1311 # clip all array attributes using the given bounds 

1312 # list_arraynames = [i for i in self.__dict__ if not callable(getattr(self, i)) and \ 

1313 # isinstance(getattr(self, i), np.ndarray)] 

1314 list_arraynames = [i for i in ['arr', 'masks', 'ac_errors', 'mask_clouds_confidence', 'spec_homo_errors', 

1315 'accuracy_layers'] 

1316 if hasattr(self, i) and getattr(self, i) is not None] # FIXME hardcoded 

1317 assert list_arraynames 

1318 assert imBounds or mapBounds, "Either 'imBounds' or 'mapBounds' must be given. Got nothing." 

1319 

1320 # calculate mapBounds if not already given 

1321 if not mapBounds: 

1322 rS, rE, cS, cE = imBounds 

1323 (xmin, ymax), (xmax, ymin) = [imXY2mapXY((imX, imY), self.arr.gt) for imX, imY in 

1324 [(cS, rS), (cE + 1, rE + 1)]] 

1325 mapBounds_prj = self.arr.projection 

1326 else: 

1327 xmin, xmax, ymin, ymax = mapBounds 

1328 

1329 # avoid disk IO if requested area is within the input array # TODO 

1330 

1331 #################################################################### 

1332 # subset all array attributes and update directly related metadata # 

1333 #################################################################### 

1334 

1335 for arrname in list_arraynames: 

1336 # get input data for array subsetting 

1337 geoArr = getattr(self, arrname) 

1338 

1339 # get subsetted and (possibly) reprojected array 

1340 subArr = GeoArray(*geoArr.get_mapPos((xmin, ymin, xmax, ymax), mapBounds_prj, 

1341 out_prj=out_prj or geoArr.prj, 

1342 rspAlg='near' if arrname == 'masks' else 'cubic', 

1343 progress=progress), 

1344 bandnames=list(geoArr.bandnames), nodata=geoArr.nodata) 

1345 

1346 # show result 

1347 if v: 

1348 subArr.show(figsize=(10, 10)) 

1349 

1350 # update array-related attributes of sub_GMS_obj 

1351 if arrname == 'arr': 

1352 # return None in case the subset object contains only nodata 

1353 if subArr.min() == subArr.max() and \ 

1354 np.std(subArr) == 0 and \ 

1355 np.unique(subArr) == subArr.nodata: 

1356 return None 

1357 

1358 sub_GMS_obj.MetaObj.map_info = geotransform2mapinfo(subArr.gt, subArr.prj) 

1359 sub_GMS_obj.MetaObj.projection = subArr.prj 

1360 sub_GMS_obj.MetaObj.rows, sub_GMS_obj.MetaObj.cols = subArr.arr.shape[:2] 

1361 sub_GMS_obj.MetaObj.CS_UTM_ZONE = get_UTMzone(prj=subArr.prj) # FIXME only works for UTM 

1362 sub_GMS_obj.MetaObj.CS_EPSG = WKT2EPSG(subArr.prj) 

1363 elif arrname == 'masks': 

1364 sub_GMS_obj.masks_meta['map info'] = geotransform2mapinfo(subArr.gt, subArr.prj) 

1365 sub_GMS_obj.masks_meta['coordinate system string'] = subArr.prj 

1366 sub_GMS_obj.masks_meta['lines'], sub_GMS_obj.masks_meta['samples'] = subArr.arr.shape[:2] 

1367 sub_GMS_obj.masks_meta['CS_UTM_ZONE'] = get_UTMzone(prj=subArr.prj) # FIXME only works for UTM 

1368 sub_GMS_obj.masks_meta['CS_EPSG'] = WKT2EPSG(subArr.prj) 

1369 else: 

1370 # other layers are either not written (i.e. have no separate meta) 

1371 # or their meta is directly derived from the corresponding array 

1372 pass 

1373 

1374 delattr(sub_GMS_obj, 

1375 arrname) # connected filePath must be disconnected because projection change will be rejected 

1376 setattr(sub_GMS_obj, arrname, subArr) 

1377 

1378 # copy subset mask data to mask_nodata and mask_clouds 

1379 sub_GMS_obj.mask_nodata = sub_GMS_obj.masks[:, :, 0] # ==> getters will automatically return it from self.masks 

1380 # FIXME not dynamic: 

1381 sub_GMS_obj.mask_clouds = sub_GMS_obj.masks[:, :, 1] if sub_GMS_obj.masks.bands > 1 else None 

1382 

1383 ################### 

1384 # update metadata # 

1385 ################### 

1386 

1387 # update arr_pos 

1388 sub_GMS_obj.arr_shape = 'block' 

1389 if imBounds is not None: 

1390 rS, rE, cS, cE = imBounds 

1391 sub_GMS_obj.arr_pos = ((rS, rE), (cS, cE)) 

1392 else: 

1393 pass # FIXME how to set arr_pos in that case? 

1394 

1395 # calculate new attributes 'corner_utm' and 'corner_lonlat' 

1396 rows, cols, bands = sub_GMS_obj.arr.shape 

1397 ULxy, URxy, LLxy, LRxy = [[0, 0], [cols - 1, 0], [0, rows - 1], [cols - 1, rows - 1]] 

1398 # FIXME asserts gt in UTM coordinates: 

1399 utm_coord_YX = pixelToMapYX([ULxy, URxy, LLxy, LRxy], geotransform=subArr.gt, projection=subArr.prj) 

1400 # ULyx,URyx,LLyx,LRyx: 

1401 lonlat_coord = pixelToLatLon([ULxy, URxy, LLxy, LRxy], geotransform=subArr.gt, projection=subArr.prj) 

1402 sub_GMS_obj.corner_utm = [[YX[1], YX[0]] for YX in utm_coord_YX] # ULxy,URxy,LLxy,LRxy 

1403 sub_GMS_obj.corner_lonlat = [[YX[1], YX[0]] for YX in lonlat_coord] # ULxy,URxy,LLxy,LRxy 

1404 

1405 # calculate 'bounds_LonLat' and 'bounds_utm' 

1406 sub_GMS_obj.bounds_LonLat = HLP_F.corner_coord_to_minmax(sub_GMS_obj.corner_lonlat) 

1407 sub_GMS_obj.bounds_utm = HLP_F.corner_coord_to_minmax(sub_GMS_obj.corner_utm) 

1408 

1409 # calculate data_corners_imXY (mask_nodata is always an array here because get_mapPos always returns an array) 

1410 corners_imYX = calc_FullDataset_corner_positions( 

1411 sub_GMS_obj.mask_nodata, assert_four_corners=False, algorithm='shapely') 

1412 sub_GMS_obj.trueDataCornerPos = [(YX[1], YX[0]) for YX in corners_imYX] 

1413 

1414 # calculate trueDataCornerLonLat 

1415 data_corners_LatLon = pixelToLatLon(sub_GMS_obj.trueDataCornerPos, 

1416 geotransform=subArr.gt, projection=subArr.prj) 

1417 sub_GMS_obj.trueDataCornerLonLat = [(YX[1], YX[0]) for YX in data_corners_LatLon] 

1418 

1419 # calculate trueDataCornerUTM 

1420 if isProjectedOrGeographic(subArr.prj) == 'projected': 

1421 data_corners_utmYX = [imYX2mapYX(imYX, gt=subArr.gt) for imYX in corners_imYX] 

1422 sub_GMS_obj.trueDataCornerUTM = [(YX[1], YX[0]) for YX in data_corners_utmYX] 

1423 else: 

1424 self.logger.error("Error while setting 'trueDataCornerUTM' due to unexpected projection of subArr: %s." 

1425 % subArr.prj) 

1426 

1427 return sub_GMS_obj 

1428 

1429 @staticmethod 

1430 def _merge_arrays(list_GMS_tiles, arrname2merge, target_shape, target_dtype): 

1431 # type: (list, str, tuple, np.dtype) -> np.ndarray 

1432 """Merge multiple arrays into a single one. 

1433 

1434 :param list_GMS_tiles: 

1435 :param arrname2merge: 

1436 :param target_shape: 

1437 :param target_dtype: 

1438 :return: 

1439 """ 

1440 out_arr = np.empty(target_shape, dtype=target_dtype) 

1441 for tile in list_GMS_tiles: 

1442 (rowStart, rowEnd), (colStart, colEnd) = tile.arr_pos 

1443 out_arr[rowStart:rowEnd + 1, colStart:colEnd + 1] = getattr(tile, arrname2merge) 

1444 return out_arr 

1445 

1446 def log_for_fullArr_or_firstTile(self, log_msg, subset=None): 

1447 """Send a message to the logger only if full array or the first tile is currently processed. 

1448 This function can be called when processing any tile but log message will only be sent from first tile. 

1449 

1450 :param log_msg: the log message to be logged 

1451 :param subset: subset argument as sent to e.g. DN2TOARadRefTemp that indicates which tile is to be processed. 

1452 Not needed if self.arr_pos is not None. 

1453 """ 

1454 

1455 if subset is None and \ 

1456 (self.arr_shape == 'cube' or self.arr_pos is None or [self.arr_pos[0][0], self.arr_pos[1][0]] == [0, 0]) or\ 

1457 subset == ['cube', None] or (subset and [subset[1][0][0], subset[1][1][0]] == [0, 0]) or \ 

1458 hasattr(self, 'logAtThisTile') and getattr(self, 'logAtThisTile'): # cube or 1st tile 

1459 self.logger.info(log_msg) 

1460 else: 

1461 pass 

1462 

1463 @staticmethod 

1464 def LBA2bandnames(LayerBandsAssignment): 

1465 """Convert LayerbandsAssignment from format ['1','2',...] to bandnames like this: [B01, .., B8A,].""" 

1466 return ['B%s' % i if len(i) == 2 else 'B0%s' % i for i in LayerBandsAssignment] 

1467 

1468 def get_tilepos(self, target_tileshape, target_tilesize): 

1469 self.tile_pos = [[target_tileshape, tb] 

1470 for tb in get_array_tilebounds(array_shape=self.shape_fullArr, tile_shape=target_tilesize)] 

1471 

1472 @staticmethod 

1473 def rescale_array(inArray, outScaleFactor, inScaleFactor=1): 

1474 """Adjust the scaling factor of an array to match the given output scale factor.""" 

1475 

1476 assert isinstance(inArray, np.ndarray) 

1477 return inArray.astype(np.int16) * (outScaleFactor / inScaleFactor) 

1478 

1479 def calc_mask_nodata(self, fromBand=None, overwrite=False): 

1480 """Calculates a no data mask with (values: 0=nodata; 1=data) 

1481 

1482 :param fromBand: <int> index of the band to be used (if None, all bands are used) 

1483 :param overwrite: <bool> whether to overwrite existing nodata mask that has already been calculated 

1484 :return: 

1485 """ 

1486 

1487 self.logger.info('Calculating nodata mask...') 

1488 

1489 if self._mask_nodata is None or overwrite: 

1490 self.arr.calc_mask_nodata(fromBand=fromBand, overwrite=overwrite) 

1491 self.mask_nodata = self.arr.mask_nodata 

1492 return self.mask_nodata 

1493 

1494 def apply_nodata_mask_to_ObjAttr(self, attrname, out_nodata_val=None): 

1495 # type: (str,int) -> None 

1496 """Applies self.mask_nodata to the specified array attribute by setting all values where mask_nodata is 0 to the 

1497 given nodata value. 

1498 

1499 :param attrname: The attribute to apply the nodata mask to. Must be an array attribute or 

1500 a string path to a previously saved ENVI-file. 

1501 :param out_nodata_val: set the values of the given attribute to this value. 

1502 """ 

1503 

1504 assert hasattr(self, attrname) 

1505 

1506 if getattr(self, attrname) is not None: 

1507 

1508 if isinstance(getattr(self, attrname), str): 

1509 update_spec_vals = True if attrname == 'arr' else False 

1510 self.apply_nodata_mask_to_saved_ENVIfile(getattr(self, attrname), out_nodata_val, update_spec_vals) 

1511 else: 

1512 assert isinstance(getattr(self, attrname), (np.ndarray, GeoArray)), \ 

1513 'L1A_obj.%s must be a numpy array or an instance of GeoArray. Got type %s.' \ 

1514 % (attrname, type(getattr(self, attrname))) 

1515 assert hasattr(self, 'mask_nodata') and self.mask_nodata is not None 

1516 

1517 self.log_for_fullArr_or_firstTile('Applying nodata mask to L1A_object.%s...' % attrname) 

1518 

1519 nodata_val = out_nodata_val if out_nodata_val else \ 

1520 DEF_D.get_outFillZeroSaturated(getattr(self, attrname).dtype)[0] 

1521 getattr(self, attrname)[self.mask_nodata.astype(np.int8) == 0] = nodata_val 

1522 

1523 if attrname == 'arr': 

1524 self.MetaObj.spec_vals['fill'] = nodata_val 

1525 

1526 def build_combined_masks_array(self): 

1527 # type: () -> dict 

1528 """Generates self.masks attribute (unsigned integer 8bit) from by concatenating all masks included in GMS obj. 

1529 The corresponding metadata is assigned to L1A_obj.masks_meta. Empty mask attributes are skipped.""" 

1530 

1531 arrays2combine = [aN for aN in ['mask_nodata', 'mask_clouds'] 

1532 if hasattr(self, aN) and isinstance(getattr(self, aN), (GeoArray, np.ndarray))] 

1533 if arrays2combine: 

1534 self.log_for_fullArr_or_firstTile('Combining masks...') 

1535 

1536 def get_data(arrName): return getattr(self, arrName).astype(np.uint8)[:, :, None] 

1537 

1538 for aN in arrays2combine: 

1539 if False in np.equal(getattr(self, aN), getattr(self, aN).astype(np.uint8)): 

1540 warnings.warn('Some pixel values of attribute %s changed during data type ' 

1541 'conversion within build_combined_masks_array().') 

1542 

1543 # set self.masks 

1544 self.masks = get_data(arrays2combine[0]) if len(arrays2combine) == 1 else \ 

1545 np.concatenate([get_data(aN) for aN in arrays2combine], axis=2) 

1546 self.masks.bandnames = arrays2combine # set band names of GeoArray (allows later indexing by band name) 

1547 

1548 # set self.masks_meta 

1549 nodataVal = DEF_D.get_outFillZeroSaturated(self.masks.dtype)[0] 

1550 self.masks_meta = {'map info': self.MetaObj.map_info, 

1551 'coordinate system string': self.MetaObj.projection, 

1552 'bands': len(arrays2combine), 

1553 'band names': arrays2combine, 

1554 'data ignore value': nodataVal} 

1555 

1556 return {'desc': 'masks', 'row_start': 0, 'row_end': self.shape_fullArr[0], 

1557 'col_start': 0, 'col_end': self.shape_fullArr[1], 'data': self.masks} # usually not needed 

1558 

1559 def apply_nodata_mask_to_saved_ENVIfile(self, path_saved_ENVIhdr, custom_nodata_val=None, update_spec_vals=False): 

1560 # type: (str,int,bool) -> None 

1561 """Applies self.mask_nodata to a saved ENVI file with the same X/Y dimensions like self.mask_nodata by setting 

1562 all values where mask_nodata is 0 to the given nodata value. 

1563 

1564 :param path_saved_ENVIhdr: <str> The path of the ENVI file to apply the nodata mask to. 

1565 :param custom_nodata_val: <int> set the values of the given attribute to this value. 

1566 :param update_spec_vals: <bool> whether to update self.MetaObj.spec_vals['fill'] 

1567 """ 

1568 

1569 self.log_for_fullArr_or_firstTile('Applying nodata mask to saved ENVI file...') 

1570 assert os.path.isfile(path_saved_ENVIhdr) 

1571 assert hasattr(self, 'mask_nodata') and self.mask_nodata is not None 

1572 if not path_saved_ENVIhdr.endswith('.hdr') and os.path.isfile(os.path.splitext(path_saved_ENVIhdr)[0] + '.hdr'): 

1573 path_saved_ENVIhdr = os.path.splitext(path_saved_ENVIhdr)[0] + '.hdr' 

1574 if custom_nodata_val is None: 

1575 dtype_IDL = int(INP_R.read_ENVIhdr_to_dict(path_saved_ENVIhdr)['data type']) 

1576 nodata_val = DEF_D.get_outFillZeroSaturated(DEF_D.dtype_lib_IDL_Python[dtype_IDL])[0] 

1577 else: 

1578 nodata_val = custom_nodata_val 

1579 FileObj = spectral.open_image(path_saved_ENVIhdr) 

1580 File_memmap = FileObj.open_memmap(writable=True) 

1581 File_memmap[self.mask_nodata == 0] = nodata_val 

1582 if update_spec_vals: 

1583 self.MetaObj.spec_vals['fill'] = nodata_val 

1584 

1585 def combine_tiles_to_ObjAttr(self, tiles, target_attr): 

1586 # type: (list,str) -> None 

1587 """Combines tiles, e.g. produced by L1A_P.L1A_object.DN2TOARadRefTemp() to a single attribute. 

1588 If CFG.inmem_serialization is False, the produced attribute is additionally written to disk. 

1589 

1590 :param tiles: <list> a list of dictionaries with the keys 'desc', 'data', 'row_start','row_end', 

1591 'col_start' and 'col_end' 

1592 :param target_attr: <str> the name of the attribute to be produced 

1593 """ 

1594 

1595 warnings.warn("'combine_tiles_to_ObjAttr' is deprecated.", DeprecationWarning) 

1596 assert tiles[0] and isinstance(tiles, list) and isinstance(tiles[0], dict), \ 

1597 "The 'tiles' argument has to be list of dictionaries with the keys 'desc', 'data', 'row_start'," \ 

1598 "'row_end', 'col_start' and 'col_end'." 

1599 self.logger.info("Building L1A attribute '%s' by combining given tiles..." % target_attr) 

1600 tiles = [tiles] if not isinstance(tiles, list) else tiles 

1601 sampleTile = dict(tiles[0]) 

1602 target_shape = self.shape_fullArr if len(sampleTile['data'].shape) == 3 else self.shape_fullArr[:2] 

1603 setattr(self, target_attr, np.empty(target_shape, dtype=sampleTile['data'].dtype)) 

1604 for tile in tiles: # type: dict 

1605 rS, rE, cS, cE = tile['row_start'], tile['row_end'], tile['col_start'], tile['col_end'] 

1606 if len(target_shape) == 3: 

1607 getattr(self, target_attr)[rS:rE + 1, cS:cE + 1, :] = tile['data'] 

1608 else: 

1609 getattr(self, target_attr)[rS:rE + 1, cS:cE + 1] = tile['data'] 

1610 if target_attr == 'arr': 

1611 self.arr_desc = sampleTile['desc'] 

1612 self.arr_shape = 'cube' if len(self.arr.shape) == 3 else 'band' if len(self.arr.shape) == 2 else 'unknown' 

1613 

1614 if not CFG.inmem_serialization: 

1615 path_radref_file = os.path.join(self.ExtractedFolder, self.baseN + '__' + self.arr_desc) 

1616 # path_radref_file = os.path.abspath('./testing/out/%s_TOA_Ref' % self.baseN) 

1617 while not os.path.isdir(os.path.dirname(path_radref_file)): 

1618 try: 

1619 os.makedirs(os.path.dirname(path_radref_file)) 

1620 except OSError as e: # maybe not neccessary anymore in python 3 

1621 if not e.errno == 17: 

1622 raise 

1623 GEOP.ndarray2gdal(self.arr, path_radref_file, importFile=self.MetaObj.Dataname, direction=3) 

1624 self.MetaObj.Dataname = path_radref_file 

1625 

1626 def write_tiles_to_ENVIfile(self, tiles, overwrite=True): 

1627 # type: (list,bool) -> None 

1628 """Writes tiles, e.g. produced by L1A_P.L1A_object.DN2TOARadRefTemp() to a single output ENVI file. 

1629 

1630 :param tiles: <list> a list of dictionaries with the keys 'desc', 'data', 'row_start','row_end', 

1631 'col_start' and 'col_end' 

1632 :param overwrite: whether to overwrite files that have been produced earlier 

1633 """ 

1634 

1635 self.logger.info("Writing tiles '%s' temporarily to disk..." % tiles[0]['desc']) 

1636 outpath = os.path.join(self.ExtractedFolder, '%s__%s.%s' % (self.baseN, tiles[0]['desc'], self.outInterleave)) 

1637 if CFG.target_radunit_optical in tiles[0]['desc'] or \ 

1638 CFG.target_radunit_thermal in tiles[0]['desc']: 

1639 self.arr_desc = tiles[0]['desc'] 

1640 self.arr = outpath 

1641 # self.arr = os.path.abspath('./testing/out/%s_TOA_Ref.bsq' % self.baseN) 

1642 self.MetaObj.Dataname = self.arr 

1643 self.arr_shape = \ 

1644 'cube' if len(tiles[0]['data'].shape) == 3 else 'band' if len( 

1645 tiles[0]['data'].shape) == 2 else 'unknown' 

1646 elif tiles[0]['desc'] == 'masks': 

1647 self.masks = outpath 

1648 elif tiles[0]['desc'] == 'lonlat_arr': 

1649 # outpath = os.path.join(os.path.abspath('./testing/out/'),'%s__%s.%s' 

1650 # %(self.baseN, tiles[0]['desc'], self.outInterleave)) 

1651 self.lonlat_arr = outpath 

1652 outpath = os.path.splitext(outpath)[0] + '.hdr' if not outpath.endswith('.hdr') else outpath 

1653 out_shape = self.shape_fullArr[:2] + ([tiles[0]['data'].shape[2]] if len(tiles[0]['data'].shape) == 3 else [1]) 

1654 OUT_W.Tiles_Writer(tiles, outpath, out_shape, tiles[0]['data'].dtype, self.outInterleave, 

1655 self.MetaObj.to_odict(), overwrite=overwrite) 

1656 

1657 def to_tiles(self, blocksize=(2048, 2048)): 

1658 # type: (tuple) -> Generator[GMS_object] 

1659 """Returns a generator object where items represent tiles of the given block size for the GMS object. 

1660 

1661 # NOTE: it's better to call get_subset_obj (also takes care of tile map infos) 

1662 

1663 :param blocksize: target dimensions of the generated block tile (rows, columns) 

1664 :return: <list> of GMS_object tiles 

1665 """ 

1666 

1667 assert type(blocksize) in [list, tuple] and len(blocksize) == 2, \ 

1668 "The argument 'blocksize_RowsCols' must represent a tuple of size 2." 

1669 

1670 tilepos = get_array_tilebounds(array_shape=self.shape_fullArr, tile_shape=blocksize) 

1671 

1672 for tp in tilepos: 

1673 (xmin, xmax), (ymin, ymax) = tp # e.g. [(0, 1999), (0, 999)] at a blocksize of 2000*1000 (rowsxcols) 

1674 tileObj = self.get_subset_obj(imBounds=(xmin, xmax, ymin, ymax)) # xmax+1/ymax+1? 

1675 yield tileObj 

1676 

1677 def to_MGRS_tiles(self, pixbuffer=10, v=False): 

1678 # type: (int, bool) -> Generator[GMS_object] 

1679 """Returns a generator object where items represent the MGRS tiles for the GMS object. 

1680 

1681 :param pixbuffer: <int> a buffer in pixel values used to generate an overlap between the returned MGRS tiles 

1682 :param v: <bool> verbose mode 

1683 :return: <list> of MGRS_tile objects 

1684 """ 

1685 

1686 assert self.arr_shape == 'cube', "Only 'cube' objects can be cut into MGRS tiles. Got %s." % self.arr_shape 

1687 self.logger.info('Cutting scene %s (entity ID %s) into MGRS tiles...' % (self.scene_ID, self.entity_ID)) 

1688 

1689 # get GeoDataFrame containing all overlapping MGRS tiles 

1690 # (MGRS geometries completely within nodata area are excluded) 

1691 GDF_MGRS_tiles = DB_T.get_overlapping_MGRS_tiles(CFG.conn_database, 

1692 tgt_corners_lonlat=self.trueDataCornerLonLat) 

1693 

1694 if GDF_MGRS_tiles.empty: 

1695 raise RuntimeError('Could not find an overlapping MGRS tile in the database for the current dataset.') 

1696 

1697 # calculate image coordinate bounds of the full GMS object for each MGRS tile within the GeoDataFrame 

1698 gt = mapinfo2geotransform(self.MetaObj.map_info) 

1699 

1700 def get_arrBounds(MGRStileObj): 

1701 return list(np.array(MGRStileObj.poly_utm.buffer(pixbuffer * gt[1]).bounds)[[0, 2, 1, 3]]) 

1702 

1703 GDF_MGRS_tiles['MGRStileObj'] = list(GDF_MGRS_tiles['granuleid'].map(lambda mgrsTileID: MGRS_tile(mgrsTileID))) 

1704 GDF_MGRS_tiles['map_bounds_MGRS'] = list(GDF_MGRS_tiles['MGRStileObj'].map(get_arrBounds)) # xmin,xmax,ymin,yma 

1705 

1706 # find first tile to log and assign 'logAtThisTile' later 

1707 dictIDxminymin = {(b[0] + b[2]): ID for ID, b in 

1708 zip(GDF_MGRS_tiles['granuleid'], GDF_MGRS_tiles['map_bounds_MGRS'])} 

1709 firstTile_ID = dictIDxminymin[min(dictIDxminymin.keys())] 

1710 

1711 # ensure self.masks exists (does not exist in case of inmem_serialization mode 

1712 # because in that case self.fill_from_disk() is skipped) 

1713 if not hasattr(self, 'masks') or self.masks is None: 

1714 self.build_combined_masks_array() # creates self.masks and self.masks_meta 

1715 

1716 # read whole dataset into RAM in order to fasten subsetting 

1717 self.arr.to_mem() 

1718 self.masks.to_mem() # to_mem ensures that the whole dataset is present in memory 

1719 

1720 # produce data for each MGRS tile in loop 

1721 for GDF_idx, GDF_row in GDF_MGRS_tiles.iterrows(): 

1722 tileObj = self.get_subset_obj(mapBounds=GDF_row.map_bounds_MGRS, 

1723 mapBounds_prj=EPSG2WKT(GDF_row['MGRStileObj'].EPSG), 

1724 out_prj=EPSG2WKT(GDF_row['MGRStileObj'].EPSG), 

1725 logmsg='Producing MGRS tile %s from scene %s (entity ID %s).' 

1726 % (GDF_row.granuleid, self.scene_ID, self.entity_ID), 

1727 progress=CFG.log_level in ['DEBUG', 10], 

1728 v=v) 

1729 

1730 MGRS_tileID = GDF_row['granuleid'] 

1731 

1732 # validate that the MGRS tile truly contains data 

1733 # -> this may not be the case if get_overlapping_MGRS_tiles() yielded invalid tiles due to inaccurate 

1734 # self.trueDataCornerLonLat 

1735 if tileObj is None or \ 

1736 True not in list(np.unique(tileObj.arr.mask_nodata)): 

1737 self.logger.info("MGRS tile '%s' has not been skipped because it contains only no data values." 

1738 % MGRS_tileID) 

1739 continue 

1740 

1741 # set MGRS info 

1742 tileObj.arr_shape = 'MGRS_tile' 

1743 tileObj.MGRS_info = {'tile_ID': MGRS_tileID, 'grid1mil': MGRS_tileID[:3], 'grid100k': MGRS_tileID[3:]} 

1744 

1745 # set logAtThisTile 

1746 tileObj.logAtThisTile = MGRS_tileID == firstTile_ID 

1747 

1748 # close logger of tileObj and of self in order to avoid logging permission errors 

1749 tileObj.close_loggers() 

1750 self.close_loggers() 

1751 

1752 yield tileObj 

1753 

1754 # set array attributes back to file path if they had been a filePath before 

1755 if self.arr.filePath: 

1756 self.arr.to_disk() 

1757 if self.masks.filePath: 

1758 self.masks.to_disk() 

1759 

1760 def to_GMS_file(self, path_gms_file=None): 

1761 self.close_loggers() 

1762 

1763 dict2write = self.attributes2dict(remove_privates=True) # includes MetaObj as OrderedDict 

1764 dict2write['arr_shape'], dict2write['arr_pos'] = ['cube', None] 

1765 path_gms_file = path_gms_file if path_gms_file else self.pathGen.get_path_gmsfile() 

1766 

1767 for k, v in list(dict2write.items()): 

1768 # if isinstance(v,np.ndarray) or isinstance(v,dict) or hasattr(v,'__dict__'): 

1769 # so, wenn meta-dicts nicht ins gms-file sollen. ist aber vllt ni schlecht 

1770 # -> erlaubt lesen der metadaten direkt aus gms 

1771 

1772 if isinstance(v, datetime.datetime): 

1773 dict2write[k] = v.strftime('%Y-%m-%d %H:%M:%S.%f%z') 

1774 

1775 elif isinstance(v, (DatasetLogger, logging.Logger)): 

1776 if hasattr(v, 'handlers') and v.handlers[:]: 

1777 warnings.warn('Not properly closed logger at GMS_obj.logger pointing to %s.' % v.path_logfile) 

1778 

1779 close_logger(dict2write[k]) 

1780 dict2write[k] = 'not set' 

1781 

1782 elif isinstance(v, GMS_identifier): 

1783 dict2write[k] = v.to_odict(include_logger=False) 

1784 

1785 elif isinstance(v, OrderedDict) or isinstance(v, dict): 

1786 dict2write[k] = dict2write[k].copy() 

1787 if 'logger' in v: 

1788 if hasattr(dict2write[k]['logger'], 'handlers') and dict2write[k]['logger'].handlers[:]: 

1789 warnings.warn("Not properly closed logger at %s['logger'] pointing to %s." 

1790 % (k, dict2write[k]['logger'].path_logfile)) 

1791 close_logger(dict2write[k]['logger']) 

1792 dict2write[k]['logger'] = 'not set' 

1793 

1794 elif isinstance(v, np.ndarray): 

1795 # delete every 3D Array larger than 100x100 

1796 if len(v.shape) == 2 and sum(v.shape) <= 200: 

1797 dict2write[k] = v.tolist() # numpy arrays are not jsonable 

1798 else: 

1799 del dict2write[k] 

1800 

1801 elif hasattr(v, '__dict__'): 

1802 # löscht Instanzen von Objekten und Arrays, aber keine OrderedDicts 

1803 if hasattr(v, 'logger'): 

1804 if hasattr(dict2write[k].logger, 'handlers') and dict2write[k].logger.handlers[:]: 

1805 warnings.warn("Not properly closed logger at %s.logger pointing to %s." 

1806 % (k, dict2write[k].logger.path_logfile)) 

1807 close_logger(dict2write[k].logger) 

1808 dict2write[k].logger = 'not set' 

1809 del dict2write[k] 

1810 

1811 # class customJSONEncoder(json.JSONEncoder): 

1812 # def default(self, obj): 

1813 # if isinstance(obj, np.ndarray): 

1814 # return '> numpy array <' 

1815 # if isinstance(obj, dict): # funktioniert nicht 

1816 # return '> python dictionary <' 

1817 # if hasattr(obj,'__dict__'): 

1818 # return '> python object <' 

1819 # # Let the base class default method raise the TypeError 

1820 # return json.JSONEncoder.default(self, obj) 

1821 # json.dump(In, open(path_out_baseN,'w'), skipkeys=True, 

1822 # sort_keys=True, cls=customJSONEncoder, separators=(',', ': '), indent =4) 

1823 with open(path_gms_file, 'w') as outF: 

1824 json.dump(dict2write, outF, skipkeys=True, sort_keys=True, separators=(',', ': '), indent=4) 

1825 

1826 def to_ENVI(self, write_masks_as_ENVI_classification=True, is_tempfile=False, compression=False): 

1827 # type: (object, bool, bool) -> None 

1828 """Write GMS object to disk. Supports full cubes AND 'block' tiles. 

1829 

1830 :param self: <object> GMS object, e.g. L1A_P.L1A_object 

1831 :param write_masks_as_ENVI_classification: <bool> whether to write masks as ENVI classification file 

1832 :param is_tempfile: <bool> whether output represents a temporary file 

1833 -> suppresses logging and database updating 

1834 - ATTENTION! This keyword asserts that the actual output file that 

1835 is written later contains the final version of the array. The array 

1836 is not overwritten or written once more later, but only renamed. 

1837 :param compression: <bool> enable or disable compression 

1838 """ 

1839 # monkey patch writer function in order to silence output stream 

1840 envi._write_image = OUT_W.silent_envi_write_image 

1841 

1842 assert self.arr_shape in ['cube', 'MGRS_tile', 'block'], \ 

1843 "GMS_object.to_ENVI supports only array shapes 'cube', 'MGRS_tile' and 'block'. Got %s." % self.arr_shape 

1844 

1845 # set MGRS info in case of MGRS tile 

1846 MGRS_info = None 

1847 if self.arr_shape == 'MGRS_tile': 

1848 assert hasattr(self, 'MGRS_info'), \ 

1849 "Tried to write an GMS object in the shape 'MGRS_tile' without without the attribute 'MGRS_info'." 

1850 MGRS_info = self.MGRS_info 

1851 

1852 # set self.arr from L1B path to L1A path in order to make to_ENVI copy L1A array (if .arr is not an array) 

1853 if self.proc_level == 'L1B' and not self.arr.is_inmem and os.path.isfile(self.arr.filePath): 

1854 # FIXME this could leed to check the wrong folder if CFG.inmem_serialization is False: 

1855 self.arr = PG.path_generator('L1A', self.image_type, self.satellite, self.sensor, self.subsystem, 

1856 self.acq_datetime, self.entity_ID, self.logger, 

1857 MGRS_info=MGRS_info).get_path_imagedata() 

1858 

1859 # set dicts 

1860 image_type_dict = dict(arr=self.image_type, masks='MAS', mask_clouds='MAC', accuracy_layers='AL') 

1861 metaAttr_dict = dict(arr='arr_meta', masks='masks_meta', mask_clouds='masks_meta', 

1862 accuracy_layers='accuracy_layers_meta') 

1863 out_dtype_dict = dict(arr='int16', masks='uint8', mask_clouds='uint8', accuracy_layers='int16') 

1864 print_dict = dict( 

1865 RSD_L1A='satellite data', MAS_L1A='masks', MAC_L1A='cloud mask', 

1866 RSD_L1B='satellite data', MAS_L1B='masks', MAC_L1B='cloud mask', 

1867 RSD_L1C='atm. corrected reflectance data', MAS_L1C='masks', MAC_L1C='cloud mask', 

1868 RSD_L2A='geometrically homogenized data', MAS_L2A='masks', MAC_L2A='cloud mask', 

1869 RSD_L2B='spectrally homogenized data', MAS_L2B='masks', MAC_L2B='cloud mask', 

1870 RSD_L2C='MGRS tiled data', MAS_L2C='masks', MAC_L2C='cloud mask', AL_L2C='accuracy layers',) 

1871 

1872 # ensure self.masks exists 

1873 if not hasattr(self, 'masks') or self.masks is None: 

1874 self.build_combined_masks_array() # creates InObj.masks and InObj.masks_meta 

1875 

1876 # generate list of attributes to write 

1877 attributes2write = ['arr', 'masks'] 

1878 

1879 if self.proc_level not in ['L1A', 'L1B'] and write_masks_as_ENVI_classification: 

1880 attributes2write.append('mask_clouds') 

1881 

1882 if self.proc_level == 'L2C': 

1883 if CFG.ac_estimate_accuracy or CFG.spathomo_estimate_accuracy or CFG.spechomo_estimate_accuracy: 

1884 attributes2write.append('accuracy_layers') 

1885 

1886 # generate accuracy layers to be written 

1887 self.generate_accuracy_layers() 

1888 else: 

1889 self.logger.info('Accuracy layers are not written because their generation has been disabled in the ' 

1890 'job configuration.') 

1891 

1892 ########################################################### 

1893 # loop through all attributes to write and execute writer # 

1894 ########################################################### 

1895 

1896 with IOLock(allowed_slots=CFG.max_parallel_reads_writes, logger=self.logger): 

1897 for arrayname in attributes2write: 

1898 descriptor = '%s_%s' % (image_type_dict[arrayname], self.proc_level) 

1899 

1900 if hasattr(self, arrayname) and getattr(self, arrayname) is not None: 

1901 arrayval = getattr(self, arrayname) # can be a GeoArray (in mem / not in mem) or a numpy.ndarray 

1902 

1903 # initial assertions 

1904 assert arrayname in metaAttr_dict, "GMS_object.to_ENVI cannot yet write %s attribute." % arrayname 

1905 assert isinstance(arrayval, (GeoArray, np.ndarray)), "Expected a GeoArray instance or a numpy " \ 

1906 "array for object attribute %s. Got %s." \ 

1907 % (arrayname, type(arrayval)) 

1908 

1909 outpath_hdr = self.pathGen.get_outPath_hdr(arrayname) 

1910 outpath_hdr = os.path.splitext(outpath_hdr)[0] + '__TEMPFILE.hdr' if is_tempfile else outpath_hdr 

1911 if not os.path.exists(os.path.dirname(outpath_hdr)): 

1912 os.makedirs(os.path.dirname(outpath_hdr)) 

1913 out_dtype = out_dtype_dict[arrayname] 

1914 meta_attr = metaAttr_dict[arrayname] 

1915 

1916 if not is_tempfile: 

1917 self.log_for_fullArr_or_firstTile('Writing %s %s.' % (self.proc_level, print_dict[descriptor])) 

1918 

1919 ######################### 

1920 # GeoArray in disk mode # 

1921 ######################### 

1922 

1923 if isinstance(arrayval, GeoArray) and not arrayval.is_inmem: 

1924 # object attribute contains GeoArray in disk mode. This is usually the case if the attribute has 

1925 # been read in Python exec mode from previous processing level and has NOT been modified during 

1926 # processing. 

1927 

1928 assert os.path.isfile(arrayval.filePath), "The object attribute '%s' contains a not existing " \ 

1929 "file path: %s" % (arrayname, arrayval.filePath) 

1930 path_to_array = arrayval.filePath 

1931 

1932 ############# 

1933 # full cube # 

1934 ############# 

1935 

1936 if self.arr_shape == 'cube': 

1937 # image data can just be copied 

1938 outpath_arr = os.path.splitext(outpath_hdr)[0] + (os.path.splitext(path_to_array)[1] 

1939 if os.path.splitext(path_to_array)[ 

1940 1] else '.%s' % self.outInterleave) 

1941 hdr2readMeta = os.path.splitext(path_to_array)[0] + '.hdr' 

1942 meta2write = INP_R.read_ENVIhdr_to_dict(hdr2readMeta, self.logger) \ 

1943 if arrayname in ['mask_clouds', ] else getattr(self, meta_attr) 

1944 meta2write.update({'interleave': self.outInterleave, 

1945 'byte order': 0, 

1946 'header offset': 0, 

1947 'data type': DEF_D.dtype_lib_Python_IDL[out_dtype], 

1948 'lines': self.shape_fullArr[0], 

1949 'samples': self.shape_fullArr[1]}) 

1950 meta2write = metaDict_to_metaODict(meta2write, self.logger) 

1951 

1952 if '__TEMPFILE' in path_to_array: 

1953 os.rename(path_to_array, outpath_arr) 

1954 envi.write_envi_header(outpath_hdr, meta2write) 

1955 HLP_F.silentremove(path_to_array) 

1956 HLP_F.silentremove(os.path.splitext(path_to_array)[0] + '.hdr') 

1957 

1958 else: 

1959 try: 

1960 shutil.copy(path_to_array, outpath_arr) # copies file + permissions 

1961 except PermissionError: 

1962 # prevents permission error if outfile already exists and is owned by another user 

1963 HLP_F.silentremove(outpath_arr) 

1964 shutil.copy(path_to_array, outpath_arr) 

1965 

1966 envi.write_envi_header(outpath_hdr, meta2write) 

1967 

1968 assert OUT_W.check_header_not_empty(outpath_hdr), "HEADER EMPTY: %s" % outpath_hdr 

1969 setattr(self, arrayname, outpath_arr) # refresh arr/masks/mask_clouds attributes 

1970 if arrayname == 'masks': 

1971 setattr(self, 'mask_nodata', outpath_arr) 

1972 

1973 ######################### 

1974 # 'block' or 'MGRS_tile # 

1975 ######################### 

1976 

1977 else: 

1978 # data have to be read in subset and then be written 

1979 if self.arr_pos: 

1980 (rS, rE), (cS, cE) = self.arr_pos 

1981 cols, rows = cE - cS + 1, rE - rS + 1 

1982 else: 

1983 cS, rS, cols, rows = [None] * 4 

1984 

1985 if '__TEMPFILE' in path_to_array: 

1986 raise NotImplementedError 

1987 

1988 if arrayname not in ['mask_clouds', 'mask_nodata']: 

1989 # read image data in subset 

1990 tempArr = gdalnumeric.LoadFile(path_to_array, cS, rS, cols, 

1991 rows) # bands, rows, columns OR rows, columns 

1992 arr2write = tempArr if len(tempArr.shape) == 2 else \ 

1993 np.swapaxes(np.swapaxes(tempArr, 0, 2), 0, 1) # rows, columns, (bands) 

1994 

1995 else: 

1996 # read mask data in subset 

1997 previous_procL = DEF_D.proc_chain[DEF_D.proc_chain.index(self.proc_level) - 1] 

1998 PG_obj = PG.path_generator(self.__dict__, proc_level=previous_procL) 

1999 path_masks = PG_obj.get_path_maskdata() 

2000 arr2write = INP_R.read_mask_subset(path_masks, arrayname, self.logger, 

2001 (self.arr_shape, self.arr_pos)) 

2002 

2003 setattr(self, arrayname, arr2write) 

2004 

2005 arrayval = getattr(self, arrayname) # can be a GeoArray (in mem / not in mem) or a numpy.ndarray 

2006 

2007 #################################### 

2008 # np.ndarray or GeoArray in memory # 

2009 #################################### 

2010 

2011 if isinstance(arrayval, np.ndarray) or isinstance(arrayval, GeoArray) and arrayval.is_inmem: 

2012 # must be an if-condition because arrayval can change attribute type from not-inmem-GeoArray 

2013 # to np.ndarray 

2014 '''object attribute contains array''' 

2015 

2016 # convert array and metadata of mask clouds to envi classification file ready data 

2017 arr2write, meta2write = OUT_W.mask_to_ENVI_Classification(self, arrayname) \ 

2018 if arrayname in ['mask_clouds', ] else (arrayval, getattr(self, meta_attr)) 

2019 arr2write = arr2write.arr if isinstance(arr2write, GeoArray) else arr2write 

2020 assert isinstance(arr2write, np.ndarray), 'Expected a numpy ndarray. Got %s.' % type(arr2write) 

2021 

2022 ########################## 

2023 # full cube or MGRS_tile # 

2024 ########################## 

2025 

2026 if self.arr_shape in ['cube', 'MGRS_tile']: 

2027 # TODO write a function that implements the algorithm from Tiles_Writer for writing cubes 

2028 # TODO -> no need for Spectral Python 

2029 

2030 # write cube-like attributes 

2031 meta2write = metaDict_to_metaODict(meta2write, self.logger) 

2032 success = 1 

2033 

2034 if arrayname not in ['mask_clouds', ]: 

2035 if compression: 

2036 success = OUT_W.write_ENVI_compressed(outpath_hdr, arr2write, meta2write) 

2037 if not success: 

2038 warnings.warn('Written compressed ENVI file is not GDAL readable! ' 

2039 'Writing uncompressed file.') 

2040 

2041 if not compression or not success: 

2042 envi.save_image(outpath_hdr, arr2write, metadata=meta2write, dtype=out_dtype, 

2043 interleave=self.outInterleave, ext=self.outInterleave, force=True) 

2044 

2045 else: 

2046 if compression: 

2047 success = OUT_W.write_ENVI_compressed(outpath_hdr, arr2write, meta2write) 

2048 if not success: 

2049 self.logger.warning('Written compressed ENVI file is not GDAL readable! ' 

2050 'Writing uncompressed file.') 

2051 

2052 if not compression or not success: 

2053 class_names = meta2write['class names'] 

2054 class_colors = meta2write['class lookup'] 

2055 envi.save_classification(outpath_hdr, arr2write, metadata=meta2write, 

2056 dtype=out_dtype, interleave=self.outInterleave, 

2057 ext=self.outInterleave, force=True, 

2058 class_names=class_names, class_colors=class_colors) 

2059 if os.path.exists(outpath_hdr): 

2060 OUT_W.reorder_ENVI_header(outpath_hdr, OUT_W.enviHdr_keyOrder) 

2061 

2062 ######################### 

2063 # block-like attributes # 

2064 ######################### 

2065 

2066 else: 

2067 if compression: # FIXME 

2068 warnings.warn( 

2069 'Compression is not yet supported for GMS object tiles. Writing uncompressed data.') 

2070 # write block-like attributes 

2071 bands = arr2write.shape[2] if len(arr2write.shape) == 3 else 1 

2072 out_shape = tuple(self.shape_fullArr[:2]) + (bands,) 

2073 

2074 OUT_W.Tiles_Writer(arr2write, outpath_hdr, out_shape, out_dtype, self.outInterleave, 

2075 out_meta=meta2write, arr_pos=self.arr_pos, overwrite=False) 

2076 assert OUT_W.check_header_not_empty(outpath_hdr), "HEADER EMPTY: %s" % outpath_hdr 

2077 

2078 outpath_arr = os.path.splitext(outpath_hdr)[0] + '.%s' % self.outInterleave 

2079 if not CFG.inmem_serialization: 

2080 setattr(self, arrayname, outpath_arr) # replace array by output path 

2081 

2082 if arrayname == 'masks': 

2083 setattr(self, 'mask_nodata', outpath_arr) 

2084 

2085 # if compression: 

2086 # raise NotImplementedError # FIXME implement working compression 

2087 # HLP_F.ENVIfile_to_ENVIcompressed(outpath_hdr) 

2088 

2089 else: 

2090 if not is_tempfile: 

2091 self.logger.warning("The %s can not be written, because there is no corresponding attribute." 

2092 % print_dict[descriptor]) 

2093 

2094 ###################################### 

2095 # write GMS-file and update database # 

2096 ###################################### 

2097 

2098 # IMPORTANT: DO NOT pass the complete object but only a copy of the dictionary in order to prevent ASCII_writer 

2099 # and data_DB_updater from modifying the attributes of the object!! 

2100 if self.arr_shape in ['cube', 'MGRS_tile'] or [self.arr_pos[0][0], self.arr_pos[1][0]] == [0, 0]: 

2101 # cube or 1st tile 

2102 

2103 # WRITE GMS FILE 

2104 self.to_GMS_file() 

2105 

2106 # CREATE/UPDATE DATABASE ENTRY 

2107 if not is_tempfile: 

2108 DB_T.data_DB_updater(self.attributes2dict()) 

2109 

2110 # get id of updated record (needed for cross-refs in later db entrys 

2111 if hasattr(self, 'MGRS_info') and self.MGRS_info: 

2112 res = DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'mgrs_tiles_proc', ['id'], 

2113 {'sceneid': self.scene_ID, 

2114 'virtual_sensor_id': CFG.virtual_sensor_id, 

2115 'mgrs_code': self.MGRS_info['tile_ID']}) 

2116 assert len(res) == 1, 'Found multiple database records for the last updated record. sceneid: %s;' \ 

2117 'mgrs_code: %s; virtual_sensor_id: %s' \ 

2118 % (self.scene_ID, self.MGRS_info['tile_ID'], CFG.virtual_sensor_id) 

2119 self.mgrs_tiles_proc_ID = res[0][0] 

2120 else: 

2121 res = DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'scenes_proc', ['id'], 

2122 {'sceneid': self.scene_ID}) 

2123 assert len(res) == 1, \ 

2124 'Found multiple database records for the last updated record. sceneid: %s' % self.scene_ID 

2125 self.scenes_proc_ID = res[0][0] 

2126 

2127 # COPY LOGFILE TO MGRS OUTPUT DIRECTORY 

2128 if self.arr_shape == 'MGRS_tile': 

2129 shutil.copy(self.path_logfile, os.path.join(self.pathGen.get_path_procdata(), 

2130 os.path.basename(self.path_logfile))) 

2131 

2132 if not is_tempfile: 

2133 self.log_for_fullArr_or_firstTile('%s data successfully saved.' % self.proc_level) 

2134 

2135 # close logger 

2136 self.close_loggers() 

2137 

2138 def record_mem_usage(self): 

2139 """Record memory usage of current process ID for the current processing level (megabytes).""" 

2140 # get memory usage of current process ID 

2141 mem_usage = int(psutil.Process(os.getpid()).memory_info().rss / 1024**2) 

2142 

2143 # update the value (only if there is not a larger value already because we want to record maximum memory needed) 

2144 if self.proc_level not in self.mem_usage or self.mem_usage[self.proc_level] > mem_usage: 

2145 self.mem_usage[self.proc_level] = mem_usage 

2146 

2147 # push recorded memory stats to database 

2148 if self.proc_level == 'L2C': 

2149 stats_pushed = DB_T.record_stats_memusage(CFG.conn_database, self) 

2150 

2151 if stats_pushed: 

2152 self.logger.info('Recorded memory usage statistics.') 

2153 

2154 def block_at_system_overload(self, max_usage=95, timeout=15*60): 

2155 logged = False 

2156 t_start = time.time() 

2157 

2158 while psutil.virtual_memory().percent >= max_usage: 

2159 if not logged: 

2160 self.logger.warning('Memory usage is %.1f percent. Waiting until memory usage is below %s percent. ' 

2161 'Timeout: %s minutes.' % (psutil.virtual_memory().percent, max_usage, timeout / 60)) 

2162 logged = True 

2163 

2164 if time.time() - t_start > timeout: 

2165 self.logger.exception('Processing could not be continued due to memory overload after waiting ' 

2166 '%s minutes.' % (timeout / 60)) 

2167 

2168 time.sleep(5) 

2169 

2170 def close_loggers(self): 

2171 # self.GMS_identifier and self.logger are getters - since self.GMS_identifier gets its logger from self.logger, 

2172 # self.logger has to be closed AFTER closing self.GMS_identifier.logger 

2173 if self.GMS_identifier and self.GMS_identifier.logger not in [None, 'not set']: 

2174 self.GMS_identifier.logger.close() 

2175 self.GMS_identifier.logger = None 

2176 

2177 if self.MetaObj and self.MetaObj.logger not in [None, 'not set']: 

2178 self.MetaObj.logger.close() 

2179 self.MetaObj.logger = None 

2180 

2181 if self._logger not in [None, 'not set']: 

2182 self._logger.close() # this runs misc.logging.GMS_logger.close() 

2183 self._logger = None # also adds current captured stream to self.log 

2184 

2185 def delete_previous_proc_level_results(self): 

2186 """Deletes results of the previous processing level if the respective flag CFG.exec_L**P[2]) is set to True. 

2187 The function is skipped if the results of the current processing level have not yet been written. 

2188 """ 

2189 

2190 tgt_proc_level = DEF_D.proc_chain[DEF_D.proc_chain.index(self.proc_level) - 1] 

2191 

2192 if getattr(CFG, 'exec_%sP' % tgt_proc_level)[2]: 

2193 

2194 pathGenPrev = PG.path_generator(self.__dict__.copy(), proc_level=tgt_proc_level) 

2195 

2196 files2delete = [pathGenPrev.get_path_imagedata(), 

2197 pathGenPrev.get_path_gmsfile(), 

2198 pathGenPrev.get_path_maskdata(), 

2199 pathGenPrev.get_path_cloudmaskdata()] 

2200 

2201 # ensure that the results of the current processing level have been written 

2202 if self.proc_level != 'L2A': 

2203 pathGenCurr = self.pathGen 

2204 else: 

2205 # after geometric homogenization and subsystem merging (L2A) a path generator without subsystem 

2206 # is needed 

2207 dict4pathGen = self.__dict__.copy() 

2208 dict4pathGen['subsystem'] = '' 

2209 pathGenCurr = PG.path_generator(dict4pathGen) 

2210 

2211 filesCurrProcL = [pathGenCurr.get_path_imagedata(), 

2212 pathGenCurr.get_path_gmsfile(), 

2213 pathGenCurr.get_path_maskdata(), 

2214 pathGenCurr.get_path_cloudmaskdata()] 

2215 

2216 if self.proc_level == 'L2A' and self.subsystem: 

2217 # in case subsystems have been merged in L2A -> also delete logfiles of L1C 

2218 files2delete.append(pathGenPrev.get_path_logfile()) 

2219 filesCurrProcL.append(pathGenCurr.get_path_logfile()) 

2220 

2221 for fPath, fPathCP in zip(files2delete, filesCurrProcL): 

2222 hdr = '%s.hdr' % os.path.splitext(fPath)[0] 

2223 if os.path.exists(fPath) and os.path.exists(fPathCP): 

2224 HLP_F.silentremove(fPath) 

2225 if os.path.exists(hdr): 

2226 HLP_F.silentremove(hdr) 

2227 

2228 def delete_tempFiles(self): 

2229 """Delete all temporary files that have been written during GMS object processing. 

2230 """ 

2231 

2232 # TODO ensure that all GDAL dataset are closed before. Otherwise the system creates a .fuse_hiddenXXX... 

2233 # TODO right in the moment when we delete the data that are opened in GDAL 

2234 

2235 self.logger.info('Deleting temporary data...') 

2236 

2237 if sys.platform.startswith('linux'): 

2238 # delete temporary extraction folder 

2239 if os.path.isdir(self.ExtractedFolder): 

2240 shutil.rmtree(self.ExtractedFolder) 

2241 

2242 if os.path.isdir(self.ExtractedFolder): 

2243 self.logger.warning('Could not delete temporary extraction folder: %s' % self.ExtractedFolder) 

2244 

2245 # delete empty folders: subsystem > sensor > Rootname 

2246 pardir = None 

2247 for i in range(2): # FIXME range(3)? 

2248 deldir = self.ExtractedFolder if i == 0 else pardir 

2249 pardir = os.path.abspath(os.path.join(deldir, os.path.pardir)) 

2250 if not glob.glob('%s/*' % pardir) and os.path.isdir(pardir): 

2251 os.rmdir(pardir) 

2252 else: 

2253 break 

2254 

2255 # delete all files containing __TEMPFILE 

2256 path_procdata = self.pathGen.get_path_procdata() 

2257 tempfiles = glob.glob(os.path.join(path_procdata, '*__TEMPFILE*')) 

2258 [HLP_F.silentremove(tf) for tf in tempfiles] 

2259 else: 

2260 raise NotImplementedError 

2261 

2262 # delete tempfiles created by HLP_F.get_tempfile() 

2263 files2delete = glob.glob(os.path.join(self.ExtractedFolder, '*')) # all files within ExtractedFolder 

2264 files2delete += glob.glob(os.path.join(CFG.path_tempdir + 'GeoMultiSens_*')) 

2265 for f in files2delete: 

2266 HLP_F.silentremove(f) 

2267 

2268 # delete previous proc_level results on demand (according to CFG.exec_L**P[2]) 

2269 self.delete_previous_proc_level_results() 

2270 

2271 self.logger.close() 

2272 self.logger = None 

2273 

2274 def flush_array_data(self): 

2275 del self.arr 

2276 del self.mask_nodata 

2277 del self.mask_clouds 

2278 del self.masks 

2279 del self.dem 

2280 del self.mask_clouds_confidence 

2281 del self.ac_errors 

2282 del self.spat_homo_errors 

2283 del self.spec_homo_errors 

2284 del self.accuracy_layers 

2285 

2286 if self.MetaObj: 

2287 self.MetaObj.ViewingAngle_arrProv = {} 

2288 self.MetaObj.IncidenceAngle_arrProv = {} 

2289 

2290 

2291class GMS_identifier(object): 

2292 def __init__(self, image_type, satellite, sensor, subsystem, proc_level, dataset_ID, logger=None): 

2293 # type: (str, str, str, Union[str, None], str, int, Union[DatasetLogger, None]) -> None 

2294 self.image_type = image_type 

2295 self.satellite = satellite 

2296 self.sensor = sensor 

2297 self.subsystem = subsystem 

2298 self.proc_level = proc_level 

2299 self.dataset_ID = dataset_ID 

2300 self.logger = logger 

2301 

2302 def to_odict(self, include_logger=True): 

2303 odict = OrderedDict(zip( 

2304 ['image_type', 'Satellite', 'Sensor', 'Subsystem', 'proc_level', 'dataset_ID'], 

2305 [self.image_type, self.satellite, self.sensor, self.subsystem, self.proc_level, self.dataset_ID])) 

2306 

2307 if include_logger: 

2308 odict['logger'] = self.logger 

2309 

2310 return odict 

2311 

2312 def __repr__(self): 

2313 return repr(self.to_odict()) 

2314 

2315 def __getstate__(self): 

2316 """Defines how the attributes of MetaObj instances are pickled.""" 

2317 close_logger(self.logger) 

2318 self.logger = None 

2319 

2320 return self.__dict__ 

2321 

2322 

2323class failed_GMS_object(GMS_object): 

2324 def delete_tempFiles(self): 

2325 pass 

2326 

2327 def __init__(self, GMS_object_or_OrdDict, failedMapper, exc_type, exc_val, exc_tb): 

2328 # type: (Union[GMS_object, OrderedDict], str, type(Exception), any, str) -> None 

2329 super(failed_GMS_object, self).__init__() 

2330 needed_attr = ['proc_level', 'image_type', 'scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem', 

2331 'arr_shape', 'arr_pos'] 

2332 

2333 if isinstance(GMS_object_or_OrdDict, OrderedDict): # in case of unhandled exception within L1A_map 

2334 OrdDic = GMS_object_or_OrdDict 

2335 [setattr(self, k, OrdDic[k]) for k in needed_attr[:-2]] 

2336 self.arr_shape = 'cube' if 'arr_shape' not in OrdDic else OrdDic['arr_shape'] 

2337 self.arr_pos = None if 'arr_pos' not in OrdDic else OrdDic['arr_pos'] 

2338 

2339 else: # in case of any other GMS mapper 

2340 [setattr(self, k, getattr(GMS_object_or_OrdDict, k)) for k in needed_attr] 

2341 

2342 self.failedMapper = failedMapper 

2343 self.ExceptionType = exc_type.__name__ 

2344 self.ExceptionValue = repr(exc_val) 

2345 self.ExceptionTraceback = exc_tb 

2346 self.proc_status = 'failed' 

2347 

2348 @property 

2349 def pandasRecord(self): 

2350 columns = ['scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem', 'image_type', 'proc_level', 

2351 'arr_shape', 'arr_pos', 'failedMapper', 'ExceptionType', 'ExceptionValue', 'ExceptionTraceback'] 

2352 return DataFrame([getattr(self, k) for k in columns], columns=columns) 

2353 

2354 

2355class finished_GMS_object(GMS_object): 

2356 def __init__(self, GMS_obj): 

2357 # type: (GMS_object) -> None 

2358 super(finished_GMS_object, self).__init__() 

2359 self.proc_level = GMS_obj.proc_level 

2360 self.image_type = GMS_obj.image_type 

2361 self.scene_ID = GMS_obj.scene_ID 

2362 self.entity_ID = GMS_obj.entity_ID 

2363 self.satellite = GMS_obj.satellite 

2364 self.sensor = GMS_obj.sensor 

2365 self.subsystem = GMS_obj.subsystem 

2366 self.arr_shape = GMS_obj.arr_shape 

2367 self.proc_status = GMS_obj.proc_status 

2368 

2369 @property 

2370 def pandasRecord(self): 

2371 columns = ['scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem', 'image_type', 'proc_level', 

2372 'arr_shape', 'arr_pos'] 

2373 return DataFrame([getattr(self, k) for k in columns], columns=columns) 

2374 

2375 

2376def update_proc_status(GMS_mapper): 

2377 """Decorator function for updating the processing status of each GMS_object (subclass) instance. 

2378 

2379 :param GMS_mapper: A GMS mapper function that takes a GMS object, does some processing and returns it back. 

2380 """ 

2381 

2382 @functools.wraps(GMS_mapper) # needed to avoid pickling errors 

2383 def wrapped_GMS_mapper(GMS_objs, **kwargs): 

2384 # type: (Union[List[GMS_object], GMS_object, OrderedDict, failed_GMS_object], dict) -> Union[GMS_object, List[GMS_object]] # noqa 

2385 

2386 # noinspection PyBroadException 

2387 try: 

2388 # set processing status to 'running' 

2389 if isinstance(GMS_objs, OrderedDict): 

2390 GMS_objs['proc_status'] = 'running' 

2391 elif isinstance(GMS_objs, GMS_object): 

2392 GMS_objs.proc_status = 'running' 

2393 elif isinstance(GMS_objs, Iterable): 

2394 for GMS_obj in GMS_objs: 

2395 GMS_obj.proc_status = 'running' 

2396 elif isinstance(GMS_objs, failed_GMS_object): 

2397 assert GMS_objs.proc_status == 'failed' 

2398 return GMS_objs 

2399 else: 

2400 raise TypeError("Unexpected type of 'GMS_objs': %s" % type(GMS_objs)) 

2401 

2402 # RUN the GMS_mapper 

2403 GMS_objs = GMS_mapper(GMS_objs, **kwargs) 

2404 

2405 # set processing status to 'finished' 

2406 if isinstance(GMS_objs, GMS_object): 

2407 GMS_objs.proc_status = 'finished' 

2408 elif isinstance(GMS_objs, Iterable): 

2409 for GMS_obj in GMS_objs: 

2410 GMS_obj.proc_status = 'finished' 

2411 else: 

2412 raise TypeError("Unexpected type of 'GMS_objs': %s" % type(GMS_objs)) 

2413 except Exception: 

2414 # set processing status to 'running' 

2415 if isinstance(GMS_objs, OrderedDict): 

2416 GMS_objs['proc_status'] = 'failed' 

2417 elif isinstance(GMS_objs, GMS_object): 

2418 GMS_objs.proc_status = 'failed' 

2419 elif isinstance(GMS_objs, Iterable): 

2420 for GMS_obj in GMS_objs: 

2421 GMS_obj.proc_status = 'failed' 

2422 else: 

2423 raise TypeError("Unexpected type of 'GMS_objs': %s" % type(GMS_objs)) 

2424 

2425 raise 

2426 

2427 return GMS_objs # Union[GMS_object, List[GMS_object]] 

2428 

2429 return wrapped_GMS_mapper 

2430 

2431 

2432def return_GMS_objs_without_arrays(GMS_pipeline): 

2433 """Decorator function for flushing any array attributes within the return value of a GMS pipeline function. 

2434 

2435 :param GMS_pipeline: A GMS mapper function that takes a GMS object, does some processing and returns it back. 

2436 """ 

2437 

2438 @functools.wraps(GMS_pipeline) # needed to avoid pickling errors 

2439 def wrapped_GMS_pipeline(*args, **kwargs): 

2440 def flush_arrays(GMS_obj): 

2441 # type: (Union[GMS_object, L1C_object]) -> GMS_object 

2442 GMS_obj.flush_array_data() 

2443 try: 

2444 GMS_obj.delete_ac_input_arrays() 

2445 except AttributeError: 

2446 pass 

2447 

2448 return GMS_obj 

2449 

2450 ################################################################### 

2451 # prepare results to be back-serialized to multiprocessing master # 

2452 ################################################################### 

2453 

2454 # NOTE: Exceptions within GMS pipeline will be forwarded to calling function. 

2455 # NOTE: Exceptions within GMS mappers are catched by exception handler (if enabled) 

2456 returnVal = GMS_pipeline(*args, **kwargs) 

2457 

2458 # flush array data because they are too big to be kept in memory for many GMS_objects 

2459 if isinstance(returnVal, (GMS_object, failed_GMS_object)): 

2460 returnVal = flush_arrays(returnVal) 

2461 elif isinstance(returnVal, Iterable): 

2462 returnVal = [flush_arrays(obj) for obj in returnVal] 

2463 else: # OrderedDict (dataset) 

2464 # the OrderedDict will not contain any arrays 

2465 pass 

2466 

2467 return returnVal 

2468 

2469 return wrapped_GMS_pipeline 

2470 

2471 

2472def return_proc_reports_only(GMS_pipeline): 

2473 """Decorator function for flushing any array attributes within the return value of a GMS pipeline function. 

2474 

2475 :param GMS_pipeline: A GMS mapper function that takes a GMS object, does some processing and returns it back. 

2476 """ 

2477 

2478 @functools.wraps(GMS_pipeline) # needed to avoid pickling errors 

2479 def wrapped_GMS_pipeline(*args, **kwargs): 

2480 ################################################################### 

2481 # prepare results to be back-serialized to multiprocessing master # 

2482 ################################################################### 

2483 

2484 # NOTE: Exceptions within GMS pipeline will be forwarded to calling function. 

2485 # NOTE: Exceptions within GMS mappers are catched by exception handler (if enabled) 

2486 returnVal = GMS_pipeline(*args, **kwargs) 

2487 

2488 # flush array data because they are too big to be kept in memory for many GMS_objects 

2489 if isinstance(returnVal, failed_GMS_object): 

2490 pass 

2491 elif isinstance(returnVal, GMS_object): 

2492 returnVal = finished_GMS_object(returnVal) 

2493 elif isinstance(returnVal, Iterable): 

2494 returnVal = [obj if isinstance(obj, failed_GMS_object) else finished_GMS_object(obj) for obj in returnVal] 

2495 else: # OrderedDict (dataset) 

2496 # the OrderedDict will not contain any arrays 

2497 pass 

2498 

2499 return returnVal 

2500 

2501 return wrapped_GMS_pipeline 

2502 

2503 

2504def GMS_object_2_dataset_dict(GMS_obj): 

2505 # type: (GMS_object) -> OrderedDict 

2506 return OrderedDict([ 

2507 ('proc_level', GMS_obj.proc_level), 

2508 ('scene_ID', GMS_obj.scene_ID), 

2509 ('dataset_ID', GMS_obj.dataset_ID), 

2510 ('image_type', GMS_obj.image_type), 

2511 ('satellite', GMS_obj.satellite), 

2512 ('sensor', GMS_obj.sensor), 

2513 ('subsystem', GMS_obj.subsystem), 

2514 ('sensormode', GMS_obj.sensormode), 

2515 ('acq_datetime', GMS_obj.acq_datetime), 

2516 ('entity_ID', GMS_obj.entity_ID), 

2517 ('filename', GMS_obj.filename) 

2518 ]) 

2519 

2520 

2521def estimate_mem_usage(dataset_ID, satellite): 

2522 memcols = ['used_mem_l1a', 'used_mem_l1b', 'used_mem_l1c', 

2523 'used_mem_l2a', 'used_mem_l2b', 'used_mem_l2c'] 

2524 

2525 df = DataFrame(DB_T.get_info_from_postgreSQLdb( 

2526 CFG.conn_database, 'stats_mem_usage_homo', 

2527 vals2return=['software_version'] + memcols, 

2528 cond_dict=dict( 

2529 datasetid=dataset_ID, 

2530 virtual_sensor_id=CFG.virtual_sensor_id, 

2531 target_gsd=CFG.target_gsd[0], # respects only xgsd 

2532 target_nbands=len(CFG.target_CWL), 

2533 inmem_serialization=CFG.inmem_serialization, 

2534 target_radunit_optical=CFG.target_radunit_optical, 

2535 # skip_coreg=CFG.skip_coreg, 

2536 ac_estimate_accuracy=CFG.ac_estimate_accuracy, 

2537 ac_bandwise_accuracy=CFG.ac_bandwise_accuracy, 

2538 spathomo_estimate_accuracy=CFG.spathomo_estimate_accuracy, 

2539 spechomo_estimate_accuracy=CFG.spechomo_estimate_accuracy, 

2540 spechomo_bandwise_accuracy=CFG.spechomo_bandwise_accuracy, 

2541 # parallelization_level=CFG.parallelization_level, 

2542 # skip_thermal=CFG.skip_thermal, 

2543 # skip_pan=CFG.skip_pan, 

2544 # mgrs_pixel_buffer=CFG.mgrs_pixel_buffer, 

2545 # cloud_masking_algorithm=CFG.cloud_masking_algorithm[satellite], 

2546 is_test=CFG.is_test 

2547 )), 

2548 columns=['software_version'] + memcols 

2549 ) 

2550 

2551 if not df.empty: 

2552 df['used_mem_max'] = df[memcols].max(axis=1) 

2553 

2554 # get records from gms_preprocessing versions higher than CFG.min_version_mem_usage_stats 

2555 vers = list(df.software_version) 

2556 vers_usable = [ver for ver in vers if parse_version(ver) >= parse_version(CFG.min_version_mem_usage_stats)] 

2557 

2558 df_sub = df.loc[df.software_version.isin(vers_usable)] 

2559 

2560 mem_estim_mb = np.mean(list(df_sub.used_mem_max)) # megabytes 

2561 mem_estim_gb = mem_estim_mb / 1024 # gigabytes 

2562 

2563 return int(np.ceil(mem_estim_gb + .1 * mem_estim_gb))