Commit 05a7271e authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Bugfix.

Former-commit-id: 8fcb7057
Former-commit-id: 25537e3f
parent 69484df0
......@@ -121,135 +121,133 @@ class process_controller(object):
return [obj.scene_ID for obj in self.failed_objects]
def add_local_availability(self, datasets):
# type: (list) -> list
# TODO revise this function
if isinstance(datasets, (list, tuple)):
datasets = MAP(self.add_local_availability, datasets)
# check availability of all subsets per scene an proc_level
datasets_validated = []
datasets_grouped = HLP_F.group_dicts_by_key(datasets, key='scene_ID')
for ds_group in datasets_grouped:
proc_lvls = [ds['proc_level'] for ds in ds_group]
if not len(list(set(proc_lvls))) == 1:
# reset processing level of those scenes where not all subsystems are available
self.logger.info('%s: Found already processed subsystems %s. Remaining subsystems are missing. '
'Therefore, the dataset has to be reprocessed.'
% (ds_group[0]['entity_ID'], proc_lvls))
for ds in ds_group:
ds['proc_level'] = None
datasets_validated += ds
for dataset in datasets:
# query the database and get the last written processing level and LayerBandsAssignment
DB_match = DB_T.get_info_from_postgreSQLdb(
GMS_config.conn_database, 'scenes_proc', ['proc_level', 'layer_bands_assignment'],
dict(sceneid=dataset['scene_ID']))
# get the corresponding logfile
path_logfile = path_generator(
dataset).get_path_logfile() # FIXME this always returns the logfile for the subsystem.
# FIXME -> merged logfiles (L2A+) are ignored
# FIXME -> for subsystems the highest start procL is L2A
def get_AllWrittenProcL_dueLog(path_log): # TODO replace this by database query + os.path.exists
"""Returns all processing level that have been successfully written according to logfile."""
if not os.path.exists(path_log):
self.logger.info("No logfile named '%s' found for %s at %s. Dataset has to be reprocessed."
% (os.path.basename(path_log), dataset['entity_ID'], os.path.dirname(path_log)))
AllWrittenProcL_dueLog = []
else:
datasets_validated.extend(ds_group)
return datasets_validated
logfile = open(path_log, 'r').read()
AllWrittenProcL_dueLog = re.findall(":*(\S*\s*) data successfully saved.", logfile, re.I)
if not AllWrittenProcL_dueLog: # AllWrittenProcL_dueLog = []
self.logger.info('%s: According to logfile no completely processed data exist at any '
'processing level. Dataset has to be reprocessed.' % dataset['entity_ID'])
else:
AllWrittenProcL_dueLog = HLP_F.sorted_nicely(list(set(AllWrittenProcL_dueLog)))
return AllWrittenProcL_dueLog
# check if there are not multiple database records for this dataset
if len(DB_match) == 1 or DB_match == [] or DB_match == 'database connection fault':
# get all processing level that have been successfully written
AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile)
dataset['proc_level'] = None # default (dataset has to be reprocessed)
# loop through all the found proc. levels and find the one that fulfills all requirements
for ProcL in reversed(AllWrittenProcL):
if dataset['proc_level']:
break # proc_level found; no further searching for lower proc_levels
assumed_path_GMS_file = '%s_%s.gms' % (os.path.splitext(path_logfile)[0], ProcL)
# check if there is also a corresponding GMS_file on disk
if os.path.isfile(assumed_path_GMS_file):
GMS_file_dict = INP_R.GMSfile2dict(assumed_path_GMS_file)
target_LayerBandsAssignment = \
get_LayerBandsAssignment(dict(
image_type=dataset['image_type'],
Satellite=dataset['satellite'],
Sensor=dataset['sensor'],
Subsystem=dataset['subsystem'],
proc_level=ProcL, # must be respected because LBA changes after atm. Corr.
dataset_ID=dataset['dataset_ID'],
logger=None), nBands=(1 if dataset['sensormode'] == 'P' else None))
# check if the LayerBandsAssignment of the written dataset on disk equals the
# desired LayerBandsAssignment
if target_LayerBandsAssignment == GMS_file_dict['LayerBandsAssignment']:
# update the database record if the dataset could not be found in database
if DB_match == [] or DB_match == 'database connection fault':
self.logger.info('The dataset %s is not included in the database of processed data but'
' according to logfile %s has been written successfully. Recreating '
'missing database entry.' % (dataset['entity_ID'], ProcL))
DB_T.data_DB_updater(GMS_file_dict)
else:
dataset = datasets
dataset['proc_level'] = ProcL
# query the database and get the last written processing level and LayerBandsAssignment
DB_match = DB_T.get_info_from_postgreSQLdb(
GMS_config.conn_database, 'scenes_proc', ['proc_level', 'layer_bands_assignment'],
dict(sceneid=dataset['scene_ID']))
# if the dataset could be found in database
elif len(DB_match) == 1:
try:
self.logger.info('Found a matching %s dataset for %s. Processing skipped until %s.'
% (ProcL, dataset['entity_ID'],
proc_chain[proc_chain.index(ProcL) + 1]))
except IndexError:
self.logger.info('Found a matching %s dataset for %s. Processing already done.'
% (ProcL, dataset['entity_ID']))
if DB_match[0][0] == ProcL:
dataset['proc_level'] = DB_match[0][0]
else:
dataset['proc_level'] = ProcL
else:
self.logger.info('Found a matching dataset for %s but with a different '
'LayerBandsAssignment. Dataset has to be reprocessed.'
% dataset['entity_ID'])
else:
self.logger.info('%s for dataset %s has been written due to logfile but no corresponding '
'dataset has been found.' % (ProcL, dataset['entity_ID']) +
' Searching for lower processing level...'
if AllWrittenProcL.index(ProcL) != 0 else '')
# get the corresponding logfile
path_logfile = path_generator(
dataset).get_path_logfile() # FIXME this always returns the logfile for the subsystem.
elif len(DB_match) > 1:
self.logger.info('According to database there are multiple matches for the dataset %s. Dataset has to '
'be reprocessed.' % dataset['entity_ID'])
dataset['proc_level'] = None
# FIXME -> merged logfiles (L2A+) are ignored
# FIXME -> for subsystems the highest start procL is L2A
else:
dataset['proc_level'] = None
def get_AllWrittenProcL_dueLog(path_log): # TODO replace this by database query + os.path.exists
"""Returns all processing level that have been successfully written according to logfile."""
#############################################################
# check availability of all subsets per scene an proc_level #
#############################################################
if not os.path.exists(path_log):
self.logger.info("No logfile named '%s' found for %s at %s. Dataset has to be reprocessed."
% (os.path.basename(path_log), dataset['entity_ID'], os.path.dirname(path_log)))
AllWrittenProcL_dueLog = []
else:
logfile = open(path_log, 'r').read()
AllWrittenProcL_dueLog = re.findall(":*(\S*\s*) data successfully saved.", logfile, re.I)
if not AllWrittenProcL_dueLog: # AllWrittenProcL_dueLog = []
self.logger.info('%s: According to logfile no completely processed data exist at any processing '
'level. Dataset has to be reprocessed.' % dataset['entity_ID'])
else:
AllWrittenProcL_dueLog = HLP_F.sorted_nicely(list(set(AllWrittenProcL_dueLog)))
return AllWrittenProcL_dueLog
# check if there are not multiple database records for this dataset
if len(DB_match) == 1 or DB_match == [] or DB_match == 'database connection fault':
# get all processing level that have been successfully written
AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile)
dataset['proc_level'] = None # default (dataset has to be reprocessed)
# loop through all the found proc. levels and find the one that fulfills all requirements
for ProcL in reversed(AllWrittenProcL):
if dataset['proc_level']:
break # proc_level found; no further searching for lower proc_levels
assumed_path_GMS_file = '%s_%s.gms' % (os.path.splitext(path_logfile)[0], ProcL)
# check if there is also a corresponding GMS_file on disk
if os.path.isfile(assumed_path_GMS_file):
GMS_file_dict = INP_R.GMSfile2dict(assumed_path_GMS_file)
target_LayerBandsAssignment = \
get_LayerBandsAssignment(dict(
image_type=dataset['image_type'],
Satellite=dataset['satellite'],
Sensor=dataset['sensor'],
Subsystem=dataset['subsystem'],
proc_level=ProcL, # must be respected because LBA changes after atm. Corr.
dataset_ID=dataset['dataset_ID'],
logger=None), nBands=(1 if dataset['sensormode'] == 'P' else None))
# check if the LayerBandsAssignment of the written dataset on disk equals the
# desired LayerBandsAssignment
if target_LayerBandsAssignment == GMS_file_dict['LayerBandsAssignment']:
# update the database record if the dataset could not be found in database
if DB_match == [] or DB_match == 'database connection fault':
self.logger.info('The dataset %s is not included in the database of processed data but'
' according to logfile %s has been written successfully. Recreating '
'missing database entry.' % (dataset['entity_ID'], ProcL))
DB_T.data_DB_updater(GMS_file_dict)
dataset['proc_level'] = ProcL
# if the dataset could be found in database
elif len(DB_match) == 1:
try:
self.logger.info('Found a matching %s dataset for %s. Processing skipped until %s.'
% (ProcL, dataset['entity_ID'],
proc_chain[proc_chain.index(ProcL) + 1]))
except IndexError:
self.logger.info('Found a matching %s dataset for %s. Processing already done.'
% (ProcL, dataset['entity_ID']))
if DB_match[0][0] == ProcL:
dataset['proc_level'] = DB_match[0][0]
else:
dataset['proc_level'] = ProcL
datasets_validated = []
datasets_grouped = HLP_F.group_dicts_by_key(datasets, key='scene_ID')
else:
self.logger.info('Found a matching dataset for %s but with a different LayerBandsAssignment. '
'Dataset has to be reprocessed.' % dataset['entity_ID'])
else:
self.logger.info('%s for dataset %s has been written due to logfile but no corresponding dataset '
'has been found.' % (ProcL, dataset['entity_ID']) +
' Searching for lower processing level...'
if AllWrittenProcL.index(ProcL) != 0 else '')
for ds_group in datasets_grouped:
proc_lvls = [ds['proc_level'] for ds in ds_group]
elif len(DB_match) > 1:
self.logger.info('According to database there are multiple matches for the dataset %s. Dataset has to be '
'reprocessed.' % dataset['entity_ID'])
dataset['proc_level'] = None
if not len(list(set(proc_lvls))) == 1:
# reset processing level of those scenes where not all subsystems are available
self.logger.info('%s: Found already processed subsystems %s. Remaining subsystems are missing. '
'Therefore, the dataset has to be reprocessed.'
% (ds_group[0]['entity_ID'], proc_lvls))
else:
dataset['proc_level'] = None
for ds in ds_group:
ds['proc_level'] = None
datasets_validated += ds
else:
datasets_validated.extend(ds_group)
return dataset
return datasets_validated
@staticmethod
def _is_inMEM(GMS_objects, dataset):
......
......@@ -100,7 +100,7 @@ class BaseTestCases:
# update attributes of DB_job_record and related DB entry
cls.PC.config.DB_job_record.reset_job_progress()
[cls.PC.add_local_availability(ds) for ds in cls.PC.config.data_list]
cls.PC.config.data_list = cls.PC.add_local_availability(cls.PC.config.data_list)
[cls.validate_db_entry(ds['filename']) for ds in cls.PC.config.data_list]
......@@ -168,12 +168,12 @@ class BaseTestCases:
# update attributes of DB_job_record and related DB entry
cls.PC.config.DB_job_record.reset_job_progress()
[cls.PC.add_local_availability(ds) for ds in cls.PC.config.data_list]
cls.PC.config.data_list = cls.PC.add_local_availability(cls.PC.config.data_list)
[cls.validate_db_entry(ds['filename']) for ds in cls.PC.config.data_list]
def test_run_all_processors2(self):
self.PC.run_all_processors2()
def test_run_all_processors(self):
self.PC.run_all_processors()
self.assertIsInstance(self.PC.L2C_newObjects, list)
###################################################################################
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment