Commit f5404bf9 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

revised progress updater during running job

algorithms.gms_object:
- revised failed_GMS_object: now accepts only instances of gms_object and OrderedDicts; removed DB query

misc.exception_handler:
- trace_unhandled_exceptions.wrapped_GMS_mapper:
    - bugfix
    - enhanced documentation
    - revised call of failed_GMS_object

processing.process_controller:
- run_all_processors(): bugfix
- update_DB_job_statistics(): fix for double statistics update in case of subsystems

updated __version__


Former-commit-id: d7f01df7
parent 78d0c2d4
......@@ -15,7 +15,7 @@ from . import config
from .processing.process_controller import process_controller
__version__ = '20170406.02'
__version__ = '20170407.01'
__author__ = 'Daniel Scheffler'
__all__ = ['algorithms',
'io',
......
......@@ -19,7 +19,7 @@ class L2C_object(L2B_object):
if L2B_obj:
# populate attributes
[setattr(self, key, value) for key,value in L2B_obj.__dict__.items()]
[setattr(self, key, value) for key, value in L2B_obj.__dict__.items()]
self.proc_level = 'L2C'
......
......@@ -1817,28 +1817,28 @@ class GMS_object(object):
class failed_GMS_object(GMS_object):
# """@DynamicAttrs"""
def __init__(self, GMS_object_or_sceneID, failedMapper, exc_type, exc_val, exc_tb):
def __init__(self, GMS_object_or_OrdDict, failedMapper, exc_type, exc_val, exc_tb):
super().__init__()
needed_attr = ['proc_level','image_type','scene_ID','entity_ID','satellite','sensor','subsystem','arr_shape',
'arr_pos']
needed_attr = ['proc_level', 'image_type', 'scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem',
'arr_shape', 'arr_pos']
if isinstance(GMS_object_or_sceneID,int): # in case of unhandled exception within L1A_map
OrdDic = DB_T.get_scene_and_dataset_infos_from_postgreSQLdb(GMS_object_or_sceneID)
if isinstance(GMS_object_or_OrdDict, collections.OrderedDict): # in case of unhandled exception within L1A_map
OrdDic = GMS_object_or_OrdDict
[setattr(self, k, OrdDic[k]) for k in needed_attr[:-2]]
self.arr_shape = 'cube' if 'arr_shape' not in OrdDic else OrdDic['arr_shape']
self.arr_pos = None if 'arr_pos' not in OrdDic else OrdDic['arr_pos']
else: # in case of any other GMS mapper
[setattr(self,k,getattr(GMS_object_or_sceneID,k)) for k in needed_attr]
self.failedMapper = failedMapper
self.ExceptionType = exc_type.__name__
self.ExceptionValue = repr(exc_val)
else: # in case of any other GMS mapper
[setattr(self, k, getattr(GMS_object_or_OrdDict, k)) for k in needed_attr]
self.failedMapper = failedMapper
self.ExceptionType = exc_type.__name__
self.ExceptionValue = repr(exc_val)
self.ExceptionTraceback = exc_tb
@property
def pandasRecord(self):
columns = ['scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem', 'image_type', 'proc_level',
'arr_shape','arr_pos', 'failedMapper', 'ExceptionType', 'ExceptionValue', 'ExceptionTraceback']
return DataFrame([getattr(self,k) for k in columns], columns=columns)
\ No newline at end of file
'arr_shape', 'arr_pos', 'failedMapper', 'ExceptionType', 'ExceptionValue', 'ExceptionTraceback']
return DataFrame([getattr(self, k) for k in columns], columns=columns)
\ No newline at end of file
......@@ -14,7 +14,7 @@ dtype_lib_IDL_Python = {0:np.bool_, 1:np.uint8, 2:np.int16, 3:np.int32, 4:np.flo
dtype_lib_GDAL_Python= {"uint8": 1, "int8": 1, "uint16": 2, "int16": 3, "uint32": 4, "int32": 5, "float32": 6,
"float64": 7, "complex64": 10, "complex128": 11}
proc_chain = ['L1A','L1B','L1C','L2A','L2B','L2C']
db_jobs_statistics_def = {'downloaded':1, 'started':2, 'L1A':3, 'L1B':4, 'L1C':5, 'L2A':6, 'L2B':7, 'L2C':8, 'FAILED':9}
db_jobs_statistics_def = {'downloaded':1, 'started':2, None:2, 'L1A':3, 'L1B':4, 'L1C':5, 'L2A':6, 'L2B':7, 'L2C':8, 'FAILED':9}
def get_GMS_sensorcode(GMS_identifier):
......
......@@ -7,7 +7,6 @@ import shutil
import sys
import traceback
import warnings
import psycopg2
from ..algorithms.gms_object import failed_GMS_object
from ..config import GMS_config as CFG
......@@ -47,14 +46,15 @@ def log_uncaught_exceptions(GMS_mapper):
"""
try:
# handle input objects of a GMS mapper that failed in a previous mapper
if isinstance(GMS_objs, failed_GMS_object) or \
(isinstance(GMS_objs,list) and len(GMS_objs)==1 and isinstance(GMS_objs[0],failed_GMS_object)):
# handle input objects of a GMS mapper that failed in a previous mapper
(isinstance(GMS_objs,list) and isinstance(GMS_objs[0],failed_GMS_object)):
GMS_obj = GMS_objs[0] if isinstance(GMS_objs, (list, tuple)) else GMS_objs # get a GMS object from which we get the new proc_level
print("Scene %s (entity ID %s) skipped %s due to an unexpected exception in %s."
% (GMS_objs.scene_ID, GMS_objs.entity_ID, GMS_mapper.__name__, GMS_objs.failedMapper))
% (GMS_obj.scene_ID, GMS_obj.entity_ID, GMS_mapper.__name__, GMS_obj.failedMapper)) # TODO should be logged by PC.logger
return GMS_objs
# update statistics column in jobs table of postgreSQL database
# in case of just initialized objects: update statistics column in jobs table of postgreSQL database to 'started'
if isinstance(GMS_objs, collections.OrderedDict) and GMS_objs['proc_level'] is None:
if not GMS_objs['subsystem'] in ['VNIR2', 'SWIR', 'TIR', 'S2A20', 'S2A60']:
# update statistics column ONLY in case of full cube or first subsystem
......@@ -66,10 +66,10 @@ def log_uncaught_exceptions(GMS_mapper):
# run the mapper function and store its results
GMS_objs = GMS_mapper(GMS_objs, **kwargs)
# update statistics column in jobs table of postgreSQL database
## update statistics column in jobs table of postgreSQL database
GMS_obj = GMS_objs[0] if isinstance(GMS_objs, (list, tuple)) else GMS_objs # get a GMS object from which we get the new proc_level
# NOTE: in case failed_Obj represents a subsystem and another another one has already been marked as FAILED the
# NOTE: in case GMS_obj represents a subsystem and another one has already been marked as FAILED the
# failed_sceneids column and the statistics column is NOT updated once more
# check if another subsystem of the same scene ID already failed - don't increment the stats anymore
if not GMS_obj.subsystem in ['VNIR2', 'SWIR', 'TIR', 'S2A20', 'S2A60']:
......@@ -118,9 +118,10 @@ def log_uncaught_exceptions(GMS_mapper):
raise
# collect some informations about failed GMS object and summarize them in failed_GMS_object
arg0 = GMS_objs['scene_ID'] if isinstance(GMS_objs, collections.OrderedDict) else \
GMS_objs[0].scene_ID if isinstance(GMS_objs, list) else GMS_objs
failed_Obj = failed_GMS_object(arg0, GMS_mapper.__name__, type_, value_, traceback_)
failed_Obj = failed_GMS_object(
GMS_objs if isinstance(GMS_objs, collections.OrderedDict) else \
GMS_objs[0] if isinstance(GMS_objs, list) else GMS_objs,
GMS_mapper.__name__, type_, value_, traceback_)
# log the exception and raise warning
failed_Obj.logger.error('\n'+traceback_, exc_info=False)
......@@ -133,6 +134,7 @@ def log_uncaught_exceptions(GMS_mapper):
res = DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'jobs', ['failed_sceneids'],
{'id': CFG.job.ID})
assert res, "Query delivered no result."
if res[0][0] is None or failed_Obj.scene_ID not in res[0][0]:
# if column is empty or scene ID is not in there
DB_T.append_item_to_arrayCol_in_postgreSQLdb(CFG.job.conn_database, 'jobs',
......
......@@ -398,7 +398,7 @@ class process_controller(object):
self.job.status = 'failed'
self.update_DB_job_record()
self.logger.error('Execution failed with an error:')
self.logger.error('Execution failed with an error:', e)
if not self.job.disable_exception_handler:
self.logger.error(e)
......@@ -644,13 +644,18 @@ class process_controller(object):
Update job statistics of the running job in the database.
"""
# TODO move this method to config.Job
already_updated_IDs = []
for ds in usecase_datalist:
if ds['proc_level'] is not None:
if ds['proc_level'] is not None and ds['scene_ID'] not in already_updated_IDs:
# update statistics column of jobs table
DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
self.job.conn_database, 'jobs', 'statistics', cond_dict={'id': self.job.ID},
idx_val2decrement=db_jobs_statistics_def['downloaded'],
idx_val2increment=db_jobs_statistics_def[ds['proc_level']])
# avoid double updating in case of subsystems belonging to the same scene ID
already_updated_IDs.append(ds['scene_ID'])
def create_job_summary(self):
"""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment