Commit 69484df0 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Fixed for L2B processing.

Former-commit-id: 6680c7b2
Former-commit-id: d6e81881
parent 926e3d1e
......@@ -115,11 +115,14 @@ def sorted_nicely(iterable):
return sorted(iterable, key=alphanum_key)
def proc_level_already_present(current_lvl, target_lvl):
if current_lvl is None or proc_chain.index(current_lvl) < proc_chain.index(target_lvl):
return False
else: # current_lvl >= target_lvl
return True
def is_proc_level_lower(current_lvl, target_lvl):
# type: (str, str) -> bool
"""Return True if current_lvl is lower than target_lvl.
:param current_lvl: current processing level (to be tested)
:param target_lvl: target processing level (refernce)
"""
return current_lvl is None or proc_chain.index(current_lvl) < proc_chain.index(target_lvl)
def convert_absPathArchive_to_GDALvsiPath(path_archive):
......
# -*- coding: utf-8 -*-
from typing import List, Tuple, Union # noqa F401 # flake8 issue
from typing import List, Tuple, Generator, Iterable, Union # noqa F401 # flake8 issue
from ..options.config import GMS_config as CFG
from ..misc import exception_handler as EXC_H
from ..misc.path_generator import path_generator
from ..algorithms import L1A_P
from ..algorithms import L1B_P
from ..algorithms import L1C_P
......@@ -98,7 +99,7 @@ def L1B_map(L1A_obj):
@EXC_H.log_uncaught_exceptions
def L1C_map(L1B_objs):
# type: (List[L1B_P.L1B_object]) -> List[L1C_P.L1C_object]
# type: (Iterable[L1B_P.L1B_object]) -> List[L1C_P.L1C_object]
"""Atmospheric correction.
NOTE: all subsystems (containing all spatial samplings) belonging to the same scene ID are needed
......@@ -123,7 +124,7 @@ def L1C_map(L1B_objs):
% CFG.target_radunit_optical) for L1C_obj in L1C_objs]
# write outputs and delete temporary data
for i, L1C_obj in enumerate(L1C_objs):
for L1C_obj in L1C_objs:
if CFG.exec_L1CP[1]:
L1C_obj.to_ENVI()
if L1C_obj.arr_shape == 'cube':
......@@ -133,17 +134,18 @@ def L1C_map(L1B_objs):
@EXC_H.log_uncaught_exceptions
def L2A_map(L1C_objs, block_size=None):
# type: (Union[List[L1C_P.L1C_object], Tuple[L1C_P.L1C_object]]) -> List[L2A_P.L2A_object]
def L2A_map(L1C_objs, block_size=None, return_tiles=True):
# type: (Union[List[L1C_P.L1C_object], Tuple[L1C_P.L1C_object]]) -> Union[List[L2A_P.L2A_object], L2A_P.L2A_object]
"""Geometric homogenization.
Performs correction of geometric displacements, resampling to target grid of the usecase and merges multiple
GMS objects belonging to the same scene ID (subsystems) to a single L2A object.
NOTE: Output L2A_object must be cut into tiles because L2A_obj size in memory exceeds maximum serialization size.
:param L1C_objs: list containing one or multiple L1C objects belonging to the same scene ID.
:param block_size: X/Y size of output tiles in pixels, e.g. (1024,1024)
:return: list of L2A_object tiles
:param L1C_objs: list containing one or multiple L1C objects belonging to the same scene ID.
:param block_size: X/Y size of output tiles in pixels, e.g. (1024,1024)
:param return_tiles: return computed L2A object in tiles
:return: list of L2A_object tiles
"""
# initialize L2A objects
......@@ -168,9 +170,11 @@ def L2A_map(L1C_objs, block_size=None):
# delete tempfiles of separate subsystem GMS objects
[L2A_obj.delete_tempFiles() for L2A_obj in L2A_objs]
L2A_tiles = L2A_obj.to_tiles(blocksize=block_size if block_size else CFG.tiling_block_size_XY)
return list(L2A_tiles)
if return_tiles:
L2A_tiles = L2A_obj.to_tiles(blocksize=block_size if block_size else CFG.tiling_block_size_XY)
return list(L2A_tiles)
else:
return L2A_obj
@EXC_H.log_uncaught_exceptions
......@@ -194,3 +198,109 @@ def L2C_map(L2B_obj):
[MGRS_tile.to_ENVI(compression=CFG.output_data_compression) for MGRS_tile in L2C_MRGS_tiles]
L2C_obj.delete_tempFiles()
return L2C_obj # contains no array data in Python mode
@EXC_H.log_uncaught_exceptions
def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise parallelization)
# type: (List[dict]) -> Union[L1A_P.GMS_object, List, Generator]
from ..model.gms_object import failed_GMS_object
if len(list(set([ds['proc_level'] for ds in list_dataset_dicts_per_scene]))) != 1:
raise ValueError('Lists of subsystem datasets with different processing levels are not supported here. '
'Received %s.' % list_dataset_dicts_per_scene)
input_proc_level = list_dataset_dicts_per_scene[0]['proc_level']
##################
# L1A processing #
##################
L1A_objects = []
if CFG.exec_L1AP[0] and input_proc_level is None:
L1A_objects = [L1A_map(subsystem_dataset_dict) for subsystem_dataset_dict in list_dataset_dicts_per_scene]
if isinstance(L1A_objects, failed_GMS_object): # only single a failed_GMS_object is returned in case of subsets
return L1A_objects
##################
# L1B processing #
##################
# select subsystem with optimal band for co-registration
# L1B_obj_coreg = L1B_map(L1A_objects[0])
# copy coreg information to remaining subsets
L1B_objects = L1A_objects
if CFG.exec_L1BP[0]:
# add earlier processed L1A data
if input_proc_level == 'L1A':
for ds in list_dataset_dicts_per_scene:
GMSfile = path_generator(ds, proc_level='L1A').get_path_gmsfile()
L1A_objects += L1A_P.L1A_object().from_disk([GMSfile, ['cube', None]])
L1B_objects = [L1B_map(L1A_obj) for L1A_obj in L1A_objects]
if isinstance(L1B_objects, failed_GMS_object): # only single a failed_GMS_object is returned in case of subsets
return L1B_objects
##################
# L1C processing #
##################
L1C_objects = L1B_objects
if CFG.exec_L1CP[0]:
# add earlier processed L1B data
if input_proc_level == 'L1B':
for ds in list_dataset_dicts_per_scene:
GMSfile = path_generator(ds, proc_level='L1B').get_path_gmsfile()
L1B_objects += L1B_P.L1B_object().from_disk([GMSfile, ['cube', None]])
L1C_objects = L1C_map(L1B_objects)
if isinstance(L1C_objects, failed_GMS_object) or not CFG.exec_L2AP[0]:
# only single a failed_GMS_object is returned in case of subsets
return L1C_objects
##################
# L2A processing #
##################
# add earlier processed L1C data
if input_proc_level == 'L1C':
for ds in list_dataset_dicts_per_scene:
GMSfile = path_generator(ds, proc_level='L1C').get_path_gmsfile()
L1C_objects += L1C_P.L1C_object().from_disk([GMSfile, ['cube', None]])
L2A_obj = L2A_map(L1C_objects, return_tiles=False)
if isinstance(L2A_obj, failed_GMS_object) or not CFG.exec_L2BP[0]:
return L2A_obj
##################
# L2B processing #
##################
# add earlier processed L2A data
if input_proc_level == 'L2A':
assert len(list_dataset_dicts_per_scene) == 1, 'Expected only a single L2A dataset since subsystems are merged.'
GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2A').get_path_gmsfile()
L2A_obj = L2A_P.L2A_object().from_disk([GMSfile, ['cube', None]])
L2B_obj = L2B_map(L2A_obj)
if isinstance(L2B_obj, failed_GMS_object) or not CFG.exec_L2CP[0]:
return L2B_obj
##################
# L2C processing #
##################
# add earlier processed L2B data
if input_proc_level == 'L2B':
assert len(list_dataset_dicts_per_scene) == 1, 'Expected only a single L2B dataset since subsystems are merged.'
GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2B').get_path_gmsfile()
L2B_obj = L2B_P.L2B_object().from_disk([GMSfile, ['cube', None]])
L2C_obj = L2C_map(L2B_obj)
return L2C_obj
......@@ -120,10 +120,34 @@ class process_controller(object):
def sceneids_failed(self):
return [obj.scene_ID for obj in self.failed_objects]
@staticmethod
def add_local_availability(dataset):
def add_local_availability(self, datasets):
# TODO revise this function
# TODO this does not respect that all subsystems of the same scene ID must be available at the same proc level!
if isinstance(datasets, (list, tuple)):
datasets = MAP(self.add_local_availability, datasets)
# check availability of all subsets per scene an proc_level
datasets_validated = []
datasets_grouped = HLP_F.group_dicts_by_key(datasets, key='scene_ID')
for ds_group in datasets_grouped:
proc_lvls = [ds['proc_level'] for ds in ds_group]
if not len(list(set(proc_lvls))) == 1:
# reset processing level of those scenes where not all subsystems are available
self.logger.info('%s: Found already processed subsystems %s. Remaining subsystems are missing. '
'Therefore, the dataset has to be reprocessed.'
% (ds_group[0]['entity_ID'], proc_lvls))
for ds in ds_group:
ds['proc_level'] = None
datasets_validated += ds
else:
datasets_validated.extend(ds_group)
return datasets_validated
else:
dataset = datasets
# query the database and get the last written processing level and LayerBandsAssignment
DB_match = DB_T.get_info_from_postgreSQLdb(
......@@ -141,15 +165,15 @@ class process_controller(object):
"""Returns all processing level that have been successfully written according to logfile."""
if not os.path.exists(path_log):
print("No logfile named '%s' found for %s at %s. Dataset has to be reprocessed."
% (os.path.basename(path_log), dataset['entity_ID'], os.path.dirname(path_log)))
self.logger.info("No logfile named '%s' found for %s at %s. Dataset has to be reprocessed."
% (os.path.basename(path_log), dataset['entity_ID'], os.path.dirname(path_log)))
AllWrittenProcL_dueLog = []
else:
logfile = open(path_log, 'r').read()
AllWrittenProcL_dueLog = re.findall(":*(\S*\s*) data successfully saved.", logfile, re.I)
if not AllWrittenProcL_dueLog: # AllWrittenProcL_dueLog = []
print('%s: According to logfile no completely processed data exist at any processing level. '
'Dataset has to be reprocessed.' % dataset['entity_ID'])
self.logger.info('%s: According to logfile no completely processed data exist at any processing '
'level. Dataset has to be reprocessed.' % dataset['entity_ID'])
else:
AllWrittenProcL_dueLog = HLP_F.sorted_nicely(list(set(AllWrittenProcL_dueLog)))
return AllWrittenProcL_dueLog
......@@ -186,9 +210,9 @@ class process_controller(object):
# update the database record if the dataset could not be found in database
if DB_match == [] or DB_match == 'database connection fault':
print('The dataset %s is not included in the database of processed data but according to '
'logfile %s has been written successfully. Recreating missing database entry.'
% (dataset['entity_ID'], ProcL))
self.logger.info('The dataset %s is not included in the database of processed data but'
' according to logfile %s has been written successfully. Recreating '
'missing database entry.' % (dataset['entity_ID'], ProcL))
DB_T.data_DB_updater(GMS_file_dict)
dataset['proc_level'] = ProcL
......@@ -196,11 +220,12 @@ class process_controller(object):
# if the dataset could be found in database
elif len(DB_match) == 1:
try:
print('Found a matching %s dataset for %s. Processing skipped until %s.'
% (ProcL, dataset['entity_ID'], proc_chain[proc_chain.index(ProcL) + 1]))
self.logger.info('Found a matching %s dataset for %s. Processing skipped until %s.'
% (ProcL, dataset['entity_ID'],
proc_chain[proc_chain.index(ProcL) + 1]))
except IndexError:
print('Found a matching %s dataset for %s. Processing already done.'
% (ProcL, dataset['entity_ID']))
self.logger.info('Found a matching %s dataset for %s. Processing already done.'
% (ProcL, dataset['entity_ID']))
if DB_match[0][0] == ProcL:
dataset['proc_level'] = DB_match[0][0]
......@@ -208,16 +233,17 @@ class process_controller(object):
dataset['proc_level'] = ProcL
else:
print('Found a matching dataset for %s but with a different LayerBandsAssignment. '
'Dataset has to be reprocessed.' % dataset['entity_ID'])
self.logger.info('Found a matching dataset for %s but with a different LayerBandsAssignment. '
'Dataset has to be reprocessed.' % dataset['entity_ID'])
else:
print('%s for dataset %s has been written due to logfile but no corresponding dataset has been '
'found.' % (ProcL, dataset['entity_ID']) + ' Searching for lower processing level...'
if AllWrittenProcL.index(ProcL) != 0 else '')
self.logger.info('%s for dataset %s has been written due to logfile but no corresponding dataset '
'has been found.' % (ProcL, dataset['entity_ID']) +
' Searching for lower processing level...'
if AllWrittenProcL.index(ProcL) != 0 else '')
elif len(DB_match) > 1:
print('According to database there are multiple matches for the dataset %s. Dataset has to be reprocessed.'
% dataset['entity_ID'])
self.logger.info('According to database there are multiple matches for the dataset %s. Dataset has to be '
'reprocessed.' % dataset['entity_ID'])
dataset['proc_level'] = None
else:
......@@ -235,16 +261,6 @@ class process_controller(object):
# check if the scene ID of the given dataset is in the scene IDs of the previously processed datasets
return dataset['scene_ID'] in [obj.scene_ID for obj in GMS_objects]
@staticmethod
def _is_already_present(dataset, procLvl):
"""Checks if the given dataset is already available on disk.
:param dataset: <GMS object>
:param procLvl: <str> processing level to be checked
:return: <bool>
"""
return HLP_F.proc_level_already_present(dataset['proc_level'], procLvl)
def _get_processor_data_list(self, procLvl, prevLvl_objects=None):
"""Returns a list of datasets that have to be read from disk and then processed by a specific processor.
......@@ -252,13 +268,13 @@ class process_controller(object):
:param prevLvl_objects:
:return:
"""
def is_already_present(dataset):
return HLP_F.proc_level_already_present(dataset['proc_level'], target_lvl=procLvl)
def is_procL_lower(dataset):
return HLP_F.is_proc_level_lower(dataset['proc_level'], target_lvl=procLvl)
if prevLvl_objects is None:
return [dataset for dataset in self.config.data_list if not is_already_present(dataset)] # TODO generator?
return [dataset for dataset in self.config.data_list if is_procL_lower(dataset)] # TODO generator?
else:
return [dataset for dataset in self.config.data_list if not is_already_present(dataset) and
return [dataset for dataset in self.config.data_list if is_procL_lower(dataset) and
not self._is_inMEM(prevLvl_objects + self.failed_objects, dataset)]
def get_DB_objects(self, procLvl, prevLvl_objects=None, parallLev=None, blocksize=None):
......@@ -277,8 +293,8 @@ class process_controller(object):
return []
else:
# handle input parameters
parallLev = parallLev if parallLev else self.config.parallelization_level
blocksize = blocksize if blocksize else self.config.tiling_block_size_XY
parallLev = parallLev or self.config.parallelization_level
blocksize = blocksize or self.config.tiling_block_size_XY
prevLvl = proc_chain[proc_chain.index(procLvl) - 1] # TODO replace by enum
# get GMSfile list
......@@ -311,7 +327,7 @@ class process_controller(object):
return DB_objs
def run_all_processors(self, custom_data_list=None):
def run_all_processors_OLD(self, custom_data_list=None):
"""
Run all processors at once.
"""
......@@ -338,7 +354,7 @@ class process_controller(object):
self.config.data_list = custom_data_list
# add local availability
self.config.data_list = MAP(self.add_local_availability, self.config.data_list)
self.config.data_list = self.add_local_availability(self.config.data_list)
self.update_DB_job_statistics(self.config.data_list)
self.L1A_processing()
......@@ -383,6 +399,77 @@ class process_controller(object):
shutdown_loggers()
raise
def run_all_processors(self, custom_data_list=None):
signal.signal(signal.SIGINT, self.stop) # enable clean shutdown possibility
# noinspection PyBroadException
try:
if self.config.profiling:
from pyinstrument import Profiler
self.profiler = Profiler() # or Profiler(use_signal=False), see below
self.profiler.start()
self.logger.info('Execution of entire GeoMultiSens pre-processing chain started for job ID %s...'
% self.config.ID)
self.DB_job_record.reset_job_progress() # updates attributes of DB_job_record and related DB entry
self.config.status = 'running'
self.update_DB_job_record() # TODO implement that into config.status.setter
self.failed_objects = []
# get list of datasets to be processed
if custom_data_list:
self.config.data_list = custom_data_list
# add local availability
self.config.data_list = self.add_local_availability(self.config.data_list)
self.update_DB_job_statistics(self.config.data_list)
# group dataset dicts by sceneid
dataset_groups = HLP_F.group_dicts_by_key(self.config.data_list, key='scene_ID')
from .pipeline import run_complete_preprocessing
GMS_objs = MAP(run_complete_preprocessing, dataset_groups)
# separate results into successful and failed objects
self.L2C_newObjects = [obj for obj in GMS_objs if isinstance(obj, L2C_P.L2C_object)]
self.failed_objects = [obj for obj in GMS_objs if isinstance(obj, failed_GMS_object)]
# create summary
self.create_job_summary()
self.logger.info('Execution finished.')
# TODO implement failed_with_warnings:
self.config.status = 'finished' if not self.failed_objects else 'finished_with_errors'
self.config.end_time = datetime.datetime.now()
self.config.computation_time = self.config.end_time - self.config.start_time
self.logger.info('Time for execution: %s' % self.config.computation_time)
# update database entry of current job
self.update_DB_job_record()
if self.config.profiling:
self.profiler.stop()
print(self.profiler.output_text(unicode=True, color=True))
shutdown_loggers()
except Exception: # noqa E722 # bare except
if self.config.profiling:
self.profiler.stop()
print(self.profiler.output_text(unicode=True, color=True))
self.config.status = 'failed'
self.update_DB_job_record()
if not self.config.disable_exception_handler:
self.logger.error('Execution failed with an error:', exc_info=True)
shutdown_loggers()
else:
self.logger.error('Execution failed with an error:')
shutdown_loggers()
raise
def stop(self, signum, frame):
"""Interrupt the running process controller gracefully."""
......
......@@ -145,6 +145,36 @@ class BaseTestCases:
# FIXME after updating the job.status-attribute for the level-processes, delete the code that is commented
# FIXME out.
class TestCompletePipeline(unittest.TestCase):
PC = None # default
@classmethod
def tearDownClass(cls):
cls.PC.config.DB_job_record.delete_procdata_of_entire_job(force=True)
@classmethod
def validate_db_entry(cls, filename):
sceneID_res = get_info_from_postgreSQLdb(cls.PC.config.conn_database, 'scenes', ['id'],
{'filename': filename})
assert sceneID_res and isinstance(sceneID_res[0][0], int), 'Invalid database entry.'
@classmethod
def create_job(cls, jobID, config):
cls.PC = process_controller(jobID, **config)
cls.PC.logger.info('Execution of entire GeoMultiSens pre-processing chain started for job ID %s...'
% cls.PC.config.ID)
# update attributes of DB_job_record and related DB entry
cls.PC.config.DB_job_record.reset_job_progress()
[cls.PC.add_local_availability(ds) for ds in cls.PC.config.data_list]
[cls.validate_db_entry(ds['filename']) for ds in cls.PC.config.data_list]
def test_run_all_processors2(self):
self.PC.run_all_processors2()
self.assertIsInstance(self.PC.L2C_newObjects, list)
###################################################################################
# Test cases 1-9: Test_<Satelite-Dataset>_<PreCollection or Collection>Data
......@@ -231,6 +261,17 @@ class Test_Sentinel2A_SingleGranuleFormat(BaseTestCases.TestAll):
cls.create_job(26186268, job_config_kwargs)
class Test_Sentinel2A_SingleGranuleFormat_CompletePipeline(BaseTestCases.TestCompletePipeline):
"""
Parametrized testclass. Tests the level-processes on a Sentinel-2A MSI scene (1 granule in archive: > 2017).
More information on the dataset will be output after the tests-classes are executed.
"""
@classmethod
def setUpClass(cls):
# job_config_kwargs['CPUs'] = 1
cls.create_job(26186268, job_config_kwargs)
class Test_Sentinel2A_MultiGranuleFormat(BaseTestCases.TestAll):
"""
Parametrized testclass. Tests the level-processes on a Sentinel-2A MSI scene (multiple granules in archive: < 2017).
......@@ -261,6 +302,16 @@ class Test_MultipleDatasetsInOneJob(BaseTestCases.TestAll):
cls.create_job(26186273, job_config_kwargs)
class Test_MultipleDatasetsInOneJob_CompletePipeline(BaseTestCases.TestCompletePipeline):
"""
Parametrized testclass. Tests the level-processes on a job containing a Landsat-5 (pre-collection data),
Landsat-7 SLC_off (pre-collection data) and a Sentinel-2A (collection data) scene.
"""
@classmethod
def setUpClass(cls):
cls.create_job(26186273, job_config_kwargs)
###################################################################################
# Summarizing the information regarding the test datasets.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment