Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
3# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
4#
5# Copyright (C) 2020 Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
6#
7# This software was developed within the context of the GeoMultiSens project funded
8# by the German Federal Ministry of Education and Research
9# (project grant code: 01 IS 14 010 A-C).
10#
11# This program is free software: you can redistribute it and/or modify it under
12# the terms of the GNU General Public License as published by the Free Software
13# Foundation, either version 3 of the License, or (at your option) any later version.
14# Please note the following exception: `gms_preprocessing` depends on tqdm, which
15# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
16# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
17# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
18#
19# This program is distributed in the hope that it will be useful, but WITHOUT
20# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
21# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
22# details.
23#
24# You should have received a copy of the GNU Lesser General Public License along
25# with this program. If not, see <http://www.gnu.org/licenses/>.
27from typing import List, Tuple, Generator, Iterable, Union # noqa F401 # flake8 issue
28from psutil import virtual_memory
30from ..options.config import GMS_config as CFG
31from ..misc import exception_handler as EXC_H
32from ..misc.path_generator import path_generator
33from ..misc.logging import GMS_logger
34from ..misc.locks import ProcessLock, MemoryReserver, redis_conn
35from ..algorithms import L1A_P
36from ..algorithms import L1B_P
37from ..algorithms import L1C_P
38from ..algorithms import L2A_P
39from ..algorithms import L2B_P
40from ..algorithms import L2C_P
41from ..model.gms_object import \
42 failed_GMS_object, update_proc_status, return_proc_reports_only, estimate_mem_usage
43from ..model.gms_object import GMS_object # noqa F401 # flake8 issue
44from ..algorithms.geoprocessing import get_common_extent
46__author__ = 'Daniel Scheffler'
49@EXC_H.log_uncaught_exceptions
50@update_proc_status
51def L1A_map(dataset_dict): # map (scene-wise parallelization)
52 # type: (dict) -> L1A_P.L1A_object
54 L1A_obj = L1A_P.L1A_object(**dataset_dict)
55 L1A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
56 L1A_obj.import_rasterdata()
57 L1A_obj.import_metadata()
58 L1A_obj.validate_GeoTransProj_GeoAlign() # sets self.GeoTransProj_ok and self.GeoAlign_ok
59 L1A_obj.apply_nodata_mask_to_ObjAttr('arr') # nodata mask is automatically calculated
60 L1A_obj.add_rasterInfo_to_MetaObj()
61 L1A_obj.reference_data('UTM')
62 L1A_obj.calc_TOARadRefTemp()
63 L1A_obj.calc_corner_positions() # requires mask_nodata
64 L1A_obj.calc_center_AcqTime() # (if neccessary); requires corner positions
65 L1A_obj.calc_mean_VAA()
66 L1A_obj.calc_orbit_overpassParams() # requires corner positions
67 L1A_obj.apply_nodata_mask_to_ObjAttr('mask_clouds', 0)
69 if CFG.exec_L1AP[1]:
70 L1A_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
71 L1A_obj.record_mem_usage()
72 L1A_obj.delete_tempFiles()
74 return L1A_obj
77@EXC_H.log_uncaught_exceptions
78@update_proc_status
79def L1A_map_1(dataset_dict, block_size=None): # map (scene-wise parallelization)
80 # type: (dict, tuple) -> List[L1A_P.L1A_object]
82 L1A_obj = L1A_P.L1A_object(**dataset_dict)
83 L1A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
84 L1A_obj.import_rasterdata()
85 L1A_obj.import_metadata()
86 L1A_obj.validate_GeoTransProj_GeoAlign() # sets self.GeoTransProj_ok and self.GeoAlign_ok
87 L1A_obj.apply_nodata_mask_to_ObjAttr('arr') # nodata mask is automatically calculated
88 L1A_obj.add_rasterInfo_to_MetaObj()
89 L1A_obj.reference_data('UTM')
90 tiles = [tile for tile in
91 # cut (block-wise parallelization)
92 L1A_obj.to_tiles(blocksize=block_size if block_size else CFG.tiling_block_size_XY)
93 if tile is not None] # None is returned in case the tile contains no data at all
94 return tiles
97@EXC_H.log_uncaught_exceptions
98@update_proc_status
99def L1A_map_2(L1A_tile): # map (block-wise parallelization)
100 # type: (L1A_P.L1A_object) -> L1A_P.L1A_object
101 L1A_tile.calc_TOARadRefTemp()
102 if not CFG.inmem_serialization:
103 L1A_tile.to_ENVI(CFG.write_ENVIclassif_cloudmask, is_tempfile=True)
104 return L1A_tile
107@EXC_H.log_uncaught_exceptions
108@update_proc_status
109def L1A_map_3(L1A_obj): # map (scene-wise parallelization)
110 # type: (L1A_P.L1A_object) -> L1A_P.L1A_object
111 L1A_obj.calc_corner_positions() # requires mask_nodata
112 L1A_obj.calc_center_AcqTime() # (if neccessary); requires corner positions
113 L1A_obj.calc_mean_VAA()
114 L1A_obj.calc_orbit_overpassParams() # requires corner positions
115 L1A_obj.apply_nodata_mask_to_ObjAttr('mask_clouds', 0)
116 if CFG.exec_L1AP[1]:
117 L1A_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
118 L1A_obj.delete_tempFiles()
119 else:
120 L1A_obj.delete_tempFiles()
121 L1A_obj.record_mem_usage()
122 return L1A_obj
125@EXC_H.log_uncaught_exceptions
126@update_proc_status
127def L1B_map(L1A_obj):
128 # type: (L1A_P.L1A_object) -> L1B_P.L1B_object
129 """L1A_obj enthält in Python- (im Gegensatz zur inmem_serialization-) Implementierung KEINE ARRAY-DATEN!,
130 nur die für die ganze Szene gültigen Metadaten"""
132 L1A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
134 L1B_obj = L1B_P.L1B_object(L1A_obj)
135 L1B_obj.compute_global_shifts()
137 if CFG.exec_L1BP[1]:
138 L1B_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
139 L1B_obj.record_mem_usage()
140 L1B_obj.delete_tempFiles()
141 return L1B_obj
144@EXC_H.log_uncaught_exceptions
145@update_proc_status
146def L1C_map(L1B_objs):
147 # type: (Iterable[L1B_P.L1B_object]) -> List[L1C_P.L1C_object]
148 """Atmospheric correction.
150 NOTE: all subsystems (containing all spatial samplings) belonging to the same scene ID are needed
152 :param L1B_objs: list containing one or multiple L1B objects belonging to the same scene ID.
153 """
154 list(L1B_objs)[0].block_at_system_overload(max_usage=CFG.critical_mem_usage)
156 # initialize L1C objects
157 L1C_objs = [L1C_P.L1C_object(L1B_obj) for L1B_obj in L1B_objs]
159 # check in config if atmospheric correction is desired
160 if CFG.target_radunit_optical == 'BOA_Ref':
161 # atmospheric correction (asserts that there is an ac_options.json file on disk for the current sensor)
162 if L1C_objs[0].ac_options:
163 # perform atmospheric correction
164 L1C_objs = L1C_P.AtmCorr(*L1C_objs).run_atmospheric_correction(dump_ac_input=False)
165 else:
166 [L1C_obj.logger.warning('Atmospheric correction is not yet supported for %s %s and has been skipped.'
167 % (L1C_obj.satellite, L1C_obj.sensor)) for L1C_obj in L1C_objs]
168 else:
169 [L1C_obj.logger.warning('Atmospheric correction skipped because optical conversion type is set to %s.'
170 % CFG.target_radunit_optical) for L1C_obj in L1C_objs]
172 # write outputs and delete temporary data
173 for L1C_obj in L1C_objs:
174 if CFG.exec_L1CP[1]:
175 L1C_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
176 if L1C_obj.arr_shape == 'cube':
177 L1C_obj.delete_tempFiles()
178 L1C_obj.delete_ac_input_arrays()
180 [L1C_obj.record_mem_usage() for L1C_obj in L1C_objs]
181 return L1C_objs
184@EXC_H.log_uncaught_exceptions
185@update_proc_status
186def L2A_map(L1C_objs, block_size=None, return_tiles=True):
187 # type: (Union[List[L1C_P.L1C_object], Tuple[L1C_P.L1C_object]], tuple, bool) -> Union[List[L2A_P.L2A_object], L2A_P.L2A_object] # noqa
188 """Geometric homogenization.
190 Performs correction of geometric displacements, resampling to target grid of the usecase and merges multiple
191 GMS objects belonging to the same scene ID (subsystems) to a single L2A object.
192 NOTE: Output L2A_object must be cut into tiles because L2A_obj size in memory exceeds maximum serialization size.
194 :param L1C_objs: list containing one or multiple L1C objects belonging to the same scene ID.
195 :param block_size: X/Y size of output tiles in pixels, e.g. (1024,1024)
196 :param return_tiles: return computed L2A object in tiles
197 :return: list of L2A_object tiles
198 """
199 L1C_objs[0].block_at_system_overload(max_usage=CFG.critical_mem_usage)
201 # initialize L2A objects
202 L2A_objs = [L2A_P.L2A_object(L1C_obj) for L1C_obj in L1C_objs]
204 # get common extent (NOTE: using lon/lat coordinates here would cause black borders around the UTM image
205 # because get_common_extent() just uses the envelop without regard to the projection
206 clip_extent = \
207 get_common_extent([obj.trueDataCornerUTM for obj in L2A_objs]) \
208 if len(L2A_objs) > 1 else L2A_objs[0].trueDataCornerUTM
210 # correct geometric displacements and resample to target grid
211 for L2A_obj in L2A_objs:
212 L2A_obj.correct_spatial_shifts(cliptoextent=CFG.clip_to_extent,
213 clipextent=clip_extent, clipextent_prj=L2A_obj.arr.prj)
215 # merge multiple subsystems belonging to the same scene ID to a single GMS object
216 if len(L2A_objs) > 1:
217 L2A_obj = L2A_P.L2A_object.from_sensor_subsystems(L2A_objs)
218 else:
219 L2A_obj = L2A_objs[0]
221 # update attributes
222 L2A_obj.calc_mask_nodata(overwrite=True)
223 L2A_obj.calc_corner_positions()
225 # write output
226 if CFG.exec_L2AP[1]:
227 L2A_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
229 # delete tempfiles of separate subsystem GMS objects
230 [L2A_obj.delete_tempFiles() for L2A_obj in L2A_objs]
232 if return_tiles:
233 L2A_tiles = list(L2A_obj.to_tiles(blocksize=block_size if block_size else CFG.tiling_block_size_XY))
234 L2A_tiles = [i for i in L2A_tiles if i is not None] # None is returned in case the tile contains no data at all
235 [L2A_tile.record_mem_usage() for L2A_tile in L2A_tiles]
236 return L2A_tiles
237 else:
238 L2A_obj.record_mem_usage()
239 return L2A_obj
242@EXC_H.log_uncaught_exceptions
243@update_proc_status
244def L2B_map(L2A_obj):
245 # type: (L2A_P.L2A_object) -> L2B_P.L2B_object
246 L2A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
247 L2B_obj = L2B_P.L2B_object(L2A_obj)
248 L2B_obj.spectral_homogenization()
249 if CFG.exec_L2BP[1]:
250 L2B_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
251 if L2B_obj.arr_shape == 'cube':
252 L2B_obj.delete_tempFiles()
253 L2B_obj.record_mem_usage()
254 return L2B_obj
257@EXC_H.log_uncaught_exceptions
258@update_proc_status
259def L2C_map(L2B_obj):
260 # type: (L2B_P.L2B_object) -> L2C_P.L2C_object
261 L2B_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
262 L2C_obj = L2C_P.L2C_object(L2B_obj)
263 if CFG.exec_L2CP[1]:
264 L2C_MRGS_tiles = L2C_obj.to_MGRS_tiles(pixbuffer=CFG.mgrs_pixel_buffer)
265 [MGRS_tile.to_ENVI(CFG.write_ENVIclassif_cloudmask,
266 compression=CFG.output_data_compression) for MGRS_tile in L2C_MRGS_tiles]
267 L2C_obj.record_mem_usage()
268 L2C_obj.delete_tempFiles()
269 return L2C_obj # contains no array data in Python mode
272@return_proc_reports_only
273# @return_GMS_objs_without_arrays
274def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise parallelization)
275 # type: (List[dict]) -> Union[L1A_P.GMS_object, List, Generator]
276 """
278 NOTE: Exceptions in this function are must be catched by calling function (process controller).
280 :param list_dataset_dicts_per_scene:
281 :return:
282 """
283 with GMS_logger('log__%s' % CFG.ID, fmt_suffix=list_dataset_dicts_per_scene[0]['scene_ID'],
284 log_level=CFG.log_level, append=True) as pipeline_logger:
286 # set CPU and memory limits
287 cpulimit = CFG.CPUs_all_jobs
288 mem2reserve = 15
290 if redis_conn:
291 mem_estim = estimate_mem_usage(list_dataset_dicts_per_scene[0]['dataset_ID'],
292 list_dataset_dicts_per_scene[0]['satellite'])
293 if mem_estim:
294 mem2reserve = mem_estim
295 else:
296 cpulimit = int((virtual_memory().total * .8 - virtual_memory().used) / 1024 ** 3 / mem2reserve)
297 pipeline_logger.info('No memory usage statistics from earlier jobs found for the current '
298 'configuration. Limiting processes to %s in order to collect memory statistics '
299 'first.' % cpulimit)
301 # start processing
302 with MemoryReserver(mem2lock_gb=mem2reserve, logger=pipeline_logger, max_usage=CFG.max_mem_usage),\
303 ProcessLock(allowed_slots=cpulimit, logger=pipeline_logger):
305 if len(list(set([ds['proc_level'] for ds in list_dataset_dicts_per_scene]))) != 1:
306 raise ValueError('Lists of subsystem datasets with different processing levels are not supported here. '
307 'Received %s.' % list_dataset_dicts_per_scene)
309 input_proc_level = list_dataset_dicts_per_scene[0]['proc_level']
311 ##################
312 # L1A processing #
313 ##################
315 L1A_objects = []
316 if CFG.exec_L1AP[0] and input_proc_level is None:
317 L1A_objects = \
318 [L1A_map(subsystem_dataset_dict) for subsystem_dataset_dict in list_dataset_dicts_per_scene]
320 if any([isinstance(obj, failed_GMS_object) for obj in L1A_objects]):
321 return L1A_objects
323 ##################
324 # L1B processing #
325 ##################
327 # select subsystem with optimal band for co-registration
328 # L1B_obj_coreg = L1B_map(L1A_objects[0])
329 # copy coreg information to remaining subsets
331 L1B_objects = L1A_objects
332 if CFG.exec_L1BP[0]:
333 # add earlier processed L1A data
334 if input_proc_level == 'L1A':
335 for ds in list_dataset_dicts_per_scene:
336 GMSfile = path_generator(ds, proc_level='L1A').get_path_gmsfile()
337 L1A_objects.append(L1A_P.L1A_object.from_disk([GMSfile, ['cube', None]]))
339 L1B_objects = [L1B_map(L1A_obj) for L1A_obj in L1A_objects]
341 del L1A_objects
343 if any([isinstance(obj, failed_GMS_object) for obj in L1B_objects]):
344 return L1B_objects
346 ##################
347 # L1C processing #
348 ##################
350 L1C_objects = L1B_objects
351 if CFG.exec_L1CP[0]:
352 # add earlier processed L1B data
353 if input_proc_level == 'L1B':
354 for ds in list_dataset_dicts_per_scene:
355 GMSfile = path_generator(ds, proc_level='L1B').get_path_gmsfile()
356 L1B_objects.append(L1B_P.L1B_object.from_disk([GMSfile, ['cube', None]]))
358 L1C_objects = L1C_map(L1B_objects)
360 del L1B_objects
362 if any([isinstance(obj, failed_GMS_object) for obj in L1C_objects]):
363 return L1C_objects
365 if not CFG.exec_L2AP[0]:
366 return L1C_objects
368 ##################
369 # L2A processing #
370 ##################
372 # add earlier processed L1C data
373 if input_proc_level == 'L1C':
374 for ds in list_dataset_dicts_per_scene:
375 GMSfile = path_generator(ds, proc_level='L1C').get_path_gmsfile()
376 L1C_objects.append(L1C_P.L1C_object.from_disk([GMSfile, ['cube', None]]))
378 L2A_obj = L2A_map(L1C_objects, return_tiles=False)
380 del L1C_objects
382 if isinstance(L2A_obj, failed_GMS_object) or not CFG.exec_L2BP[0]:
383 return L2A_obj
385 ##################
386 # L2B processing #
387 ##################
389 # add earlier processed L2A data
390 if input_proc_level == 'L2A':
391 assert len(list_dataset_dicts_per_scene) == 1, \
392 'Expected only a single L2A dataset since subsystems are merged.'
393 GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2A').get_path_gmsfile()
394 L2A_obj = L2A_P.L2A_object.from_disk([GMSfile, ['cube', None]])
396 L2B_obj = L2B_map(L2A_obj)
398 del L2A_obj
400 if isinstance(L2B_obj, failed_GMS_object) or not CFG.exec_L2CP[0]:
401 return L2B_obj
403 ##################
404 # L2C processing #
405 ##################
407 # add earlier processed L2B data
408 if input_proc_level == 'L2B':
409 assert len(list_dataset_dicts_per_scene) == 1, \
410 'Expected only a single L2B dataset since subsystems are merged.'
411 GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2B').get_path_gmsfile()
412 L2B_obj = L2B_P.L2B_object.from_disk([GMSfile, ['cube', None]])
414 L2C_obj = L2C_map(L2B_obj) # type: Union[GMS_object, failed_GMS_object, List]
416 del L2B_obj
418 return L2C_obj