Commit 2b66ea9d authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

revised job statistics updater

 misc.database_tools:
 - added increment_decrement_arrayCol_in_postgreSQLdb()
 - GMS_JOB:
     - _populate_jobAttrs_from_sceneInfoGDF() / create(): updated statistics
     - added reset_job_progress()
     - implemented update_db_entry()
 misc.definition_dicts:
 - updated db_jobs_statistics_def
 misc.exception_handler:
 - log_uncaught_exceptions.wrapped_GMS_mapper(): revised statistics updating scheme (now also properly working for scenes with subsystems
 processing.process_controller.process_controller:
 - run_all_processors(): added reset of job progress in database
- updated __version__
Former-commit-id: 811477d3
Former-commit-id: 0b237688
parent cc39f7e8
......@@ -15,7 +15,7 @@ from . import config
from .processing.process_controller import process_controller
__version__ = '20170327.03'
__version__ = '20170328.01'
__author__ = 'Daniel Scheffler'
__all__ = ['algorithms',
'io',
......
......@@ -276,6 +276,47 @@ def remove_item_from_arrayCol_in_postgreSQLdb(conn_params, tablename, vals2remov
if 'connection' in locals(): connection.close()
def increment_decrement_arrayCol_in_postgreSQLdb(conn_params, tablename, col2update, idx_val2decrement=None,
idx_val2increment=None, cond_dict=None, timeout=15000):
# type: (str, str, str, int, int, dict, int) -> None
"""Updates an array column of a specific postgreSQL table in the form that it increments or decrements the elements
at a given position. HINT: The column must have values like that: [0,1,0,3,1,0]
:param conn_params: <str> connection parameters as provided by CFG.job.conn_params
:param tablename: <str> name of the table within the database to be update
:param col2update: <str> column name of the column to be updated
:param idx_val2decrement: <int> the index of the array element to be decremented (starts with 1)
:param idx_val2increment: <int> the index of the array element to be incremented (starts with 1)
:param cond_dict: <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
HINT: <value> can also be a list or a tuple of elements to match
:param timeout: <int> allows to set a custom statement timeout (milliseconds)
:return:
"""
cond_dict = cond_dict if cond_dict else {}
conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
connection = psycopg2.connect(conn_params)
if connection is None:
warnings.warn('database connection fault')
return 'database connection fault'
cursor = connection.cursor()
condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
if cond_dict else ""
dec_str = '' if idx_val2decrement is None else \
"%s[%s] = %s[%s]-1" %(col2update, idx_val2decrement, col2update, idx_val2decrement)
inc_str = '' if idx_val2increment is None else \
"%s[%s] = %s[%s]+1" %(col2update, idx_val2increment, col2update, idx_val2increment)
if dec_str or inc_str:
dec_inc_str = ','.join([dec_str, inc_str])
execute_pgSQL_query(cursor, "UPDATE %s SET %s %s" %(tablename, dec_inc_str, condition))
if 'connection' in locals(): connection.commit()
if 'connection' in locals(): connection.close()
def create_record_in_postgreSQLdb(conn_params, tablename, vals2write_dict, timeout=15000):
# type: (str, str, dict, dict, int) -> int
"""Creates a single new record in a postgreSQL database and pupulates its columns with the given values.
......@@ -788,7 +829,7 @@ class GMS_JOB(object):
if not sceneInfoGDF.empty:
self.dataframe = sceneInfoGDF
self.sceneids = list(self.dataframe['sceneid'])
self.statistics = [len(self.sceneids)] + [0] * 7
self.statistics = [len(self.sceneids)] + [0] * 8
self.bounds = box(*MultiPolygon(list(self.dataframe['polygons'])).bounds)
self.timerange_start = self.dataframe.acquisitiondate.min().to_datetime()
self.timerange_end = self.dataframe.acquisitiondate.max().to_datetime()
......@@ -828,6 +869,18 @@ class GMS_JOB(object):
return self
def reset_job_progress(self):
"""Resets everthing in the database entry that has been written during the last run of the job..
"""
self.finishtime = None
self.failed_sceneids = []
self.progress = None
self.status = None
self.statistics = [len(self.sceneids)] + [0] * 8
self.update_db_entry()
def _get_dataframe(self,datadict): # FIXME deprecated
gdf = GeoDataFrame(datadict, columns=['satellite','sensor','filenames'])
gdf.columns = ['satellite','sensor','filename']
......@@ -870,7 +923,7 @@ class GMS_JOB(object):
(self.dataframe['sensor'] ==sen)]['sceneid'].count()
for sat,sen in zip(all_sat,all_sen)]
self.scene_counts = {'%s %s'%(sat,sen):cnt for sat,sen,cnt in zip(all_sat,all_sen,counts)}
self.statistics = [len(self.sceneids)] + [0] * 7
self.statistics = [len(self.sceneids)] + [0] * 8
db_entry = self.db_entry
......@@ -892,8 +945,14 @@ class GMS_JOB(object):
def update_db_entry(self):
"""Updates the all values of current database entry belonging to the respective job ID. New values are taken
from the attributes of the GMS_JOB instance.
"""
assert self.exists_in_db
raise NotImplementedError
db_entry = self.db_entry
del db_entry['id'] # primary key of the record cannot be overwritten
update_records_in_postgreSQLdb(self.conn, 'jobs', db_entry, {'id': self.id})
def delete_procdata_of_failed_sceneIDs(self, proc_level='all', force=False):
......
......@@ -14,7 +14,7 @@ dtype_lib_IDL_Python = {0:np.bool_, 1:np.uint8, 2:np.int16, 3:np.int32, 4:np.flo
dtype_lib_GDAL_Python= {"uint8": 1, "int8": 1, "uint16": 2, "int16": 3, "uint32": 4, "int32": 5, "float32": 6,
"float64": 7, "complex64": 10, "complex128": 11}
proc_chain = ['L1A','L1B','L1C','L2A','L2B','L2C']
db_jobs_statistics_def = {'downloaded':1, 'L1A':2, 'L1B':3, 'L1C':4, 'L2A':5, 'L2B':6, 'L2C':7, 'FAILED':8}
db_jobs_statistics_def = {'downloaded':1, 'started':2, 'L1A':3, 'L1B':4, 'L1C':5, 'L2A':6, 'L2B':7, 'L2C':8, 'FAILED':9}
def get_GMS_sensorcode(GMS_identifier):
......
......@@ -54,23 +54,40 @@ def log_uncaught_exceptions(GMS_mapper):
% (GMS_objs.scene_ID, GMS_objs.entity_ID, GMS_mapper.__name__, GMS_objs.failedMapper))
return GMS_objs
# update statistics column in jobs table of postgreSQL database
if isinstance(GMS_objs, collections.OrderedDict) and GMS_objs['proc_level'] is None:
if not GMS_objs['subsystem'] in ['VNIR2', 'SWIR', 'TIR', 'S2A20', 'S2A60']:
# update statistics column ONLY in case of full cube or first subsystem
DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
CFG.job.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.job.ID},
idx_val2decrement=db_jobs_statistics_def['started']-1,
idx_val2increment=db_jobs_statistics_def['started'])
# run the mapper function and store its results
GMS_objs = GMS_mapper(GMS_objs, **kwargs)
# get a GMS object from which we get the new proc_level
GMS_obj = GMS_objs[0] if isinstance(GMS_objs, (list, tuple)) else GMS_objs
# update statistics column in jobs table of postgreSQL database
idx_val2decrement = db_jobs_statistics_def[GMS_obj.proc_level]-1
idx_val2increment = db_jobs_statistics_def[GMS_obj.proc_level]
with psycopg2.connect(CFG.job.conn_database) as conn:
with conn.cursor() as cursor:
DB_T.execute_pgSQL_query(cursor,
"UPDATE jobs SET "
"statistics[%s] = statistics[%s]-1,"
"statistics[%s] = statistics[%s]+1"
"WHERE id=%s" % (idx_val2decrement, idx_val2decrement,
idx_val2increment, idx_val2increment, CFG.job.ID))
GMS_obj = GMS_objs[0] if isinstance(GMS_objs, (list, tuple)) else GMS_objs # get a GMS object from which we get the new proc_level
# NOTE: in case failed_Obj represents a subsystem and another another one has already been marked as FAILED the
# failed_sceneids column and the statistics column is NOT updated once more
# check if another subsystem of the same scene ID already failed - don't increment the stats anymore
if not GMS_obj.subsystem in ['VNIR2', 'SWIR', 'TIR', 'S2A20', 'S2A60']:
another_ss_failed = False
if GMS_obj.subsystem:
res = DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'jobs', ['failed_sceneids'],
{'id': CFG.job.ID})
assert res, "Query delivered no result."
if res[0][0] is not None and GMS_obj.scene_ID in res[0][0]:
another_ss_failed = True
# update statistics column ONLY in case of full cube or first subsystem and if no other subsystem failed
if not another_ss_failed:
DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
CFG.job.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.job.ID},
idx_val2decrement=db_jobs_statistics_def[GMS_obj.proc_level]-1,
idx_val2increment=db_jobs_statistics_def[GMS_obj.proc_level])
return GMS_objs
except OSError:
......@@ -100,7 +117,6 @@ def log_uncaught_exceptions(GMS_mapper):
# turn off exception handling and raise the error
raise
# collect some informations about failed GMS object and summarize them in failed_GMS_object
arg0 = GMS_objs['scene_ID'] if isinstance(GMS_objs, collections.OrderedDict) else \
GMS_objs[0].scene_ID if isinstance(GMS_objs, list) else GMS_objs
......@@ -111,7 +127,9 @@ def log_uncaught_exceptions(GMS_mapper):
warnings.warn("\nLogged an uncaught exception within %s during processing of scene ID %s (entity "
"ID %s):\n '%s'\n" % (GMS_mapper.__name__, failed_Obj.scene_ID, failed_Obj.entity_ID, value_))
# add the scene ID to failed_sceneids column in jobs table of postgreSQL database
## add the scene ID to failed_sceneids column in jobs table of DB and update statistics column
# NOTE: in case failed_Obj represents a subsystem and another one has already been marked as FAILED the
# failed_sceneids column and the statistics column is NOT updated once more
res = DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'jobs', ['failed_sceneids'],
{'id': CFG.job.ID})
assert res, "Query delivered no result."
......@@ -120,17 +138,11 @@ def log_uncaught_exceptions(GMS_mapper):
DB_T.append_item_to_arrayCol_in_postgreSQLdb(CFG.job.conn_database, 'jobs',
{'failed_sceneids':failed_Obj.scene_ID}, {'id':CFG.job.ID})
# update statistics column in jobs table of postgreSQL database
idx_val2decrement = db_jobs_statistics_def[failed_Obj.proc_level]
idx_val2increment = db_jobs_statistics_def['FAILED']
with psycopg2.connect(CFG.job.conn_database) as conn:
with conn.cursor() as cursor:
DB_T.execute_pgSQL_query(cursor,
"UPDATE jobs SET "
"statistics[%s] = statistics[%s]-1,"
"statistics[%s] = statistics[%s]+1"
"WHERE id=%s" %(idx_val2decrement, idx_val2decrement,
idx_val2increment, idx_val2increment, CFG.job.ID))
# update statistics column in jobs table of postgreSQL database
DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
CFG.job.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.job.ID},
idx_val2decrement=db_jobs_statistics_def[failed_Obj.proc_level],
idx_val2increment=db_jobs_statistics_def['FAILED'])
return failed_Obj
......
......@@ -350,6 +350,7 @@ class process_controller(object):
self.logger.info('Execution of entire GeoMultiSens pre-processing chain started...')
self.job.status = 'running'
self.update_DB_job_record() # TODO implement that into job.status.setter
self.DB_job_record.reset_job_progress()
self.failed_objects = []
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment