Commit 90a33c41 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Clarified some job statistic related calls.


Former-commit-id: 9276f698
parent 81b03365
......@@ -17,7 +17,7 @@ dtype_lib_IDL_Python = {0: np.bool_, 1: np.uint8, 2: np.int16, 3: np.int32, 4: n
dtype_lib_GDAL_Python = {"uint8": 1, "int8": 1, "uint16": 2, "int16": 3, "uint32": 4, "int32": 5, "float32": 6,
"float64": 7, "complex64": 10, "complex128": 11}
proc_chain = ['L1A', 'L1B', 'L1C', 'L2A', 'L2B', 'L2C']
db_jobs_statistics_def = {'downloaded': 1, 'started': 2, None: 2, 'L1A': 3, 'L1B': 4, 'L1C': 5, 'L2A': 6, 'L2B': 7,
db_jobs_statistics_def = {'pending': 1, 'started': 2, None: 2, 'L1A': 3, 'L1B': 4, 'L1C': 5, 'L2A': 6, 'L2B': 7,
'L2C': 8, 'FAILED': 9} # NOTE: OrderedDicts passed to L1A_map have proc_level=None
......
......@@ -6,7 +6,7 @@ import shutil
import sys
import traceback
import warnings
from logging import Logger
from logging import getLogger
from typing import Union, List # noqa F401 # flake8 issue
from ..model.gms_object import GMS_object # noqa F401 # flake8 issue
......@@ -46,7 +46,8 @@ class ExceptionHandler(object):
@property
def logger(self):
if not self._logger:
self._logger = Logger('ExceptionHandler', level=CFG.log_level)
self._logger = getLogger('ExceptionHandler')
self._logger.setLevel(CFG.log_level)
return self._logger
@logger.setter
......@@ -158,7 +159,7 @@ class ExceptionHandler(object):
# update statistics column ONLY in case of full cube or first subsystem
DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
CFG.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.ID},
idx_val2decrement=db_jobs_statistics_def['started'] - 1, # FIXME BUG?
idx_val2decrement=db_jobs_statistics_def['pending'],
idx_val2increment=db_jobs_statistics_def['started'])
def increment_progress(self):
......@@ -168,7 +169,6 @@ class ExceptionHandler(object):
"""
# get a GMS object from which we get the new proc_level
GMS_obj = self.get_sample_GMS_obj(self.GMS_objs)
# TODO if another subsystem already suceeded -> decrement the higher proc level and increment failed
# validate proc_level
if GMS_obj.proc_level is None:
......@@ -247,6 +247,9 @@ class ExceptionHandler(object):
# check if another subsystem already reached a higher processing level
# NOTE: this fixes issue #50
# NOTE: This works not only for GMS_object instances but also for L1A inputs (OrderedDicts) because
# failed_GMS_object inherits from GMS_object and GMS_object.proc_status_all_GMS_objs has already
# been updated by the first subsystem (that earlier reached L1A)
# FIXME proc_status_all_GMSobjs is not available if other subsystems are processed by another
# FIXME multiprocessing worker or on another machine (cluster node)
procstatus_other_ss = {k: v for k, v in GMS_object.proc_status_all_GMSobjs[failed_Obj.scene_ID].items()
......
......@@ -1402,6 +1402,7 @@ class failed_GMS_object(GMS_object):
pass
def __init__(self, GMS_object_or_OrdDict, failedMapper, exc_type, exc_val, exc_tb):
# type: (Union[GMS_object, OrderedDict], str, type(Exception), any, str) -> None
super(failed_GMS_object, self).__init__()
needed_attr = ['proc_level', 'image_type', 'scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem',
'arr_shape', 'arr_pos']
......
......@@ -725,7 +725,7 @@ class process_controller(object):
# update statistics column of jobs table
DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
self.config.conn_database, 'jobs', 'statistics', cond_dict={'id': self.config.ID},
idx_val2decrement=db_jobs_statistics_def['downloaded'],
idx_val2decrement=db_jobs_statistics_def['pending'],
idx_val2increment=db_jobs_statistics_def[ds['proc_level']])
# avoid double updating in case of subsystems belonging to the same scene ID
......
......@@ -154,13 +154,13 @@ class Test_ExceptionHandler_Subsystems(BaseTest_ExceptionHandler.Test_ExceptionH
def test_gms_mapper_fail_secondSS_progress_stats(self):
"""Check correctness of progress stats if a first subsystem succeeds and then another one fails.
NOTE: This happens quite rarely because if a mapper fails for a subsystem, it usualy fails for the first
NOTE: This happens quite rarely because if a mapper fails for a subsystem, it usually fails for the first
subsystem it receives."""
for i, subDs in enumerate(self.PC.config.data_list):
if subDs['subsystem'] == 'S2A10':
self.dummy_L1A_mapper_success(subDs)
# progress stats must be incremented [0, 1, 0, 0, 0, 0, 0, 0, 0] (downloaded) to L1A
# progress stats must be incremented [0, 1, 0, 0, 0, 0, 0, 0, 0] (started) to L1A
self.assertEqual(self.get_current_progress_stats(), [0, 0, 1, 0, 0, 0, 0, 0, 0]) # stats at L1A
else:
outObj = self.dummy_gms_mapper_fail(subDs)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment