Commit f65899ca authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

statistics column of jobs table in pgSQL database is now properly updated during processing

misc.database_tools.GMS_JOB:
- added attributes 'status' and 'statistics'
- db_entry: added docstring
- _populate_jobAttrs_from_sceneInfoGDF(): now also sets self.statistics
- create(): now also sets self.statistics
misc.definition_dicts:
- added dictionary 'db_jobs_statistics_def': added automatic de- and incrementing of scene count included in statistics column of jobs table in database
misc.exception_handler:
- trace_unhandled_exceptions.wrapped_GMS_mapper():
- updated __version__
Former-commit-id: d5593e78
Former-commit-id: cd1e7a26
parent 4146c388
...@@ -15,7 +15,7 @@ from . import config ...@@ -15,7 +15,7 @@ from . import config
from .processing.process_controller import process_controller from .processing.process_controller import process_controller
__version__ = '20170327.01' __version__ = '20170327.02'
__author__ = 'Daniel Scheffler' __author__ = 'Daniel Scheffler'
__all__ = ['algorithms', __all__ = ['algorithms',
'io', 'io',
......
...@@ -562,10 +562,13 @@ class GMS_JOB(object): ...@@ -562,10 +562,13 @@ class GMS_JOB(object):
self.jobs_table_columns = ['id','creationtime','finishtime','sceneids','timerange_start', self.jobs_table_columns = ['id','creationtime','finishtime','sceneids','timerange_start',
'timerange_end','bounds','distribution_index','progress','feedback', 'timerange_end','bounds','distribution_index','progress','feedback',
'failed_sceneids','datasetid_spatial_ref', 'failed_sceneids','datasetid_spatial_ref',
'virtualsensorid','ref_job_id','datacube_mgrs_tiles_proc','comment'] 'virtualsensorid','ref_job_id','datacube_mgrs_tiles_proc','comment',
'status', 'statistics']
self.virtualsensorid = None # set by self._set_target_sensor_specs() self.virtualsensorid = None # set by self._set_target_sensor_specs()
self.datasetid_spatial_ref = 249 # FIXME notnull but not getable via virtual sensor id and not needed anymore in DB (gsd is given) self.datasetid_spatial_ref = 249 # FIXME notnull but not getable via virtual sensor id and not needed anymore in DB (gsd is given)
self.datasetname_spatial_ref = 'SENTINEL-2A' self.datasetname_spatial_ref = 'SENTINEL-2A'
self.status = None
self.statistics = []
self.comment = None self.comment = None
self.epsg = None # set by self._set_target_sensor_specs() self.epsg = None # set by self._set_target_sensor_specs()
self.ground_spatial_sampling = None # set by self._set_target_sensor_specs() self.ground_spatial_sampling = None # set by self._set_target_sensor_specs()
...@@ -595,6 +598,8 @@ class GMS_JOB(object): ...@@ -595,6 +598,8 @@ class GMS_JOB(object):
@property @property
def db_entry(self): def db_entry(self):
"""Returns an OrderedDict containing keys and values of the database entry.
"""
db_entry = collections.OrderedDict() db_entry = collections.OrderedDict()
for i in self.jobs_table_columns: for i in self.jobs_table_columns:
db_entry[i] = getattr(self,i) db_entry[i] = getattr(self,i)
...@@ -783,6 +788,7 @@ class GMS_JOB(object): ...@@ -783,6 +788,7 @@ class GMS_JOB(object):
if not sceneInfoGDF.empty: if not sceneInfoGDF.empty:
self.dataframe = sceneInfoGDF self.dataframe = sceneInfoGDF
self.sceneids = list(self.dataframe['sceneid']) self.sceneids = list(self.dataframe['sceneid'])
self.statistics = [len(self.sceneids)] + [0] * 7
self.bounds = box(*MultiPolygon(list(self.dataframe['polygons'])).bounds) self.bounds = box(*MultiPolygon(list(self.dataframe['polygons'])).bounds)
self.timerange_start = self.dataframe.acquisitiondate.min().to_datetime() self.timerange_start = self.dataframe.acquisitiondate.min().to_datetime()
self.timerange_end = self.dataframe.acquisitiondate.max().to_datetime() self.timerange_end = self.dataframe.acquisitiondate.max().to_datetime()
...@@ -864,6 +870,8 @@ class GMS_JOB(object): ...@@ -864,6 +870,8 @@ class GMS_JOB(object):
(self.dataframe['sensor'] ==sen)]['sceneid'].count() (self.dataframe['sensor'] ==sen)]['sceneid'].count()
for sat,sen in zip(all_sat,all_sen)] for sat,sen in zip(all_sat,all_sen)]
self.scene_counts = {'%s %s'%(sat,sen):cnt for sat,sen,cnt in zip(all_sat,all_sen,counts)} self.scene_counts = {'%s %s'%(sat,sen):cnt for sat,sen,cnt in zip(all_sat,all_sen,counts)}
self.statistics = [len(self.sceneids)] + [0] * 7
db_entry = self.db_entry db_entry = self.db_entry
del db_entry['id'] del db_entry['id']
......
...@@ -14,6 +14,7 @@ dtype_lib_IDL_Python = {0:np.bool_, 1:np.uint8, 2:np.int16, 3:np.int32, 4:np.flo ...@@ -14,6 +14,7 @@ dtype_lib_IDL_Python = {0:np.bool_, 1:np.uint8, 2:np.int16, 3:np.int32, 4:np.flo
dtype_lib_GDAL_Python= {"uint8": 1, "int8": 1, "uint16": 2, "int16": 3, "uint32": 4, "int32": 5, "float32": 6, dtype_lib_GDAL_Python= {"uint8": 1, "int8": 1, "uint16": 2, "int16": 3, "uint32": 4, "int32": 5, "float32": 6,
"float64": 7, "complex64": 10, "complex128": 11} "float64": 7, "complex64": 10, "complex128": 11}
proc_chain = ['L1A','L1B','L1C','L2A','L2B','L2C'] proc_chain = ['L1A','L1B','L1C','L2A','L2B','L2C']
db_jobs_statistics_def = {'downloaded':1, 'L1A':2, 'L1B':3, 'L1C':4, 'L2A':5, 'L2B':6, 'L2C':7, 'FAILED':8}
def get_GMS_sensorcode(GMS_identifier): def get_GMS_sensorcode(GMS_identifier):
......
...@@ -7,10 +7,12 @@ import shutil ...@@ -7,10 +7,12 @@ import shutil
import sys import sys
import traceback import traceback
import warnings import warnings
import psycopg2
from ..algorithms.gms_object import failed_GMS_object from ..algorithms.gms_object import failed_GMS_object
from ..config import GMS_config as CFG from ..config import GMS_config as CFG
from ..misc import database_tools as DB_T from ..misc import database_tools as DB_T
from .definition_dicts import db_jobs_statistics_def
...@@ -52,7 +54,24 @@ def log_uncaught_exceptions(GMS_mapper): ...@@ -52,7 +54,24 @@ def log_uncaught_exceptions(GMS_mapper):
% (GMS_objs.scene_ID, GMS_objs.entity_ID, GMS_mapper.__name__, GMS_objs.failedMapper)) % (GMS_objs.scene_ID, GMS_objs.entity_ID, GMS_mapper.__name__, GMS_objs.failedMapper))
return GMS_objs return GMS_objs
return GMS_mapper(GMS_objs, **kwargs) # run the mapper function and store its results
GMS_objs = GMS_mapper(GMS_objs, **kwargs)
# get a GMS object from which we get the new proc_level
GMS_obj = GMS_objs[0] if isinstance(GMS_objs, (list, tuple)) else GMS_objs
# update statistics column in jobs table of postgreSQL database
idx_val2decrement = db_jobs_statistics_def[GMS_obj.proc_level]-1
idx_val2increment = db_jobs_statistics_def[GMS_obj.proc_level]
with psycopg2.connect(CFG.job.conn_database) as conn:
with conn.cursor() as cursor:
DB_T.execute_pgSQL_query(cursor,
"UPDATE jobs SET "
"statistics[%s] = statistics[%s]-1,"
"statistics[%s] = statistics[%s]+1"
"WHERE id=%s" % (idx_val2decrement, idx_val2decrement,
idx_val2increment, idx_val2increment, CFG.job.ID))
return GMS_objs
except OSError: except OSError:
# get Exception details # get Exception details
...@@ -101,6 +120,18 @@ def log_uncaught_exceptions(GMS_mapper): ...@@ -101,6 +120,18 @@ def log_uncaught_exceptions(GMS_mapper):
DB_T.append_item_to_arrayCol_in_postgreSQLdb(CFG.job.conn_database, 'jobs', DB_T.append_item_to_arrayCol_in_postgreSQLdb(CFG.job.conn_database, 'jobs',
{'failed_sceneids':failed_Obj.scene_ID}, {'id':CFG.job.ID}) {'failed_sceneids':failed_Obj.scene_ID}, {'id':CFG.job.ID})
# update statistics column in jobs table of postgreSQL database
idx_val2decrement = db_jobs_statistics_def[failed_Obj.proc_level]
idx_val2increment = db_jobs_statistics_def['FAILED']
with psycopg2.connect(CFG.job.conn_database) as conn:
with conn.cursor() as cursor:
DB_T.execute_pgSQL_query(cursor,
"UPDATE jobs SET "
"statistics[%s] = statistics[%s]-1,"
"statistics[%s] = statistics[%s]+1"
"WHERE id=%s" %(idx_val2decrement, idx_val2decrement,
idx_val2increment, idx_val2increment, CFG.job.ID))
return failed_Obj return failed_Obj
return wrapped_GMS_mapper return wrapped_GMS_mapper
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment