Commit fa769ced authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Simplified config. Bugfix options_schema. Bugfix test_gms_preprocessing.

Updated version info (v0.14.6).
parent 868b2cad
......@@ -23,7 +23,7 @@ import psutil
from pprint import pformat
from typing import TYPE_CHECKING
from .options_schema import gms_schema_input, gms_schema_config_output, parameter_mapping
from .options_schema import gms_schema_input, gms_schema_config_output, parameter_mapping, get_param_from_json_config
from ..version import __version__, __versionalias__
if TYPE_CHECKING:
......@@ -196,7 +196,7 @@ class JobConfig(object):
self.conn_database = get_conn_database(hostname=self.db_host)
# get validated options dict from JSON-options
json_opts = self.get_json_opts(validate=True)
self.json_opts_fused_valid = self.get_json_opts(validate=True)
gp = self.get_parameter
......@@ -204,60 +204,36 @@ class JobConfig(object):
# global options #
##################
json_globts = json_opts['global_opts'] # type: dict
self.inmem_serialization = \
gp('inmem_serialization', json_globts['inmem_serialization'])
self.parallelization_level = \
gp('parallelization_level', json_globts['parallelization_level'])
self.spatial_index_server_host = \
gp('spatial_index_server_host', json_globts['spatial_index_server_host'])
self.spatial_index_server_port = \
gp('spatial_index_server_port', json_globts['spatial_index_server_port'])
self.CPUs = \
gp('CPUs', json_globts['CPUs'], fallback=multiprocessing.cpu_count())
self.CPUs_all_jobs = \
gp('CPUs_all_jobs', json_globts['CPUs_all_jobs'])
self.max_mem_usage = \
gp('max_mem_usage', json_globts['max_mem_usage'])
self.critical_mem_usage = \
gp('critical_mem_usage', json_globts['critical_mem_usage'])
self.max_parallel_reads_writes = \
gp('max_parallel_reads_writes', json_globts['max_parallel_reads_writes'])
self.allow_subMultiprocessing = \
gp('allow_subMultiprocessing', json_globts['allow_subMultiprocessing'])
self.delete_old_output = \
gp('delete_old_output', json_globts['delete_old_output'])
self.disable_exception_handler = \
gp('disable_exception_handler', json_globts['disable_exception_handler'])
self.disable_IO_locks = \
gp('disable_IO_locks', json_globts['disable_IO_locks'])
self.disable_CPU_locks = \
gp('disable_CPU_locks', json_globts['disable_CPU_locks'])
self.disable_memory_locks = \
gp('disable_memory_locks', json_globts['disable_memory_locks'])
self.min_version_mem_usage_stats = \
gp('min_version_mem_usage_stats', json_globts['min_version_mem_usage_stats'])
self.log_level = \
gp('log_level', json_globts['log_level'])
self.tiling_block_size_XY = \
gp('tiling_block_size_XY', json_globts['tiling_block_size_XY'])
self.is_test = \
gp('is_test', json_globts['is_test'])
self.profiling = \
gp('profiling', json_globts['profiling'])
self.benchmark_global = \
gp('benchmark_global', json_globts['benchmark_global'])
self.inmem_serialization = gp('inmem_serialization')
self.parallelization_level = gp('parallelization_level')
self.spatial_index_server_host = gp('spatial_index_server_host')
self.spatial_index_server_port = gp('spatial_index_server_port')
self.CPUs = gp('CPUs', fallback=multiprocessing.cpu_count())
self.CPUs_all_jobs = gp('CPUs_all_jobs')
self.max_mem_usage = gp('max_mem_usage')
self.critical_mem_usage = gp('critical_mem_usage')
self.max_parallel_reads_writes = gp('max_parallel_reads_writes')
self.allow_subMultiprocessing = gp('allow_subMultiprocessing')
self.delete_old_output = gp('delete_old_output')
self.disable_exception_handler = gp('disable_exception_handler')
self.disable_IO_locks = gp('disable_IO_locks')
self.disable_CPU_locks = gp('disable_CPU_locks')
self.disable_memory_locks = gp('disable_memory_locks')
self.min_version_mem_usage_stats = gp('min_version_mem_usage_stats')
self.log_level = gp('log_level')
self.tiling_block_size_XY = gp('tiling_block_size_XY')
self.is_test = gp('is_test')
self.profiling = gp('profiling')
self.benchmark_global = gp('benchmark_global')
#########
# paths #
#########
json_paths = json_opts['paths'] # type: dict
# external
self.path_spatIdxSrv = self.DB_config_table['path_spatial_index_mediator_server']
self.path_tempdir = self.absP(self.DB_config_table['path_tempdir'])
self.path_custom_sicor_options = gp('path_custom_sicor_options', json_paths['path_custom_sicor_options'])
self.path_custom_sicor_options = gp('path_custom_sicor_options')
self.path_dem_proc_srtm_90m = self.absP(self.DB_config_table['path_dem_proc_srtm_90m'])
# internal (included in gms_preprocessing repository)
......@@ -271,30 +247,17 @@ class JobConfig(object):
'SUNp1fontenla__350-2500nm_@0.1nm_converted.txt')
if not self.is_test:
def get_dbpath(dbkey):
return self.joinP(self.path_fileserver, self.DB_config_table[dbkey])
# normal mode
self.path_fileserver = self.DB_config_table['path_data_root']
self.path_archive = \
gp('path_archive', json_paths['path_archive'],
fallback=self.joinP(self.path_fileserver, self.DB_config_table['foldername_download']))
self.path_procdata_scenes = \
gp('path_procdata_scenes', json_paths['path_procdata_scenes'],
fallback=self.joinP(self.path_fileserver, self.DB_config_table['foldername_procdata_scenes']))
self.path_procdata_MGRS = \
gp('path_procdata_MGRS', json_paths['path_procdata_MGRS'],
fallback=self.joinP(self.path_fileserver, self.DB_config_table['foldername_procdata_MGRS']))
self.path_archive = gp('path_archive', fallback=get_dbpath('foldername_download'))
self.path_procdata_scenes = gp('path_procdata_scenes', fallback=get_dbpath('foldername_procdata_scenes'))
self.path_procdata_MGRS = gp('path_procdata_MGRS', fallback=get_dbpath('foldername_procdata_MGRS'))
self.path_ECMWF_db = self.absP(self.DB_config_table['path_ECMWF_db'])
self.path_benchmarks = \
gp('path_benchmarks', json_paths['path_benchmarks'],
fallback=self.absP(self.DB_config_table['path_benchmarks']))
self.path_job_logs = \
gp('path_job_logs', json_paths['path_job_logs'],
fallback=self.joinP(self.path_fileserver, self.DB_config_table['foldername_job_logs']))
self.path_benchmarks = gp('path_benchmarks', fallback=self.absP(self.DB_config_table['path_benchmarks']))
self.path_job_logs = gp('path_job_logs', fallback=get_dbpath('foldername_job_logs'))
else:
# software test mode, the repository should be self-contained -> use only relative paths
......@@ -312,117 +275,65 @@ class JobConfig(object):
# processor configuration #
###########################
json_processors = json_opts['processors'] # type: dict
# general_opts
self.skip_thermal = \
gp('skip_thermal', json_processors['general_opts']['skip_thermal'])
self.skip_pan = \
gp('skip_pan', json_processors['general_opts']['skip_pan'])
self.sort_bands_by_cwl = \
gp('sort_bands_by_cwl', json_processors['general_opts']['sort_bands_by_cwl'])
self.target_radunit_optical = \
gp('target_radunit_optical', json_processors['general_opts']['target_radunit_optical'])
self.target_radunit_thermal = \
gp('target_radunit_thermal', json_processors['general_opts']['target_radunit_thermal'])
self.scale_factor_TOARef = \
gp('scale_factor_TOARef', json_processors['general_opts']['scale_factor_TOARef'])
self.scale_factor_BOARef = \
gp('scale_factor_BOARef', json_processors['general_opts']['scale_factor_BOARef'])
self.mgrs_pixel_buffer = \
gp('mgrs_pixel_buffer', json_processors['general_opts']['mgrs_pixel_buffer'])
self.output_data_compression = \
gp('output_data_compression', json_processors['general_opts']['output_data_compression'])
self.write_ENVIclassif_cloudmask = \
gp('write_ENVIclassif_cloudmask', json_processors['general_opts']['write_ENVIclassif_cloudmask'])
self.skip_thermal = gp('skip_thermal')
self.skip_pan = gp('skip_pan')
self.sort_bands_by_cwl = gp('sort_bands_by_cwl')
self.target_radunit_optical = gp('target_radunit_optical')
self.target_radunit_thermal = gp('target_radunit_thermal')
self.scale_factor_TOARef = gp('scale_factor_TOARef')
self.scale_factor_BOARef = gp('scale_factor_BOARef')
self.mgrs_pixel_buffer = gp('mgrs_pixel_buffer')
self.output_data_compression = gp('output_data_compression')
self.write_ENVIclassif_cloudmask = gp('write_ENVIclassif_cloudmask')
# processor specific opts
# L1A
self.exec_L1AP = gp('exec_L1AP', [
json_processors['L1A']['run_processor'],
json_processors['L1A']['write_output'],
json_processors['L1A']['delete_output']])
self.SZA_SAA_calculation_accurracy = \
gp('SZA_SAA_calculation_accurracy', json_processors['L1A']['SZA_SAA_calculation_accurracy'])
self.export_VZA_SZA_SAA_RAA_stats = \
gp('export_VZA_SZA_SAA_RAA_stats', json_processors['L1A']['export_VZA_SZA_SAA_RAA_stats'])
self.exec_L1AP = gp('exec_L1AP')
self.SZA_SAA_calculation_accurracy = gp('SZA_SAA_calculation_accurracy')
self.export_VZA_SZA_SAA_RAA_stats = gp('export_VZA_SZA_SAA_RAA_stats')
# L1B
self.exec_L1BP = gp('exec_L1BP', [
json_processors['L1B']['run_processor'],
json_processors['L1B']['write_output'],
json_processors['L1B']['delete_output']])
self.skip_coreg = gp('skip_coreg', json_processors['L1B']['skip_coreg'])
self.spatial_ref_min_overlap = \
gp('spatial_ref_min_overlap', json_processors['L1B']['spatial_ref_min_overlap'])
self.spatial_ref_min_cloudcov = \
gp('spatial_ref_min_cloudcov', json_processors['L1B']['spatial_ref_min_cloudcov'])
self.spatial_ref_max_cloudcov = \
gp('spatial_ref_max_cloudcov', json_processors['L1B']['spatial_ref_max_cloudcov'])
self.spatial_ref_plusminus_days = \
gp('spatial_ref_plusminus_days', json_processors['L1B']['spatial_ref_plusminus_days'])
self.spatial_ref_plusminus_years = \
gp('spatial_ref_plusminus_years', json_processors['L1B']['spatial_ref_plusminus_years'])
self.coreg_band_wavelength_for_matching = \
gp('coreg_band_wavelength_for_matching', json_processors['L1B']['coreg_band_wavelength_for_matching'])
self.coreg_max_shift_allowed = \
gp('coreg_max_shift_allowed', json_processors['L1B']['coreg_max_shift_allowed'])
self.coreg_window_size = \
gp('coreg_window_size', json_processors['L1B']['coreg_window_size'])
self.exec_L1BP = gp('exec_L1BP')
self.skip_coreg = gp('skip_coreg')
self.spatial_ref_min_overlap = gp('spatial_ref_min_overlap')
self.spatial_ref_min_cloudcov = gp('spatial_ref_min_cloudcov')
self.spatial_ref_max_cloudcov = gp('spatial_ref_max_cloudcov')
self.spatial_ref_plusminus_days = gp('spatial_ref_plusminus_days')
self.spatial_ref_plusminus_years = gp('spatial_ref_plusminus_years')
self.coreg_band_wavelength_for_matching = gp('coreg_band_wavelength_for_matching')
self.coreg_max_shift_allowed = gp('coreg_max_shift_allowed')
self.coreg_window_size = gp('coreg_window_size')
# L1C
self.exec_L1CP = gp('exec_L1CP', [
json_processors['L1C']['run_processor'],
json_processors['L1C']['write_output'],
json_processors['L1C']['delete_output']])
self.cloud_masking_algorithm = \
gp('cloud_masking_algorithm', json_processors['L1C']['cloud_masking_algorithm'])
self.export_L1C_obj_dumps = \
gp('export_L1C_obj_dumps', json_processors['L1C']['export_L1C_obj_dumps'])
self.auto_download_ecmwf = \
gp('auto_download_ecmwf', json_processors['L1C']['auto_download_ecmwf'])
self.ac_fillnonclear_areas = \
gp('ac_fillnonclear_areas', json_processors['L1C']['ac_fillnonclear_areas'])
self.ac_clear_area_labels = \
gp('ac_clear_area_labels', json_processors['L1C']['ac_clear_area_labels'])
self.ac_scale_factor_errors = \
gp('ac_scale_factor_errors', json_processors['L1C']['ac_scale_factor_errors'])
self.ac_max_ram_gb = \
gp('ac_max_ram_gb', json_processors['L1C']['ac_max_ram_gb'])
self.ac_estimate_accuracy = \
gp('ac_estimate_accuracy', json_processors['L1C']['ac_estimate_accuracy'])
self.ac_bandwise_accuracy = \
gp('ac_bandwise_accuracy', json_processors['L1C']['ac_bandwise_accuracy'])
self.exec_L1CP = gp('exec_L1CP')
self.cloud_masking_algorithm = gp('cloud_masking_algorithm')
self.export_L1C_obj_dumps = gp('export_L1C_obj_dumps')
self.auto_download_ecmwf = gp('auto_download_ecmwf')
self.ac_fillnonclear_areas = gp('ac_fillnonclear_areas')
self.ac_clear_area_labels = gp('ac_clear_area_labels')
self.ac_scale_factor_errors = gp('ac_scale_factor_errors')
self.ac_max_ram_gb = gp('ac_max_ram_gb')
self.ac_estimate_accuracy = gp('ac_estimate_accuracy')
self.ac_bandwise_accuracy = gp('ac_bandwise_accuracy')
# L2A
self.exec_L2AP = gp('exec_L2AP', [
json_processors['L2A']['run_processor'],
json_processors['L2A']['write_output'],
json_processors['L2A']['delete_output']])
self.align_coord_grids = gp('align_coord_grids', json_processors['L2A']['align_coord_grids'])
self.match_gsd = gp('match_gsd', json_processors['L2A']['match_gsd'])
self.spatial_resamp_alg = gp('spatial_resamp_alg', json_processors['L2A']['spatial_resamp_alg'])
self.clip_to_extent = gp('clip_to_extent', json_processors['L2A']['clip_to_extent'])
self.spathomo_estimate_accuracy = \
gp('spathomo_estimate_accuracy', json_processors['L2A']['spathomo_estimate_accuracy'])
self.exec_L2AP = gp('exec_L2AP')
self.align_coord_grids = gp('align_coord_grids')
self.match_gsd = gp('match_gsd')
self.spatial_resamp_alg = gp('spatial_resamp_alg')
self.clip_to_extent = gp('clip_to_extent')
self.spathomo_estimate_accuracy = gp('spathomo_estimate_accuracy')
# L2B
self.exec_L2BP = gp('exec_L2BP', [
json_processors['L2B']['run_processor'],
json_processors['L2B']['write_output'],
json_processors['L2B']['delete_output']])
self.spechomo_method = gp('spechomo_method', json_processors['L2B']['spechomo_method'])
self.spechomo_estimate_accuracy = \
gp('spechomo_estimate_accuracy', json_processors['L2B']['spechomo_estimate_accuracy'])
self.spechomo_bandwise_accuracy = \
gp('spechomo_bandwise_accuracy', json_processors['L2B']['spechomo_bandwise_accuracy'])
self.exec_L2BP = gp('exec_L2BP')
self.spechomo_method = gp('spechomo_method')
self.spechomo_estimate_accuracy = gp('spechomo_estimate_accuracy')
self.spechomo_bandwise_accuracy = gp('spechomo_bandwise_accuracy')
# L2C
self.exec_L2CP = gp('exec_L2CP', [
json_processors['L2C']['run_processor'],
json_processors['L2C']['write_output'],
json_processors['L2C']['delete_output']])
self.exec_L2CP = gp('exec_L2CP')
################################
# target sensor specifications #
......@@ -483,7 +394,7 @@ class JobConfig(object):
return {'args': {k: v for k, v in argskwargs.items() if k in argsnames},
'kwargs': {k: v for k, v in argskwargs.items() if k not in argsnames}}
def get_parameter(self, key_user_opts, val_json=None, attr_db_job_record='', fallback=None):
def get_parameter(self, key_user_opts, attr_db_job_record='', fallback=None):
# 1. JobConfig parameters: parameters that are directly passed to JobConfig
if key_user_opts in self.kwargs:
return self.kwargs[key_user_opts]
......@@ -493,6 +404,7 @@ class JobConfig(object):
return getattr(self.DB_job_record, attr_db_job_record)
# 3. JSON parameters: parameters that have been defined via JSON Input (command line or advanced UI params)
val_json = get_param_from_json_config(key_user_opts, self.json_opts_fused_valid)
if val_json or val_json is False:
return val_json
......
......@@ -233,6 +233,7 @@ parameter_mapping = dict(
exec_L1BP=('processors', 'L1B', ['run_processor', 'write_output', 'delete_output']),
skip_coreg=('processors', 'L1B', 'skip_coreg'),
spatial_ref_min_overlap=('processors', 'L1B', 'spatial_ref_min_overlap'),
spatial_ref_min_cloudcov=('processors', 'L1B', 'spatial_ref_min_cloudcov'),
spatial_ref_max_cloudcov=('processors', 'L1B', 'spatial_ref_max_cloudcov'),
spatial_ref_plusminus_days=('processors', 'L1B', 'spatial_ref_plusminus_days'),
spatial_ref_plusminus_years=('processors', 'L1B', 'spatial_ref_plusminus_years'),
......
__version__ = '0.14.5'
__version__ = '0.14.6'
__versionalias__ = '20180319.01'
......@@ -65,7 +65,7 @@ gmsRepo_rootpath = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')
# Defining the configurations needed to start a job containing the different dataset scenes.
# TODO Change the job-configurations for selected datasets.
job_config_kwargs = dict(parallelization_level='scenes', db_host=db_host, spatial_index_server_host=index_host,
delete_old_output=True, is_test=True,
delete_old_output=True, is_test=True, reset_status=True,
inmem_serialization=False,
exec_L1AP=[True, True, True], exec_L1BP=[True, True, True], exec_L1CP=[True, True, True],
exec_L2AP=[True, True, True], exec_L2BP=[True, True, False], exec_L2CP=[True, True, False])
......@@ -421,7 +421,6 @@ class Test_ProcessContinuing_CompletePipeline(unittest.TestCase):
def setUp(self):
self.cfg_kw = job_config_kwargs.copy() # copy, because job_config_kwargs is modified otherwise
self.cfg_kw.update(dict(
reset_status=True,
exec_L1BP=[False, False, False],
exec_L1CP=[False, False, False],
exec_L2AP=[False, False, False],
......@@ -687,6 +686,6 @@ if __name__ == '__main__':
logger.info("END.") # OUTPUT, END.
# Delete the handlers added to the "log_Test"-logger to ensure that no message is outputted twice in a row, when
# Delete the handlers added to the "log_Test"-logger to ensure that no message is output twice in a row, when
# the logger is used again.
logger.handlers = []
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment