Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
3# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
4#
5# Copyright (C) 2020 Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
6#
7# This software was developed within the context of the GeoMultiSens project funded
8# by the German Federal Ministry of Education and Research
9# (project grant code: 01 IS 14 010 A-C).
10#
11# This program is free software: you can redistribute it and/or modify it under
12# the terms of the GNU General Public License as published by the Free Software
13# Foundation, either version 3 of the License, or (at your option) any later version.
14# Please note the following exception: `gms_preprocessing` depends on tqdm, which
15# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
16# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
17# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
18#
19# This program is distributed in the hope that it will be useful, but WITHOUT
20# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
21# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
22# details.
23#
24# You should have received a copy of the GNU Lesser General Public License along
25# with this program. If not, see <http://www.gnu.org/licenses/>.
27import copy
28import datetime
29import functools
30import glob
31import json
32import os
33import re
34import shutil
35import sys
36import warnings
37import logging
38from collections import OrderedDict
39from itertools import chain
40from typing import Iterable, List, Union, Generator, TYPE_CHECKING # noqa F401 # flake8 issue
41import psutil
42import time
43from pkg_resources import parse_version
45import numpy as np
46import spectral
47from spectral.io import envi
48from pandas import DataFrame, read_csv
49from nested_dict import nested_dict
51try:
52 from osgeo import gdalnumeric
53except ImportError:
54 import gdalnumeric
56from geoarray import GeoArray, NoDataMask, CloudMask
57from py_tools_ds.geo.coord_grid import is_coord_grid_equal
58from py_tools_ds.geo.projection import EPSG2WKT, WKT2EPSG, get_UTMzone, isProjectedOrGeographic
59from py_tools_ds.geo.map_info import geotransform2mapinfo, mapinfo2geotransform
60from py_tools_ds.geo.coord_calc import calc_FullDataset_corner_positions
61from py_tools_ds.geo.coord_trafo import pixelToLatLon, pixelToMapYX, imXY2mapXY, imYX2mapYX
62from py_tools_ds.numeric.array import get_array_tilebounds
63from sicor.options import get_options as get_ac_options
65from ..misc.logging import GMS_logger as DatasetLogger
66from ..misc.logging import close_logger
67from ..misc import database_tools as DB_T
68from ..misc import path_generator as PG
69from ..model.mgrs_tile import MGRS_tile
70from ..model.metadata import \
71 METADATA, get_dict_LayerOptTherm, metaDict_to_metaODict, get_LayerBandsAssignment, layerdependent_metadata
72from ..options.config import GMS_config as CFG
73from ..algorithms import geoprocessing as GEOP
74from ..io import input_reader as INP_R
75from ..io import output_writer as OUT_W
76from ..misc import helper_functions as HLP_F
77from ..misc import definition_dicts as DEF_D
78from ..misc.locks import IOLock
80if TYPE_CHECKING:
81 from ..algorithms.L1C_P import L1C_object # noqa F401 # flake8 issue
83__author__ = 'Daniel Scheffler'
86class GMS_object(object):
87 # class attributes
88 # NOTE: these attributes can be modified and seen by ALL GMS_object instances
89 proc_status_all_GMSobjs = nested_dict()
91 def __init__(self, pathImage=''):
92 # add private attributes
93 self._logger = None
94 self._loggers_disabled = False
95 self._log = ''
96 self._MetaObj = None
97 self._LayerBandsAssignment = None
98 self._coreg_needed = None
99 self._resamp_needed = None
100 self._arr = None
101 self._mask_nodata = None
102 self._mask_clouds = None
103 self._mask_clouds_confidence = None
104 self._masks = None
105 self._dem = None
106 self._pathGen = None
107 self._ac_options = {}
108 self._ac_errors = None
109 self._spat_homo_errors = None
110 self._spec_homo_errors = None
111 self._accuracy_layers = None
112 self._dict_LayerOptTherm = None
113 self._cloud_masking_algorithm = None
114 self._coreg_info = None
116 # defaults
117 self.proc_level = 'init'
118 self.image_type = ''
119 self.satellite = ''
120 self.sensor = ''
121 self.subsystem = ''
122 self.sensormode = ''
123 self.acq_datetime = None # also includes time, set by from_disk() and within L1A_P
124 self.entity_ID = ''
125 self.scene_ID = -9999
126 self.filename = ''
127 self.dataset_ID = -9999
128 self.outInterleave = 'bsq'
129 self.VAA_mean = None # set by self.calc_mean_VAA()
130 self.corner_lonlat = None
131 self.trueDataCornerPos = [] # set by self.calc_corner_positions()
132 self.trueDataCornerLonLat = [] # set by self.calc_corner_positions()
133 self.fullSceneCornerPos = [] # set by self.calc_corner_positions()
134 self.fullSceneCornerLonLat = [] # set by self.calc_corner_positions()
135 # rows,cols,bands of the full scene (not of the subset as possibly represented by self.arr.shape)
136 self.shape_fullArr = [None, None, None]
137 self.arr_shape = 'cube'
138 self.arr_desc = '' # description of data units for self.arr
139 self.arr_pos = None # <tuple> in the form ((row_start,row_end),(col_start,col_end))
140 self.tile_pos = None # <list>, filled by self.get_tilepos()
141 self.GeoTransProj_ok = True # set by self.validate_GeoTransProj_GeoAlign()
142 self.GeoAlign_ok = True # set by self.validate_GeoTransProj_GeoAlign()
143 self.masks_meta = {} # set by self.build_L1A_masks()
144 # self.CLD_obj = CLD_P.GmsCloudClassifier(classifier=self.path_cloud_class_obj)
146 # set pathes
147 self.path_archive = ''
148 self.path_procdata = ''
149 self.ExtractedFolder = ''
150 self.baseN = ''
151 self.path_logfile = ''
152 self.path_archive_valid = False
153 self.path_InFilePreprocessor = None
154 self.path_MetaPreprocessor = None
156 self.job_ID = CFG.ID
157 self.dataset_ID = -9999 if self.scene_ID == -9999 else \
158 int(DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'scenes', ['datasetid'],
159 {'id': self.scene_ID})[0][0])
160 self.scenes_proc_ID = None # set by Output writer after creation/update of db record in table scenes_proc
161 self.mgrs_tiles_proc_ID = None # set by Output writer after creation/update of db rec in table mgrs_tiles_proc
162 self.MGRS_info = None
163 self.lonlat_arr = None # set by self.write_tiles_to_ENVIfile
164 self.trueDataCornerUTM = None # set by self.from_tiles
166 self.mem_usage = {}
168 # set pathes
169 self.path_cloud_class_obj = ''
171 # handle initialization arguments
172 if pathImage:
173 self.arr = pathImage # run the setter for 'arr' which creates an Instance of GeoArray
175 def __getstate__(self):
176 """Defines how the attributes of GMS object are pickled."""
178 self.close_loggers()
179 del self.pathGen # path generator can only be used for the current processing level
181 # delete arrays if their in-mem size is to big to be pickled
182 # => (avoids MaybeEncodingError: Error sending result: '[<gms_preprocessing.algorithms.L2C_P.L2C_object
183 # object at 0x7fc44f6399e8>]'. Reason: 'error("'i' format requires -2147483648 <= number <= 2147483647",)')
184 if self.proc_level == 'L2C' and CFG.inmem_serialization:
185 self.flush_array_data()
187 return self.__dict__
189 def __setstate__(self, ObjDict):
190 """Defines how the attributes of GMS object are unpickled."""
192 self.__dict__ = ObjDict
193 # TODO unpickle meta to MetaObj
195 def __deepcopy__(self, memodict={}):
196 """Returns a deepcopy of the object excluding loggers because loggers are not serializable."""
198 cls = self.__class__
199 result = cls.__new__(cls)
200 self.close_loggers()
201 del self.pathGen # has a logger
202 memodict[id(self)] = result
204 for k, v in self.__dict__.items():
205 setattr(result, k, copy.deepcopy(v, memodict))
206 return result
208 def set_pathes(self):
209 self.baseN = self.pathGen.get_baseN()
210 self.path_procdata = self.pathGen.get_path_procdata()
211 self.ExtractedFolder = self.pathGen.get_path_tempdir()
212 self.path_logfile = self.pathGen.get_path_logfile()
213 self.pathGen = PG.path_generator(self.__dict__) # passes a logger in addition to previous attributes
214 self.path_archive = self.pathGen.get_local_archive_path_baseN()
216 if not CFG.inmem_serialization:
217 self.path_InFilePreprocessor = os.path.join(self.ExtractedFolder, '%s%s_DN.bsq'
218 % (self.entity_ID,
219 ('_%s' % self.subsystem if self.subsystem else '')))
220 else: # keep data in memory
221 self.path_InFilePreprocessor = None # None: keeps all produced data in memory (numpy array attributes)
223 self.path_MetaPreprocessor = self.path_archive
225 def validate_pathes(self):
226 if not os.path.isfile(self.path_archive) and not os.path.isdir(self.path_archive):
227 self.logger.info("The %s dataset '%s' has not been processed earlier and no corresponding raw data archive"
228 "has been found at %s." % (self.sensor, self.entity_ID, self.path_archive))
229 self.logger.info('Trying to download the dataset...')
230 self.path_archive_valid = self._data_downloader(self.sensor, self.entity_ID)
231 else:
232 self.path_archive_valid = True
234 if not CFG.inmem_serialization and self.ExtractedFolder and not os.path.isdir(self.ExtractedFolder):
235 os.makedirs(self.ExtractedFolder)
237 assert os.path.exists(self.path_archive), 'Invalid path to RAW data. File %s does not exist at %s.' \
238 % (os.path.basename(self.path_archive),
239 os.path.dirname(self.path_archive))
240 assert isinstance(self.path_archive, str), 'Invalid path to RAW data. Got %s instead of string or unicode.' \
241 % type(self.path_archive)
242 if not CFG.inmem_serialization and self.ExtractedFolder:
243 assert os.path.exists(self.path_archive), \
244 'Invalid path for temporary files. Directory %s does not exist.' % self.ExtractedFolder
246 @property
247 def logger(self):
248 if self._loggers_disabled:
249 return None
250 if self._logger and self._logger.handlers[:]:
251 return self._logger
252 else:
253 self._logger = DatasetLogger('log__' + self.baseN, fmt_suffix=self.scene_ID, path_logfile=self.path_logfile,
254 log_level=CFG.log_level, append=True)
255 return self._logger
257 @logger.setter
258 def logger(self, logger):
259 assert isinstance(logger, logging.Logger) or logger in ['not set', None], \
260 "GMS_obj.logger can not be set to %s." % logger
262 # save prior logs
263 # if logger is None and self._logger is not None:
264 # self.log += self.logger.captured_stream
265 self._logger = logger
267 @property # FIXME does not work yet
268 def log(self):
269 """Returns a string of all logged messages until now."""
271 return self._log
273 @log.setter
274 def log(self, string):
275 assert isinstance(string, str), "'log' can only be set to a string. Got %s." % type(string)
276 self._log = string
278 @property
279 def proc_status(self):
280 # type: () -> str
281 """
282 Get the processing status of the current GMS_object (subclass) instance for the current processing level.
284 Possible values: 'initialized', 'running', 'finished', 'failed'
285 """
286 # NOTE: self.proc_status_all_GMSobjs is a class attribute (visible and modifyable from any other subsystem)
287 return self.proc_status_all_GMSobjs[self.scene_ID][self.subsystem][self.proc_level]
289 @proc_status.setter
290 def proc_status(self, new_status):
291 # type: (str) -> None
292 self.proc_status_all_GMSobjs[self.scene_ID][self.subsystem][self.proc_level] = new_status
294 @property
295 def GMS_identifier(self):
296 return GMS_identifier(self.image_type, self.satellite, self.sensor, self.subsystem, self.proc_level,
297 self.dataset_ID, self.logger)
299 @property
300 def MetaObj(self):
301 # TODO if there is no MetaObj -> create MetaObj by reading metadata from disk
302 # reading from disk should use L1A_P.L1A_object.import_metadata -> so just return None
304 return self._MetaObj
306 @MetaObj.setter
307 def MetaObj(self, MetaObj):
308 assert isinstance(MetaObj, METADATA), "'MetaObj' can only be set to an instance of METADATA class. " \
309 "Got %s." % type(MetaObj)
310 self._MetaObj = MetaObj
312 @MetaObj.deleter
313 def MetaObj(self):
314 if self._MetaObj and self._MetaObj.logger not in [None, 'not set']:
315 self._MetaObj.logger.close()
316 self._MetaObj.logger = None
318 self._MetaObj = None
320 @property
321 def pathGen(self):
322 # type: () -> PG.path_generator
323 """
324 Returns the path generator object for generating file pathes belonging to the GMS object.
325 """
327 if self._pathGen and self._pathGen.proc_level == self.proc_level:
328 return self._pathGen
329 else:
330 self._pathGen = PG.path_generator(self.__dict__.copy(), MGRS_info=self.MGRS_info)
332 return self._pathGen
334 @pathGen.setter
335 def pathGen(self, pathGen):
336 assert isinstance(pathGen, PG.path_generator), 'GMS_object.pathGen can only be set to an instance of ' \
337 'path_generator. Got %s.' % type(pathGen)
338 self._pathGen = pathGen
340 @pathGen.deleter
341 def pathGen(self):
342 self._pathGen = None
344 @property
345 def subset(self):
346 return [self.arr_shape, self.arr_pos]
348 @property
349 def LayerBandsAssignment(self):
350 # FIXME merge that with self.MetaObj.LayerBandsAssignment
351 # FIXME -> otherwise a change of LBA in MetaObj is not recognized here
352 if self._LayerBandsAssignment:
353 return self._LayerBandsAssignment
354 elif self.image_type == 'RSD':
355 self._LayerBandsAssignment = get_LayerBandsAssignment(self.GMS_identifier) \
356 if self.sensormode != 'P' else get_LayerBandsAssignment(self.GMS_identifier, nBands=1)
357 return self._LayerBandsAssignment
358 else:
359 return ''
361 @LayerBandsAssignment.setter
362 def LayerBandsAssignment(self, LBA_list):
363 self._LayerBandsAssignment = LBA_list
364 self.MetaObj.LayerBandsAssignment = LBA_list
366 @property
367 def dict_LayerOptTherm(self):
368 if self._dict_LayerOptTherm:
369 return self._dict_LayerOptTherm
370 elif self.LayerBandsAssignment:
371 self._dict_LayerOptTherm = get_dict_LayerOptTherm(self.GMS_identifier, self.LayerBandsAssignment)
372 return self._dict_LayerOptTherm
373 else:
374 return None
376 @property
377 def georef(self):
378 """Returns True if the current dataset can serve as spatial reference."""
380 return True if self.image_type == 'RSD' and re.search(r'OLI', self.sensor, re.I) else False
382 @property
383 def coreg_needed(self):
384 if self._coreg_needed is None:
385 self._coreg_needed = not (self.dataset_ID == CFG.datasetid_spatial_ref)
386 return self._coreg_needed
388 @coreg_needed.setter
389 def coreg_needed(self, value):
390 self._coreg_needed = value
392 @property
393 def coreg_info(self):
394 if not self._coreg_info:
395 self._coreg_info = {
396 'corrected_shifts_px': {'x': 0, 'y': 0},
397 'corrected_shifts_map': {'x': 0, 'y': 0},
398 'original map info': self.MetaObj.map_info,
399 'updated map info': None,
400 'shift_reliability': None,
401 'reference scene ID': None,
402 'reference entity ID': None,
403 'reference geotransform': None,
404 # reference projection must be the own projection in order to avoid overwriting with a wrong EPSG
405 'reference projection': self.MetaObj.projection,
406 'reference extent': {'rows': None, 'cols': None},
407 'reference grid': [list(CFG.spatial_ref_gridx),
408 list(CFG.spatial_ref_gridy)],
409 'success': False
410 }
412 return self._coreg_info
414 @coreg_info.setter
415 def coreg_info(self, val):
416 self._coreg_info = val
418 @property
419 def resamp_needed(self):
420 if self._resamp_needed is None:
421 gt = mapinfo2geotransform(self.MetaObj.map_info)
422 self._resamp_needed = not is_coord_grid_equal(gt, CFG.spatial_ref_gridx,
423 CFG.spatial_ref_gridy)
424 return self._resamp_needed
426 @resamp_needed.setter
427 def resamp_needed(self, value):
428 self._resamp_needed = value
430 @property
431 def arr(self):
432 # type: () -> GeoArray
433 # TODO this must return a subset if self.subset is not None
434 return self._arr
436 @arr.setter
437 def arr(self, *geoArr_initArgs):
438 # TODO this must be able to handle subset inputs in tiled processing
439 self._arr = GeoArray(*geoArr_initArgs)
441 # set nodata value and geoinfos
442 # NOTE: MetaObj is NOT gettable before import_metadata has been executed!
443 if hasattr(self, 'MetaObj') and self.MetaObj:
444 self._arr.nodata = self.MetaObj.spec_vals['fill']
445 self._arr.gt = mapinfo2geotransform(self.MetaObj.map_info) if self.MetaObj.map_info else [0, 1, 0, 0, 0, -1]
446 self._arr.prj = self.MetaObj.projection
447 else:
448 self._arr.nodata = DEF_D.get_outFillZeroSaturated(self._arr.dtype)[0]
450 # set bandnames like this: [B01, .., B8A,]
451 if self.LayerBandsAssignment:
452 if len(self.LayerBandsAssignment) == self._arr.bands:
453 self._arr.bandnames = self.LBA2bandnames(self.LayerBandsAssignment)
454 else:
455 warnings.warn("Cannot set 'bandnames' attribute of GMS_object.arr because LayerBandsAssignment has %s "
456 "bands and GMS_object.arr has %s bands."
457 % (len(self.LayerBandsAssignment), self._arr.bands))
459 @arr.deleter
460 def arr(self):
461 self._arr = None
463 @property
464 def arr_meta(self):
465 return self.MetaObj.to_odict()
467 @property
468 def mask_nodata(self):
469 if self._mask_nodata is not None:
470 if not self._mask_nodata.is_inmem and self._mask_nodata.bands > 1:
471 # NoDataMask object self._mask_nodata points to multi-band image file (bands mask_nodata/mask_clouds)
472 # -> read processes of not needed bands need to be avoided
473 self._mask_nodata = NoDataMask(self._mask_nodata[:, :, 0],
474 geotransform=self._mask_nodata.gt,
475 projection=self._mask_nodata.prj)
476 return self._mask_nodata
478 elif self._masks:
479 # return nodata mask from self.masks
480 self._mask_nodata = NoDataMask(self._masks[:, :, 0], # TODO use band names
481 geotransform=self._masks.gt,
482 projection=self._masks.prj)
483 return self._mask_nodata
485 elif isinstance(self.arr, GeoArray):
486 self.logger.info('Calculating nodata mask...')
487 self._mask_nodata = self.arr.mask_nodata # calculates mask nodata if not already present
488 return self._mask_nodata
489 else:
490 return None
492 @mask_nodata.setter
493 def mask_nodata(self, *geoArr_initArgs):
494 if geoArr_initArgs[0] is not None:
495 self._mask_nodata = NoDataMask(*geoArr_initArgs)
496 self._mask_nodata.nodata = False
497 self._mask_nodata.gt = self.arr.gt
498 self._mask_nodata.prj = self.arr.prj
499 else:
500 del self.mask_nodata
502 @mask_nodata.deleter
503 def mask_nodata(self):
504 self._mask_nodata = None
506 @property
507 def mask_clouds(self):
508 if self._mask_clouds is not None:
509 if not self._mask_clouds.is_inmem and self._mask_clouds.bands > 1:
510 # CloudMask object self._mask_clouds points to multi-band image file on disk
511 # (bands mask_nodata/mask_clouds) -> read processes of not needed bands need to be avoided
512 self._mask_clouds = CloudMask(self._mask_clouds[:, :, 1],
513 geotransform=self._mask_clouds.gt,
514 projection=self._mask_clouds.prj) # TODO add legend
515 return self._mask_clouds
517 elif self._masks and self._masks.bands > 1: # FIXME this will not be secure if there are more than 2 bands
518 # return cloud mask from self.masks
519 self._mask_clouds = CloudMask(self._masks[:, :, 1], # TODO use band names
520 geotransform=self._masks.gt,
521 projection=self._masks.prj)
522 return self._mask_clouds
523 else:
524 return None # TODO don't calculate cloud mask?
526 @mask_clouds.setter
527 def mask_clouds(self, *geoArr_initArgs):
528 if geoArr_initArgs[0] is not None: # FIXME shape validation?
529 self._mask_clouds = CloudMask(*geoArr_initArgs)
530 self._mask_clouds.nodata = 0
531 self._mask_clouds.gt = self.arr.gt
532 self._mask_clouds.prj = self.arr.prj
533 else:
534 del self.mask_clouds
536 @mask_clouds.deleter
537 def mask_clouds(self):
538 self._mask_clouds = None
540 @property
541 def dem(self):
542 """
543 Returns an SRTM DEM in the exact dimension an pixel grid of self.arr as an instance of GeoArray.
544 """
546 if self._dem is None:
547 self.logger.info('Generating DEM...')
548 DC_args = (self.arr.box.boxMapXY, self.arr.prj, self.arr.xgsd, self.arr.ygsd)
549 try:
550 DC = INP_R.DEM_Creator(dem_sensor='SRTM', logger=self.logger)
551 self._dem = DC.from_extent(*DC_args)
552 except RuntimeError:
553 self.logger.warning('SRTM DEM generation failed. Falling back to ASTER...')
554 DC = INP_R.DEM_Creator(dem_sensor='ASTER', logger=self.logger)
555 self._dem = DC.from_extent(*DC_args)
557 self._dem.nodata = DEF_D.get_outFillZeroSaturated(self._dem.dtype)[0]
558 return self._dem
560 @dem.setter
561 def dem(self, *geoArr_initArgs):
562 if geoArr_initArgs[0] is not None:
563 geoArr = GeoArray(*geoArr_initArgs)
564 assert self._dem.shape[:2] == self.arr.shape[:2]
566 self._dem = geoArr
567 self._dem.nodata = DEF_D.get_outFillZeroSaturated(self._dem.dtype)[0]
568 self._dem.gt = self.arr.gt
569 self._dem.prj = self.arr.prj
570 else:
571 del self.dem
573 @dem.deleter
574 def dem(self):
575 self._dem = None
577 @property
578 def masks(self):
579 # if self.mask_nodata is not None and self.mask_clouds is not None and \
580 # self._masks is not None and self._masks.bands==1:
582 # self.build_combined_masks_array()
584 return self._masks
586 @masks.setter
587 def masks(self, *geoArr_initArgs):
588 """
589 NOTE: This does not automatically update mask_nodata and mask_clouds BUT if mask_nodata and mask_clouds are
590 None their getters will automatically synchronize!
591 """
593 if geoArr_initArgs[0] is not None:
594 self._masks = GeoArray(*geoArr_initArgs)
595 self._masks.nodata = 0
596 self._masks.gt = self.arr.gt
597 self._masks.prj = self.arr.prj
598 else:
599 del self.masks
601 @masks.deleter
602 def masks(self):
603 self._masks = None
605 @property
606 def mask_clouds_confidence(self):
607 return self._mask_clouds_confidence
609 @mask_clouds_confidence.setter
610 def mask_clouds_confidence(self, *geoArr_initArgs):
611 if geoArr_initArgs[0] is not None:
612 cnfArr = GeoArray(*geoArr_initArgs)
614 assert cnfArr.shape == self.arr.shape[:2], \
615 "The 'mask_clouds_confidence' GeoArray can only be instanced with an array of the same dimensions " \
616 "like GMS_obj.arr. Got %s." % str(cnfArr.shape)
618 # noinspection PyProtectedMember
619 if cnfArr._nodata is None:
620 cnfArr.nodata = DEF_D.get_outFillZeroSaturated(cnfArr.dtype)[0]
621 cnfArr.gt = self.arr.gt
622 cnfArr.prj = self.arr.prj
623 cnfArr.bandnames = ['confidence']
625 self._mask_clouds_confidence = cnfArr
626 else:
627 del self.mask_clouds_confidence
629 @mask_clouds_confidence.deleter
630 def mask_clouds_confidence(self):
631 self._mask_clouds_confidence = None
633 @property
634 def ac_errors(self):
635 """Returns an instance of GeoArray containing error information calculated by the atmospheric correction.
637 :return:
638 """
640 return self._ac_errors
642 @ac_errors.setter
643 def ac_errors(self, *geoArr_initArgs):
644 if geoArr_initArgs[0] is not None:
645 errArr = GeoArray(*geoArr_initArgs)
647 if CFG.ac_bandwise_accuracy:
648 assert errArr.shape == self.arr.shape, \
649 "The 'ac_errors' GeoArray can only be instanced with an array of the same dimensions like " \
650 "GMS_obj.arr. Got %s." % str(errArr.shape)
651 else:
652 assert errArr.shape[:2] == self.arr.shape[:2], \
653 "The 'ac_errors' GeoArray can only be instanced with an array of the same X/Y dimensions like " \
654 "GMS_obj.arr. Got %s." % str(errArr.shape)
656 # noinspection PyProtectedMember
657 if errArr._nodata is None:
658 errArr.nodata = DEF_D.get_outFillZeroSaturated(errArr.dtype)[0]
659 errArr.gt = self.arr.gt
660 errArr.prj = self.arr.prj
661 errArr.bandnames = self.LBA2bandnames(self.LayerBandsAssignment) if errArr.ndim == 3 else ['median']
663 self._ac_errors = errArr
664 else:
665 del self.ac_errors
667 @ac_errors.deleter
668 def ac_errors(self):
669 self._ac_errors = None
671 @property
672 def spat_homo_errors(self):
673 if not self._spat_homo_errors and self.coreg_info['shift_reliability'] is not None:
674 errArr = GeoArray(np.full(self.arr.shape[:2], self.coreg_info['shift_reliability'], dtype=np.uint8),
675 geotransform=self.arr.geotransform,
676 projection=self.arr.projection,
677 bandnames=['shift_reliability'],
678 nodata=DEF_D.get_outFillZeroSaturated(np.uint8)[0])
679 errArr[self.arr.mask_nodata.astype(np.int8) == 0] = errArr.nodata
681 self._spat_homo_errors = errArr
683 return self._spat_homo_errors
685 @spat_homo_errors.deleter
686 def spat_homo_errors(self):
687 self._spat_homo_errors = None
689 @property
690 def spec_homo_errors(self):
691 """Returns an instance of GeoArray containing error information calculated during spectral homogenization.
693 :return:
694 """
696 return self._spec_homo_errors
698 @spec_homo_errors.setter
699 def spec_homo_errors(self, *geoArr_initArgs):
700 if geoArr_initArgs[0] is not None:
701 errArr = GeoArray(*geoArr_initArgs)
703 if CFG.spechomo_bandwise_accuracy:
704 assert errArr.shape == self.arr.shape, \
705 "The 'spec_homo_errors' GeoArray can only be instanced with an array of the same dimensions like " \
706 "GMS_obj.arr. Got %s." % str(errArr.shape)
707 else:
708 assert errArr.shape[:2] == self.arr.shape[:2], \
709 "The 'spec_homo_errors' GeoArray can only be instanced with an array of the same X/Y dimensions " \
710 "like GMS_obj.arr. Got %s." % str(errArr.shape)
712 # noinspection PyProtectedMember
713 if errArr._nodata is None:
714 errArr.nodata = DEF_D.get_outFillZeroSaturated(errArr.dtype)[0]
715 errArr.gt = self.arr.gt
716 errArr.prj = self.arr.prj
717 errArr.bandnames = self.LBA2bandnames(self.LayerBandsAssignment) if errArr.ndim == 3 else ['median']
719 self._spec_homo_errors = errArr
720 else:
721 del self.spec_homo_errors
723 @spec_homo_errors.deleter
724 def spec_homo_errors(self):
725 self._spec_homo_errors = None
727 @property
728 def accuracy_layers(self):
729 return self._accuracy_layers
731 @accuracy_layers.setter
732 def accuracy_layers(self, *geoArr_initArgs):
733 if geoArr_initArgs[0] is not None:
734 acc_lay = GeoArray(*geoArr_initArgs)
735 assert acc_lay.shape[:2] == self.arr.shape[:2],\
736 "The 'accuracy_layers' GeoArray can only be instanced with an array of the same dimensions like " \
737 "GMS_obj.arr. Got %s." % str(acc_lay.shape)
739 # noinspection PyProtectedMember
740 if acc_lay._nodata is None:
741 acc_lay.nodata = DEF_D.get_outFillZeroSaturated(acc_lay.dtype)[0]
742 acc_lay.gt = self.arr.gt
743 acc_lay.prj = self.arr.prj
745 if not acc_lay.bandnames:
746 raise ValueError
748 self._accuracy_layers = acc_lay
749 else:
750 del self._accuracy_layers
752 @accuracy_layers.deleter
753 def accuracy_layers(self):
754 self._accuracy_layers = None
756 @property
757 def accuracy_layers_meta(self):
758 if self._accuracy_layers is not None:
759 return {'map info': geotransform2mapinfo(self._accuracy_layers.gt, self._accuracy_layers.projection),
760 'coordinate system string': self._accuracy_layers.projection,
761 'bands': self._accuracy_layers.bands,
762 'band names': list(self._accuracy_layers.bandnames),
763 'data ignore value': self._accuracy_layers.nodata}
764 else:
765 return None
767 def generate_accuracy_layers(self):
768 if not self.proc_level.startswith('L2'):
769 self.logger.warning('Attempt to get %s accuracy layers failed - they are a Level 2 feature only.'
770 % self.proc_level)
771 return None
773 try:
774 from ..algorithms.L2C_P import AccuracyCube
775 self._accuracy_layers = AccuracyCube(self) # don't use setter because it sets it to a GeoArray instance
777 except ValueError as e:
778 if str(e) == 'The given GMS_object contains no accuracy layers for combination.':
779 if any([CFG.ac_estimate_accuracy, CFG.spathomo_estimate_accuracy, CFG.spechomo_estimate_accuracy]):
780 self.logger.warning('The given GMS_object contains no accuracy layers although computation '
781 'of accurracy layers was enabled in job configuration.')
782 else:
783 pass # self._accuracy_layers keeps None
784 else:
785 raise
787 except Exception as e:
788 raise RuntimeError('Failed to generate AccuracyCube!', e)
790 @property
791 def cloud_masking_algorithm(self):
792 if not self._cloud_masking_algorithm:
793 self._cloud_masking_algorithm = CFG.cloud_masking_algorithm[self.satellite]
794 return self._cloud_masking_algorithm
796 @property
797 def ac_options(self):
798 # type: () -> dict
799 """
800 Returns the options dictionary needed as input for atmospheric correction. If an empty dictionary is returned,
801 atmospheric correction is not yet available for the current sensor and will later be skipped.
802 """
804 if not self._ac_options:
805 path_ac_options = CFG.path_custom_sicor_options or PG.get_path_ac_options(self.GMS_identifier)
807 if path_ac_options and os.path.exists(path_ac_options):
808 # don't validate because options contain pathes that do not exist on another server:
809 opt_dict = get_ac_options(path_ac_options, validation=False)
811 # update some file paths depending on the current environment
812 opt_dict['DEM']['fn'] = CFG.path_dem_proc_srtm_90m
813 opt_dict['ECMWF']['path_db'] = CFG.path_ECMWF_db
814 opt_dict['S2Image'][
815 'S2_MSI_granule_path'] = None # only a placeholder -> will always be None for GMS usage
816 opt_dict['output'] = [] # outputs are not needed for GMS -> so
817 opt_dict['report']['report_path'] = os.path.join(self.pathGen.get_path_procdata(), '[TYPE]')
818 if 'uncertainties' in opt_dict:
819 if CFG.ac_estimate_accuracy:
820 opt_dict['uncertainties']['snr_model'] = PG.get_path_snr_model(self.GMS_identifier)
821 else:
822 del opt_dict['uncertainties'] # SICOR will not compute uncertainties if that key is missing
824 # apply custom configuration
825 opt_dict["logger"]['level'] = CFG.log_level
826 opt_dict["ram"]['upper_limit'] = CFG.ac_max_ram_gb
827 opt_dict["ram"]['unit'] = 'GB'
828 opt_dict["AC"]['fill_nonclear_areas'] = CFG.ac_fillnonclear_areas
829 opt_dict["AC"]['clear_area_labels'] = CFG.ac_clear_area_labels
830 # opt_dict['AC']['n_cores'] = CFG.CPUs if CFG.allow_subMultiprocessing else 1
832 self._ac_options = opt_dict
833 else:
834 self.logger.warning('There is no options file available for atmospheric correction. '
835 'Atmospheric correction must be skipped.')
837 return self._ac_options
839 def get_copied_dict_and_props(self, remove_privates=False):
840 # type: (bool) -> dict
841 """Returns a copy of the current object dictionary including the current values of all object properties."""
843 # loggers must be closed
844 self.close_loggers()
845 # this disables automatic recreation of loggers (otherwise loggers are created by using getattr()):
846 self._loggers_disabled = True
848 out_dict = self.__dict__.copy()
850 # add properties
851 property_names = [p for p in dir(self.__class__) if isinstance(getattr(self.__class__, p), property)]
852 [out_dict.update({propK: copy.copy(getattr(self, propK))}) for propK in property_names]
854 # remove private attributes
855 if remove_privates:
856 out_dict = {k: v for k, v in out_dict.items() if not k.startswith('_')}
858 self._loggers_disabled = False # enables automatic recreation of loggers
860 return out_dict
862 def attributes2dict(self, remove_privates=False):
863 # type: (bool) -> dict
864 """Returns a copy of the current object dictionary including the current values of all object properties."""
866 # loggers must be closed
867 self.close_loggers()
868 # this disables automatic recreation of loggers (otherwise loggers are created by using getattr()):
869 self._loggers_disabled = True
871 out_dict = self.__dict__.copy()
873 # add some selected property values
874 for i in ['GMS_identifier', 'LayerBandsAssignment', 'coreg_needed', 'coreg_info', 'resamp_needed',
875 'dict_LayerOptTherm', 'georef', 'MetaObj']:
876 if i == 'MetaObj':
877 out_dict['meta_odict'] = self.MetaObj.to_odict()
878 else:
879 out_dict[i] = getattr(self, i)
881 # remove private attributes
882 if remove_privates:
883 out_dict = {k: v for k, v in out_dict.items() if not k.startswith('_')}
885 self._loggers_disabled = False # enables automatic recreation of loggers
886 return out_dict
888 def _data_downloader(self, sensor, entity_ID):
889 self.logger.info('Data downloader started.')
890 success = False
891 " > download source code for Landsat here < "
892 if not success:
893 self.logger.critical(
894 "Download for %s dataset '%s' failed. No further processing possible." % (sensor, entity_ID))
895 raise RuntimeError('Archive download failed.')
896 return success
898 @classmethod
899 def from_disk(cls, tuple_GMS_subset):
900 """Fills an already instanced GMS object with data from disk. Excludes array attributes in Python mode.
902 :param tuple_GMS_subset: <tuple> e.g. ('/path/gms_file.gms', ['cube', None])
903 """
904 GMS_obj = cls()
906 path_GMS_file = tuple_GMS_subset[0]
907 GMSfileDict = INP_R.GMSfile2dict(path_GMS_file)
909 # copy all attributes from GMS file (private attributes are not touched since they are not included in GMS file)
911 # set MetaObj first in order to make some getters and setters work
912 GMS_id = GMS_identifier(image_type=GMSfileDict['image_type'], satellite=GMSfileDict['satellite'],
913 sensor=GMSfileDict['sensor'], subsystem=GMSfileDict['subsystem'],
914 proc_level=GMSfileDict['proc_level'], dataset_ID=GMSfileDict['dataset_ID'], logger=None)
915 GMS_obj.MetaObj = METADATA(GMS_id).from_odict(GMSfileDict['meta_odict'])
917 for key, value in GMSfileDict.items():
918 if key in ['GMS_identifier', 'georef', 'dict_LayerOptTherm']:
919 continue # properties that should better be created on the fly
920 elif key == 'acq_datetime':
921 GMS_obj.acq_datetime = datetime.datetime.strptime(value, '%Y-%m-%d %H:%M:%S.%f%z')
922 else:
923 try:
924 setattr(GMS_obj, key, value)
925 except Exception:
926 raise AttributeError("Can't set attribute %s." % key)
928 GMS_obj.arr_shape, GMS_obj.arr_pos = tuple_GMS_subset[1]
930 GMS_obj.arr = GMS_obj.pathGen.get_path_imagedata()
931 # self.mask_nodata and self.mask_clouds are auto-synchronized via self.masks (see their getters):
932 GMS_obj.masks = GMS_obj.pathGen.get_path_maskdata()
933 GMS_obj.accuracy_layers = GMS_obj.pathGen.get_path_accuracylayers()
935 return GMS_obj
937 @classmethod
938 def from_sensor_subsystems(cls, list_GMS_objs):
939 # type: (List[GMS_object]) -> GMS_object
940 """Merge separate GMS objects belonging to the same scene-ID into ONE GMS object.
942 :param list_GMS_objs: <list> of GMS objects covering the same geographic area but representing different
943 sensor subsystems (e.g. 3 GMS_objects for Sentinel-2 10m/20m/60m bands)
944 """
945 GMS_obj_merged = cls()
947 # assertions
948 assert len(list_GMS_objs) > 1, "'GMS_object.from_sensor_subsystems()' expects multiple input GMS objects. " \
949 "Got %d." % len(list_GMS_objs)
950 assert all([is_coord_grid_equal(list_GMS_objs[0].arr.gt, *obj.arr.xygrid_specs) for obj in list_GMS_objs[1:]]),\
951 "The input GMS objects must have the same pixel grid. Received: %s" \
952 % np.array([obj.arr.xygrid_specs for obj in list_GMS_objs])
953 assert len(list(set([GMS_obj.proc_level for GMS_obj in list_GMS_objs]))) == 1, \
954 "The input GMS objects for GMS_object.from_sensor_subsystems() must have the same processing level."
955 subsystems = [GMS_obj.subsystem for GMS_obj in list_GMS_objs]
956 assert len(subsystems) == len(list(set(subsystems))), \
957 "The input 'list_GMS_objs' contains duplicates: %s" % subsystems
959 ##################
960 # merge logfiles #
961 ##################
963 # read all logs into DataFrame, sort it by the first column
964 [GMS_obj.close_loggers() for GMS_obj in list_GMS_objs] # close the loggers of the input objects
965 paths_inLogs = [GMS_obj.pathGen.get_path_logfile() for GMS_obj in list_GMS_objs]
966 allLogs_df = DataFrame()
967 for log in paths_inLogs:
968 df = read_csv(log, sep='\n', delimiter=None, header=None, # no delimiter needed
969 engine='python') # engine suppresses a pandas warning
970 allLogs_df = allLogs_df.append(df)
972 allLogs_df = allLogs_df.sort_values(0) # sorting uses timestamps that appear on first position in logs
973 allLogs_df = allLogs_df.drop_duplicates() # otherwise, e.g., logs from AC would appear 3 times for S2A
975 # set common metadata, needed for logfile
976 GMS_obj_merged.baseN = list_GMS_objs[0].pathGen.get_baseN(merged_subsystems=True)
977 GMS_obj_merged.path_logfile = list_GMS_objs[0].pathGen.get_path_logfile(merged_subsystems=True)
978 GMS_obj_merged.scene_ID = list_GMS_objs[0].scene_ID
980 # write the merged logfile and flush previous logger
981 np.savetxt(GMS_obj_merged.path_logfile, np.array(allLogs_df), delimiter=None, fmt="%s")
982 GMS_obj_merged.close_loggers()
984 # log
985 GMS_obj_merged.logger.info('Merging the subsystems %s to a single GMS object...'
986 % ', '.join([GMS_obj.subsystem for GMS_obj in list_GMS_objs]))
988 ##################
989 # MERGE METADATA #
990 ##################
992 # copy all attributes from the first input GMS file (private attributes are not touched)
993 for key, value in list_GMS_objs[0].__dict__.copy().items():
994 if key in ['GMS_identifier', 'georef', 'dict_LayerOptTherm']:
995 continue # properties that should better be created on the fly
996 elif key in ['baseN', 'path_logfile', 'scene_ID', 'subsystem']:
997 continue # either previously set with common values or not needed for merged GMS_object
998 try:
999 setattr(GMS_obj_merged, key, value)
1000 except Exception:
1001 raise AttributeError("Can't set attribute %s." % key)
1003 # update LayerBandsAssignment and get full list of output bandnames
1004 from .metadata import get_LayerBandsAssignment
1005 # use identifier of first input GMS object for getting LBA (respects current proc_level):
1006 gms_idf = list_GMS_objs[0].GMS_identifier
1007 GMS_obj_merged.LayerBandsAssignment = get_LayerBandsAssignment(gms_idf, return_fullLBA=True)
1008 bandnames = ['B%s' % i if len(i) == 2 else 'B0%s' % i for i in GMS_obj_merged.LayerBandsAssignment]
1010 # update layer-dependent metadata with respect to remaining input GMS objects
1011 GMS_obj_merged.MetaObj.LayerBandsAssignment = GMS_obj_merged.LayerBandsAssignment
1012 GMS_obj_merged.MetaObj.Subsystem = ''
1014 GMS_obj_merged.subsystem = ''
1015 del GMS_obj_merged.pathGen # must be refreshed because subsystem is now ''
1016 GMS_obj_merged.close_loggers() # must also be refreshed because it depends on pathGen
1018 for attrN in layerdependent_metadata:
1019 # combine values from separate subsystems to a single value
1020 attrDic_fullLBA = {}
1021 for GMS_obj in list_GMS_objs:
1022 attr_val = getattr(GMS_obj.MetaObj, attrN)
1023 if isinstance(attr_val, list):
1024 attrDic_fullLBA.update(dict(zip(GMS_obj.LayerBandsAssignment, attr_val)))
1025 elif isinstance(attr_val, (dict, OrderedDict)):
1026 attrDic_fullLBA.update(attr_val)
1027 else:
1028 raise ValueError(attrN)
1030 # update the attribute in self.MetaObj
1031 if attrDic_fullLBA:
1032 val2set = [attrDic_fullLBA[bN] for bN in GMS_obj_merged.LayerBandsAssignment] \
1033 if isinstance(getattr(list_GMS_objs[0].MetaObj, attrN), list) else attrDic_fullLBA
1034 setattr(GMS_obj_merged.MetaObj, attrN, val2set)
1036 ####################
1037 # MERGE ARRAY DATA #
1038 ####################
1040 # find the common extent. NOTE: boundsMap is expected in the order [xmin,xmax,ymin,ymax]
1041 geoExtents = np.array([GMS_obj.arr.box.boundsMap for GMS_obj in list_GMS_objs])
1042 common_extent = (min(geoExtents[:, 0]), max(geoExtents[:, 1]), min(geoExtents[:, 2]), max(geoExtents[:, 3]))
1044 # overwrite array data with merged arrays, clipped to common_extent and reordered according to FullLayerBandsAss
1045 for attrname in ['arr', 'ac_errors', 'dem', 'mask_nodata', 'mask_clouds', 'mask_clouds_confidence', 'masks']:
1047 # get current attribute of each subsystem without running property getters
1048 all_arrays = [getattr(GMS_obj, '_%s' % attrname) for GMS_obj in list_GMS_objs]
1050 # get the same geographical extent for each input GMS object
1051 if len(set(tuple(ext) for ext in geoExtents.tolist())) > 1:
1052 # in case of different extents
1053 geoArrs_same_extent = []
1055 for geoArr in all_arrays:
1057 if geoArr is not None:
1058 # FIXME mask_clouds_confidence is no GeoArray until here
1059 # FIXME -> has no nodata value -> calculation throughs warning
1060 geoArr_same_extent = \
1061 GeoArray(*geoArr.get_mapPos(
1062 mapBounds=np.array(common_extent)[[0, 2, 1, 3]], # pass (xmin, ymin, xmax, ymax)
1063 mapBounds_prj=geoArr.prj),
1064 bandnames=list(geoArr.bandnames.keys()),
1065 nodata=geoArr.nodata)
1066 geoArrs_same_extent.append(geoArr_same_extent)
1067 else:
1068 # e.g. in case of cloud mask that is only appended to the GMS object with the same
1069 # spatial resolution)
1070 geoArrs_same_extent.append(None)
1072 else:
1073 # skip get_mapPos() if all input GMS objects have the same extent
1074 geoArrs_same_extent = all_arrays
1076 # validate output GeoArrays #
1077 #############################
1079 if len([gA for gA in geoArrs_same_extent if gA is not None]) > 1:
1080 equal_bounds = all([geoArrs_same_extent[0].box.boundsMap == gA.box.boundsMap
1081 for gA in geoArrs_same_extent[1:]])
1082 equal_epsg = all([geoArrs_same_extent[0].epsg == gA.epsg for gA in geoArrs_same_extent[1:]])
1083 equal_xydims = all([geoArrs_same_extent[0].shape[:2] == gA.shape[:2] for gA in geoArrs_same_extent[1:]])
1084 if not all([equal_bounds, equal_epsg, equal_xydims]):
1085 raise RuntimeError('Something went wrong during getting the same geographical extent for all the '
1086 'input GMS objects. The extents, projections or pixel dimensions of the '
1087 'calculated input GMS objects are not equal.')
1089 # set output arrays #
1090 #####################
1092 # handle those arrays where bands have to be reordered according to FullLayerBandsAssignment
1093 if attrname in ['arr', 'ac_errors'] and list(set(geoArrs_same_extent)) != [None]:
1094 # check that each desired band name for the current attribute is provided by one of the input
1095 # GMS objects
1096 available_bandNs = list(chain.from_iterable([list(gA.bandnames) for gA in geoArrs_same_extent]))
1097 for bN in bandnames:
1098 if bN not in available_bandNs:
1099 raise ValueError("The given input GMS objects (subsystems) do not provide a bandname '%s' for "
1100 "the attribute '%s'. Available band names amongst all input GMS objects are: "
1101 "%s" % (bN, attrname, str(available_bandNs)))
1103 # merge arrays
1104 def get_band(bandN):
1105 return [gA[bandN] for gA in geoArrs_same_extent if gA and bandN in gA.bandnames][0]
1106 full_geoArr = GeoArray(np.dstack([get_band(bandN) for bandN in bandnames]),
1107 geoArrs_same_extent[0].gt, geoArrs_same_extent[0].prj,
1108 bandnames=bandnames,
1109 nodata=geoArrs_same_extent[0].nodata)
1111 # handle CFG.ac_bandwise_accuracy (average ac_errors if needed)
1112 # NOTE: full_geoArr will only have 3 dims in case of multiple subsystems
1113 # -> median cannot directly computed during AC due to different GSDs of the subsystems
1114 # -> first perform spatial homogenization, directly after that compute median
1115 # -> this is not done later (e.g., in L2C) to avoid memory overflows in L2B or L2C
1116 if attrname == 'ac_errors' and not CFG.ac_bandwise_accuracy and full_geoArr.bands > 1:
1117 full_geoArr = np.median(full_geoArr, axis=2).astype(full_geoArr.dtype)
1119 setattr(GMS_obj_merged, attrname, full_geoArr)
1121 # handle the remaining arrays
1122 else:
1123 # masks, dem, mask_nodata, mask_clouds, mask_clouds_confidence
1124 if attrname == 'dem':
1125 # use the DEM of the first input object
1126 # (if the grid is the same, the DEMs should be the same anyway)
1127 GMS_obj_merged.dem = geoArrs_same_extent[0]
1129 elif attrname == 'mask_nodata':
1130 # must not be merged -> self.arr is already merged, so just recalculate it (np.all)
1131 GMS_obj_merged.mask_nodata = GMS_obj_merged.calc_mask_nodata(overwrite=True)
1133 elif attrname == 'mask_clouds':
1134 # possibly only present in ONE subsystem (set by atm. Corr.)
1135 mask_clouds = [msk for msk in geoArrs_same_extent if msk is not None]
1136 if len(mask_clouds) > 1:
1137 raise ValueError('Expected mask clouds in only one subsystem. Got %s.' % len(mask_clouds))
1138 GMS_obj_merged.mask_clouds = mask_clouds[0] if mask_clouds else None
1140 elif attrname == 'mask_clouds_confidence':
1141 # possibly only present in ONE subsystem (set by atm. Corr.)
1142 mask_clouds_conf = [msk for msk in geoArrs_same_extent if msk is not None]
1143 if len(mask_clouds_conf) > 1:
1144 raise ValueError(
1145 'Expected mask_clouds_conf in only one subsystem. Got %s.' % len(mask_clouds_conf))
1146 GMS_obj_merged.mask_clouds_confidence = mask_clouds_conf[0] if mask_clouds_conf else None
1148 elif attrname == 'masks':
1149 # self.mask_nodata and self.mask_clouds will already be set here -> so just recreate it from there
1150 GMS_obj_merged.masks = None
1152 # avoid unequal nodata edges between individual layers (resampling artifacts etc.) #
1153 ####################################################################################
1155 # get pixel values of areas that have not been atmospherically corrected (non-clear)
1156 nonclear_labels = [lbl for lbl in ["Clear", "Snow", "Water", "Shadow", "Cirrus", "Cloud"]
1157 if lbl not in CFG.ac_clear_area_labels]
1158 cloud_mask_legend = DEF_D.get_mask_classdefinition('mask_clouds', GMS_obj_merged.satellite)
1159 nonclear_pixVals = [cloud_mask_legend[lbl] for lbl in nonclear_labels]
1161 # apply cloud mask to image data and all products derived from image data
1162 # (only if image data represents BOA-Ref and cloud areas are not to be filled with TOA-Ref)
1164 if re.search(r'BOA_Reflectance', GMS_obj_merged.MetaObj.PhysUnit, re.I) and not CFG.ac_fillnonclear_areas:
1165 # fill non-clear areas with no data values (for all bands)
1166 for pixVal in nonclear_pixVals:
1167 mask_nonclear = GMS_obj_merged.mask_clouds[:] == pixVal
1168 GMS_obj_merged.arr[mask_nonclear] = DEF_D.get_outFillZeroSaturated(GMS_obj_merged.arr.dtype)[0]
1170 if GMS_obj_merged.ac_errors:
1171 GMS_obj_merged.ac_errors[mask_nonclear] = \
1172 DEF_D.get_outFillZeroSaturated(GMS_obj_merged.ac_errors.dtype)[0]
1174 # update no data mask (adds all pixels to no data where at least one band has no data)
1175 GMS_obj_merged.calc_mask_nodata(overwrite=True)
1177 # apply updated nodata mask to array data (only to no data area, not to non-clear area)
1178 # => this sets all pixels to no data where at least one band has no data
1179 # NOTE: This may cause the cloud mask not exactly match the holes in self.arr so that there may be small
1180 # gaps between the cloud edge and where the image data begins. This is due to resampling, e.g. from
1181 # Sentinel-2 60m grid to Landsat 30 m grid. Nodata mask marks those areas as no data although there is
1182 # no cloud at exactly that pixel. This is ok!
1183 area2replace = np.all([GMS_obj_merged.mask_nodata[:] == 0] +
1184 [GMS_obj_merged.mask_clouds[:] != pixVal for pixVal in nonclear_pixVals], axis=0) \
1185 if GMS_obj_merged.mask_clouds is not None else GMS_obj_merged.mask_nodata[:] == 0
1187 for attrname in ['arr', 'ac_errors']:
1188 attr_val = getattr(GMS_obj_merged, attrname)
1190 if attr_val is not None:
1191 attr_val[area2replace] = DEF_D.get_outFillZeroSaturated(attr_val.dtype)[0]
1192 setattr(GMS_obj_merged, attrname, attr_val)
1194 # recreate self.masks
1195 GMS_obj_merged.build_combined_masks_array()
1197 # update array-dependent metadata
1198 GMS_obj_merged.MetaObj.cols = GMS_obj_merged.arr.cols
1199 GMS_obj_merged.MetaObj.rows = GMS_obj_merged.arr.rows
1200 GMS_obj_merged.MetaObj.bands = GMS_obj_merged.arr.bands
1201 GMS_obj_merged.MetaObj.map_info = geotransform2mapinfo(GMS_obj_merged.arr.gt, GMS_obj_merged.arr.prj)
1202 GMS_obj_merged.MetaObj.projection = GMS_obj_merged.arr.prj
1204 # update corner coordinates # (calc_corner_positions is a method of L1A_object)
1205 GMS_obj_merged.calc_corner_positions()
1207 # set shape of full array
1208 GMS_obj_merged.shape_fullArr = GMS_obj_merged.arr.shape
1210 return GMS_obj_merged
1212 @classmethod
1213 def from_tiles(cls, list_GMS_tiles):
1214 # type: (list) -> GMS_object
1215 """Merge separate GMS objects with different spatial coverage but belonging to one scene-ID to ONE GMS object.
1217 :param list_GMS_tiles: <list> of GMS objects that have been created by cut_GMS_obj_into_blocks()
1218 """
1219 GMS_obj = cls()
1221 if 'IMapUnorderedIterator' in str(type(list_GMS_tiles)):
1222 list_GMS_tiles = list(list_GMS_tiles)
1224 # copy all attributes except of array attributes
1225 tile1 = list_GMS_tiles[0]
1226 [setattr(GMS_obj, i, getattr(tile1, i)) for i in tile1.__dict__
1227 if not callable(getattr(tile1, i)) and not isinstance(getattr(tile1, i), (np.ndarray, GeoArray))]
1229 # MERGE ARRAY-ATTRIBUTES
1230 list_arraynames = [i for i in tile1.__dict__ if not callable(getattr(tile1, i)) and
1231 isinstance(getattr(tile1, i), (np.ndarray, GeoArray))]
1232 list_arraynames = ['_arr'] + [i for i in list_arraynames if
1233 i != '_arr'] # list must start with _arr, otherwise setters will not work
1235 for arrname in list_arraynames:
1236 samplearray = getattr(tile1, arrname)
1237 assert isinstance(samplearray, (np.ndarray, GeoArray)), \
1238 'Received a %s object for attribute %s. Expected a numpy array or an instance of GeoArray.' \
1239 % (type(samplearray), arrname)
1240 is_3d = samplearray.ndim == 3
1241 bands = (samplearray.shape[2],) if is_3d else () # dynamic -> works for arr, cld_arr,...
1242 target_shape = tuple(GMS_obj.shape_fullArr[:2]) + bands
1243 target_dtype = samplearray.dtype
1244 merged_array = GMS_obj._merge_arrays(list_GMS_tiles, arrname, target_shape, target_dtype)
1246 setattr(GMS_obj, arrname if not arrname.startswith('_') else arrname[1:],
1247 merged_array) # use setters if possible
1248 # NOTE: this asserts that each attribute starting with '_' has also a property with a setter!
1250 # UPDATE ARRAY-DEPENDENT ATTRIBUTES
1251 GMS_obj.arr_shape = 'cube'
1252 GMS_obj.arr_pos = None
1254 # update MetaObj attributes
1255 GMS_obj.MetaObj.cols = GMS_obj.arr.cols
1256 GMS_obj.MetaObj.rows = GMS_obj.arr.rows
1257 GMS_obj.MetaObj.bands = GMS_obj.arr.bands
1258 GMS_obj.MetaObj.map_info = geotransform2mapinfo(GMS_obj.arr.gt, GMS_obj.arr.prj)
1259 GMS_obj.MetaObj.projection = GMS_obj.arr.prj
1261 # calculate data_corners_imXY (mask_nodata is always an array here because get_mapPos always returns an array)
1262 corners_imYX = calc_FullDataset_corner_positions(
1263 GMS_obj.mask_nodata, assert_four_corners=False, algorithm='shapely')
1264 GMS_obj.trueDataCornerPos = [(YX[1], YX[0]) for YX in corners_imYX] # [UL, UR, LL, LR]
1266 # calculate trueDataCornerLonLat
1267 data_corners_LatLon = pixelToLatLon(GMS_obj.trueDataCornerPos,
1268 geotransform=GMS_obj.arr.gt, projection=GMS_obj.arr.prj)
1269 GMS_obj.trueDataCornerLonLat = [(YX[1], YX[0]) for YX in data_corners_LatLon]
1271 # calculate trueDataCornerUTM
1272 data_corners_utmYX = pixelToMapYX(GMS_obj.trueDataCornerPos, geotransform=GMS_obj.arr.gt,
1273 projection=GMS_obj.arr.prj) # FIXME asserts gt in UTM coordinates
1274 GMS_obj.trueDataCornerUTM = [(YX[1], YX[0]) for YX in data_corners_utmYX]
1276 return GMS_obj
1278 def get_subset_obj(self, imBounds=None, mapBounds=None, mapBounds_prj=None, out_prj=None, logmsg=None,
1279 progress=False, v=False):
1280 # type: (tuple, tuple, str, str, str, bool, bool) -> Union[GMS_object, None]
1281 """Return a subset of the given GMS object, based on the given bounds coordinates.
1283 Array attributes are clipped and relevant metadata keys are updated according to new extent. In case the subset
1284 does not contain any data but only no-data values, None is returned.
1286 :param imBounds: <tuple> tuple of image coordinates in the form (xmin,xmax,ymin,ymax)
1287 :param mapBounds: <tuple> tuple of map coordinates in the form (xmin,xmax,ymin,ymax)
1288 :param mapBounds_prj: <str> a WKT string containing projection of the given map bounds
1289 (can be different to projection of the GMS object; ignored if map bounds not given)
1290 :param out_prj: <str> a WKT string containing output projection.
1291 If not given, the projection of self.arr is used.
1292 :param logmsg: <str> a message to be logged when this method is called
1293 :param progress: <bool> whether to show progress bar (default: False)
1294 :param v: <bool> verbose mode (default: False)
1295 :return: <GMS_object> the GMS object subset
1296 """
1298 if logmsg:
1299 self.logger.info(logmsg)
1301 # copy object
1302 from ..misc.helper_functions import get_parentObjDict
1303 parentObjDict = get_parentObjDict()
1304 sub_GMS_obj = parentObjDict[self.proc_level](*HLP_F.initArgsDict[self.proc_level]) # init
1305 sub_GMS_obj.__dict__.update(
1306 {k: getattr(self, k) for k in self.__dict__.keys()
1307 if not isinstance(getattr(self, k), (GeoArray, np.ndarray))}.copy())
1309 sub_GMS_obj = copy.deepcopy(sub_GMS_obj)
1311 # clip all array attributes using the given bounds
1312 # list_arraynames = [i for i in self.__dict__ if not callable(getattr(self, i)) and \
1313 # isinstance(getattr(self, i), np.ndarray)]
1314 list_arraynames = [i for i in ['arr', 'masks', 'ac_errors', 'mask_clouds_confidence', 'spec_homo_errors',
1315 'accuracy_layers']
1316 if hasattr(self, i) and getattr(self, i) is not None] # FIXME hardcoded
1317 assert list_arraynames
1318 assert imBounds or mapBounds, "Either 'imBounds' or 'mapBounds' must be given. Got nothing."
1320 # calculate mapBounds if not already given
1321 if not mapBounds:
1322 rS, rE, cS, cE = imBounds
1323 (xmin, ymax), (xmax, ymin) = [imXY2mapXY((imX, imY), self.arr.gt) for imX, imY in
1324 [(cS, rS), (cE + 1, rE + 1)]]
1325 mapBounds_prj = self.arr.projection
1326 else:
1327 xmin, xmax, ymin, ymax = mapBounds
1329 # avoid disk IO if requested area is within the input array # TODO
1331 ####################################################################
1332 # subset all array attributes and update directly related metadata #
1333 ####################################################################
1335 for arrname in list_arraynames:
1336 # get input data for array subsetting
1337 geoArr = getattr(self, arrname)
1339 # get subsetted and (possibly) reprojected array
1340 subArr = GeoArray(*geoArr.get_mapPos((xmin, ymin, xmax, ymax), mapBounds_prj,
1341 out_prj=out_prj or geoArr.prj,
1342 rspAlg='near' if arrname == 'masks' else 'cubic',
1343 progress=progress),
1344 bandnames=list(geoArr.bandnames), nodata=geoArr.nodata)
1346 # show result
1347 if v:
1348 subArr.show(figsize=(10, 10))
1350 # update array-related attributes of sub_GMS_obj
1351 if arrname == 'arr':
1352 # return None in case the subset object contains only nodata
1353 if subArr.min() == subArr.max() and \
1354 np.std(subArr) == 0 and \
1355 np.unique(subArr) == subArr.nodata:
1356 return None
1358 sub_GMS_obj.MetaObj.map_info = geotransform2mapinfo(subArr.gt, subArr.prj)
1359 sub_GMS_obj.MetaObj.projection = subArr.prj
1360 sub_GMS_obj.MetaObj.rows, sub_GMS_obj.MetaObj.cols = subArr.arr.shape[:2]
1361 sub_GMS_obj.MetaObj.CS_UTM_ZONE = get_UTMzone(prj=subArr.prj) # FIXME only works for UTM
1362 sub_GMS_obj.MetaObj.CS_EPSG = WKT2EPSG(subArr.prj)
1363 elif arrname == 'masks':
1364 sub_GMS_obj.masks_meta['map info'] = geotransform2mapinfo(subArr.gt, subArr.prj)
1365 sub_GMS_obj.masks_meta['coordinate system string'] = subArr.prj
1366 sub_GMS_obj.masks_meta['lines'], sub_GMS_obj.masks_meta['samples'] = subArr.arr.shape[:2]
1367 sub_GMS_obj.masks_meta['CS_UTM_ZONE'] = get_UTMzone(prj=subArr.prj) # FIXME only works for UTM
1368 sub_GMS_obj.masks_meta['CS_EPSG'] = WKT2EPSG(subArr.prj)
1369 else:
1370 # other layers are either not written (i.e. have no separate meta)
1371 # or their meta is directly derived from the corresponding array
1372 pass
1374 delattr(sub_GMS_obj,
1375 arrname) # connected filePath must be disconnected because projection change will be rejected
1376 setattr(sub_GMS_obj, arrname, subArr)
1378 # copy subset mask data to mask_nodata and mask_clouds
1379 sub_GMS_obj.mask_nodata = sub_GMS_obj.masks[:, :, 0] # ==> getters will automatically return it from self.masks
1380 # FIXME not dynamic:
1381 sub_GMS_obj.mask_clouds = sub_GMS_obj.masks[:, :, 1] if sub_GMS_obj.masks.bands > 1 else None
1383 ###################
1384 # update metadata #
1385 ###################
1387 # update arr_pos
1388 sub_GMS_obj.arr_shape = 'block'
1389 if imBounds is not None:
1390 rS, rE, cS, cE = imBounds
1391 sub_GMS_obj.arr_pos = ((rS, rE), (cS, cE))
1392 else:
1393 pass # FIXME how to set arr_pos in that case?
1395 # calculate new attributes 'corner_utm' and 'corner_lonlat'
1396 rows, cols, bands = sub_GMS_obj.arr.shape
1397 ULxy, URxy, LLxy, LRxy = [[0, 0], [cols - 1, 0], [0, rows - 1], [cols - 1, rows - 1]]
1398 # FIXME asserts gt in UTM coordinates:
1399 utm_coord_YX = pixelToMapYX([ULxy, URxy, LLxy, LRxy], geotransform=subArr.gt, projection=subArr.prj)
1400 # ULyx,URyx,LLyx,LRyx:
1401 lonlat_coord = pixelToLatLon([ULxy, URxy, LLxy, LRxy], geotransform=subArr.gt, projection=subArr.prj)
1402 sub_GMS_obj.corner_utm = [[YX[1], YX[0]] for YX in utm_coord_YX] # ULxy,URxy,LLxy,LRxy
1403 sub_GMS_obj.corner_lonlat = [[YX[1], YX[0]] for YX in lonlat_coord] # ULxy,URxy,LLxy,LRxy
1405 # calculate 'bounds_LonLat' and 'bounds_utm'
1406 sub_GMS_obj.bounds_LonLat = HLP_F.corner_coord_to_minmax(sub_GMS_obj.corner_lonlat)
1407 sub_GMS_obj.bounds_utm = HLP_F.corner_coord_to_minmax(sub_GMS_obj.corner_utm)
1409 # calculate data_corners_imXY (mask_nodata is always an array here because get_mapPos always returns an array)
1410 corners_imYX = calc_FullDataset_corner_positions(
1411 sub_GMS_obj.mask_nodata, assert_four_corners=False, algorithm='shapely')
1412 sub_GMS_obj.trueDataCornerPos = [(YX[1], YX[0]) for YX in corners_imYX]
1414 # calculate trueDataCornerLonLat
1415 data_corners_LatLon = pixelToLatLon(sub_GMS_obj.trueDataCornerPos,
1416 geotransform=subArr.gt, projection=subArr.prj)
1417 sub_GMS_obj.trueDataCornerLonLat = [(YX[1], YX[0]) for YX in data_corners_LatLon]
1419 # calculate trueDataCornerUTM
1420 if isProjectedOrGeographic(subArr.prj) == 'projected':
1421 data_corners_utmYX = [imYX2mapYX(imYX, gt=subArr.gt) for imYX in corners_imYX]
1422 sub_GMS_obj.trueDataCornerUTM = [(YX[1], YX[0]) for YX in data_corners_utmYX]
1423 else:
1424 self.logger.error("Error while setting 'trueDataCornerUTM' due to unexpected projection of subArr: %s."
1425 % subArr.prj)
1427 return sub_GMS_obj
1429 @staticmethod
1430 def _merge_arrays(list_GMS_tiles, arrname2merge, target_shape, target_dtype):
1431 # type: (list, str, tuple, np.dtype) -> np.ndarray
1432 """Merge multiple arrays into a single one.
1434 :param list_GMS_tiles:
1435 :param arrname2merge:
1436 :param target_shape:
1437 :param target_dtype:
1438 :return:
1439 """
1440 out_arr = np.empty(target_shape, dtype=target_dtype)
1441 for tile in list_GMS_tiles:
1442 (rowStart, rowEnd), (colStart, colEnd) = tile.arr_pos
1443 out_arr[rowStart:rowEnd + 1, colStart:colEnd + 1] = getattr(tile, arrname2merge)
1444 return out_arr
1446 def log_for_fullArr_or_firstTile(self, log_msg, subset=None):
1447 """Send a message to the logger only if full array or the first tile is currently processed.
1448 This function can be called when processing any tile but log message will only be sent from first tile.
1450 :param log_msg: the log message to be logged
1451 :param subset: subset argument as sent to e.g. DN2TOARadRefTemp that indicates which tile is to be processed.
1452 Not needed if self.arr_pos is not None.
1453 """
1455 if subset is None and \
1456 (self.arr_shape == 'cube' or self.arr_pos is None or [self.arr_pos[0][0], self.arr_pos[1][0]] == [0, 0]) or\
1457 subset == ['cube', None] or (subset and [subset[1][0][0], subset[1][1][0]] == [0, 0]) or \
1458 hasattr(self, 'logAtThisTile') and getattr(self, 'logAtThisTile'): # cube or 1st tile
1459 self.logger.info(log_msg)
1460 else:
1461 pass
1463 @staticmethod
1464 def LBA2bandnames(LayerBandsAssignment):
1465 """Convert LayerbandsAssignment from format ['1','2',...] to bandnames like this: [B01, .., B8A,]."""
1466 return ['B%s' % i if len(i) == 2 else 'B0%s' % i for i in LayerBandsAssignment]
1468 def get_tilepos(self, target_tileshape, target_tilesize):
1469 self.tile_pos = [[target_tileshape, tb]
1470 for tb in get_array_tilebounds(array_shape=self.shape_fullArr, tile_shape=target_tilesize)]
1472 @staticmethod
1473 def rescale_array(inArray, outScaleFactor, inScaleFactor=1):
1474 """Adjust the scaling factor of an array to match the given output scale factor."""
1476 assert isinstance(inArray, np.ndarray)
1477 return inArray.astype(np.int16) * (outScaleFactor / inScaleFactor)
1479 def calc_mask_nodata(self, fromBand=None, overwrite=False):
1480 """Calculates a no data mask with (values: 0=nodata; 1=data)
1482 :param fromBand: <int> index of the band to be used (if None, all bands are used)
1483 :param overwrite: <bool> whether to overwrite existing nodata mask that has already been calculated
1484 :return:
1485 """
1487 self.logger.info('Calculating nodata mask...')
1489 if self._mask_nodata is None or overwrite:
1490 self.arr.calc_mask_nodata(fromBand=fromBand, overwrite=overwrite)
1491 self.mask_nodata = self.arr.mask_nodata
1492 return self.mask_nodata
1494 def apply_nodata_mask_to_ObjAttr(self, attrname, out_nodata_val=None):
1495 # type: (str,int) -> None
1496 """Applies self.mask_nodata to the specified array attribute by setting all values where mask_nodata is 0 to the
1497 given nodata value.
1499 :param attrname: The attribute to apply the nodata mask to. Must be an array attribute or
1500 a string path to a previously saved ENVI-file.
1501 :param out_nodata_val: set the values of the given attribute to this value.
1502 """
1504 assert hasattr(self, attrname)
1506 if getattr(self, attrname) is not None:
1508 if isinstance(getattr(self, attrname), str):
1509 update_spec_vals = True if attrname == 'arr' else False
1510 self.apply_nodata_mask_to_saved_ENVIfile(getattr(self, attrname), out_nodata_val, update_spec_vals)
1511 else:
1512 assert isinstance(getattr(self, attrname), (np.ndarray, GeoArray)), \
1513 'L1A_obj.%s must be a numpy array or an instance of GeoArray. Got type %s.' \
1514 % (attrname, type(getattr(self, attrname)))
1515 assert hasattr(self, 'mask_nodata') and self.mask_nodata is not None
1517 self.log_for_fullArr_or_firstTile('Applying nodata mask to L1A_object.%s...' % attrname)
1519 nodata_val = out_nodata_val if out_nodata_val else \
1520 DEF_D.get_outFillZeroSaturated(getattr(self, attrname).dtype)[0]
1521 getattr(self, attrname)[self.mask_nodata.astype(np.int8) == 0] = nodata_val
1523 if attrname == 'arr':
1524 self.MetaObj.spec_vals['fill'] = nodata_val
1526 def build_combined_masks_array(self):
1527 # type: () -> dict
1528 """Generates self.masks attribute (unsigned integer 8bit) from by concatenating all masks included in GMS obj.
1529 The corresponding metadata is assigned to L1A_obj.masks_meta. Empty mask attributes are skipped."""
1531 arrays2combine = [aN for aN in ['mask_nodata', 'mask_clouds']
1532 if hasattr(self, aN) and isinstance(getattr(self, aN), (GeoArray, np.ndarray))]
1533 if arrays2combine:
1534 self.log_for_fullArr_or_firstTile('Combining masks...')
1536 def get_data(arrName): return getattr(self, arrName).astype(np.uint8)[:, :, None]
1538 for aN in arrays2combine:
1539 if False in np.equal(getattr(self, aN), getattr(self, aN).astype(np.uint8)):
1540 warnings.warn('Some pixel values of attribute %s changed during data type '
1541 'conversion within build_combined_masks_array().')
1543 # set self.masks
1544 self.masks = get_data(arrays2combine[0]) if len(arrays2combine) == 1 else \
1545 np.concatenate([get_data(aN) for aN in arrays2combine], axis=2)
1546 self.masks.bandnames = arrays2combine # set band names of GeoArray (allows later indexing by band name)
1548 # set self.masks_meta
1549 nodataVal = DEF_D.get_outFillZeroSaturated(self.masks.dtype)[0]
1550 self.masks_meta = {'map info': self.MetaObj.map_info,
1551 'coordinate system string': self.MetaObj.projection,
1552 'bands': len(arrays2combine),
1553 'band names': arrays2combine,
1554 'data ignore value': nodataVal}
1556 return {'desc': 'masks', 'row_start': 0, 'row_end': self.shape_fullArr[0],
1557 'col_start': 0, 'col_end': self.shape_fullArr[1], 'data': self.masks} # usually not needed
1559 def apply_nodata_mask_to_saved_ENVIfile(self, path_saved_ENVIhdr, custom_nodata_val=None, update_spec_vals=False):
1560 # type: (str,int,bool) -> None
1561 """Applies self.mask_nodata to a saved ENVI file with the same X/Y dimensions like self.mask_nodata by setting
1562 all values where mask_nodata is 0 to the given nodata value.
1564 :param path_saved_ENVIhdr: <str> The path of the ENVI file to apply the nodata mask to.
1565 :param custom_nodata_val: <int> set the values of the given attribute to this value.
1566 :param update_spec_vals: <bool> whether to update self.MetaObj.spec_vals['fill']
1567 """
1569 self.log_for_fullArr_or_firstTile('Applying nodata mask to saved ENVI file...')
1570 assert os.path.isfile(path_saved_ENVIhdr)
1571 assert hasattr(self, 'mask_nodata') and self.mask_nodata is not None
1572 if not path_saved_ENVIhdr.endswith('.hdr') and os.path.isfile(os.path.splitext(path_saved_ENVIhdr)[0] + '.hdr'):
1573 path_saved_ENVIhdr = os.path.splitext(path_saved_ENVIhdr)[0] + '.hdr'
1574 if custom_nodata_val is None:
1575 dtype_IDL = int(INP_R.read_ENVIhdr_to_dict(path_saved_ENVIhdr)['data type'])
1576 nodata_val = DEF_D.get_outFillZeroSaturated(DEF_D.dtype_lib_IDL_Python[dtype_IDL])[0]
1577 else:
1578 nodata_val = custom_nodata_val
1579 FileObj = spectral.open_image(path_saved_ENVIhdr)
1580 File_memmap = FileObj.open_memmap(writable=True)
1581 File_memmap[self.mask_nodata == 0] = nodata_val
1582 if update_spec_vals:
1583 self.MetaObj.spec_vals['fill'] = nodata_val
1585 def combine_tiles_to_ObjAttr(self, tiles, target_attr):
1586 # type: (list,str) -> None
1587 """Combines tiles, e.g. produced by L1A_P.L1A_object.DN2TOARadRefTemp() to a single attribute.
1588 If CFG.inmem_serialization is False, the produced attribute is additionally written to disk.
1590 :param tiles: <list> a list of dictionaries with the keys 'desc', 'data', 'row_start','row_end',
1591 'col_start' and 'col_end'
1592 :param target_attr: <str> the name of the attribute to be produced
1593 """
1595 warnings.warn("'combine_tiles_to_ObjAttr' is deprecated.", DeprecationWarning)
1596 assert tiles[0] and isinstance(tiles, list) and isinstance(tiles[0], dict), \
1597 "The 'tiles' argument has to be list of dictionaries with the keys 'desc', 'data', 'row_start'," \
1598 "'row_end', 'col_start' and 'col_end'."
1599 self.logger.info("Building L1A attribute '%s' by combining given tiles..." % target_attr)
1600 tiles = [tiles] if not isinstance(tiles, list) else tiles
1601 sampleTile = dict(tiles[0])
1602 target_shape = self.shape_fullArr if len(sampleTile['data'].shape) == 3 else self.shape_fullArr[:2]
1603 setattr(self, target_attr, np.empty(target_shape, dtype=sampleTile['data'].dtype))
1604 for tile in tiles: # type: dict
1605 rS, rE, cS, cE = tile['row_start'], tile['row_end'], tile['col_start'], tile['col_end']
1606 if len(target_shape) == 3:
1607 getattr(self, target_attr)[rS:rE + 1, cS:cE + 1, :] = tile['data']
1608 else:
1609 getattr(self, target_attr)[rS:rE + 1, cS:cE + 1] = tile['data']
1610 if target_attr == 'arr':
1611 self.arr_desc = sampleTile['desc']
1612 self.arr_shape = 'cube' if len(self.arr.shape) == 3 else 'band' if len(self.arr.shape) == 2 else 'unknown'
1614 if not CFG.inmem_serialization:
1615 path_radref_file = os.path.join(self.ExtractedFolder, self.baseN + '__' + self.arr_desc)
1616 # path_radref_file = os.path.abspath('./testing/out/%s_TOA_Ref' % self.baseN)
1617 while not os.path.isdir(os.path.dirname(path_radref_file)):
1618 try:
1619 os.makedirs(os.path.dirname(path_radref_file))
1620 except OSError as e: # maybe not neccessary anymore in python 3
1621 if not e.errno == 17:
1622 raise
1623 GEOP.ndarray2gdal(self.arr, path_radref_file, importFile=self.MetaObj.Dataname, direction=3)
1624 self.MetaObj.Dataname = path_radref_file
1626 def write_tiles_to_ENVIfile(self, tiles, overwrite=True):
1627 # type: (list,bool) -> None
1628 """Writes tiles, e.g. produced by L1A_P.L1A_object.DN2TOARadRefTemp() to a single output ENVI file.
1630 :param tiles: <list> a list of dictionaries with the keys 'desc', 'data', 'row_start','row_end',
1631 'col_start' and 'col_end'
1632 :param overwrite: whether to overwrite files that have been produced earlier
1633 """
1635 self.logger.info("Writing tiles '%s' temporarily to disk..." % tiles[0]['desc'])
1636 outpath = os.path.join(self.ExtractedFolder, '%s__%s.%s' % (self.baseN, tiles[0]['desc'], self.outInterleave))
1637 if CFG.target_radunit_optical in tiles[0]['desc'] or \
1638 CFG.target_radunit_thermal in tiles[0]['desc']:
1639 self.arr_desc = tiles[0]['desc']
1640 self.arr = outpath
1641 # self.arr = os.path.abspath('./testing/out/%s_TOA_Ref.bsq' % self.baseN)
1642 self.MetaObj.Dataname = self.arr
1643 self.arr_shape = \
1644 'cube' if len(tiles[0]['data'].shape) == 3 else 'band' if len(
1645 tiles[0]['data'].shape) == 2 else 'unknown'
1646 elif tiles[0]['desc'] == 'masks':
1647 self.masks = outpath
1648 elif tiles[0]['desc'] == 'lonlat_arr':
1649 # outpath = os.path.join(os.path.abspath('./testing/out/'),'%s__%s.%s'
1650 # %(self.baseN, tiles[0]['desc'], self.outInterleave))
1651 self.lonlat_arr = outpath
1652 outpath = os.path.splitext(outpath)[0] + '.hdr' if not outpath.endswith('.hdr') else outpath
1653 out_shape = self.shape_fullArr[:2] + ([tiles[0]['data'].shape[2]] if len(tiles[0]['data'].shape) == 3 else [1])
1654 OUT_W.Tiles_Writer(tiles, outpath, out_shape, tiles[0]['data'].dtype, self.outInterleave,
1655 self.MetaObj.to_odict(), overwrite=overwrite)
1657 def to_tiles(self, blocksize=(2048, 2048)):
1658 # type: (tuple) -> Generator[GMS_object]
1659 """Returns a generator object where items represent tiles of the given block size for the GMS object.
1661 # NOTE: it's better to call get_subset_obj (also takes care of tile map infos)
1663 :param blocksize: target dimensions of the generated block tile (rows, columns)
1664 :return: <list> of GMS_object tiles
1665 """
1667 assert type(blocksize) in [list, tuple] and len(blocksize) == 2, \
1668 "The argument 'blocksize_RowsCols' must represent a tuple of size 2."
1670 tilepos = get_array_tilebounds(array_shape=self.shape_fullArr, tile_shape=blocksize)
1672 for tp in tilepos:
1673 (xmin, xmax), (ymin, ymax) = tp # e.g. [(0, 1999), (0, 999)] at a blocksize of 2000*1000 (rowsxcols)
1674 tileObj = self.get_subset_obj(imBounds=(xmin, xmax, ymin, ymax)) # xmax+1/ymax+1?
1675 yield tileObj
1677 def to_MGRS_tiles(self, pixbuffer=10, v=False):
1678 # type: (int, bool) -> Generator[GMS_object]
1679 """Returns a generator object where items represent the MGRS tiles for the GMS object.
1681 :param pixbuffer: <int> a buffer in pixel values used to generate an overlap between the returned MGRS tiles
1682 :param v: <bool> verbose mode
1683 :return: <list> of MGRS_tile objects
1684 """
1686 assert self.arr_shape == 'cube', "Only 'cube' objects can be cut into MGRS tiles. Got %s." % self.arr_shape
1687 self.logger.info('Cutting scene %s (entity ID %s) into MGRS tiles...' % (self.scene_ID, self.entity_ID))
1689 # get GeoDataFrame containing all overlapping MGRS tiles
1690 # (MGRS geometries completely within nodata area are excluded)
1691 GDF_MGRS_tiles = DB_T.get_overlapping_MGRS_tiles(CFG.conn_database,
1692 tgt_corners_lonlat=self.trueDataCornerLonLat)
1694 if GDF_MGRS_tiles.empty:
1695 raise RuntimeError('Could not find an overlapping MGRS tile in the database for the current dataset.')
1697 # calculate image coordinate bounds of the full GMS object for each MGRS tile within the GeoDataFrame
1698 gt = mapinfo2geotransform(self.MetaObj.map_info)
1700 def get_arrBounds(MGRStileObj):
1701 return list(np.array(MGRStileObj.poly_utm.buffer(pixbuffer * gt[1]).bounds)[[0, 2, 1, 3]])
1703 GDF_MGRS_tiles['MGRStileObj'] = list(GDF_MGRS_tiles['granuleid'].map(lambda mgrsTileID: MGRS_tile(mgrsTileID)))
1704 GDF_MGRS_tiles['map_bounds_MGRS'] = list(GDF_MGRS_tiles['MGRStileObj'].map(get_arrBounds)) # xmin,xmax,ymin,yma
1706 # find first tile to log and assign 'logAtThisTile' later
1707 dictIDxminymin = {(b[0] + b[2]): ID for ID, b in
1708 zip(GDF_MGRS_tiles['granuleid'], GDF_MGRS_tiles['map_bounds_MGRS'])}
1709 firstTile_ID = dictIDxminymin[min(dictIDxminymin.keys())]
1711 # ensure self.masks exists (does not exist in case of inmem_serialization mode
1712 # because in that case self.fill_from_disk() is skipped)
1713 if not hasattr(self, 'masks') or self.masks is None:
1714 self.build_combined_masks_array() # creates self.masks and self.masks_meta
1716 # read whole dataset into RAM in order to fasten subsetting
1717 self.arr.to_mem()
1718 self.masks.to_mem() # to_mem ensures that the whole dataset is present in memory
1720 # produce data for each MGRS tile in loop
1721 for GDF_idx, GDF_row in GDF_MGRS_tiles.iterrows():
1722 tileObj = self.get_subset_obj(mapBounds=GDF_row.map_bounds_MGRS,
1723 mapBounds_prj=EPSG2WKT(GDF_row['MGRStileObj'].EPSG),
1724 out_prj=EPSG2WKT(GDF_row['MGRStileObj'].EPSG),
1725 logmsg='Producing MGRS tile %s from scene %s (entity ID %s).'
1726 % (GDF_row.granuleid, self.scene_ID, self.entity_ID),
1727 progress=CFG.log_level in ['DEBUG', 10],
1728 v=v)
1730 MGRS_tileID = GDF_row['granuleid']
1732 # validate that the MGRS tile truly contains data
1733 # -> this may not be the case if get_overlapping_MGRS_tiles() yielded invalid tiles due to inaccurate
1734 # self.trueDataCornerLonLat
1735 if tileObj is None or \
1736 True not in list(np.unique(tileObj.arr.mask_nodata)):
1737 self.logger.info("MGRS tile '%s' has not been skipped because it contains only no data values."
1738 % MGRS_tileID)
1739 continue
1741 # set MGRS info
1742 tileObj.arr_shape = 'MGRS_tile'
1743 tileObj.MGRS_info = {'tile_ID': MGRS_tileID, 'grid1mil': MGRS_tileID[:3], 'grid100k': MGRS_tileID[3:]}
1745 # set logAtThisTile
1746 tileObj.logAtThisTile = MGRS_tileID == firstTile_ID
1748 # close logger of tileObj and of self in order to avoid logging permission errors
1749 tileObj.close_loggers()
1750 self.close_loggers()
1752 yield tileObj
1754 # set array attributes back to file path if they had been a filePath before
1755 if self.arr.filePath:
1756 self.arr.to_disk()
1757 if self.masks.filePath:
1758 self.masks.to_disk()
1760 def to_GMS_file(self, path_gms_file=None):
1761 self.close_loggers()
1763 dict2write = self.attributes2dict(remove_privates=True) # includes MetaObj as OrderedDict
1764 dict2write['arr_shape'], dict2write['arr_pos'] = ['cube', None]
1765 path_gms_file = path_gms_file if path_gms_file else self.pathGen.get_path_gmsfile()
1767 for k, v in list(dict2write.items()):
1768 # if isinstance(v,np.ndarray) or isinstance(v,dict) or hasattr(v,'__dict__'):
1769 # so, wenn meta-dicts nicht ins gms-file sollen. ist aber vllt ni schlecht
1770 # -> erlaubt lesen der metadaten direkt aus gms
1772 if isinstance(v, datetime.datetime):
1773 dict2write[k] = v.strftime('%Y-%m-%d %H:%M:%S.%f%z')
1775 elif isinstance(v, (DatasetLogger, logging.Logger)):
1776 if hasattr(v, 'handlers') and v.handlers[:]:
1777 warnings.warn('Not properly closed logger at GMS_obj.logger pointing to %s.' % v.path_logfile)
1779 close_logger(dict2write[k])
1780 dict2write[k] = 'not set'
1782 elif isinstance(v, GMS_identifier):
1783 dict2write[k] = v.to_odict(include_logger=False)
1785 elif isinstance(v, OrderedDict) or isinstance(v, dict):
1786 dict2write[k] = dict2write[k].copy()
1787 if 'logger' in v:
1788 if hasattr(dict2write[k]['logger'], 'handlers') and dict2write[k]['logger'].handlers[:]:
1789 warnings.warn("Not properly closed logger at %s['logger'] pointing to %s."
1790 % (k, dict2write[k]['logger'].path_logfile))
1791 close_logger(dict2write[k]['logger'])
1792 dict2write[k]['logger'] = 'not set'
1794 elif isinstance(v, np.ndarray):
1795 # delete every 3D Array larger than 100x100
1796 if len(v.shape) == 2 and sum(v.shape) <= 200:
1797 dict2write[k] = v.tolist() # numpy arrays are not jsonable
1798 else:
1799 del dict2write[k]
1801 elif hasattr(v, '__dict__'):
1802 # löscht Instanzen von Objekten und Arrays, aber keine OrderedDicts
1803 if hasattr(v, 'logger'):
1804 if hasattr(dict2write[k].logger, 'handlers') and dict2write[k].logger.handlers[:]:
1805 warnings.warn("Not properly closed logger at %s.logger pointing to %s."
1806 % (k, dict2write[k].logger.path_logfile))
1807 close_logger(dict2write[k].logger)
1808 dict2write[k].logger = 'not set'
1809 del dict2write[k]
1811 # class customJSONEncoder(json.JSONEncoder):
1812 # def default(self, obj):
1813 # if isinstance(obj, np.ndarray):
1814 # return '> numpy array <'
1815 # if isinstance(obj, dict): # funktioniert nicht
1816 # return '> python dictionary <'
1817 # if hasattr(obj,'__dict__'):
1818 # return '> python object <'
1819 # # Let the base class default method raise the TypeError
1820 # return json.JSONEncoder.default(self, obj)
1821 # json.dump(In, open(path_out_baseN,'w'), skipkeys=True,
1822 # sort_keys=True, cls=customJSONEncoder, separators=(',', ': '), indent =4)
1823 with open(path_gms_file, 'w') as outF:
1824 json.dump(dict2write, outF, skipkeys=True, sort_keys=True, separators=(',', ': '), indent=4)
1826 def to_ENVI(self, write_masks_as_ENVI_classification=True, is_tempfile=False, compression=False):
1827 # type: (object, bool, bool) -> None
1828 """Write GMS object to disk. Supports full cubes AND 'block' tiles.
1830 :param self: <object> GMS object, e.g. L1A_P.L1A_object
1831 :param write_masks_as_ENVI_classification: <bool> whether to write masks as ENVI classification file
1832 :param is_tempfile: <bool> whether output represents a temporary file
1833 -> suppresses logging and database updating
1834 - ATTENTION! This keyword asserts that the actual output file that
1835 is written later contains the final version of the array. The array
1836 is not overwritten or written once more later, but only renamed.
1837 :param compression: <bool> enable or disable compression
1838 """
1839 # monkey patch writer function in order to silence output stream
1840 envi._write_image = OUT_W.silent_envi_write_image
1842 assert self.arr_shape in ['cube', 'MGRS_tile', 'block'], \
1843 "GMS_object.to_ENVI supports only array shapes 'cube', 'MGRS_tile' and 'block'. Got %s." % self.arr_shape
1845 # set MGRS info in case of MGRS tile
1846 MGRS_info = None
1847 if self.arr_shape == 'MGRS_tile':
1848 assert hasattr(self, 'MGRS_info'), \
1849 "Tried to write an GMS object in the shape 'MGRS_tile' without without the attribute 'MGRS_info'."
1850 MGRS_info = self.MGRS_info
1852 # set self.arr from L1B path to L1A path in order to make to_ENVI copy L1A array (if .arr is not an array)
1853 if self.proc_level == 'L1B' and not self.arr.is_inmem and os.path.isfile(self.arr.filePath):
1854 # FIXME this could leed to check the wrong folder if CFG.inmem_serialization is False:
1855 self.arr = PG.path_generator('L1A', self.image_type, self.satellite, self.sensor, self.subsystem,
1856 self.acq_datetime, self.entity_ID, self.logger,
1857 MGRS_info=MGRS_info).get_path_imagedata()
1859 # set dicts
1860 image_type_dict = dict(arr=self.image_type, masks='MAS', mask_clouds='MAC', accuracy_layers='AL')
1861 metaAttr_dict = dict(arr='arr_meta', masks='masks_meta', mask_clouds='masks_meta',
1862 accuracy_layers='accuracy_layers_meta')
1863 out_dtype_dict = dict(arr='int16', masks='uint8', mask_clouds='uint8', accuracy_layers='int16')
1864 print_dict = dict(
1865 RSD_L1A='satellite data', MAS_L1A='masks', MAC_L1A='cloud mask',
1866 RSD_L1B='satellite data', MAS_L1B='masks', MAC_L1B='cloud mask',
1867 RSD_L1C='atm. corrected reflectance data', MAS_L1C='masks', MAC_L1C='cloud mask',
1868 RSD_L2A='geometrically homogenized data', MAS_L2A='masks', MAC_L2A='cloud mask',
1869 RSD_L2B='spectrally homogenized data', MAS_L2B='masks', MAC_L2B='cloud mask',
1870 RSD_L2C='MGRS tiled data', MAS_L2C='masks', MAC_L2C='cloud mask', AL_L2C='accuracy layers',)
1872 # ensure self.masks exists
1873 if not hasattr(self, 'masks') or self.masks is None:
1874 self.build_combined_masks_array() # creates InObj.masks and InObj.masks_meta
1876 # generate list of attributes to write
1877 attributes2write = ['arr', 'masks']
1879 if self.proc_level not in ['L1A', 'L1B'] and write_masks_as_ENVI_classification:
1880 attributes2write.append('mask_clouds')
1882 if self.proc_level == 'L2C':
1883 if CFG.ac_estimate_accuracy or CFG.spathomo_estimate_accuracy or CFG.spechomo_estimate_accuracy:
1884 attributes2write.append('accuracy_layers')
1886 # generate accuracy layers to be written
1887 self.generate_accuracy_layers()
1888 else:
1889 self.logger.info('Accuracy layers are not written because their generation has been disabled in the '
1890 'job configuration.')
1892 ###########################################################
1893 # loop through all attributes to write and execute writer #
1894 ###########################################################
1896 with IOLock(allowed_slots=CFG.max_parallel_reads_writes, logger=self.logger):
1897 for arrayname in attributes2write:
1898 descriptor = '%s_%s' % (image_type_dict[arrayname], self.proc_level)
1900 if hasattr(self, arrayname) and getattr(self, arrayname) is not None:
1901 arrayval = getattr(self, arrayname) # can be a GeoArray (in mem / not in mem) or a numpy.ndarray
1903 # initial assertions
1904 assert arrayname in metaAttr_dict, "GMS_object.to_ENVI cannot yet write %s attribute." % arrayname
1905 assert isinstance(arrayval, (GeoArray, np.ndarray)), "Expected a GeoArray instance or a numpy " \
1906 "array for object attribute %s. Got %s." \
1907 % (arrayname, type(arrayval))
1909 outpath_hdr = self.pathGen.get_outPath_hdr(arrayname)
1910 outpath_hdr = os.path.splitext(outpath_hdr)[0] + '__TEMPFILE.hdr' if is_tempfile else outpath_hdr
1911 if not os.path.exists(os.path.dirname(outpath_hdr)):
1912 os.makedirs(os.path.dirname(outpath_hdr))
1913 out_dtype = out_dtype_dict[arrayname]
1914 meta_attr = metaAttr_dict[arrayname]
1916 if not is_tempfile:
1917 self.log_for_fullArr_or_firstTile('Writing %s %s.' % (self.proc_level, print_dict[descriptor]))
1919 #########################
1920 # GeoArray in disk mode #
1921 #########################
1923 if isinstance(arrayval, GeoArray) and not arrayval.is_inmem:
1924 # object attribute contains GeoArray in disk mode. This is usually the case if the attribute has
1925 # been read in Python exec mode from previous processing level and has NOT been modified during
1926 # processing.
1928 assert os.path.isfile(arrayval.filePath), "The object attribute '%s' contains a not existing " \
1929 "file path: %s" % (arrayname, arrayval.filePath)
1930 path_to_array = arrayval.filePath
1932 #############
1933 # full cube #
1934 #############
1936 if self.arr_shape == 'cube':
1937 # image data can just be copied
1938 outpath_arr = os.path.splitext(outpath_hdr)[0] + (os.path.splitext(path_to_array)[1]
1939 if os.path.splitext(path_to_array)[
1940 1] else '.%s' % self.outInterleave)
1941 hdr2readMeta = os.path.splitext(path_to_array)[0] + '.hdr'
1942 meta2write = INP_R.read_ENVIhdr_to_dict(hdr2readMeta, self.logger) \
1943 if arrayname in ['mask_clouds', ] else getattr(self, meta_attr)
1944 meta2write.update({'interleave': self.outInterleave,
1945 'byte order': 0,
1946 'header offset': 0,
1947 'data type': DEF_D.dtype_lib_Python_IDL[out_dtype],
1948 'lines': self.shape_fullArr[0],
1949 'samples': self.shape_fullArr[1]})
1950 meta2write = metaDict_to_metaODict(meta2write, self.logger)
1952 if '__TEMPFILE' in path_to_array:
1953 os.rename(path_to_array, outpath_arr)
1954 envi.write_envi_header(outpath_hdr, meta2write)
1955 HLP_F.silentremove(path_to_array)
1956 HLP_F.silentremove(os.path.splitext(path_to_array)[0] + '.hdr')
1958 else:
1959 try:
1960 shutil.copy(path_to_array, outpath_arr) # copies file + permissions
1961 except PermissionError:
1962 # prevents permission error if outfile already exists and is owned by another user
1963 HLP_F.silentremove(outpath_arr)
1964 shutil.copy(path_to_array, outpath_arr)
1966 envi.write_envi_header(outpath_hdr, meta2write)
1968 assert OUT_W.check_header_not_empty(outpath_hdr), "HEADER EMPTY: %s" % outpath_hdr
1969 setattr(self, arrayname, outpath_arr) # refresh arr/masks/mask_clouds attributes
1970 if arrayname == 'masks':
1971 setattr(self, 'mask_nodata', outpath_arr)
1973 #########################
1974 # 'block' or 'MGRS_tile #
1975 #########################
1977 else:
1978 # data have to be read in subset and then be written
1979 if self.arr_pos:
1980 (rS, rE), (cS, cE) = self.arr_pos
1981 cols, rows = cE - cS + 1, rE - rS + 1
1982 else:
1983 cS, rS, cols, rows = [None] * 4
1985 if '__TEMPFILE' in path_to_array:
1986 raise NotImplementedError
1988 if arrayname not in ['mask_clouds', 'mask_nodata']:
1989 # read image data in subset
1990 tempArr = gdalnumeric.LoadFile(path_to_array, cS, rS, cols,
1991 rows) # bands, rows, columns OR rows, columns
1992 arr2write = tempArr if len(tempArr.shape) == 2 else \
1993 np.swapaxes(np.swapaxes(tempArr, 0, 2), 0, 1) # rows, columns, (bands)
1995 else:
1996 # read mask data in subset
1997 previous_procL = DEF_D.proc_chain[DEF_D.proc_chain.index(self.proc_level) - 1]
1998 PG_obj = PG.path_generator(self.__dict__, proc_level=previous_procL)
1999 path_masks = PG_obj.get_path_maskdata()
2000 arr2write = INP_R.read_mask_subset(path_masks, arrayname, self.logger,
2001 (self.arr_shape, self.arr_pos))
2003 setattr(self, arrayname, arr2write)
2005 arrayval = getattr(self, arrayname) # can be a GeoArray (in mem / not in mem) or a numpy.ndarray
2007 ####################################
2008 # np.ndarray or GeoArray in memory #
2009 ####################################
2011 if isinstance(arrayval, np.ndarray) or isinstance(arrayval, GeoArray) and arrayval.is_inmem:
2012 # must be an if-condition because arrayval can change attribute type from not-inmem-GeoArray
2013 # to np.ndarray
2014 '''object attribute contains array'''
2016 # convert array and metadata of mask clouds to envi classification file ready data
2017 arr2write, meta2write = OUT_W.mask_to_ENVI_Classification(self, arrayname) \
2018 if arrayname in ['mask_clouds', ] else (arrayval, getattr(self, meta_attr))
2019 arr2write = arr2write.arr if isinstance(arr2write, GeoArray) else arr2write
2020 assert isinstance(arr2write, np.ndarray), 'Expected a numpy ndarray. Got %s.' % type(arr2write)
2022 ##########################
2023 # full cube or MGRS_tile #
2024 ##########################
2026 if self.arr_shape in ['cube', 'MGRS_tile']:
2027 # TODO write a function that implements the algorithm from Tiles_Writer for writing cubes
2028 # TODO -> no need for Spectral Python
2030 # write cube-like attributes
2031 meta2write = metaDict_to_metaODict(meta2write, self.logger)
2032 success = 1
2034 if arrayname not in ['mask_clouds', ]:
2035 if compression:
2036 success = OUT_W.write_ENVI_compressed(outpath_hdr, arr2write, meta2write)
2037 if not success:
2038 warnings.warn('Written compressed ENVI file is not GDAL readable! '
2039 'Writing uncompressed file.')
2041 if not compression or not success:
2042 envi.save_image(outpath_hdr, arr2write, metadata=meta2write, dtype=out_dtype,
2043 interleave=self.outInterleave, ext=self.outInterleave, force=True)
2045 else:
2046 if compression:
2047 success = OUT_W.write_ENVI_compressed(outpath_hdr, arr2write, meta2write)
2048 if not success:
2049 self.logger.warning('Written compressed ENVI file is not GDAL readable! '
2050 'Writing uncompressed file.')
2052 if not compression or not success:
2053 class_names = meta2write['class names']
2054 class_colors = meta2write['class lookup']
2055 envi.save_classification(outpath_hdr, arr2write, metadata=meta2write,
2056 dtype=out_dtype, interleave=self.outInterleave,
2057 ext=self.outInterleave, force=True,
2058 class_names=class_names, class_colors=class_colors)
2059 if os.path.exists(outpath_hdr):
2060 OUT_W.reorder_ENVI_header(outpath_hdr, OUT_W.enviHdr_keyOrder)
2062 #########################
2063 # block-like attributes #
2064 #########################
2066 else:
2067 if compression: # FIXME
2068 warnings.warn(
2069 'Compression is not yet supported for GMS object tiles. Writing uncompressed data.')
2070 # write block-like attributes
2071 bands = arr2write.shape[2] if len(arr2write.shape) == 3 else 1
2072 out_shape = tuple(self.shape_fullArr[:2]) + (bands,)
2074 OUT_W.Tiles_Writer(arr2write, outpath_hdr, out_shape, out_dtype, self.outInterleave,
2075 out_meta=meta2write, arr_pos=self.arr_pos, overwrite=False)
2076 assert OUT_W.check_header_not_empty(outpath_hdr), "HEADER EMPTY: %s" % outpath_hdr
2078 outpath_arr = os.path.splitext(outpath_hdr)[0] + '.%s' % self.outInterleave
2079 if not CFG.inmem_serialization:
2080 setattr(self, arrayname, outpath_arr) # replace array by output path
2082 if arrayname == 'masks':
2083 setattr(self, 'mask_nodata', outpath_arr)
2085 # if compression:
2086 # raise NotImplementedError # FIXME implement working compression
2087 # HLP_F.ENVIfile_to_ENVIcompressed(outpath_hdr)
2089 else:
2090 if not is_tempfile:
2091 self.logger.warning("The %s can not be written, because there is no corresponding attribute."
2092 % print_dict[descriptor])
2094 ######################################
2095 # write GMS-file and update database #
2096 ######################################
2098 # IMPORTANT: DO NOT pass the complete object but only a copy of the dictionary in order to prevent ASCII_writer
2099 # and data_DB_updater from modifying the attributes of the object!!
2100 if self.arr_shape in ['cube', 'MGRS_tile'] or [self.arr_pos[0][0], self.arr_pos[1][0]] == [0, 0]:
2101 # cube or 1st tile
2103 # WRITE GMS FILE
2104 self.to_GMS_file()
2106 # CREATE/UPDATE DATABASE ENTRY
2107 if not is_tempfile:
2108 DB_T.data_DB_updater(self.attributes2dict())
2110 # get id of updated record (needed for cross-refs in later db entrys
2111 if hasattr(self, 'MGRS_info') and self.MGRS_info:
2112 res = DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'mgrs_tiles_proc', ['id'],
2113 {'sceneid': self.scene_ID,
2114 'virtual_sensor_id': CFG.virtual_sensor_id,
2115 'mgrs_code': self.MGRS_info['tile_ID']})
2116 assert len(res) == 1, 'Found multiple database records for the last updated record. sceneid: %s;' \
2117 'mgrs_code: %s; virtual_sensor_id: %s' \
2118 % (self.scene_ID, self.MGRS_info['tile_ID'], CFG.virtual_sensor_id)
2119 self.mgrs_tiles_proc_ID = res[0][0]
2120 else:
2121 res = DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'scenes_proc', ['id'],
2122 {'sceneid': self.scene_ID})
2123 assert len(res) == 1, \
2124 'Found multiple database records for the last updated record. sceneid: %s' % self.scene_ID
2125 self.scenes_proc_ID = res[0][0]
2127 # COPY LOGFILE TO MGRS OUTPUT DIRECTORY
2128 if self.arr_shape == 'MGRS_tile':
2129 shutil.copy(self.path_logfile, os.path.join(self.pathGen.get_path_procdata(),
2130 os.path.basename(self.path_logfile)))
2132 if not is_tempfile:
2133 self.log_for_fullArr_or_firstTile('%s data successfully saved.' % self.proc_level)
2135 # close logger
2136 self.close_loggers()
2138 def record_mem_usage(self):
2139 """Record memory usage of current process ID for the current processing level (megabytes)."""
2140 # get memory usage of current process ID
2141 mem_usage = int(psutil.Process(os.getpid()).memory_info().rss / 1024**2)
2143 # update the value (only if there is not a larger value already because we want to record maximum memory needed)
2144 if self.proc_level not in self.mem_usage or self.mem_usage[self.proc_level] > mem_usage:
2145 self.mem_usage[self.proc_level] = mem_usage
2147 # push recorded memory stats to database
2148 if self.proc_level == 'L2C':
2149 stats_pushed = DB_T.record_stats_memusage(CFG.conn_database, self)
2151 if stats_pushed:
2152 self.logger.info('Recorded memory usage statistics.')
2154 def block_at_system_overload(self, max_usage=95, timeout=15*60):
2155 logged = False
2156 t_start = time.time()
2158 while psutil.virtual_memory().percent >= max_usage:
2159 if not logged:
2160 self.logger.warning('Memory usage is %.1f percent. Waiting until memory usage is below %s percent. '
2161 'Timeout: %s minutes.' % (psutil.virtual_memory().percent, max_usage, timeout / 60))
2162 logged = True
2164 if time.time() - t_start > timeout:
2165 self.logger.exception('Processing could not be continued due to memory overload after waiting '
2166 '%s minutes.' % (timeout / 60))
2168 time.sleep(5)
2170 def close_loggers(self):
2171 # self.GMS_identifier and self.logger are getters - since self.GMS_identifier gets its logger from self.logger,
2172 # self.logger has to be closed AFTER closing self.GMS_identifier.logger
2173 if self.GMS_identifier and self.GMS_identifier.logger not in [None, 'not set']:
2174 self.GMS_identifier.logger.close()
2175 self.GMS_identifier.logger = None
2177 if self.MetaObj and self.MetaObj.logger not in [None, 'not set']:
2178 self.MetaObj.logger.close()
2179 self.MetaObj.logger = None
2181 if self._logger not in [None, 'not set']:
2182 self._logger.close() # this runs misc.logging.GMS_logger.close()
2183 self._logger = None # also adds current captured stream to self.log
2185 def delete_previous_proc_level_results(self):
2186 """Deletes results of the previous processing level if the respective flag CFG.exec_L**P[2]) is set to True.
2187 The function is skipped if the results of the current processing level have not yet been written.
2188 """
2190 tgt_proc_level = DEF_D.proc_chain[DEF_D.proc_chain.index(self.proc_level) - 1]
2192 if getattr(CFG, 'exec_%sP' % tgt_proc_level)[2]:
2194 pathGenPrev = PG.path_generator(self.__dict__.copy(), proc_level=tgt_proc_level)
2196 files2delete = [pathGenPrev.get_path_imagedata(),
2197 pathGenPrev.get_path_gmsfile(),
2198 pathGenPrev.get_path_maskdata(),
2199 pathGenPrev.get_path_cloudmaskdata()]
2201 # ensure that the results of the current processing level have been written
2202 if self.proc_level != 'L2A':
2203 pathGenCurr = self.pathGen
2204 else:
2205 # after geometric homogenization and subsystem merging (L2A) a path generator without subsystem
2206 # is needed
2207 dict4pathGen = self.__dict__.copy()
2208 dict4pathGen['subsystem'] = ''
2209 pathGenCurr = PG.path_generator(dict4pathGen)
2211 filesCurrProcL = [pathGenCurr.get_path_imagedata(),
2212 pathGenCurr.get_path_gmsfile(),
2213 pathGenCurr.get_path_maskdata(),
2214 pathGenCurr.get_path_cloudmaskdata()]
2216 if self.proc_level == 'L2A' and self.subsystem:
2217 # in case subsystems have been merged in L2A -> also delete logfiles of L1C
2218 files2delete.append(pathGenPrev.get_path_logfile())
2219 filesCurrProcL.append(pathGenCurr.get_path_logfile())
2221 for fPath, fPathCP in zip(files2delete, filesCurrProcL):
2222 hdr = '%s.hdr' % os.path.splitext(fPath)[0]
2223 if os.path.exists(fPath) and os.path.exists(fPathCP):
2224 HLP_F.silentremove(fPath)
2225 if os.path.exists(hdr):
2226 HLP_F.silentremove(hdr)
2228 def delete_tempFiles(self):
2229 """Delete all temporary files that have been written during GMS object processing.
2230 """
2232 # TODO ensure that all GDAL dataset are closed before. Otherwise the system creates a .fuse_hiddenXXX...
2233 # TODO right in the moment when we delete the data that are opened in GDAL
2235 self.logger.info('Deleting temporary data...')
2237 if sys.platform.startswith('linux'):
2238 # delete temporary extraction folder
2239 if os.path.isdir(self.ExtractedFolder):
2240 shutil.rmtree(self.ExtractedFolder)
2242 if os.path.isdir(self.ExtractedFolder):
2243 self.logger.warning('Could not delete temporary extraction folder: %s' % self.ExtractedFolder)
2245 # delete empty folders: subsystem > sensor > Rootname
2246 pardir = None
2247 for i in range(2): # FIXME range(3)?
2248 deldir = self.ExtractedFolder if i == 0 else pardir
2249 pardir = os.path.abspath(os.path.join(deldir, os.path.pardir))
2250 if not glob.glob('%s/*' % pardir) and os.path.isdir(pardir):
2251 os.rmdir(pardir)
2252 else:
2253 break
2255 # delete all files containing __TEMPFILE
2256 path_procdata = self.pathGen.get_path_procdata()
2257 tempfiles = glob.glob(os.path.join(path_procdata, '*__TEMPFILE*'))
2258 [HLP_F.silentremove(tf) for tf in tempfiles]
2259 else:
2260 raise NotImplementedError
2262 # delete tempfiles created by HLP_F.get_tempfile()
2263 files2delete = glob.glob(os.path.join(self.ExtractedFolder, '*')) # all files within ExtractedFolder
2264 files2delete += glob.glob(os.path.join(CFG.path_tempdir + 'GeoMultiSens_*'))
2265 for f in files2delete:
2266 HLP_F.silentremove(f)
2268 # delete previous proc_level results on demand (according to CFG.exec_L**P[2])
2269 self.delete_previous_proc_level_results()
2271 self.logger.close()
2272 self.logger = None
2274 def flush_array_data(self):
2275 del self.arr
2276 del self.mask_nodata
2277 del self.mask_clouds
2278 del self.masks
2279 del self.dem
2280 del self.mask_clouds_confidence
2281 del self.ac_errors
2282 del self.spat_homo_errors
2283 del self.spec_homo_errors
2284 del self.accuracy_layers
2286 if self.MetaObj:
2287 self.MetaObj.ViewingAngle_arrProv = {}
2288 self.MetaObj.IncidenceAngle_arrProv = {}
2291class GMS_identifier(object):
2292 def __init__(self, image_type, satellite, sensor, subsystem, proc_level, dataset_ID, logger=None):
2293 # type: (str, str, str, Union[str, None], str, int, Union[DatasetLogger, None]) -> None
2294 self.image_type = image_type
2295 self.satellite = satellite
2296 self.sensor = sensor
2297 self.subsystem = subsystem
2298 self.proc_level = proc_level
2299 self.dataset_ID = dataset_ID
2300 self.logger = logger
2302 def to_odict(self, include_logger=True):
2303 odict = OrderedDict(zip(
2304 ['image_type', 'Satellite', 'Sensor', 'Subsystem', 'proc_level', 'dataset_ID'],
2305 [self.image_type, self.satellite, self.sensor, self.subsystem, self.proc_level, self.dataset_ID]))
2307 if include_logger:
2308 odict['logger'] = self.logger
2310 return odict
2312 def __repr__(self):
2313 return repr(self.to_odict())
2315 def __getstate__(self):
2316 """Defines how the attributes of MetaObj instances are pickled."""
2317 close_logger(self.logger)
2318 self.logger = None
2320 return self.__dict__
2323class failed_GMS_object(GMS_object):
2324 def delete_tempFiles(self):
2325 pass
2327 def __init__(self, GMS_object_or_OrdDict, failedMapper, exc_type, exc_val, exc_tb):
2328 # type: (Union[GMS_object, OrderedDict], str, type(Exception), any, str) -> None
2329 super(failed_GMS_object, self).__init__()
2330 needed_attr = ['proc_level', 'image_type', 'scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem',
2331 'arr_shape', 'arr_pos']
2333 if isinstance(GMS_object_or_OrdDict, OrderedDict): # in case of unhandled exception within L1A_map
2334 OrdDic = GMS_object_or_OrdDict
2335 [setattr(self, k, OrdDic[k]) for k in needed_attr[:-2]]
2336 self.arr_shape = 'cube' if 'arr_shape' not in OrdDic else OrdDic['arr_shape']
2337 self.arr_pos = None if 'arr_pos' not in OrdDic else OrdDic['arr_pos']
2339 else: # in case of any other GMS mapper
2340 [setattr(self, k, getattr(GMS_object_or_OrdDict, k)) for k in needed_attr]
2342 self.failedMapper = failedMapper
2343 self.ExceptionType = exc_type.__name__
2344 self.ExceptionValue = repr(exc_val)
2345 self.ExceptionTraceback = exc_tb
2346 self.proc_status = 'failed'
2348 @property
2349 def pandasRecord(self):
2350 columns = ['scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem', 'image_type', 'proc_level',
2351 'arr_shape', 'arr_pos', 'failedMapper', 'ExceptionType', 'ExceptionValue', 'ExceptionTraceback']
2352 return DataFrame([getattr(self, k) for k in columns], columns=columns)
2355class finished_GMS_object(GMS_object):
2356 def __init__(self, GMS_obj):
2357 # type: (GMS_object) -> None
2358 super(finished_GMS_object, self).__init__()
2359 self.proc_level = GMS_obj.proc_level
2360 self.image_type = GMS_obj.image_type
2361 self.scene_ID = GMS_obj.scene_ID
2362 self.entity_ID = GMS_obj.entity_ID
2363 self.satellite = GMS_obj.satellite
2364 self.sensor = GMS_obj.sensor
2365 self.subsystem = GMS_obj.subsystem
2366 self.arr_shape = GMS_obj.arr_shape
2367 self.proc_status = GMS_obj.proc_status
2369 @property
2370 def pandasRecord(self):
2371 columns = ['scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem', 'image_type', 'proc_level',
2372 'arr_shape', 'arr_pos']
2373 return DataFrame([getattr(self, k) for k in columns], columns=columns)
2376def update_proc_status(GMS_mapper):
2377 """Decorator function for updating the processing status of each GMS_object (subclass) instance.
2379 :param GMS_mapper: A GMS mapper function that takes a GMS object, does some processing and returns it back.
2380 """
2382 @functools.wraps(GMS_mapper) # needed to avoid pickling errors
2383 def wrapped_GMS_mapper(GMS_objs, **kwargs):
2384 # type: (Union[List[GMS_object], GMS_object, OrderedDict, failed_GMS_object], dict) -> Union[GMS_object, List[GMS_object]] # noqa
2386 # noinspection PyBroadException
2387 try:
2388 # set processing status to 'running'
2389 if isinstance(GMS_objs, OrderedDict):
2390 GMS_objs['proc_status'] = 'running'
2391 elif isinstance(GMS_objs, GMS_object):
2392 GMS_objs.proc_status = 'running'
2393 elif isinstance(GMS_objs, Iterable):
2394 for GMS_obj in GMS_objs:
2395 GMS_obj.proc_status = 'running'
2396 elif isinstance(GMS_objs, failed_GMS_object):
2397 assert GMS_objs.proc_status == 'failed'
2398 return GMS_objs
2399 else:
2400 raise TypeError("Unexpected type of 'GMS_objs': %s" % type(GMS_objs))
2402 # RUN the GMS_mapper
2403 GMS_objs = GMS_mapper(GMS_objs, **kwargs)
2405 # set processing status to 'finished'
2406 if isinstance(GMS_objs, GMS_object):
2407 GMS_objs.proc_status = 'finished'
2408 elif isinstance(GMS_objs, Iterable):
2409 for GMS_obj in GMS_objs:
2410 GMS_obj.proc_status = 'finished'
2411 else:
2412 raise TypeError("Unexpected type of 'GMS_objs': %s" % type(GMS_objs))
2413 except Exception:
2414 # set processing status to 'running'
2415 if isinstance(GMS_objs, OrderedDict):
2416 GMS_objs['proc_status'] = 'failed'
2417 elif isinstance(GMS_objs, GMS_object):
2418 GMS_objs.proc_status = 'failed'
2419 elif isinstance(GMS_objs, Iterable):
2420 for GMS_obj in GMS_objs:
2421 GMS_obj.proc_status = 'failed'
2422 else:
2423 raise TypeError("Unexpected type of 'GMS_objs': %s" % type(GMS_objs))
2425 raise
2427 return GMS_objs # Union[GMS_object, List[GMS_object]]
2429 return wrapped_GMS_mapper
2432def return_GMS_objs_without_arrays(GMS_pipeline):
2433 """Decorator function for flushing any array attributes within the return value of a GMS pipeline function.
2435 :param GMS_pipeline: A GMS mapper function that takes a GMS object, does some processing and returns it back.
2436 """
2438 @functools.wraps(GMS_pipeline) # needed to avoid pickling errors
2439 def wrapped_GMS_pipeline(*args, **kwargs):
2440 def flush_arrays(GMS_obj):
2441 # type: (Union[GMS_object, L1C_object]) -> GMS_object
2442 GMS_obj.flush_array_data()
2443 try:
2444 GMS_obj.delete_ac_input_arrays()
2445 except AttributeError:
2446 pass
2448 return GMS_obj
2450 ###################################################################
2451 # prepare results to be back-serialized to multiprocessing master #
2452 ###################################################################
2454 # NOTE: Exceptions within GMS pipeline will be forwarded to calling function.
2455 # NOTE: Exceptions within GMS mappers are catched by exception handler (if enabled)
2456 returnVal = GMS_pipeline(*args, **kwargs)
2458 # flush array data because they are too big to be kept in memory for many GMS_objects
2459 if isinstance(returnVal, (GMS_object, failed_GMS_object)):
2460 returnVal = flush_arrays(returnVal)
2461 elif isinstance(returnVal, Iterable):
2462 returnVal = [flush_arrays(obj) for obj in returnVal]
2463 else: # OrderedDict (dataset)
2464 # the OrderedDict will not contain any arrays
2465 pass
2467 return returnVal
2469 return wrapped_GMS_pipeline
2472def return_proc_reports_only(GMS_pipeline):
2473 """Decorator function for flushing any array attributes within the return value of a GMS pipeline function.
2475 :param GMS_pipeline: A GMS mapper function that takes a GMS object, does some processing and returns it back.
2476 """
2478 @functools.wraps(GMS_pipeline) # needed to avoid pickling errors
2479 def wrapped_GMS_pipeline(*args, **kwargs):
2480 ###################################################################
2481 # prepare results to be back-serialized to multiprocessing master #
2482 ###################################################################
2484 # NOTE: Exceptions within GMS pipeline will be forwarded to calling function.
2485 # NOTE: Exceptions within GMS mappers are catched by exception handler (if enabled)
2486 returnVal = GMS_pipeline(*args, **kwargs)
2488 # flush array data because they are too big to be kept in memory for many GMS_objects
2489 if isinstance(returnVal, failed_GMS_object):
2490 pass
2491 elif isinstance(returnVal, GMS_object):
2492 returnVal = finished_GMS_object(returnVal)
2493 elif isinstance(returnVal, Iterable):
2494 returnVal = [obj if isinstance(obj, failed_GMS_object) else finished_GMS_object(obj) for obj in returnVal]
2495 else: # OrderedDict (dataset)
2496 # the OrderedDict will not contain any arrays
2497 pass
2499 return returnVal
2501 return wrapped_GMS_pipeline
2504def GMS_object_2_dataset_dict(GMS_obj):
2505 # type: (GMS_object) -> OrderedDict
2506 return OrderedDict([
2507 ('proc_level', GMS_obj.proc_level),
2508 ('scene_ID', GMS_obj.scene_ID),
2509 ('dataset_ID', GMS_obj.dataset_ID),
2510 ('image_type', GMS_obj.image_type),
2511 ('satellite', GMS_obj.satellite),
2512 ('sensor', GMS_obj.sensor),
2513 ('subsystem', GMS_obj.subsystem),
2514 ('sensormode', GMS_obj.sensormode),
2515 ('acq_datetime', GMS_obj.acq_datetime),
2516 ('entity_ID', GMS_obj.entity_ID),
2517 ('filename', GMS_obj.filename)
2518 ])
2521def estimate_mem_usage(dataset_ID, satellite):
2522 memcols = ['used_mem_l1a', 'used_mem_l1b', 'used_mem_l1c',
2523 'used_mem_l2a', 'used_mem_l2b', 'used_mem_l2c']
2525 df = DataFrame(DB_T.get_info_from_postgreSQLdb(
2526 CFG.conn_database, 'stats_mem_usage_homo',
2527 vals2return=['software_version'] + memcols,
2528 cond_dict=dict(
2529 datasetid=dataset_ID,
2530 virtual_sensor_id=CFG.virtual_sensor_id,
2531 target_gsd=CFG.target_gsd[0], # respects only xgsd
2532 target_nbands=len(CFG.target_CWL),
2533 inmem_serialization=CFG.inmem_serialization,
2534 target_radunit_optical=CFG.target_radunit_optical,
2535 # skip_coreg=CFG.skip_coreg,
2536 ac_estimate_accuracy=CFG.ac_estimate_accuracy,
2537 ac_bandwise_accuracy=CFG.ac_bandwise_accuracy,
2538 spathomo_estimate_accuracy=CFG.spathomo_estimate_accuracy,
2539 spechomo_estimate_accuracy=CFG.spechomo_estimate_accuracy,
2540 spechomo_bandwise_accuracy=CFG.spechomo_bandwise_accuracy,
2541 # parallelization_level=CFG.parallelization_level,
2542 # skip_thermal=CFG.skip_thermal,
2543 # skip_pan=CFG.skip_pan,
2544 # mgrs_pixel_buffer=CFG.mgrs_pixel_buffer,
2545 # cloud_masking_algorithm=CFG.cloud_masking_algorithm[satellite],
2546 is_test=CFG.is_test
2547 )),
2548 columns=['software_version'] + memcols
2549 )
2551 if not df.empty:
2552 df['used_mem_max'] = df[memcols].max(axis=1)
2554 # get records from gms_preprocessing versions higher than CFG.min_version_mem_usage_stats
2555 vers = list(df.software_version)
2556 vers_usable = [ver for ver in vers if parse_version(ver) >= parse_version(CFG.min_version_mem_usage_stats)]
2558 df_sub = df.loc[df.software_version.isin(vers_usable)]
2560 mem_estim_mb = np.mean(list(df_sub.used_mem_max)) # megabytes
2561 mem_estim_gb = mem_estim_mb / 1024 # gigabytes
2563 return int(np.ceil(mem_estim_gb + .1 * mem_estim_gb))