Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2 

3# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data 

4# 

5# Copyright (C) 2020 Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de) 

6# 

7# This software was developed within the context of the GeoMultiSens project funded 

8# by the German Federal Ministry of Education and Research 

9# (project grant code: 01 IS 14 010 A-C). 

10# 

11# This program is free software: you can redistribute it and/or modify it under 

12# the terms of the GNU General Public License as published by the Free Software 

13# Foundation, either version 3 of the License, or (at your option) any later version. 

14# Please note the following exception: `gms_preprocessing` depends on tqdm, which 

15# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files 

16# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore". 

17# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE. 

18# 

19# This program is distributed in the hope that it will be useful, but WITHOUT 

20# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

21# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

22# details. 

23# 

24# You should have received a copy of the GNU Lesser General Public License along 

25# with this program. If not, see <http://www.gnu.org/licenses/>. 

26 

27import collections 

28import glob 

29import os 

30import re 

31import tempfile 

32import warnings 

33import uuid 

34from logging import Logger 

35from typing import Union, TYPE_CHECKING # noqa F401 # flake8 issue 

36 

37from ..options.config import GMS_config as CFG 

38from .definition_dicts import get_GMS_sensorcode 

39 

40if TYPE_CHECKING: 

41 from ..model.gms_object import GMS_identifier # noqa F401 # flake8 issue 

42 

43# get_scene_and_dataset_infos_from_postgreSQLdb # inline in order to avoid circular dependencies 

44 

45__author__ = 'Daniel Scheffler' 

46 

47 

48class path_generator(object): 

49 """Methods return absolute paths corresponding to the input object. 

50 To be instanced with the dict of a L1A/L1B/... object or a list with the attributes below. 

51 If 'scene_ID' (integer) is passed to kwargs, all eventually given args are ignored. 

52 Instead they are retrieved from postgreSQLdb.""" 

53 

54 def __init__(self, *args, **kwargs): 

55 if 'scene_ID' in kwargs: 

56 from .database_tools import get_scene_and_dataset_infos_from_postgreSQLdb 

57 args = [get_scene_and_dataset_infos_from_postgreSQLdb(kwargs['scene_ID'])] # return [dict] 

58 assert len(args) in [1, 8, 9], "Received invalid length of 'args' argument." 

59 

60 isdict = True if len(args) == 1 and type(args[0] in [dict, collections.OrderedDict]) else False 

61 if not isdict and len(args) == 8: 

62 args += (None,) # set logger to None if not given in tuple 

63 elif isdict and 'logger' not in args[0]: 

64 args[0].update({'logger': None}) # set logger to None if not given in dict 

65 

66 argsdict = args[0] if isdict else dict(zip(['proc_level', 'image_type', 'satellite', 'sensor', 'subsystem', 

67 'acq_datetime', 'entity_ID', 'filename', 'logger'], list(args))) 

68 self.proc_level = kwargs.get('proc_level', argsdict['proc_level']) 

69 self.image_type = argsdict['image_type'] 

70 self.satellite = argsdict['satellite'] 

71 # sensor: database distinguishes SLC_OFF and SLC_ON but file structure not 

72 self.sensor = argsdict['sensor'] if not argsdict['sensor'] in ['ETM+_SLC_OFF', 'ETM+_SLC_ON'] else 'ETM+' 

73 self.subsystem = argsdict['subsystem'] 

74 self.AcqDate = argsdict['acq_datetime'] 

75 self.entity_ID = argsdict['entity_ID'] 

76 self.filename = argsdict['filename'] 

77 self.logger = argsdict['logger'] 

78 self.MGRS_info = kwargs.get('MGRS_info', None) 

79 

80 def __getstate__(self): 

81 """Defines how the attributes of path_generator are pickled.""" 

82 

83 if self.logger not in [None, 'not set']: 

84 self.logger.close() 

85 self.logger = None 

86 return self.__dict__ 

87 

88 def __setstate__(self, ObjDict): 

89 """Defines how the attributes of GMS object are unpickled.""" 

90 

91 self.__dict__ = ObjDict 

92 

93 def get_path_rawdata(self): 

94 """Returns the folder of all downloaded data for the current scene.""" 

95 if self.sensor and re.search(self.sensor, 'SRTM', re.I): 

96 return os.path.join(CFG.path_archive, self.satellite, self.sensor, 

97 self.subsystem) # FIXME downloader should store data into sensor folder 

98 else: 

99 return os.path.join(CFG.path_archive, self.satellite, self.sensor) 

100 

101 def get_path_procdata(self): 

102 """Returns the target folder of all processed data for the current scene.""" 

103 pOrd = (CFG.path_procdata_MGRS, 'virtual_sensor_id_%s' % CFG.virtual_sensor_id, 

104 self.MGRS_info['grid1mil'], self.MGRS_info['grid100k'], self.entity_ID) if self.MGRS_info else \ 

105 (CFG.path_procdata_scenes, self.satellite, self.sensor, self.entity_ID) 

106 return os.path.join(*pOrd) 

107 

108 def get_baseN(self, merged_subsystems=False): 

109 """Returns the basename belonging to the given scene. 

110 

111 :param merged_subsystems: if True, a subsystem is not included in the returned basename 

112 (usefor for merged subsystems in L2A+) 

113 """ 

114 if self.subsystem and not merged_subsystems: 

115 items2include = (self.satellite, self.sensor, self.subsystem, self.entity_ID) 

116 else: 

117 items2include = (self.satellite, self.sensor, self.entity_ID) 

118 

119 if self.MGRS_info: 

120 items2include += (self.MGRS_info['tile_ID'],) 

121 return '__'.join(list(items2include)) 

122 

123 def get_path_logfile(self, merged_subsystems=False): 

124 """Returns the path of the logfile belonging to the given scene, e.g. '/path/to/file/file.log'. 

125 

126 :param merged_subsystems: if True, a subsystem is not included in the returned logfile path 

127 (usefor for merged subsystems in L2A+) 

128 """ 

129 return os.path.join(self.get_path_procdata(), self.get_baseN(merged_subsystems=merged_subsystems) + '.log') 

130 

131 def get_local_archive_path_baseN(self): 

132 """Returns the path of the downloaded raw data archive, e.g. '/path/to/file/file.tar.gz'.""" 

133 

134 folder_rawdata = self.get_path_rawdata() 

135 self.filename = self.filename if self.filename else self.entity_ID 

136 if os.path.exists(os.path.join(folder_rawdata, self.filename)): 

137 outP = os.path.join(folder_rawdata, self.filename) 

138 else: 

139 extensions_found = [ext for ext in ['.tar.gz', '.zip', '.hdf'] 

140 if os.path.exists(os.path.join(folder_rawdata, '%s%s' % (self.filename, ext)))] 

141 if extensions_found: 

142 assert len(extensions_found) > 0, \ 

143 'The dataset %s.* cannot be found at %s' % (self.filename, folder_rawdata) 

144 assert len(extensions_found) == 1, "The folder %s contains multiple files identified as raw data " \ 

145 "to be processed. Choosing first one.." % folder_rawdata 

146 outP = os.path.join(folder_rawdata, '%s%s' % (self.filename, extensions_found[0])) 

147 else: 

148 if self.filename.endswith('.SAFE') and \ 

149 os.path.exists(os.path.join(folder_rawdata, os.path.splitext(self.filename)[0]) + '.zip'): 

150 outP = os.path.join(folder_rawdata, 

151 os.path.splitext(self.filename)[0]) + '.zip' # FIXME Bug in Datenbank 

152 else: 

153 raise FileNotFoundError('The dataset %s.* cannot be found at %s' 

154 % (self.filename, folder_rawdata)) # TODO DOWNLOAD COMMAND 

155 return outP 

156 

157 def get_path_gmsfile(self): 

158 """Returns the path of the .gms file belonging to the given processing level, e.g. '/path/to/file/file.gms'.""" 

159 return os.path.join(self.get_path_procdata(), '%s_%s.gms' % (self.get_baseN(), self.proc_level)) 

160 

161 def get_path_imagedata(self): 

162 """Returns the path of the .bsq file belonging to the given processing level, e.g. '/path/to/file/file.bsq'.""" 

163 return os.path.join(self.get_path_procdata(), '%s_image_data_%s.bsq' % (self.get_baseN(), self.proc_level)) 

164 

165 def get_path_maskdata(self): 

166 """Returns the path of the *_masks_*.bsq file belonging to the given processing level, 

167 e.g. '/path/to/file/file_masks_L1A.bsq'.""" 

168 return os.path.join(self.get_path_procdata(), '%s_masks_%s.bsq' % (self.get_baseN(), self.proc_level)) 

169 

170 def get_path_cloudmaskdata(self): 

171 """Returns the path of the *_mask_clouds_*.bsq file belonging to the given processing level, 

172 e.g. '/path/to/file/file_mask_clouds_L1A.bsq'.""" 

173 return os.path.join(self.get_path_procdata(), '%s_mask_clouds_%s.bsq' % (self.get_baseN(), self.proc_level)) 

174 

175 def get_path_accuracylayers(self): 

176 """Returns the path of the *_accuracy_layers_*.bsq file, e.g., '/path/to/file/file_accuracy_layers_L2C.bsq'. 

177 

178 NOTE: Accuracy layers are only present in L2C. 

179 """ 

180 if self.proc_level == 'L2C': 

181 return os.path.join(self.get_path_procdata(), '%s_accuracy_layers_%s.bsq' 

182 % (self.get_baseN(), self.proc_level)) 

183 

184 def get_path_tempdir(self): 

185 path_archive = self.get_local_archive_path_baseN() 

186 RootName = os.path.splitext(os.path.basename(path_archive))[0] 

187 RootName = os.path.splitext(RootName)[0] if os.path.splitext(RootName)[1] else RootName 

188 RootName += '__' + uuid.uuid4().hex # add a hex code in order to get uniqueness 

189 return os.path.join(CFG.path_tempdir, RootName, self.sensor, self.subsystem) \ 

190 if self.subsystem else os.path.join(CFG.path_tempdir, RootName, self.sensor) 

191 

192 def get_outPath_hdr(self, attrName2write): 

193 # type: (str) -> str 

194 """Returns the output path for the given attribute to be written. 

195 :param attrName2write: <str> name of the GMS object attribute to be written""" 

196 outNameSuffix = 'image_data' if attrName2write == 'arr' else attrName2write 

197 outNameHdr = '%s_%s_%s.hdr' % (self.get_baseN(), outNameSuffix, self.proc_level) if outNameSuffix else \ 

198 '%s_%s.hdr' % (self.get_baseN(), self.proc_level) 

199 return os.path.join(self.get_path_procdata(), outNameHdr) 

200 

201 def get_path_ac_input_dump(self): 

202 """Returns the path of the .dill for a dump of atmospheric correction inputs, e.g. '/path/to/file/file.dill'.""" 

203 return os.path.join(self.get_path_procdata(), '%s_ac_input_%s.dill' % (self.get_baseN(), self.proc_level)) 

204 

205 def get_pathes_all_procdata(self): # TODO 

206 image = self.get_path_imagedata() 

207 mask = self.get_path_maskdata() 

208 mask_clouds = self.get_path_cloudmaskdata() 

209 accuracylayers = self.get_path_accuracylayers() 

210 gms_file = self.get_path_gmsfile() 

211 log_file = self.get_path_logfile() 

212 

213 all_pathes = [image, mask, mask_clouds, accuracylayers, gms_file, log_file] 

214 

215 warnings.warn( 

216 'get_pathes_all_procdata() is not yet completely implemented and will not return complete path list!') 

217 return all_pathes 

218 

219 

220def get_tempfile(ext=None, prefix=None, tgt_dir=None): 

221 """Returns the path to a tempfile.mkstemp() file that can be passed to any function that expects a physical path. 

222 The tempfile has to be deleted manually. 

223 :param ext: file extension (None if None) 

224 :param prefix: optional file prefix 

225 :param tgt_dir: target directory (automatically set if None) 

226 """ 

227 if tgt_dir is None: 

228 tgt_dir = CFG.path_tempdir 

229 prefix = 'GeoMultiSens__' if prefix is None else prefix 

230 fd, path = tempfile.mkstemp(prefix=prefix, suffix=ext, dir=tgt_dir) 

231 os.close(fd) 

232 return path 

233 

234 

235def get_path_cloud_class_obj(GMS_id, get_all=False): 

236 """Returns the absolute path of the the training data used by cloud classifier. 

237 :param GMS_id: 

238 :param get_all: 

239 """ 

240 

241 GMS_sensorcode = get_GMS_sensorcode(GMS_id) 

242 satellite, sensor, logger = GMS_id.satellite, GMS_id.sensor, GMS_id.logger 

243 path_cloud_classifier_objects = CFG.path_cloud_classif 

244 

245 obj_name_dic = { 

246 'AVNIR-2': None, 

247 'TM4': None, 

248 'TM5': None, 

249 'TM7': None, 

250 'LDCM': None, 

251 'SPOT1a': None, 

252 'SPOT1b': None, 

253 'SPOT2a': None, 

254 'SPOT2b': None, 

255 'SPOT3a': None, 

256 'SPOT3b': None, 

257 'SPOT4a': None, 

258 'SPOT4b': None, 

259 'SPOT5a': None, 

260 'SPOT5b': None, 

261 'RE5': None, 

262 'AST_V1': None, 

263 'AST_V2': None, 

264 'AST_S': None, 

265 'AST_T': None, 

266 'S2A10': 'S2__cld_mask_20160321_s2_8l0.200_11l0.060_12l0.040_v20161124_16:41:04.h5', 

267 'S2A20': 'S2__cld_mask_20160321_s2_8l0.200_11l0.060_12l0.040_v20161124_16:41:04.h5', 

268 'S2A60': 'S2__cld_mask_20160321_s2_8l0.200_11l0.060_12l0.040_v20161124_16:41:04.h5', 

269 'S2B10': 'S2__cld_mask_20160321_s2_8l0.200_11l0.060_12l0.040_v20161124_16:41:04.h5', 

270 'S2B20': 'S2__cld_mask_20160321_s2_8l0.200_11l0.060_12l0.040_v20161124_16:41:04.h5', 

271 'S2B60': 'S2__cld_mask_20160321_s2_8l0.200_11l0.060_12l0.040_v20161124_16:41:04.h5'} 

272 

273 if get_all: # returns a list 

274 listClf = glob.glob(os.path.join(path_cloud_classifier_objects, '*.dill')) 

275 classifier_names = listClf if listClf != [] else None 

276 classifier_path = [os.path.join(path_cloud_classifier_objects, str(i)) for i in classifier_names] 

277 if not os.path.isfile(os.path.join(path_cloud_classifier_objects, classifier_path)[0]): 

278 logger.warning('Cloud masking is not yet implemented for %s %s.' % (satellite, sensor)) 

279 classifier_path = None 

280 else: 

281 try: 

282 classif_objName = obj_name_dic[GMS_sensorcode] 

283 if classif_objName: 

284 classifier_path = os.path.join(path_cloud_classifier_objects, classif_objName) 

285 if not os.path.isfile(os.path.join(path_cloud_classifier_objects, classifier_path)): 

286 warnings.warn("Path generator expects a specific cloud mask object (%s) at %s but it does not " 

287 "exist. Are you sure that 'path_cloud_classif' has been correctly set in the config " 

288 "table of postgreSQL database and that the file is included in the repository? By " 

289 "default the classifier object should be available at " 

290 "<GMS root dir>/database/cloud_classifier/" 

291 % (classif_objName, path_cloud_classifier_objects)) 

292 logger.warning( 

293 'Cloud masking not possible for %s %s due to environment error.' # TODO move to environment 

294 % (satellite, sensor)) 

295 classifier_path = None 

296 else: 

297 logger.warning('Cloud masking is not yet implemented for %s %s.' % (satellite, sensor)) 

298 classifier_path = None 

299 

300 except KeyError: 

301 logger.warning("Sensorcode '%s' is not included in sensorcode dictionary and can not be converted into GMS " 

302 "sensorcode." % GMS_sensorcode) 

303 classifier_path = None 

304 return classifier_path 

305 

306 

307def get_path_snr_model(GMS_id): 

308 # type: (GMS_identifier) -> str 

309 """Returns the absolute path of the SNR model for the given sensor. 

310 

311 :param GMS_id: 

312 """ 

313 

314 satellite, sensor = (GMS_id.satellite, GMS_id.sensor) 

315 satellite = 'RapidEye' if re.match(r'RapidEye', satellite, re.I) else satellite 

316 sensor = sensor[:-1] if re.match(r'SPOT', satellite, re.I) and sensor[-1] not in ['1', '2'] else sensor 

317 return os.path.join(CFG.path_SNR_models, satellite, sensor, 'SNR_model.csv') 

318 

319 

320def get_path_ac_options(GMS_id): 

321 # type: (GMS_identifier)->Union[str, None] 

322 """Returns the path of the options json file needed for atmospheric correction. 

323 """ 

324 

325 GMSid_ac = GMS_id 

326 GMSid_ac.subsystem = '' 

327 sensorcode = get_GMS_sensorcode(GMSid_ac) 

328 

329 ac_options_file_dic = { 

330 'AVNIR-2': None, 

331 'TM4': 'l8_options.json', 

332 'TM5': 'l8_options.json', 

333 'TM7': 'l8_options.json', # AC uses Landsat-8 options for L7 but reads only a subset of the options 

334 'LDCM': 'l8_options.json', 

335 'SPOT1a': None, 

336 'SPOT1b': None, 

337 'SPOT2a': None, 

338 'SPOT2b': None, 

339 'SPOT3a': None, 

340 'SPOT3b': None, 

341 'SPOT4a': None, 

342 'SPOT4b': None, 

343 'SPOT5a': None, 

344 'SPOT5b': None, 

345 'RE5': None, 

346 'AST_full': None, 

347 'S2A_full': 's2_options.json', 

348 'S2B_full': 's2_options.json', 

349 } 

350 

351 try: 

352 fName_optFile = ac_options_file_dic[get_GMS_sensorcode(GMS_id)] 

353 except KeyError: 

354 GMS_id.logger.warning( 

355 "Sensorcode '%s' is not included in ac_options dictionary. " 

356 "Thus atmospheric correction is not available for the current scene." % sensorcode) 

357 fName_optFile = None 

358 

359 if fName_optFile: 

360 from sicor import options 

361 path_ac = os.path.join(os.path.dirname(options.__file__), fName_optFile) 

362 

363 # validate 

364 logger = GMS_id.logger or Logger(__name__) 

365 if not os.path.exists(path_ac): 

366 logger.warning('Could not locate options file for atmospheric correction at %s.' % path_ac) 

367 

368 return path_ac 

369 else: 

370 return None