Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2 

3# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data 

4# 

5# Copyright (C) 2020 Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de) 

6# 

7# This software was developed within the context of the GeoMultiSens project funded 

8# by the German Federal Ministry of Education and Research 

9# (project grant code: 01 IS 14 010 A-C). 

10# 

11# This program is free software: you can redistribute it and/or modify it under 

12# the terms of the GNU General Public License as published by the Free Software 

13# Foundation, either version 3 of the License, or (at your option) any later version. 

14# Please note the following exception: `gms_preprocessing` depends on tqdm, which 

15# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files 

16# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore". 

17# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE. 

18# 

19# This program is distributed in the hope that it will be useful, but WITHOUT 

20# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

21# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

22# details. 

23# 

24# You should have received a copy of the GNU Lesser General Public License along 

25# with this program. If not, see <http://www.gnu.org/licenses/>. 

26 

27from typing import List, Tuple, Generator, Iterable, Union # noqa F401 # flake8 issue 

28from psutil import virtual_memory 

29 

30from ..options.config import GMS_config as CFG 

31from ..misc import exception_handler as EXC_H 

32from ..misc.path_generator import path_generator 

33from ..misc.logging import GMS_logger 

34from ..misc.locks import ProcessLock, MemoryReserver, redis_conn 

35from ..algorithms import L1A_P 

36from ..algorithms import L1B_P 

37from ..algorithms import L1C_P 

38from ..algorithms import L2A_P 

39from ..algorithms import L2B_P 

40from ..algorithms import L2C_P 

41from ..model.gms_object import \ 

42 failed_GMS_object, update_proc_status, return_proc_reports_only, estimate_mem_usage 

43from ..model.gms_object import GMS_object # noqa F401 # flake8 issue 

44from ..algorithms.geoprocessing import get_common_extent 

45 

46__author__ = 'Daniel Scheffler' 

47 

48 

49@EXC_H.log_uncaught_exceptions 

50@update_proc_status 

51def L1A_map(dataset_dict): # map (scene-wise parallelization) 

52 # type: (dict) -> L1A_P.L1A_object 

53 

54 L1A_obj = L1A_P.L1A_object(**dataset_dict) 

55 L1A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage) 

56 L1A_obj.import_rasterdata() 

57 L1A_obj.import_metadata() 

58 L1A_obj.validate_GeoTransProj_GeoAlign() # sets self.GeoTransProj_ok and self.GeoAlign_ok 

59 L1A_obj.apply_nodata_mask_to_ObjAttr('arr') # nodata mask is automatically calculated 

60 L1A_obj.add_rasterInfo_to_MetaObj() 

61 L1A_obj.reference_data('UTM') 

62 L1A_obj.calc_TOARadRefTemp() 

63 L1A_obj.calc_corner_positions() # requires mask_nodata 

64 L1A_obj.calc_center_AcqTime() # (if neccessary); requires corner positions 

65 L1A_obj.calc_mean_VAA() 

66 L1A_obj.calc_orbit_overpassParams() # requires corner positions 

67 L1A_obj.apply_nodata_mask_to_ObjAttr('mask_clouds', 0) 

68 

69 if CFG.exec_L1AP[1]: 

70 L1A_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask) 

71 L1A_obj.record_mem_usage() 

72 L1A_obj.delete_tempFiles() 

73 

74 return L1A_obj 

75 

76 

77@EXC_H.log_uncaught_exceptions 

78@update_proc_status 

79def L1A_map_1(dataset_dict, block_size=None): # map (scene-wise parallelization) 

80 # type: (dict, tuple) -> List[L1A_P.L1A_object] 

81 

82 L1A_obj = L1A_P.L1A_object(**dataset_dict) 

83 L1A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage) 

84 L1A_obj.import_rasterdata() 

85 L1A_obj.import_metadata() 

86 L1A_obj.validate_GeoTransProj_GeoAlign() # sets self.GeoTransProj_ok and self.GeoAlign_ok 

87 L1A_obj.apply_nodata_mask_to_ObjAttr('arr') # nodata mask is automatically calculated 

88 L1A_obj.add_rasterInfo_to_MetaObj() 

89 L1A_obj.reference_data('UTM') 

90 tiles = [tile for tile in 

91 # cut (block-wise parallelization) 

92 L1A_obj.to_tiles(blocksize=block_size if block_size else CFG.tiling_block_size_XY) 

93 if tile is not None] # None is returned in case the tile contains no data at all 

94 return tiles 

95 

96 

97@EXC_H.log_uncaught_exceptions 

98@update_proc_status 

99def L1A_map_2(L1A_tile): # map (block-wise parallelization) 

100 # type: (L1A_P.L1A_object) -> L1A_P.L1A_object 

101 L1A_tile.calc_TOARadRefTemp() 

102 if not CFG.inmem_serialization: 

103 L1A_tile.to_ENVI(CFG.write_ENVIclassif_cloudmask, is_tempfile=True) 

104 return L1A_tile 

105 

106 

107@EXC_H.log_uncaught_exceptions 

108@update_proc_status 

109def L1A_map_3(L1A_obj): # map (scene-wise parallelization) 

110 # type: (L1A_P.L1A_object) -> L1A_P.L1A_object 

111 L1A_obj.calc_corner_positions() # requires mask_nodata 

112 L1A_obj.calc_center_AcqTime() # (if neccessary); requires corner positions 

113 L1A_obj.calc_mean_VAA() 

114 L1A_obj.calc_orbit_overpassParams() # requires corner positions 

115 L1A_obj.apply_nodata_mask_to_ObjAttr('mask_clouds', 0) 

116 if CFG.exec_L1AP[1]: 

117 L1A_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask) 

118 L1A_obj.delete_tempFiles() 

119 else: 

120 L1A_obj.delete_tempFiles() 

121 L1A_obj.record_mem_usage() 

122 return L1A_obj 

123 

124 

125@EXC_H.log_uncaught_exceptions 

126@update_proc_status 

127def L1B_map(L1A_obj): 

128 # type: (L1A_P.L1A_object) -> L1B_P.L1B_object 

129 """L1A_obj enthält in Python- (im Gegensatz zur inmem_serialization-) Implementierung KEINE ARRAY-DATEN!, 

130 nur die für die ganze Szene gültigen Metadaten""" 

131 

132 L1A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage) 

133 

134 L1B_obj = L1B_P.L1B_object(L1A_obj) 

135 L1B_obj.compute_global_shifts() 

136 

137 if CFG.exec_L1BP[1]: 

138 L1B_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask) 

139 L1B_obj.record_mem_usage() 

140 L1B_obj.delete_tempFiles() 

141 return L1B_obj 

142 

143 

144@EXC_H.log_uncaught_exceptions 

145@update_proc_status 

146def L1C_map(L1B_objs): 

147 # type: (Iterable[L1B_P.L1B_object]) -> List[L1C_P.L1C_object] 

148 """Atmospheric correction. 

149 

150 NOTE: all subsystems (containing all spatial samplings) belonging to the same scene ID are needed 

151 

152 :param L1B_objs: list containing one or multiple L1B objects belonging to the same scene ID. 

153 """ 

154 list(L1B_objs)[0].block_at_system_overload(max_usage=CFG.critical_mem_usage) 

155 

156 # initialize L1C objects 

157 L1C_objs = [L1C_P.L1C_object(L1B_obj) for L1B_obj in L1B_objs] 

158 

159 # check in config if atmospheric correction is desired 

160 if CFG.target_radunit_optical == 'BOA_Ref': 

161 # atmospheric correction (asserts that there is an ac_options.json file on disk for the current sensor) 

162 if L1C_objs[0].ac_options: 

163 # perform atmospheric correction 

164 L1C_objs = L1C_P.AtmCorr(*L1C_objs).run_atmospheric_correction(dump_ac_input=False) 

165 else: 

166 [L1C_obj.logger.warning('Atmospheric correction is not yet supported for %s %s and has been skipped.' 

167 % (L1C_obj.satellite, L1C_obj.sensor)) for L1C_obj in L1C_objs] 

168 else: 

169 [L1C_obj.logger.warning('Atmospheric correction skipped because optical conversion type is set to %s.' 

170 % CFG.target_radunit_optical) for L1C_obj in L1C_objs] 

171 

172 # write outputs and delete temporary data 

173 for L1C_obj in L1C_objs: 

174 if CFG.exec_L1CP[1]: 

175 L1C_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask) 

176 if L1C_obj.arr_shape == 'cube': 

177 L1C_obj.delete_tempFiles() 

178 L1C_obj.delete_ac_input_arrays() 

179 

180 [L1C_obj.record_mem_usage() for L1C_obj in L1C_objs] 

181 return L1C_objs 

182 

183 

184@EXC_H.log_uncaught_exceptions 

185@update_proc_status 

186def L2A_map(L1C_objs, block_size=None, return_tiles=True): 

187 # type: (Union[List[L1C_P.L1C_object], Tuple[L1C_P.L1C_object]], tuple, bool) -> Union[List[L2A_P.L2A_object], L2A_P.L2A_object] # noqa 

188 """Geometric homogenization. 

189 

190 Performs correction of geometric displacements, resampling to target grid of the usecase and merges multiple 

191 GMS objects belonging to the same scene ID (subsystems) to a single L2A object. 

192 NOTE: Output L2A_object must be cut into tiles because L2A_obj size in memory exceeds maximum serialization size. 

193 

194 :param L1C_objs: list containing one or multiple L1C objects belonging to the same scene ID. 

195 :param block_size: X/Y size of output tiles in pixels, e.g. (1024,1024) 

196 :param return_tiles: return computed L2A object in tiles 

197 :return: list of L2A_object tiles 

198 """ 

199 L1C_objs[0].block_at_system_overload(max_usage=CFG.critical_mem_usage) 

200 

201 # initialize L2A objects 

202 L2A_objs = [L2A_P.L2A_object(L1C_obj) for L1C_obj in L1C_objs] 

203 

204 # get common extent (NOTE: using lon/lat coordinates here would cause black borders around the UTM image 

205 # because get_common_extent() just uses the envelop without regard to the projection 

206 clip_extent = \ 

207 get_common_extent([obj.trueDataCornerUTM for obj in L2A_objs]) \ 

208 if len(L2A_objs) > 1 else L2A_objs[0].trueDataCornerUTM 

209 

210 # correct geometric displacements and resample to target grid 

211 for L2A_obj in L2A_objs: 

212 L2A_obj.correct_spatial_shifts(cliptoextent=CFG.clip_to_extent, 

213 clipextent=clip_extent, clipextent_prj=L2A_obj.arr.prj) 

214 

215 # merge multiple subsystems belonging to the same scene ID to a single GMS object 

216 if len(L2A_objs) > 1: 

217 L2A_obj = L2A_P.L2A_object.from_sensor_subsystems(L2A_objs) 

218 else: 

219 L2A_obj = L2A_objs[0] 

220 

221 # update attributes 

222 L2A_obj.calc_mask_nodata(overwrite=True) 

223 L2A_obj.calc_corner_positions() 

224 

225 # write output 

226 if CFG.exec_L2AP[1]: 

227 L2A_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask) 

228 

229 # delete tempfiles of separate subsystem GMS objects 

230 [L2A_obj.delete_tempFiles() for L2A_obj in L2A_objs] 

231 

232 if return_tiles: 

233 L2A_tiles = list(L2A_obj.to_tiles(blocksize=block_size if block_size else CFG.tiling_block_size_XY)) 

234 L2A_tiles = [i for i in L2A_tiles if i is not None] # None is returned in case the tile contains no data at all 

235 [L2A_tile.record_mem_usage() for L2A_tile in L2A_tiles] 

236 return L2A_tiles 

237 else: 

238 L2A_obj.record_mem_usage() 

239 return L2A_obj 

240 

241 

242@EXC_H.log_uncaught_exceptions 

243@update_proc_status 

244def L2B_map(L2A_obj): 

245 # type: (L2A_P.L2A_object) -> L2B_P.L2B_object 

246 L2A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage) 

247 L2B_obj = L2B_P.L2B_object(L2A_obj) 

248 L2B_obj.spectral_homogenization() 

249 if CFG.exec_L2BP[1]: 

250 L2B_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask) 

251 if L2B_obj.arr_shape == 'cube': 

252 L2B_obj.delete_tempFiles() 

253 L2B_obj.record_mem_usage() 

254 return L2B_obj 

255 

256 

257@EXC_H.log_uncaught_exceptions 

258@update_proc_status 

259def L2C_map(L2B_obj): 

260 # type: (L2B_P.L2B_object) -> L2C_P.L2C_object 

261 L2B_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage) 

262 L2C_obj = L2C_P.L2C_object(L2B_obj) 

263 if CFG.exec_L2CP[1]: 

264 L2C_MRGS_tiles = L2C_obj.to_MGRS_tiles(pixbuffer=CFG.mgrs_pixel_buffer) 

265 [MGRS_tile.to_ENVI(CFG.write_ENVIclassif_cloudmask, 

266 compression=CFG.output_data_compression) for MGRS_tile in L2C_MRGS_tiles] 

267 L2C_obj.record_mem_usage() 

268 L2C_obj.delete_tempFiles() 

269 return L2C_obj # contains no array data in Python mode 

270 

271 

272@return_proc_reports_only 

273# @return_GMS_objs_without_arrays 

274def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise parallelization) 

275 # type: (List[dict]) -> Union[L1A_P.GMS_object, List, Generator] 

276 """ 

277 

278 NOTE: Exceptions in this function are must be catched by calling function (process controller). 

279 

280 :param list_dataset_dicts_per_scene: 

281 :return: 

282 """ 

283 with GMS_logger('log__%s' % CFG.ID, fmt_suffix=list_dataset_dicts_per_scene[0]['scene_ID'], 

284 log_level=CFG.log_level, append=True) as pipeline_logger: 

285 

286 # set CPU and memory limits 

287 cpulimit = CFG.CPUs_all_jobs 

288 mem2reserve = 15 

289 

290 if redis_conn: 

291 mem_estim = estimate_mem_usage(list_dataset_dicts_per_scene[0]['dataset_ID'], 

292 list_dataset_dicts_per_scene[0]['satellite']) 

293 if mem_estim: 

294 mem2reserve = mem_estim 

295 else: 

296 cpulimit = int((virtual_memory().total * .8 - virtual_memory().used) / 1024 ** 3 / mem2reserve) 

297 pipeline_logger.info('No memory usage statistics from earlier jobs found for the current ' 

298 'configuration. Limiting processes to %s in order to collect memory statistics ' 

299 'first.' % cpulimit) 

300 

301 # start processing 

302 with MemoryReserver(mem2lock_gb=mem2reserve, logger=pipeline_logger, max_usage=CFG.max_mem_usage),\ 

303 ProcessLock(allowed_slots=cpulimit, logger=pipeline_logger): 

304 

305 if len(list(set([ds['proc_level'] for ds in list_dataset_dicts_per_scene]))) != 1: 

306 raise ValueError('Lists of subsystem datasets with different processing levels are not supported here. ' 

307 'Received %s.' % list_dataset_dicts_per_scene) 

308 

309 input_proc_level = list_dataset_dicts_per_scene[0]['proc_level'] 

310 

311 ################## 

312 # L1A processing # 

313 ################## 

314 

315 L1A_objects = [] 

316 if CFG.exec_L1AP[0] and input_proc_level is None: 

317 L1A_objects = \ 

318 [L1A_map(subsystem_dataset_dict) for subsystem_dataset_dict in list_dataset_dicts_per_scene] 

319 

320 if any([isinstance(obj, failed_GMS_object) for obj in L1A_objects]): 

321 return L1A_objects 

322 

323 ################## 

324 # L1B processing # 

325 ################## 

326 

327 # select subsystem with optimal band for co-registration 

328 # L1B_obj_coreg = L1B_map(L1A_objects[0]) 

329 # copy coreg information to remaining subsets 

330 

331 L1B_objects = L1A_objects 

332 if CFG.exec_L1BP[0]: 

333 # add earlier processed L1A data 

334 if input_proc_level == 'L1A': 

335 for ds in list_dataset_dicts_per_scene: 

336 GMSfile = path_generator(ds, proc_level='L1A').get_path_gmsfile() 

337 L1A_objects.append(L1A_P.L1A_object.from_disk([GMSfile, ['cube', None]])) 

338 

339 L1B_objects = [L1B_map(L1A_obj) for L1A_obj in L1A_objects] 

340 

341 del L1A_objects 

342 

343 if any([isinstance(obj, failed_GMS_object) for obj in L1B_objects]): 

344 return L1B_objects 

345 

346 ################## 

347 # L1C processing # 

348 ################## 

349 

350 L1C_objects = L1B_objects 

351 if CFG.exec_L1CP[0]: 

352 # add earlier processed L1B data 

353 if input_proc_level == 'L1B': 

354 for ds in list_dataset_dicts_per_scene: 

355 GMSfile = path_generator(ds, proc_level='L1B').get_path_gmsfile() 

356 L1B_objects.append(L1B_P.L1B_object.from_disk([GMSfile, ['cube', None]])) 

357 

358 L1C_objects = L1C_map(L1B_objects) 

359 

360 del L1B_objects 

361 

362 if any([isinstance(obj, failed_GMS_object) for obj in L1C_objects]): 

363 return L1C_objects 

364 

365 if not CFG.exec_L2AP[0]: 

366 return L1C_objects 

367 

368 ################## 

369 # L2A processing # 

370 ################## 

371 

372 # add earlier processed L1C data 

373 if input_proc_level == 'L1C': 

374 for ds in list_dataset_dicts_per_scene: 

375 GMSfile = path_generator(ds, proc_level='L1C').get_path_gmsfile() 

376 L1C_objects.append(L1C_P.L1C_object.from_disk([GMSfile, ['cube', None]])) 

377 

378 L2A_obj = L2A_map(L1C_objects, return_tiles=False) 

379 

380 del L1C_objects 

381 

382 if isinstance(L2A_obj, failed_GMS_object) or not CFG.exec_L2BP[0]: 

383 return L2A_obj 

384 

385 ################## 

386 # L2B processing # 

387 ################## 

388 

389 # add earlier processed L2A data 

390 if input_proc_level == 'L2A': 

391 assert len(list_dataset_dicts_per_scene) == 1, \ 

392 'Expected only a single L2A dataset since subsystems are merged.' 

393 GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2A').get_path_gmsfile() 

394 L2A_obj = L2A_P.L2A_object.from_disk([GMSfile, ['cube', None]]) 

395 

396 L2B_obj = L2B_map(L2A_obj) 

397 

398 del L2A_obj 

399 

400 if isinstance(L2B_obj, failed_GMS_object) or not CFG.exec_L2CP[0]: 

401 return L2B_obj 

402 

403 ################## 

404 # L2C processing # 

405 ################## 

406 

407 # add earlier processed L2B data 

408 if input_proc_level == 'L2B': 

409 assert len(list_dataset_dicts_per_scene) == 1, \ 

410 'Expected only a single L2B dataset since subsystems are merged.' 

411 GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2B').get_path_gmsfile() 

412 L2B_obj = L2B_P.L2B_object.from_disk([GMSfile, ['cube', None]]) 

413 

414 L2C_obj = L2C_map(L2B_obj) # type: Union[GMS_object, failed_GMS_object, List] 

415 

416 del L2B_obj 

417 

418 return L2C_obj