Commit 85efd845 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Revised exception handler: log_uncaught_exceptions() now calls new class ExceptionHandler.


Former-commit-id: c1650715
parent 2f8d2d4d
......@@ -324,7 +324,8 @@ class Usecase:
self.scale_factor_TOARef = 10000
self.scale_factor_BOARef = 10000
self.scale_factor_errors_ac = 255
self.virtual_sensor_id = 10 # Sentinel-2A 10m
# self.virtual_sensor_id = 10 # Sentinel-2A 10m
self.virtual_sensor_id = 1 # Landsat-8
self.datasetid_spectral_ref = 249 # Sentinel-2A
self.target_CWL = []
self.target_FWHM = []
......@@ -339,16 +340,17 @@ class Usecase:
self.sort_bands_by_cwl = int(query_cfg('sort_bands_by_cwl'))
self.conversion_type_optical = query_cfg('conversion_type_optical')
self.conversion_type_thermal = query_cfg('conversion_type_thermal')
self.datasetid_spatial_ref = query_job('datasetid_spatial_ref')
self.virtual_sensor_id = query_job('virtualsensorid')
self.virtual_sensor_id = self.virtual_sensor_id if self.virtual_sensor_id != -1 else 10 # Sentinel-2A 10m
self.datasetid_spatial_ref = query_job('datasetid_spatial_ref')
# self.datasetid_spatial_ref = 104
self.virtual_sensor_name = query_vir('name', self.virtual_sensor_id)
self.datasetid_spectral_ref = query_vir('spectral_characteristics_datasetid', self.virtual_sensor_id)
# FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen:
self.target_CWL = query_vir('wavelengths_pos', self.virtual_sensor_id)
# FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen:
self.target_FWHM = query_vir('band_width', self.virtual_sensor_id)
# FIXME target GSD setting is a duplicate to datasetid_spatial_ref!
self.target_gsd = query_vir('spatial_resolution',
self.virtual_sensor_id) # table features only 1 value for X/Y-dims
self.target_gsd = xgsd, ygsd = \
......
......@@ -6,11 +6,17 @@ import shutil
import sys
import traceback
import warnings
from logging import Logger
from typing import TYPE_CHECKING
from ..model.gms_object import failed_GMS_object
from ..config import GMS_config as CFG
from ..misc import database_tools as DB_T
from .definition_dicts import db_jobs_statistics_def
from .definition_dicts import db_jobs_statistics_def, proc_chain
if TYPE_CHECKING:
from ..model.gms_object import GMS_object
from typing import Union, List
__author__ = 'Daniel Scheffler'
......@@ -29,109 +35,176 @@ def trace_unhandled_exceptions(func):
return wrapped_func
def log_uncaught_exceptions(GMS_mapper):
"""Decorator function for handling unexpected exceptions that occurr within GMS mapper functions. Traceback is
sent to logfile of the respective GMS object and the scene ID is added to the 'failed_sceneids' column within
the jobs table of the postgreSQL database.
class ExceptionHandler(object):
def __init__(self, logger=None):
self.GMS_objs = None # type: Union[list, dict]
self.GMS_mapper_name = ''
self.GMS_mapper_failed = False
self._exc_details = None
self._logger = logger
:param GMS_mapper: A GMS mapper function that takes a GMS object, does some processing and returns it back.
"""
@property
def logger(self):
if not self._logger:
self._logger = Logger('ExceptionHandler', level=CFG.job.log_level)
return self._logger
@functools.wraps(GMS_mapper) # needed to avoid pickling errors
def wrapped_GMS_mapper(GMS_objs, **kwargs):
"""
@logger.setter
def logger(self, logger):
self._logger = logger
:param GMS_objs: one OR multiple instances of GMS_object or one instance of failed_object
:param kwargs:
:return:
def __call__(self, GMS_mapper):
self.log_uncaught_exceptions(GMS_mapper)
def log_uncaught_exceptions(self, GMS_mapper):
"""Decorator function for handling unexpected exceptions that occurr within GMS mapper functions. Traceback is
sent to logfile of the respective GMS object and the scene ID is added to the 'failed_sceneids' column
within the jobs table of the postgreSQL database.
:param GMS_mapper: A GMS mapper function that takes a GMS object, does some processing and returns it back.
"""
try:
# handle input objects of a GMS mapper that failed in a previous mapper
if isinstance(GMS_objs, failed_GMS_object) or \
(isinstance(GMS_objs, list) and isinstance(GMS_objs[0], failed_GMS_object)):
# get a GMS object from which we get the new proc_level
GMS_obj = GMS_objs[0] if isinstance(GMS_objs, (list, tuple)) else GMS_objs
print("Scene %s (entity ID %s) skipped %s due to an unexpected exception in %s."
% (GMS_obj.scene_ID, GMS_obj.entity_ID, GMS_mapper.__name__,
GMS_obj.failedMapper)) # TODO should be logged by PC.logger
return GMS_objs
# in case of just initialized objects:
# update statistics column in jobs table of postgreSQL database to 'started'
if isinstance(GMS_objs, collections.OrderedDict) and GMS_objs['proc_level'] is None:
if not GMS_objs['subsystem'] in ['VNIR2', 'SWIR', 'TIR', 'S2A20', 'S2A60']:
# update statistics column ONLY in case of full cube or first subsystem
DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
CFG.job.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.job.ID},
idx_val2decrement=db_jobs_statistics_def['started'] - 1,
idx_val2increment=db_jobs_statistics_def['started'])
# run the mapper function and store its results
GMS_objs = GMS_mapper(GMS_objs, **kwargs)
# update statistics column in jobs table of postgreSQL database
# get a GMS object from which we get the new proc_level
GMS_obj = GMS_objs[0] if isinstance(GMS_objs, (list, tuple)) else GMS_objs
# NOTE: in case GMS_obj represents a subsystem and another one has already been marked as FAILED the
# failed_sceneids column and the statistics column is NOT updated once more
# check if another subsystem of the same scene ID already failed - don't increment the stats anymore
if GMS_obj.subsystem not in ['VNIR2', 'SWIR', 'TIR', 'S2A20', 'S2A60']:
another_ss_failed = False
if GMS_obj.subsystem:
res = DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'jobs', ['failed_sceneids'],
{'id': CFG.job.ID})
assert res, "Query delivered no result."
if res[0][0] is not None and GMS_obj.scene_ID in res[0][0]:
another_ss_failed = True
# update statistics column ONLY in case of full cube or first subsystem and if no other subsystem failed
if not another_ss_failed:
DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
CFG.job.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.job.ID},
idx_val2decrement=db_jobs_statistics_def[GMS_obj.proc_level] - 1,
idx_val2increment=db_jobs_statistics_def[GMS_obj.proc_level])
return GMS_objs
except OSError:
# get Exception details
type_, value_ = sys.exc_info()[:2]
traceback_ = traceback.format_exc()
@functools.wraps(GMS_mapper) # needed to avoid pickling errors
def wrapped_GMS_mapper(GMS_objs, **kwargs):
"""
if value_.strerror == 'Input/output error':
# check free disk space
usageNamedTuple = shutil.disk_usage(CFG.job.path_fileserver)
percent_free = usageNamedTuple.free / usageNamedTuple.total
gigabytes_free = usageNamedTuple.free / (1024 ** 3)
if usageNamedTuple.free / usageNamedTuple.total < 0.025:
warnings.warn('\nCatched an unexpected IO error and FREE DISK SPACE IS ONLY %.2f percent '
'(~%.1f GB)!' % (percent_free * 100, gigabytes_free))
:param GMS_objs: one OR multiple instances of GMS_object or one instance of failed_object
:param kwargs:
:return:
"""
elif CFG.job.disable_exception_handler:
# turn off exception handling and raise the error
raise
self.GMS_mapper_name = GMS_mapper.__name__
self.GMS_objs = GMS_objs
except Exception:
# get Exception details
try:
self.handle_previously_failed()
self.update_progress_started()
# run the mapper function and store its results
self.GMS_objs = GMS_mapper(GMS_objs, **kwargs)
self.increment_progress()
return self.GMS_objs # type: Union[GMS_object, List[GMS_object]]
except OSError:
_, exc_val, _ = self.exc_details
if exc_val.strerror == 'Input/output error':
# check free disk space
usageNamedTuple = shutil.disk_usage(CFG.job.path_fileserver)
percent_free = usageNamedTuple.free / usageNamedTuple.total
gigabytes_free = usageNamedTuple.free / (1024 ** 3)
if usageNamedTuple.free / usageNamedTuple.total < 0.025:
self.logger.warning('\nCatched an unexpected IO error and FREE DISK SPACE IS ONLY %.2f percent '
'(~%.1f GB)!' % (percent_free * 100, gigabytes_free))
elif CFG.job.disable_exception_handler:
raise
else:
return self.handle_failed() # type: failed_GMS_object
except Exception:
if CFG.job.disable_exception_handler:
raise
else:
return self.handle_failed() # type: failed_GMS_object
return wrapped_GMS_mapper
@property
def exc_details(self):
if not self._exc_details:
type_, value_ = sys.exc_info()[:2]
traceback_ = traceback.format_exc()
self._exc_details = type_, value_, traceback_
return self._exc_details
@staticmethod
def is_failed(GMS_objs):
return isinstance(GMS_objs, failed_GMS_object) or \
(isinstance(GMS_objs, list) and isinstance(GMS_objs[0], failed_GMS_object))
@staticmethod
def get_sample_GMS_obj(GMS_objs):
# type: (Union[list, tuple, collections.OrderedDict]) -> GMS_object
return \
GMS_objs if isinstance(GMS_objs, collections.OrderedDict) else \
GMS_objs[0] if isinstance(GMS_objs, (list, tuple)) else GMS_objs
def handle_previously_failed(self):
if self.is_failed(self.GMS_objs):
GMS_obj = self.get_sample_GMS_obj(self.GMS_objs) # type: failed_GMS_object
print("Scene %s (entity ID %s) skipped %s due to an unexpected exception in %s."
% (GMS_obj.scene_ID, GMS_obj.entity_ID, self.GMS_mapper_name,
GMS_obj.failedMapper)) # TODO should be logged by PC.logger
return self.GMS_objs
def update_progress_started(self):
"""in case of just initialized objects:
update statistics column in jobs table of postgreSQL database to 'started'"""
if isinstance(self.GMS_objs, collections.OrderedDict) and self.GMS_objs['proc_level'] is None:
if not self.GMS_objs['subsystem'] in ['VNIR2', 'SWIR', 'TIR', 'S2A20', 'S2A60']:
# update statistics column ONLY in case of full cube or first subsystem
DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
CFG.job.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.job.ID},
idx_val2decrement=db_jobs_statistics_def['started'] - 1,
idx_val2increment=db_jobs_statistics_def['started'])
def increment_progress(self):
"""update statistics column in jobs table of postgreSQL database"""
# get a GMS object from which we get the new proc_level
GMS_obj = self.get_sample_GMS_obj(self.GMS_objs)
# NOTE: in case GMS_obj represents a subsystem and another one has already been marked as FAILED the
# failed_sceneids column and the statistics column is NOT updated once more
# check if another subsystem of the same scene ID already failed - don't increment the stats anymore
if not GMS_obj.subsystem or GMS_obj.subsystem in ['VNIR1', 'S2A10']:
another_ss_failed = False
if GMS_obj.subsystem:
# check if another subsystem of the same scene ID has been marked as failed before
res = DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'jobs', ['failed_sceneids'],
{'id': CFG.job.ID})
assert res, "Query delivered no result."
if res[0][0] is not None and GMS_obj.scene_ID in res[0][0]:
self.logger.debug("Found another failed subsystem of scene %s in the database.")
another_ss_failed = True
# update statistics column ONLY in case of full cube or first subsystem and if no other subsystem failed
if not another_ss_failed:
self.logger.debug("Decrementing job statistics array for %s objects."
% proc_chain[proc_chain.index(GMS_obj.proc_level) - 1])
self.logger.debug("Incrementing job statistics array for %s objects." % GMS_obj.proc_level)
DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
CFG.job.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.job.ID},
idx_val2decrement=db_jobs_statistics_def[GMS_obj.proc_level] - 1,
idx_val2increment=db_jobs_statistics_def[GMS_obj.proc_level])
@staticmethod
def update_progress_failed(failed_Obj):
"""Update statistics column in jobs table of postgreSQL database."""
if not failed_Obj.subsystem or failed_Obj.subsystem in ['VNIR1', 'S2A10']:
DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
CFG.job.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.job.ID},
idx_val2decrement=db_jobs_statistics_def[failed_Obj.proc_level],
idx_val2increment=db_jobs_statistics_def['FAILED'])
if CFG.job.disable_exception_handler:
# turn off exception handling and raise the error
raise
def handle_failed(self):
_, exc_val, exc_tb = self.exc_details
# collect some informations about failed GMS object and summarize them in failed_GMS_object
failed_Obj = \
failed_GMS_object(GMS_objs if isinstance(GMS_objs, collections.OrderedDict) else
GMS_objs[0] if isinstance(GMS_objs, list) else GMS_objs, GMS_mapper.__name__, type_,
value_, traceback_)
failed_Obj = failed_GMS_object(self.get_sample_GMS_obj(self.GMS_objs),
self.GMS_mapper_name, *self.exc_details)
# log the exception and raise warning
failed_Obj.logger.error('\n' + traceback_, exc_info=False)
warnings.warn("\nLogged an uncaught exception within %s during processing of scene ID %s (entity "
"ID %s):\n '%s'\n" % (GMS_mapper.__name__, failed_Obj.scene_ID, failed_Obj.entity_ID, value_))
failed_Obj.logger.error('\n' + exc_tb, exc_info=False)
self.logger.warning("\nLogged an uncaught exception within %s during processing of scene ID %s "
"(entity ID %s):\n '%s'\n"
% (self.GMS_mapper_name, failed_Obj.scene_ID, failed_Obj.entity_ID, exc_val))
# add the scene ID to failed_sceneids column in jobs table of DB and update statistics column
# NOTE: in case failed_Obj represents a subsystem and another one has already been marked as FAILED the
......@@ -145,15 +218,14 @@ def log_uncaught_exceptions(GMS_mapper):
DB_T.append_item_to_arrayCol_in_postgreSQLdb(CFG.job.conn_database, 'jobs',
{'failed_sceneids': failed_Obj.scene_ID}, {'id': CFG.job.ID})
# update statistics column in jobs table of postgreSQL database
DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
CFG.job.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.job.ID},
idx_val2decrement=db_jobs_statistics_def[failed_Obj.proc_level],
idx_val2increment=db_jobs_statistics_def['FAILED'])
self.update_progress_failed(failed_Obj)
return failed_Obj
return wrapped_GMS_mapper
def log_uncaught_exceptions(GMS_mapper):
exc_handler = ExceptionHandler()
return exc_handler.log_uncaught_exceptions(GMS_mapper)
def ignore_warning(warning_type):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment