Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
3# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
4#
5# Copyright (C) 2020 Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
6#
7# This software was developed within the context of the GeoMultiSens project funded
8# by the German Federal Ministry of Education and Research
9# (project grant code: 01 IS 14 010 A-C).
10#
11# This program is free software: you can redistribute it and/or modify it under
12# the terms of the GNU General Public License as published by the Free Software
13# Foundation, either version 3 of the License, or (at your option) any later version.
14# Please note the following exception: `gms_preprocessing` depends on tqdm, which
15# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
16# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
17# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
18#
19# This program is distributed in the hope that it will be useful, but WITHOUT
20# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
21# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
22# details.
23#
24# You should have received a copy of the GNU Lesser General Public License along
25# with this program. If not, see <http://www.gnu.org/licenses/>.
28import collections
29import functools
30import shutil
31import sys
32import traceback
33import warnings
34from logging import getLogger
35from typing import Union, List, TYPE_CHECKING # noqa F401 # flake8 issue
37from ..options.config import GMS_config as CFG
38from ..misc import database_tools as DB_T
39from ..misc.helper_functions import is_proc_level_lower
40from .definition_dicts import db_jobs_statistics_def, proc_chain
42if TYPE_CHECKING:
43 from ..model.gms_object import GMS_object # noqa F401 # flake8 issue
44 from ..model.gms_object import failed_GMS_object
46__author__ = 'Daniel Scheffler'
49def trace_unhandled_exceptions(func):
50 @functools.wraps(func)
51 def wrapped_func(*args, **kwargs):
52 result = None
54 # noinspection PyBroadException
55 try:
56 result = func(*args, **kwargs)
57 except Exception:
58 print('Exception in ' + func.__name__)
59 traceback.print_exc()
60 return result
62 return wrapped_func
65class ExceptionHandler(object):
66 def __init__(self, logger=None):
67 self.GMS_objs = None # type: Union[List[GMS_object], GMS_object, collections.OrderedDict, failed_GMS_object]
68 self.GMS_mapper_name = ''
69 self.GMS_mapper_failed = False
70 self._exc_details = None
71 self._logger = logger
73 @property
74 def logger(self):
75 if not self._logger:
76 self._logger = getLogger('ExceptionHandler')
77 self._logger.setLevel(CFG.log_level)
78 return self._logger
80 @logger.setter
81 def logger(self, logger):
82 self._logger = logger
84 def __call__(self, GMS_mapper):
85 self.log_uncaught_exceptions(GMS_mapper)
87 def log_uncaught_exceptions(self, GMS_mapper):
88 """Decorator function for handling unexpected exceptions that occurr within GMS mapper functions. Traceback is
89 sent to logfile of the respective GMS object and the scene ID is added to the 'failed_sceneids' column
90 within the jobs table of the postgreSQL database.
92 :param GMS_mapper: A GMS mapper function that takes a GMS object, does some processing and returns it back.
93 """
95 @functools.wraps(GMS_mapper) # needed to avoid pickling errors
96 def wrapped_GMS_mapper(GMS_objs, **kwargs):
97 # type: (Union[List[GMS_object], GMS_object, collections.OrderedDict, failed_GMS_object], dict) -> Union[GMS_object, List[GMS_object], failed_GMS_object] # noqa
98 """
100 :param GMS_objs: one OR multiple instances of GMS_object or one instance of failed_object
101 :param kwargs:
102 :return:
103 """
105 self.GMS_mapper_name = GMS_mapper.__name__
106 self.GMS_objs = GMS_objs
108 if not GMS_objs:
109 return GMS_objs
111 # noinspection PyBroadException
112 try:
113 # GMS_mapper inputs CONTAIN NO failed_GMS_objects -> run the mapper normally
114 if not self.is_failed(self.GMS_objs):
115 self.update_progress_started()
117 # run the mapper function and store its results
118 self.GMS_objs = GMS_mapper(GMS_objs, **kwargs)
120 self.increment_progress()
122 # GMS_mapper inputs CONTAIN failed_GMS_objects -> log and return mapper inputs as received
123 else:
124 GMS_obj = self.get_sample_GMS_obj(self.GMS_objs) # failed_GMS_object
125 # FIXME in case self.GMS_objs is a list and the failed object is not at first position
126 # FIXME GMS_obj.failedMapper will not work
127 print("Scene %s (entity ID %s) skipped %s due to an unexpected exception in %s."
128 % (GMS_obj.scene_ID, GMS_obj.entity_ID, self.GMS_mapper_name,
129 GMS_obj.failedMapper)) # TODO should be logged by PC.logger
131 return self.GMS_objs # Union[GMS_object, List[GMS_object]]
133 except OSError:
134 _, exc_val, _ = self.exc_details
136 if exc_val.strerror == 'Input/output error':
137 # check free disk space
138 usageNamedTuple = shutil.disk_usage(CFG.path_fileserver)
139 percent_free = usageNamedTuple.free / usageNamedTuple.total
140 gigabytes_free = usageNamedTuple.free / (1024 ** 3)
141 if usageNamedTuple.free / usageNamedTuple.total < 0.025:
142 self.logger.warning('\nCatched an unexpected IO error and FREE DISK SPACE IS ONLY %.2f percent '
143 '(~%.1f GB)!' % (percent_free * 100, gigabytes_free))
145 elif CFG.disable_exception_handler:
146 raise
147 else:
148 return self.handle_failed() # failed_GMS_object
150 except Exception:
151 if CFG.disable_exception_handler:
152 raise
153 else:
154 return self.handle_failed() # failed_GMS_object
156 return wrapped_GMS_mapper
158 @property
159 def exc_details(self):
160 if not self._exc_details:
161 type_, value_ = sys.exc_info()[:2]
162 traceback_ = traceback.format_exc()
163 self._exc_details = type_, value_, traceback_
164 return self._exc_details
166 @staticmethod
167 def is_failed(GMS_objs):
168 from ..model.gms_object import failed_GMS_object
169 return isinstance(GMS_objs, failed_GMS_object) or \
170 (isinstance(GMS_objs, list) and isinstance(GMS_objs[0], failed_GMS_object))
172 @staticmethod
173 def get_sample_GMS_obj(GMS_objs):
174 # type: (Union[list, tuple, collections.OrderedDict, failed_GMS_object]) -> Union[GMS_object, failed_GMS_object]
175 return \
176 GMS_objs if isinstance(GMS_objs, collections.OrderedDict) else \
177 GMS_objs[0] if isinstance(GMS_objs, (list, tuple)) else GMS_objs
179 def update_progress_started(self):
180 """in case of just initialized objects:
181 update statistics column in jobs table of postgreSQL database to 'started'"""
182 if isinstance(self.GMS_objs, collections.OrderedDict) and self.GMS_objs['proc_level'] is None:
184 if not self.GMS_objs['subsystem'] or self.GMS_objs['subsystem'] in ['VNIR1', 'S2A10', 'S2B10']:
185 self.logger.debug("Setting job statistics array to 'STARTED'.")
187 # update statistics column ONLY in case of full cube or first subsystem
188 DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
189 CFG.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.ID},
190 idx_val2decrement=db_jobs_statistics_def['pending'],
191 idx_val2increment=db_jobs_statistics_def['started'],
192 timeout=30000)
194 def increment_progress(self):
195 """Update statistics column in jobs table of postgreSQL database.
197 NOTE: This function ONLY receives those GMS_objects that have been sucessfully processed by the GMS_mapper.
198 """
199 # get a GMS object from which we get the new proc_level
200 GMS_obj = self.get_sample_GMS_obj(self.GMS_objs)
202 # validate proc_level
203 if GMS_obj.proc_level is None:
204 raise ValueError('Received GMS_object for %s %s without processing level after being processed by %s.'
205 % (GMS_obj.entity_ID, GMS_obj.subsystem, self.GMS_mapper_name))
207 # NOTE: in case GMS_obj represents a subsystem and another one has already been marked as FAILED the
208 # failed_sceneids column and the statistics column is NOT updated once more
209 # check if another subsystem of the same scene ID already failed - don't increment the stats anymore
210 if not GMS_obj.subsystem or GMS_obj.subsystem in ['VNIR1', 'S2A10', 'S2B10']:
211 another_ss_failed = False
213 if GMS_obj.subsystem:
214 # check if another subsystem of the same scene ID has been marked as failed before
215 res = DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'jobs', ['failed_sceneids'],
216 {'id': CFG.ID})
217 assert res, "Query delivered no result."
219 if res[0][0] is not None and GMS_obj.scene_ID in res[0][0]:
220 self.logger.debug("Found another failed subsystem of scene %s in the database.")
221 another_ss_failed = True
223 # update statistics column ONLY in case of full cube or first subsystem and if no other subsystem failed
224 if not another_ss_failed:
225 self.logger.debug("Decrementing job statistics array for %s objects."
226 % proc_chain[proc_chain.index(GMS_obj.proc_level) - 1])
227 self.logger.debug("Incrementing job statistics array for %s objects." % GMS_obj.proc_level)
229 DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
230 CFG.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.ID},
231 idx_val2decrement=db_jobs_statistics_def[GMS_obj.proc_level] - 1,
232 idx_val2increment=db_jobs_statistics_def[GMS_obj.proc_level],
233 timeout=30000)
235 @staticmethod
236 def update_progress_failed(failed_Obj, procL_failed=None):
237 """Update statistics column in jobs table of postgreSQL database.
239 :param failed_Obj: instance of gms_object failed_GMS_object
240 :param procL_failed: processing level to be decremented. If not given, the one from failed_Obj is used.
241 :return:
242 """
243 DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
244 CFG.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.ID},
245 idx_val2decrement=db_jobs_statistics_def[procL_failed or failed_Obj.proc_level],
246 idx_val2increment=db_jobs_statistics_def['FAILED'],
247 timeout=30000)
249 def handle_failed(self):
250 from ..model.gms_object import failed_GMS_object
252 try:
253 _, exc_val, exc_tb = self.exc_details
255 # collect some informations about failed GMS object and summarize them in failed_GMS_object
256 failed_Obj = failed_GMS_object(self.get_sample_GMS_obj(self.GMS_objs),
257 self.GMS_mapper_name, *self.exc_details)
259 # log the exception and raise warning
260 failed_Obj.logger.error('\n' + exc_tb, exc_info=False)
261 self.logger.warning("\nLogged an uncaught exception within %s during processing of scene ID %s "
262 "(entity ID %s):\n '%s'\n"
263 % (self.GMS_mapper_name, failed_Obj.scene_ID, failed_Obj.entity_ID, exc_val))
265 # add the scene ID to failed_sceneids column in jobs table of DB and update statistics column
266 # NOTE: in case failed_Obj represents a subsystem and another one has already been marked as FAILED the
267 # failed_sceneids column and the statistics column is NOT updated once more
269 another_ss_failed = False
270 another_ss_succeeded = False
271 higher_procL = None
273 if failed_Obj.subsystem:
274 # check if another subsystem of the same scene ID has been marked as failed before
275 res = DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'jobs', ['failed_sceneids'], {'id': CFG.ID})
276 assert res, "Query delivered no result."
278 if res[0][0] is not None and failed_Obj.scene_ID in res[0][0]:
279 self.logger.debug("Found another failed subsystem of scene %s in the database.")
280 another_ss_failed = True
282 # check if another subsystem already reached a higher processing level
283 # NOTE: this fixes issue #50
284 # NOTE: This works not only for GMS_object instances but also for L1A inputs (OrderedDicts) because
285 # failed_GMS_object inherits from GMS_object and GMS_object.proc_status_all_GMS_objs has already
286 # been updated by the first subsystem (that earlier reached L1A)
287 # FIXME proc_status_all_GMSobjs is not available if other subsystems are processed by another
288 # FIXME multiprocessing worker or on another machine (cluster node)
289 from ..model.gms_object import GMS_object
291 procstatus_other_ss = {k: v for k, v in GMS_object.proc_status_all_GMSobjs[failed_Obj.scene_ID].items()
292 if k != failed_Obj.subsystem}
293 for ss, statusentry in procstatus_other_ss.items():
294 for procL in statusentry.keys():
295 if is_proc_level_lower(failed_Obj.proc_level, procL) and statusentry[procL] == 'finished':
296 higher_procL = procL
297 self.logger.debug("Found another subsystem that already reached a higher processing level.")
298 another_ss_succeeded = True
299 break
301 if not another_ss_failed: # applies also to full cubes
302 DB_T.append_item_to_arrayCol_in_postgreSQLdb(CFG.conn_database, 'jobs',
303 {'failed_sceneids': failed_Obj.scene_ID}, {'id': CFG.ID})
305 if not another_ss_succeeded:
306 self.update_progress_failed(failed_Obj)
307 else:
308 self.update_progress_failed(failed_Obj, procL_failed=higher_procL)
310 return failed_Obj
312 except Exception:
313 # raise exceptions that occurr during self.handle_failed() -> must be ProgrammingErrors
314 raise
317def log_uncaught_exceptions(GMS_mapper, logger=None):
318 exc_handler = ExceptionHandler(logger=logger)
319 return exc_handler.log_uncaught_exceptions(GMS_mapper)
322def ignore_warning(warning_type):
323 """A decorator to ignore a specific warning when executing a function.
325 :param warning_type: the type of the warning to ignore
326 """
328 def _ignore_warning(func):
329 @functools.wraps(func)
330 def __ignore_warning(*args, **kwargs):
331 with warnings.catch_warnings(record=True):
332 # Catch all warnings of this type
333 warnings.simplefilter('always', warning_type)
334 # Execute the function
335 result = func(*args, **kwargs)
337 return result
339 return __ignore_warning
341 return _ignore_warning