Commit 685a3793 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Fixed multiprocessing issue.

parent 4897c882
Pipeline #1839 failed with stage
in 5 minutes and 58 seconds
......@@ -120,118 +120,122 @@ class process_controller(object):
def sceneids_failed(self):
return [obj.scene_ID for obj in self.failed_objects]
def add_local_availability(self, datasets):
# type: (List[OrderedDict]) -> List[OrderedDict]
def _add_local_availability_single_dataset(self, dataset):
# type: (OrderedDict) -> OrderedDict
# TODO revise this function
def check_single_dataset(dataset):
# query the database and get the last written processing level and LayerBandsAssignment
DB_match = DB_T.get_info_from_postgreSQLdb(
GMS_config.conn_database, 'scenes_proc', ['proc_level', 'layer_bands_assignment'],
dict(sceneid=dataset['scene_ID']))
# get the corresponding logfile
path_logfile = path_generator(
dataset).get_path_logfile() # FIXME this always returns the logfile for the subsystem.
# FIXME -> merged logfiles (L2A+) are ignored
# FIXME -> for subsystems the highest start procL is L2A
def get_AllWrittenProcL_dueLog(path_log): # TODO replace this by database query + os.path.exists
"""Returns all processing level that have been successfully written according to logfile."""
if not os.path.exists(path_log):
self.logger.info("No logfile named '%s' found for %s at %s. Dataset has to be reprocessed."
% (os.path.basename(path_log), dataset['entity_ID'], os.path.dirname(path_log)))
AllWrittenProcL_dueLog = []
else:
logfile = open(path_log, 'r').read()
AllWrittenProcL_dueLog = re.findall(":*(\S*\s*) data successfully saved.", logfile, re.I)
if not AllWrittenProcL_dueLog: # AllWrittenProcL_dueLog = []
self.logger.info('%s: According to logfile no completely processed data exist at any '
'processing level. Dataset has to be reprocessed.' % dataset['entity_ID'])
else:
AllWrittenProcL_dueLog = HLP_F.sorted_nicely(list(set(AllWrittenProcL_dueLog)))
return AllWrittenProcL_dueLog
# check if there are not multiple database records for this dataset
if len(DB_match) == 1 or DB_match == [] or DB_match == 'database connection fault':
# get all processing level that have been successfully written
AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile)
dataset['proc_level'] = None # default (dataset has to be reprocessed)
# loop through all the found proc. levels and find the one that fulfills all requirements
for ProcL in reversed(AllWrittenProcL):
if dataset['proc_level']:
break # proc_level found; no further searching for lower proc_levels
assumed_path_GMS_file = '%s_%s.gms' % (os.path.splitext(path_logfile)[0], ProcL)
# check if there is also a corresponding GMS_file on disk
if os.path.isfile(assumed_path_GMS_file):
GMS_file_dict = INP_R.GMSfile2dict(assumed_path_GMS_file)
target_LayerBandsAssignment = \
get_LayerBandsAssignment(dict(
image_type=dataset['image_type'],
Satellite=dataset['satellite'],
Sensor=dataset['sensor'],
Subsystem=dataset['subsystem'],
proc_level=ProcL, # must be respected because LBA changes after atm. Corr.
dataset_ID=dataset['dataset_ID'],
logger=None), nBands=(1 if dataset['sensormode'] == 'P' else None))
# check if the LayerBandsAssignment of the written dataset on disk equals the
# desired LayerBandsAssignment
if target_LayerBandsAssignment == GMS_file_dict['LayerBandsAssignment']:
# update the database record if the dataset could not be found in database
if DB_match == [] or DB_match == 'database connection fault':
self.logger.info('The dataset %s is not included in the database of processed data but'
' according to logfile %s has been written successfully. Recreating '
'missing database entry.' % (dataset['entity_ID'], ProcL))
DB_T.data_DB_updater(GMS_file_dict)
# query the database and get the last written processing level and LayerBandsAssignment
DB_match = DB_T.get_info_from_postgreSQLdb(
GMS_config.conn_database, 'scenes_proc', ['proc_level', 'layer_bands_assignment'],
dict(sceneid=dataset['scene_ID']))
# get the corresponding logfile
path_logfile = path_generator(
dataset).get_path_logfile() # FIXME this always returns the logfile for the subsystem.
# FIXME -> merged logfiles (L2A+) are ignored
# FIXME -> for subsystems the highest start procL is L2A
def get_AllWrittenProcL_dueLog(path_log): # TODO replace this by database query + os.path.exists
"""Returns all processing level that have been successfully written according to logfile."""
if not os.path.exists(path_log):
self.logger.info("No logfile named '%s' found for %s at %s. Dataset has to be reprocessed."
% (os.path.basename(path_log), dataset['entity_ID'], os.path.dirname(path_log)))
AllWrittenProcL_dueLog = []
else:
logfile = open(path_log, 'r').read()
AllWrittenProcL_dueLog = re.findall(":*(\S*\s*) data successfully saved.", logfile, re.I)
if not AllWrittenProcL_dueLog: # AllWrittenProcL_dueLog = []
self.logger.info('%s: According to logfile no completely processed data exist at any '
'processing level. Dataset has to be reprocessed.' % dataset['entity_ID'])
else:
AllWrittenProcL_dueLog = HLP_F.sorted_nicely(list(set(AllWrittenProcL_dueLog)))
return AllWrittenProcL_dueLog
# check if there are not multiple database records for this dataset
if len(DB_match) == 1 or DB_match == [] or DB_match == 'database connection fault':
# get all processing level that have been successfully written
AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile)
dataset['proc_level'] = None # default (dataset has to be reprocessed)
# loop through all the found proc. levels and find the one that fulfills all requirements
for ProcL in reversed(AllWrittenProcL):
if dataset['proc_level']:
break # proc_level found; no further searching for lower proc_levels
assumed_path_GMS_file = '%s_%s.gms' % (os.path.splitext(path_logfile)[0], ProcL)
# check if there is also a corresponding GMS_file on disk
if os.path.isfile(assumed_path_GMS_file):
GMS_file_dict = INP_R.GMSfile2dict(assumed_path_GMS_file)
target_LayerBandsAssignment = \
get_LayerBandsAssignment(dict(
image_type=dataset['image_type'],
Satellite=dataset['satellite'],
Sensor=dataset['sensor'],
Subsystem=dataset['subsystem'],
proc_level=ProcL, # must be respected because LBA changes after atm. Corr.
dataset_ID=dataset['dataset_ID'],
logger=None), nBands=(1 if dataset['sensormode'] == 'P' else None))
# check if the LayerBandsAssignment of the written dataset on disk equals the
# desired LayerBandsAssignment
if target_LayerBandsAssignment == GMS_file_dict['LayerBandsAssignment']:
# update the database record if the dataset could not be found in database
if DB_match == [] or DB_match == 'database connection fault':
self.logger.info('The dataset %s is not included in the database of processed data but'
' according to logfile %s has been written successfully. Recreating '
'missing database entry.' % (dataset['entity_ID'], ProcL))
DB_T.data_DB_updater(GMS_file_dict)
dataset['proc_level'] = ProcL
# if the dataset could be found in database
elif len(DB_match) == 1:
try:
self.logger.info('Found a matching %s dataset for %s. Processing skipped until %s.'
% (ProcL, dataset['entity_ID'],
proc_chain[proc_chain.index(ProcL) + 1]))
except IndexError:
self.logger.info('Found a matching %s dataset for %s. Processing already done.'
% (ProcL, dataset['entity_ID']))
if DB_match[0][0] == ProcL:
dataset['proc_level'] = DB_match[0][0]
else:
dataset['proc_level'] = ProcL
# if the dataset could be found in database
elif len(DB_match) == 1:
try:
self.logger.info('Found a matching %s dataset for %s. Processing skipped until %s.'
% (ProcL, dataset['entity_ID'],
proc_chain[proc_chain.index(ProcL) + 1]))
except IndexError:
self.logger.info('Found a matching %s dataset for %s. Processing already done.'
% (ProcL, dataset['entity_ID']))
if DB_match[0][0] == ProcL:
dataset['proc_level'] = DB_match[0][0]
else:
dataset['proc_level'] = ProcL
else:
self.logger.info('Found a matching dataset for %s but with a different '
'LayerBandsAssignment. Dataset has to be reprocessed.'
% dataset['entity_ID'])
else:
self.logger.info('%s for dataset %s has been written due to logfile but no corresponding '
'dataset has been found.' % (ProcL, dataset['entity_ID']) +
' Searching for lower processing level...'
if AllWrittenProcL.index(ProcL) != 0 else '')
self.logger.info('Found a matching dataset for %s but with a different '
'LayerBandsAssignment. Dataset has to be reprocessed.'
% dataset['entity_ID'])
else:
self.logger.info('%s for dataset %s has been written due to logfile but no corresponding '
'dataset has been found.' % (ProcL, dataset['entity_ID']) +
' Searching for lower processing level...'
if AllWrittenProcL.index(ProcL) != 0 else '')
elif len(DB_match) > 1:
self.logger.info('According to database there are multiple matches for the dataset %s. Dataset has to '
'be reprocessed.' % dataset['entity_ID'])
dataset['proc_level'] = None
elif len(DB_match) > 1:
self.logger.info('According to database there are multiple matches for the dataset %s. Dataset has to '
'be reprocessed.' % dataset['entity_ID'])
dataset['proc_level'] = None
else:
dataset['proc_level'] = None
else:
dataset['proc_level'] = None
return dataset
return dataset
datasets = MAP(check_single_dataset, datasets)
def add_local_availability(self, datasets):
# type: (List[OrderedDict]) -> List[OrderedDict]
"""Check availability of all subsets per scene and processing level.
#############################################################
# check availability of all subsets per scene an proc_level #
#############################################################
NOTE: The processing level of those scenes, where not all subsystems are available in the same processing level
is reset.
:param datasets: List of one OrderedDict per subsystem as generated by CFG.data_list
"""
datasets = [self._add_local_availability_single_dataset(ds) for ds in datasets]
datasets_validated = []
datasets_grouped = HLP_F.group_dicts_by_key(datasets, key='scene_ID')
......@@ -241,7 +245,7 @@ class process_controller(object):
if not len(list(set(proc_lvls))) == 1:
# reset processing level of those scenes where not all subsystems are available
self.logger.info('%s: Found already processed subsystems at differnt processing levels %s. '
self.logger.info('%s: Found already processed subsystems at different processing levels %s. '
'Dataset has to be reprocessed to avoid errors'
% (ds_group[0]['entity_ID'], proc_lvls))
......
......@@ -165,11 +165,6 @@ class BaseTestCases:
cls.PC.logger.info('Execution of entire GeoMultiSens pre-processing chain started for job ID %s...'
% cls.PC.config.ID)
# # update attributes of DB_job_record and related DB entry
# cls.PC.config.DB_job_record.reset_job_progress()
#
# cls.PC.config.data_list = cls.PC.add_local_availability(cls.PC.config.data_list)
[cls.validate_db_entry(ds['filename']) for ds in cls.PC.config.data_list]
def test_run_all_processors(self):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment