Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2 

3# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data 

4# 

5# Copyright (C) 2020 Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de) 

6# 

7# This software was developed within the context of the GeoMultiSens project funded 

8# by the German Federal Ministry of Education and Research 

9# (project grant code: 01 IS 14 010 A-C). 

10# 

11# This program is free software: you can redistribute it and/or modify it under 

12# the terms of the GNU General Public License as published by the Free Software 

13# Foundation, either version 3 of the License, or (at your option) any later version. 

14# Please note the following exception: `gms_preprocessing` depends on tqdm, which 

15# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files 

16# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore". 

17# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE. 

18# 

19# This program is distributed in the hope that it will be useful, but WITHOUT 

20# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

21# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

22# details. 

23# 

24# You should have received a copy of the GNU Lesser General Public License along 

25# with this program. If not, see <http://www.gnu.org/licenses/>. 

26 

27 

28import collections 

29import functools 

30import shutil 

31import sys 

32import traceback 

33import warnings 

34from logging import getLogger 

35from typing import Union, List, TYPE_CHECKING # noqa F401 # flake8 issue 

36 

37from ..options.config import GMS_config as CFG 

38from ..misc import database_tools as DB_T 

39from ..misc.helper_functions import is_proc_level_lower 

40from .definition_dicts import db_jobs_statistics_def, proc_chain 

41 

42if TYPE_CHECKING: 

43 from ..model.gms_object import GMS_object # noqa F401 # flake8 issue 

44 from ..model.gms_object import failed_GMS_object 

45 

46__author__ = 'Daniel Scheffler' 

47 

48 

49def trace_unhandled_exceptions(func): 

50 @functools.wraps(func) 

51 def wrapped_func(*args, **kwargs): 

52 result = None 

53 

54 # noinspection PyBroadException 

55 try: 

56 result = func(*args, **kwargs) 

57 except Exception: 

58 print('Exception in ' + func.__name__) 

59 traceback.print_exc() 

60 return result 

61 

62 return wrapped_func 

63 

64 

65class ExceptionHandler(object): 

66 def __init__(self, logger=None): 

67 self.GMS_objs = None # type: Union[List[GMS_object], GMS_object, collections.OrderedDict, failed_GMS_object] 

68 self.GMS_mapper_name = '' 

69 self.GMS_mapper_failed = False 

70 self._exc_details = None 

71 self._logger = logger 

72 

73 @property 

74 def logger(self): 

75 if not self._logger: 

76 self._logger = getLogger('ExceptionHandler') 

77 self._logger.setLevel(CFG.log_level) 

78 return self._logger 

79 

80 @logger.setter 

81 def logger(self, logger): 

82 self._logger = logger 

83 

84 def __call__(self, GMS_mapper): 

85 self.log_uncaught_exceptions(GMS_mapper) 

86 

87 def log_uncaught_exceptions(self, GMS_mapper): 

88 """Decorator function for handling unexpected exceptions that occurr within GMS mapper functions. Traceback is 

89 sent to logfile of the respective GMS object and the scene ID is added to the 'failed_sceneids' column 

90 within the jobs table of the postgreSQL database. 

91 

92 :param GMS_mapper: A GMS mapper function that takes a GMS object, does some processing and returns it back. 

93 """ 

94 

95 @functools.wraps(GMS_mapper) # needed to avoid pickling errors 

96 def wrapped_GMS_mapper(GMS_objs, **kwargs): 

97 # type: (Union[List[GMS_object], GMS_object, collections.OrderedDict, failed_GMS_object], dict) -> Union[GMS_object, List[GMS_object], failed_GMS_object] # noqa 

98 """ 

99 

100 :param GMS_objs: one OR multiple instances of GMS_object or one instance of failed_object 

101 :param kwargs: 

102 :return: 

103 """ 

104 

105 self.GMS_mapper_name = GMS_mapper.__name__ 

106 self.GMS_objs = GMS_objs 

107 

108 if not GMS_objs: 

109 return GMS_objs 

110 

111 # noinspection PyBroadException 

112 try: 

113 # GMS_mapper inputs CONTAIN NO failed_GMS_objects -> run the mapper normally 

114 if not self.is_failed(self.GMS_objs): 

115 self.update_progress_started() 

116 

117 # run the mapper function and store its results 

118 self.GMS_objs = GMS_mapper(GMS_objs, **kwargs) 

119 

120 self.increment_progress() 

121 

122 # GMS_mapper inputs CONTAIN failed_GMS_objects -> log and return mapper inputs as received 

123 else: 

124 GMS_obj = self.get_sample_GMS_obj(self.GMS_objs) # failed_GMS_object 

125 # FIXME in case self.GMS_objs is a list and the failed object is not at first position 

126 # FIXME GMS_obj.failedMapper will not work 

127 print("Scene %s (entity ID %s) skipped %s due to an unexpected exception in %s." 

128 % (GMS_obj.scene_ID, GMS_obj.entity_ID, self.GMS_mapper_name, 

129 GMS_obj.failedMapper)) # TODO should be logged by PC.logger 

130 

131 return self.GMS_objs # Union[GMS_object, List[GMS_object]] 

132 

133 except OSError: 

134 _, exc_val, _ = self.exc_details 

135 

136 if exc_val.strerror == 'Input/output error': 

137 # check free disk space 

138 usageNamedTuple = shutil.disk_usage(CFG.path_fileserver) 

139 percent_free = usageNamedTuple.free / usageNamedTuple.total 

140 gigabytes_free = usageNamedTuple.free / (1024 ** 3) 

141 if usageNamedTuple.free / usageNamedTuple.total < 0.025: 

142 self.logger.warning('\nCatched an unexpected IO error and FREE DISK SPACE IS ONLY %.2f percent ' 

143 '(~%.1f GB)!' % (percent_free * 100, gigabytes_free)) 

144 

145 elif CFG.disable_exception_handler: 

146 raise 

147 else: 

148 return self.handle_failed() # failed_GMS_object 

149 

150 except Exception: 

151 if CFG.disable_exception_handler: 

152 raise 

153 else: 

154 return self.handle_failed() # failed_GMS_object 

155 

156 return wrapped_GMS_mapper 

157 

158 @property 

159 def exc_details(self): 

160 if not self._exc_details: 

161 type_, value_ = sys.exc_info()[:2] 

162 traceback_ = traceback.format_exc() 

163 self._exc_details = type_, value_, traceback_ 

164 return self._exc_details 

165 

166 @staticmethod 

167 def is_failed(GMS_objs): 

168 from ..model.gms_object import failed_GMS_object 

169 return isinstance(GMS_objs, failed_GMS_object) or \ 

170 (isinstance(GMS_objs, list) and isinstance(GMS_objs[0], failed_GMS_object)) 

171 

172 @staticmethod 

173 def get_sample_GMS_obj(GMS_objs): 

174 # type: (Union[list, tuple, collections.OrderedDict, failed_GMS_object]) -> Union[GMS_object, failed_GMS_object] 

175 return \ 

176 GMS_objs if isinstance(GMS_objs, collections.OrderedDict) else \ 

177 GMS_objs[0] if isinstance(GMS_objs, (list, tuple)) else GMS_objs 

178 

179 def update_progress_started(self): 

180 """in case of just initialized objects: 

181 update statistics column in jobs table of postgreSQL database to 'started'""" 

182 if isinstance(self.GMS_objs, collections.OrderedDict) and self.GMS_objs['proc_level'] is None: 

183 

184 if not self.GMS_objs['subsystem'] or self.GMS_objs['subsystem'] in ['VNIR1', 'S2A10', 'S2B10']: 

185 self.logger.debug("Setting job statistics array to 'STARTED'.") 

186 

187 # update statistics column ONLY in case of full cube or first subsystem 

188 DB_T.increment_decrement_arrayCol_in_postgreSQLdb( 

189 CFG.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.ID}, 

190 idx_val2decrement=db_jobs_statistics_def['pending'], 

191 idx_val2increment=db_jobs_statistics_def['started'], 

192 timeout=30000) 

193 

194 def increment_progress(self): 

195 """Update statistics column in jobs table of postgreSQL database. 

196 

197 NOTE: This function ONLY receives those GMS_objects that have been sucessfully processed by the GMS_mapper. 

198 """ 

199 # get a GMS object from which we get the new proc_level 

200 GMS_obj = self.get_sample_GMS_obj(self.GMS_objs) 

201 

202 # validate proc_level 

203 if GMS_obj.proc_level is None: 

204 raise ValueError('Received GMS_object for %s %s without processing level after being processed by %s.' 

205 % (GMS_obj.entity_ID, GMS_obj.subsystem, self.GMS_mapper_name)) 

206 

207 # NOTE: in case GMS_obj represents a subsystem and another one has already been marked as FAILED the 

208 # failed_sceneids column and the statistics column is NOT updated once more 

209 # check if another subsystem of the same scene ID already failed - don't increment the stats anymore 

210 if not GMS_obj.subsystem or GMS_obj.subsystem in ['VNIR1', 'S2A10', 'S2B10']: 

211 another_ss_failed = False 

212 

213 if GMS_obj.subsystem: 

214 # check if another subsystem of the same scene ID has been marked as failed before 

215 res = DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'jobs', ['failed_sceneids'], 

216 {'id': CFG.ID}) 

217 assert res, "Query delivered no result." 

218 

219 if res[0][0] is not None and GMS_obj.scene_ID in res[0][0]: 

220 self.logger.debug("Found another failed subsystem of scene %s in the database.") 

221 another_ss_failed = True 

222 

223 # update statistics column ONLY in case of full cube or first subsystem and if no other subsystem failed 

224 if not another_ss_failed: 

225 self.logger.debug("Decrementing job statistics array for %s objects." 

226 % proc_chain[proc_chain.index(GMS_obj.proc_level) - 1]) 

227 self.logger.debug("Incrementing job statistics array for %s objects." % GMS_obj.proc_level) 

228 

229 DB_T.increment_decrement_arrayCol_in_postgreSQLdb( 

230 CFG.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.ID}, 

231 idx_val2decrement=db_jobs_statistics_def[GMS_obj.proc_level] - 1, 

232 idx_val2increment=db_jobs_statistics_def[GMS_obj.proc_level], 

233 timeout=30000) 

234 

235 @staticmethod 

236 def update_progress_failed(failed_Obj, procL_failed=None): 

237 """Update statistics column in jobs table of postgreSQL database. 

238 

239 :param failed_Obj: instance of gms_object failed_GMS_object 

240 :param procL_failed: processing level to be decremented. If not given, the one from failed_Obj is used. 

241 :return: 

242 """ 

243 DB_T.increment_decrement_arrayCol_in_postgreSQLdb( 

244 CFG.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.ID}, 

245 idx_val2decrement=db_jobs_statistics_def[procL_failed or failed_Obj.proc_level], 

246 idx_val2increment=db_jobs_statistics_def['FAILED'], 

247 timeout=30000) 

248 

249 def handle_failed(self): 

250 from ..model.gms_object import failed_GMS_object 

251 

252 try: 

253 _, exc_val, exc_tb = self.exc_details 

254 

255 # collect some informations about failed GMS object and summarize them in failed_GMS_object 

256 failed_Obj = failed_GMS_object(self.get_sample_GMS_obj(self.GMS_objs), 

257 self.GMS_mapper_name, *self.exc_details) 

258 

259 # log the exception and raise warning 

260 failed_Obj.logger.error('\n' + exc_tb, exc_info=False) 

261 self.logger.warning("\nLogged an uncaught exception within %s during processing of scene ID %s " 

262 "(entity ID %s):\n '%s'\n" 

263 % (self.GMS_mapper_name, failed_Obj.scene_ID, failed_Obj.entity_ID, exc_val)) 

264 

265 # add the scene ID to failed_sceneids column in jobs table of DB and update statistics column 

266 # NOTE: in case failed_Obj represents a subsystem and another one has already been marked as FAILED the 

267 # failed_sceneids column and the statistics column is NOT updated once more 

268 

269 another_ss_failed = False 

270 another_ss_succeeded = False 

271 higher_procL = None 

272 

273 if failed_Obj.subsystem: 

274 # check if another subsystem of the same scene ID has been marked as failed before 

275 res = DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'jobs', ['failed_sceneids'], {'id': CFG.ID}) 

276 assert res, "Query delivered no result." 

277 

278 if res[0][0] is not None and failed_Obj.scene_ID in res[0][0]: 

279 self.logger.debug("Found another failed subsystem of scene %s in the database.") 

280 another_ss_failed = True 

281 

282 # check if another subsystem already reached a higher processing level 

283 # NOTE: this fixes issue #50 

284 # NOTE: This works not only for GMS_object instances but also for L1A inputs (OrderedDicts) because 

285 # failed_GMS_object inherits from GMS_object and GMS_object.proc_status_all_GMS_objs has already 

286 # been updated by the first subsystem (that earlier reached L1A) 

287 # FIXME proc_status_all_GMSobjs is not available if other subsystems are processed by another 

288 # FIXME multiprocessing worker or on another machine (cluster node) 

289 from ..model.gms_object import GMS_object 

290 

291 procstatus_other_ss = {k: v for k, v in GMS_object.proc_status_all_GMSobjs[failed_Obj.scene_ID].items() 

292 if k != failed_Obj.subsystem} 

293 for ss, statusentry in procstatus_other_ss.items(): 

294 for procL in statusentry.keys(): 

295 if is_proc_level_lower(failed_Obj.proc_level, procL) and statusentry[procL] == 'finished': 

296 higher_procL = procL 

297 self.logger.debug("Found another subsystem that already reached a higher processing level.") 

298 another_ss_succeeded = True 

299 break 

300 

301 if not another_ss_failed: # applies also to full cubes 

302 DB_T.append_item_to_arrayCol_in_postgreSQLdb(CFG.conn_database, 'jobs', 

303 {'failed_sceneids': failed_Obj.scene_ID}, {'id': CFG.ID}) 

304 

305 if not another_ss_succeeded: 

306 self.update_progress_failed(failed_Obj) 

307 else: 

308 self.update_progress_failed(failed_Obj, procL_failed=higher_procL) 

309 

310 return failed_Obj 

311 

312 except Exception: 

313 # raise exceptions that occurr during self.handle_failed() -> must be ProgrammingErrors 

314 raise 

315 

316 

317def log_uncaught_exceptions(GMS_mapper, logger=None): 

318 exc_handler = ExceptionHandler(logger=logger) 

319 return exc_handler.log_uncaught_exceptions(GMS_mapper) 

320 

321 

322def ignore_warning(warning_type): 

323 """A decorator to ignore a specific warning when executing a function. 

324 

325 :param warning_type: the type of the warning to ignore 

326 """ 

327 

328 def _ignore_warning(func): 

329 @functools.wraps(func) 

330 def __ignore_warning(*args, **kwargs): 

331 with warnings.catch_warnings(record=True): 

332 # Catch all warnings of this type 

333 warnings.simplefilter('always', warning_type) 

334 # Execute the function 

335 result = func(*args, **kwargs) 

336 

337 return result 

338 

339 return __ignore_warning 

340 

341 return _ignore_warning