Commit f71f4d01 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

AC is now skipped if an exception is raised there. Implemented job status...

AC is now skipped if an exception is raised there. Implemented job status updates into process controller.

algorithms.L1C.AtmCorr:
- run_atmospheric_correction(): AC is now skipped if an exception is raised there. NOTE: the output will be TOA reflectance instead of BOA reflectance in that case

processing.process_controller:
- run_all_processors():
     - added error handling for unexpected exceptions within process controller
     - added job.status updates that are also written into database in order to allow status visualization during a running job
- stop(): added job.status updater
- update_DB_job_record(): now 'status' column of jobs database table is also updated

config.Job:
- added attribute 'status'

- updated __version__
parent 0497218d
......@@ -15,7 +15,7 @@ from . import config
from .processing.process_controller import process_controller
__version__ = '20170320.01'
__version__ = '20170327.01'
__author__ = 'Daniel Scheffler'
__all__ = ['algorithms',
'io',
......
......@@ -748,7 +748,7 @@ class AtmCorr(object):
try:
rs_image.logger = self.logger
self.results = AC_GMS(rs_image, self.options, logger=self.logger, script=script)
except:
except Exception as e:
# serialialize AC input
if dump_ac_input:
path_dump = self.inObjs[0].pathGen.get_path_ac_input_dump()
......@@ -762,7 +762,13 @@ class AtmCorr(object):
for inObj in self.inObjs:
inObj.delete_ac_input_arrays()
raise
self.logger.error('\nAn error occurred during atmospheric correction. BE AWARE THAT THE SCENE %s '
'(ENTITY ID %s) HAS NOT BEEN ATMOSPHERICALLY CORRECTED! Error message was: \n%s\n'
%(self.inObjs[0].scene_ID, self.inObjs[0].entity_ID, repr(e)))
# TODO include that in the job summary!
return list(self.inObjs)
# get processing infos
self.proc_info = self.ac_input['options']['processing'] # FIXME this is not appended to GMS objects
......
......@@ -68,6 +68,7 @@ class Job:
assert exec_mode in ['Flink','Python']
assert isinstance(db_host, str), "'db_host must be a string! Got %s." %type(db_host)
self.status = 'initialized' # possible values: 'initialized', 'running', 'canceled', 'failed', 'finished_with_warnings', 'finished_with_errors', 'finished'
self.start_time = datetime.datetime.now()
self.end_time = None
self.computation_time = None
......@@ -75,7 +76,7 @@ class Job:
self.CPUs = multiprocessing.cpu_count()
#self.CPUs = 1
self.allow_subMultiprocessing = True # allows multiprocessing within workers
self.disable_exception_handler = True # disables automatic handling of unexpected exceptions
self.disable_exception_handler = False # disables automatic handling of unexpected exceptions
self.profiling = False
# TODO add log level
......
......@@ -341,53 +341,76 @@ class process_controller(object):
"""
signal.signal(signal.SIGINT, self.stop) # enable clean shutdown possibility
# TODO handle errors
if self.job.profiling:
from pyinstrument import Profiler
self.profiler = Profiler() # or Profiler(use_signal=False), see below
self.profiler.start()
self.logger.info('Execution of entire GeoMultiSens pre-processing chain started...')
self.failed_objects = []
# get list of datasets to be processed
if custom_data_list:
self.usecase.data_list = custom_data_list
try:
if self.job.profiling:
from pyinstrument import Profiler
self.profiler = Profiler() # or Profiler(use_signal=False), see below
self.profiler.start()
self.logger.info('Execution of entire GeoMultiSens pre-processing chain started...')
self.job.status = 'running'
self.update_DB_job_record() # TODO implement that into job.status.setter
self.failed_objects = []
# get list of datasets to be processed
if custom_data_list:
self.usecase.data_list = custom_data_list
# add local availability
self.usecase.data_list = MAP(self.add_local_availability, self.usecase.data_list)
self.L1A_processing()
self.L1B_processing()
self.L1C_processing()
self.L2A_processing()
self.L2B_processing()
self.L2C_processing()
# create summary
self.create_job_summary()
self.logger.info('Execution finished.')
self.job.status = 'finished' if not self.failed_objects else 'finished_with_errors' # TODO implement failed_with_warnings
self.job.end_time = datetime.datetime.now()
self.job.computation_time = self.job.end_time-self.job.start_time
self.logger.info('Time for execution: %s' % self.job.computation_time)
# update database entry of current job
self.update_DB_job_record()
if self.job.profiling:
self.profiler.stop()
print(self.profiler.output_text(unicode=True, color=True))
shutdown_loggers()
except Exception as e:
if self.job.profiling:
self.profiler.stop()
print(self.profiler.output_text(unicode=True, color=True))
self.job.status = 'failed'
self.update_DB_job_record()
self.logger.error('Execution failed with an error:')
if not self.job.disable_exception_handler:
self.logger.error(e)
shutdown_loggers()
else:
shutdown_loggers()
raise
# add local availability
self.usecase.data_list = MAP(self.add_local_availability, self.usecase.data_list)
self.L1A_processing()
self.L1B_processing()
self.L1C_processing()
self.L2A_processing()
self.L2B_processing()
self.L2C_processing()
def stop(self, signum, frame):
"""Interrupt the running process controller gracefully."""
# update database entry of current job
self.job.status = 'canceled'
self.update_DB_job_record()
# create summary
self.create_job_summary()
self.logger.info('Execution finished.')
self.job.end_time = datetime.datetime.now()
self.job.computation_time = self.job.end_time-self.job.start_time
self.logger.info('Time for execution: %s' % self.job.computation_time)
if self.job.profiling:
self.profiler.stop()
print(self.profiler.output_text(unicode=True, color=True))
shutdown_loggers()
def stop(self, signum, frame):
"""Interrupt the running process controller gracefully."""
self.logger.warning('Process controller stopped by user.')
del self.logger
shutdown_loggers()
print('Process controller stopped by user.')
raise KeyboardInterrupt
......@@ -601,11 +624,12 @@ class process_controller(object):
"""
# update 'failed_sceneids' column of job record within jobs table
sceneids_failed = list(set([obj.scene_ID for obj in self.failed_objects]))
DB_T.update_records_in_postgreSQLdb(self.job.conn_database, 'jobs',
{'failed_sceneids': sceneids_failed}, {'id': self.job.ID})
# add job finish timestamp
DB_T.update_records_in_postgreSQLdb(self.job.conn_database, 'jobs', {'finishtime': self.job.end_time},
{'id': self.job.ID})
DB_T.update_records_in_postgreSQLdb(
self.job.conn_database, 'jobs',
{'failed_sceneids': sceneids_failed, # update 'failed_sceneids' column
'finishtime': self.job.end_time, # add job finish timestamp
'status': self.job.status}, # update 'job_status' column
{'id': self.job.ID})
def create_job_summary(self):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment