Commit c71e9c5e authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Renamed option conversion_type_optical to target_radunit_optical and...

Renamed option conversion_type_optical to target_radunit_optical and conversion_type_thermal to target_radunit_thermal. Keys in provided JSON file are now optional. Bugfixes.
Former-commit-id: 981de81d
Former-commit-id: 68ff63b7
parent da504a90
......@@ -271,7 +271,7 @@ class L1A_object(GMS_object):
def calc_TOARadRefTemp(self, subset=None):
"""Convert DN, Rad or TOA_Ref data to TOA Reflectance, to Radiance or to Surface Temperature
(depending on CFG.conversion_type_optical and conversion_type_thermal).
(depending on CFG.target_radunit_optical and target_radunit_thermal).
The function can be executed by a L1A_object representing a full scene or a tile. To process a file from disk
in tiles, provide an item of self.tile_pos as the 'subset' argument."""
......@@ -305,7 +305,7 @@ class L1A_object(GMS_object):
for optical_thermal in ['optical', 'thermal']:
if optical_thermal not in self.dict_LayerOptTherm.values():
continue
conv = getattr(CFG, 'conversion_type_%s' % optical_thermal)
conv = getattr(CFG, 'target_radunit_%s' % optical_thermal)
conv = conv if conv != 'BOA_Ref' else 'TOA_Ref'
assert conv in ['Rad', 'TOA_Ref', 'Temp'], 'Unsupported conversion type: %s' % conv
arr_desc = self.arr_desc.split('/')[0] if optical_thermal == 'optical' else self.arr_desc.split('/')[-1]
......@@ -390,8 +390,8 @@ class L1A_object(GMS_object):
self.update_spec_vals_according_to_dtype('int16')
tiles_desc = '_'.join([desc for op_th, desc in zip(['optical', 'thermal'],
[CFG.conversion_type_optical,
CFG.conversion_type_thermal])
[CFG.target_radunit_optical,
CFG.target_radunit_thermal])
if desc in self.dict_LayerOptTherm.values()])
self.arr = dataOut
......
......@@ -52,8 +52,8 @@ class GEOPROCESSING(object):
def __init__(self, geodata, logger, workspace=None, subset=None, v=None):
self.logger = logger
self.subset = subset
self.conversion_type_optical = ''
self.conversion_type_thermal = ''
self.target_radunit_optical = ''
self.target_radunit_thermal = ''
self.outpath_conv = None
# check gdal environment
if v is not None:
......@@ -587,7 +587,7 @@ class GEOPROCESSING(object):
extent = [self.cols, self.rows]
if self.subset is not None and self.subset[0] == 'custom' and \
self.conversion_type_optical == '' and self.conversion_type_thermal == '':
self.target_radunit_optical == '' and self.target_radunit_thermal == '':
# conversion to Rad or Ref overwrites self.inDs
# => custom bandsList contains bands that are NOT in range(self.bands)
bands2process = self.bandsList
......
......@@ -855,8 +855,8 @@ class GMS_object(Dataset):
self.logger.info("Writing tiles '%s' temporarily to disk..." % tiles[0]['desc'])
outpath = os.path.join(self.ExtractedFolder, '%s__%s.%s' % (self.baseN, tiles[0]['desc'], self.outInterleave))
if CFG.conversion_type_optical in tiles[0]['desc'] or \
CFG.conversion_type_thermal in tiles[0]['desc']:
if CFG.target_radunit_optical in tiles[0]['desc'] or \
CFG.target_radunit_thermal in tiles[0]['desc']:
self.meta_odict = self.MetaObj.to_odict() # important in order to keep geotransform/projection
self.arr_desc = tiles[0]['desc']
self.arr = outpath
......
......@@ -1414,12 +1414,12 @@ class METADATA(object):
'BOA_Ref': 'BOA_Reflectance in [0-%d]' % CFG.scale_factor_BOARef,
'Temp': 'Degrees Celsius with scale factor = 100'}
if list(set(dict_LayerOptTherm.values())) == ['optical']:
self.PhysUnit = dict_conv_physUnit[CFG.conversion_type_optical]
self.PhysUnit = dict_conv_physUnit[CFG.target_radunit_optical]
elif list(set(dict_LayerOptTherm.values())) == ['thermal']:
self.PhysUnit = dict_conv_physUnit[CFG.conversion_type_thermal]
self.PhysUnit = dict_conv_physUnit[CFG.target_radunit_thermal]
elif sorted(list(set(dict_LayerOptTherm.values()))) == ['optical', 'thermal']:
self.PhysUnit = ['Optical bands: %s' % dict_conv_physUnit[CFG.conversion_type_optical],
'Thermal bands: %s' % dict_conv_physUnit[CFG.conversion_type_thermal]]
self.PhysUnit = ['Optical bands: %s' % dict_conv_physUnit[CFG.target_radunit_optical],
'Thermal bands: %s' % dict_conv_physUnit[CFG.target_radunit_thermal]]
else:
logger = self.logger if hasattr(self, 'logger') else temp_logger
assert logger, "ERROR: Physical unit could not be determined due to unexpected 'dict_LayerOptTherm'. " \
......@@ -1478,7 +1478,7 @@ class METADATA(object):
# copy directly compatible keys
Meta = collections.OrderedDict()
# Meta['description'] = descr_dic[self.Satellite + '_' + CFG.conversion_type_optical]
# Meta['description'] = descr_dic[self.Satellite + '_' + CFG.target_radunit_optical]
for odictKey in enviHdr_keyOrder:
if odictKey in map_odictKeys_objAttrnames:
......@@ -1807,7 +1807,7 @@ def get_LayerBandsAssignment(GMS_identifier, nBands=None, ignore_usecase=False,
# remove those bands that are excluded by atmospheric corrections if proc_level >= L1C
if GMS_identifier['proc_level'] not in [None, 'L1A', 'L1B']: # TODO replace with enum procL
if CFG.conversion_type_optical == 'BOA_Ref':
if CFG.target_radunit_optical == 'BOA_Ref':
path_ac_options = get_path_ac_options(GMS_identifier)
if path_ac_options and os.path.exists(path_ac_options):
# FIXME this does not work for L7
......
......@@ -20,7 +20,7 @@ import pkgutil
from pprint import pformat
from typing import TYPE_CHECKING
from .options_schema import gms_schema
from .options_schema import gms_schema_input, gms_schema_config_output
if TYPE_CHECKING:
from gms_preprocessing.misc.database_tools import GMS_JOB # noqa F401 # flake8 issue
......@@ -40,14 +40,15 @@ class GMS_configuration(object):
raise EnvironmentError("Config has not been set already on this machine. Run 'set_config()' first!'")
GMS_config = GMS_configuration()
GMS_config = GMS_configuration() # type: JobConfig
path_gmslib = os.path.dirname(pkgutil.get_loader("gms_preprocessing").path)
path_options_default = os.path.join(path_gmslib, 'options', 'options_default.json')
def set_config(job_ID, exec_mode='Python', db_host='localhost', reset=False, json_config='', exec_L1AP=None,
def set_config(job_ID, json_config='', exec_mode='Python', parallelization_level='scenes', db_host='localhost',
reset_status=False, delete_old_output=False, exec_L1AP=None,
exec_L1BP=None, exec_L1CP=None, exec_L2AP=None, exec_L2BP=None, exec_L2CP=None, CPUs=None,
allow_subMultiprocessing=True, disable_exception_handler=True, log_level='INFO',
tiling_block_size_XY=(2048, 2048), is_test=False, profiling=False, benchmark_global=False,
......@@ -55,11 +56,15 @@ def set_config(job_ID, exec_mode='Python', db_host='localhost', reset=False, jso
"""Set up a configuration for a new gms_preprocessing job!
:param job_ID: job ID of the job to be executed, e.g. 123456 (must be present in database)
:param json_config path to JSON file containing configuration parameters or a string in JSON format
:param exec_mode: 'Python': writes intermediate results to disk in order to save memory
'Flink': keeps intermediate results in memory in order to save IO time
:param parallelization_level: <str> choices: 'scenes' - parallelization on scene-level
'tiles' - parallelisation on tile-level
:param db_host: host name of the server that runs the postgreSQL database
:param reset: whether to reset the job status or not (default=False)
:param json_config path to JSON file containing configuration parameters or a string in JSON format
:param reset_status: whether to reset the job status or not (default=False)
:param delete_old_output: <bool> whether to delete previously created output of the given job ID
before running the job (default = False)
:param exec_L1AP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L1BP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L1CP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
......@@ -86,7 +91,7 @@ def set_config(job_ID, exec_mode='Python', db_host='localhost', reset=False, jso
output path to store processed MGRS tiles
:param path_archive: input path where downloaded data are stored
"""
if not hasattr(builtins, 'GMS_JobConfig') or reset:
if not hasattr(builtins, 'GMS_JobConfig') or reset_status:
kwargs = dict([x for x in locals().items() if x[0] != "self" and not x[0].startswith('__')])
builtins.GMS_JobConfig = JobConfig(job_ID, **kwargs)
......@@ -94,7 +99,7 @@ def set_config(job_ID, exec_mode='Python', db_host='localhost', reset=False, jso
class JobConfig(object):
def __init__(self, ID, db_host='localhost', **user_opts):
def __init__(self, ID, **user_opts):
"""Create a job configuration
Workflow:
......@@ -104,8 +109,8 @@ class JobConfig(object):
# => zuerst JobConfig auf Basis von JSON erstellen
# 2. dann überschreiben mit user-defined parametern (entweder init-parameter oder db-settings per webapp)
:param ID: job ID of the job to be executed, e.g. 123456 (must be present in database)
:param db_host: host name of the server that runs the postgreSQL database
:param ID: job ID of the job to be executed, e.g. 123456 (must be present in database)
:param user_opts keyword arguments to be passed to gms_preprocessing.set_config()
"""
# privates
self._DB_job_record = None # type: GMS_JOB
......@@ -127,10 +132,10 @@ class JobConfig(object):
# args
self.ID = ID
self.db_host = db_host
self.kwargs = user_opts
# database connection
self.db_host = user_opts['db_host']
self.conn_database = "dbname='geomultisens' user='gmsdb' password='gmsdb' host='%s' connect_timeout=3" \
% self.db_host
......@@ -147,8 +152,12 @@ class JobConfig(object):
self.exec_mode = \
gp('exec_mode', json_globts['exec_mode'])
self.parallelization_level = \
gp('parallelization_level', json_globts['parallelization_level'])
self.CPUs = \
gp('CPUs', json_globts['CPUs'], fallback=multiprocessing.cpu_count())
self.delete_old_output = \
gp('delete_old_output', json_globts['delete_old_output'])
self.allow_subMultiprocessing = \
gp('allow_subMultiprocessing', json_globts['allow_subMultiprocessing'])
self.disable_exception_handler = \
......@@ -234,10 +243,10 @@ class JobConfig(object):
gp('skip_pan', json_processors['general_opts']['skip_pan'])
self.sort_bands_by_cwl = \
gp('sort_bands_by_cwl', json_processors['general_opts']['sort_bands_by_cwl'])
self.conversion_type_optical = \
gp('conversion_type_optical', json_processors['general_opts']['conversion_type_optical'])
self.conversion_type_thermal = \
gp('conversion_type_thermal', json_processors['general_opts']['conversion_type_thermal'])
self.target_radunit_optical = \
gp('target_radunit_optical', json_processors['general_opts']['target_radunit_optical'])
self.target_radunit_thermal = \
gp('target_radunit_thermal', json_processors['general_opts']['target_radunit_thermal'])
self.scale_factor_TOARef = \
gp('scale_factor_TOARef', json_processors['general_opts']['scale_factor_TOARef'])
self.scale_factor_BOARef = \
......@@ -335,6 +344,7 @@ class JobConfig(object):
############
self.validate_exec_configs()
GMSValidator(allow_unknown=True, schema=gms_schema_config_output).validate(self.to_dict())
@property
def kwargs_defaults(self):
......@@ -368,7 +378,7 @@ class JobConfig(object):
return getattr(self.DB_job_record, attr_db_job_record)
# 3. JSON parameters: parameters that have been defined via JSON Input (command line or advanced UI params)
if val_json and val_json is not None:
if val_json or val_json is False:
return val_json
# fallback: if nothing has been returned until here
......@@ -471,7 +481,7 @@ class JobConfig(object):
default_options.update(params_dict)
if validate:
GMSValidator(allow_unknown=True, schema=gms_schema).validate(default_options)
GMSValidator(allow_unknown=True, schema=gms_schema_input).validate(default_options)
json_options = default_options
return json_options
......@@ -628,7 +638,7 @@ class JobConfig(object):
def is_GMSConfig_available():
try:
if GMS_config.job is not None:
if GMS_config is not None:
return True
except (EnvironmentError, OSError):
return False
......@@ -717,7 +727,7 @@ def get_options(target, validation=True):
options = json_to_python(json.loads(jsmin(fl.read())))
if validation is True:
GMSValidator(allow_unknown=True, schema=gms_schema).validate(options)
GMSValidator(allow_unknown=True, schema=gms_schema_input).validate(options)
return options
else:
......
{
"global_opts": {
"exec_mode": "Python", /*"Python" or "Flink"*/
"parallelization_level": "scenes", /*"scenes" or "tiles"*/
"db_host": "localhost",
"CPUs": "None", /*number of CPU cores to be used for processing (default: "None" -> use all available)*/
"delete_old_output": false, /*whether to delete previously created output of the given job ID*/
"allow_subMultiprocessing": true, /*allow multiprocessing within multiprocessing workers*/
"disable_exception_handler": false, /*enable/disable automatic handling of unexpected exceptions*/
"log_level": "INFO", /*the logging level to be used (choices: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL';*/
......@@ -40,8 +42,8 @@
"skip_thermal": true,
"skip_pan": true,
"sort_bands_by_cwl": true,
"conversion_type_optical": "BOA_Ref", /*'Rad' / 'TOA_Ref' / 'BOA_Ref'*/
"conversion_type_thermal": "Rad", /*'Rad' / 'Temp'*/
"target_radunit_optical": "BOA_Ref", /*'Rad' / 'TOA_Ref' / 'BOA_Ref'*/
"target_radunit_thermal": "Rad", /*'Rad' / 'Temp'*/
"scale_factor_TOARef": 10000,
"scale_factor_BOARef": 10000
},
......
"""Definition of gms options schema (as used by cerberus library)."""
gms_schema = dict(
gms_schema_input = dict(
global_opts=dict(
type='dict', required=True,
type='dict', required=False,
schema=dict(
exec_mode=dict(type='string', required=True, allowed=['Python', 'Flink']),
db_host=dict(type='string', required=True),
CPUs=dict(type='integer', required=True, nullable=True),
allow_subMultiprocessing=dict(type='boolean', required=True),
disable_exception_handler=dict(type='boolean', required=True),
log_level=dict(type='string', required=True, allowed=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
tiling_block_size_XY=dict(type='list', required=True, schema=dict(type="integer"), minlength=2,
exec_mode=dict(type='string', required=False, allowed=['Python', 'Flink']),
parallelization_level=dict(type='string', required=False, allowed=['scenes', 'tiles']),
db_host=dict(type='string', required=False),
CPUs=dict(type='integer', required=False, nullable=True),
delete_old_output=dict(type='boolean', required=False),
allow_subMultiprocessing=dict(type='boolean', required=False),
disable_exception_handler=dict(type='boolean', required=False),
log_level=dict(type='string', required=False, allowed=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
tiling_block_size_XY=dict(type='list', required=False, schema=dict(type="integer"), minlength=2,
maxlength=2),
is_test=dict(type='boolean', required=True),
profiling=dict(type='boolean', required=True),
benchmark_global=dict(type='boolean', required=True),
is_test=dict(type='boolean', required=False),
profiling=dict(type='boolean', required=False),
benchmark_global=dict(type='boolean', required=False),
)),
paths=dict(
type='dict', required=True,
type='dict', required=False,
schema=dict(
path_fileserver=dict(type='string', required=True),
path_archive=dict(type='string', required=True),
path_procdata_scenes=dict(type='string', required=True),
path_procdata_MGRS=dict(type='string', required=True),
path_tempdir=dict(type='string', required=True),
path_benchmarks=dict(type='string', required=True),
path_job_logs=dict(type='string', required=True),
path_spatIdxSrv=dict(type='string', required=True),
path_ac_tables=dict(type='string', required=True),
path_SNR_models=dict(type='string', required=True),
path_SRFs=dict(type='string', required=True),
path_dem_proc_srtm_90m=dict(type='string', required=True),
path_earthSunDist=dict(type='string', required=True),
path_solar_irr=dict(type='string', required=True),
path_cloud_classif=dict(type='string', required=True),
path_ECMWF_db=dict(type='string', required=True),
path_fileserver=dict(type='string', required=False),
path_archive=dict(type='string', required=False),
path_procdata_scenes=dict(type='string', required=False),
path_procdata_MGRS=dict(type='string', required=False),
path_tempdir=dict(type='string', required=False),
path_benchmarks=dict(type='string', required=False),
path_job_logs=dict(type='string', required=False),
path_spatIdxSrv=dict(type='string', required=False),
path_ac_tables=dict(type='string', required=False),
path_SNR_models=dict(type='string', required=False),
path_SRFs=dict(type='string', required=False),
path_dem_proc_srtm_90m=dict(type='string', required=False),
path_earthSunDist=dict(type='string', required=False),
path_solar_irr=dict(type='string', required=False),
path_cloud_classif=dict(type='string', required=False),
path_ECMWF_db=dict(type='string', required=False),
)),
processors=dict(
type='dict', required=True,
type='dict', required=False,
schema=dict(
general_opts=dict(type='dict', required=True, schema=dict(
skip_thermal=dict(type='boolean', required=True),
skip_pan=dict(type='boolean', required=True),
sort_bands_by_cwl=dict(type='boolean', required=True),
conversion_type_optical=dict(type='string', required=True, allowed=['Rad', 'TOA_Ref', 'BOA_Ref']),
conversion_type_thermal=dict(type='string', required=True, allowed=['Rad', 'Temp']),
scale_factor_TOARef=dict(type='integer', required=True),
scale_factor_BOARef=dict(type='integer', required=True),
general_opts=dict(type='dict', required=False, schema=dict(
skip_thermal=dict(type='boolean', required=False),
skip_pan=dict(type='boolean', required=False),
sort_bands_by_cwl=dict(type='boolean', required=False),
target_radunit_optical=dict(type='string', required=False, allowed=['Rad', 'TOA_Ref', 'BOA_Ref']),
target_radunit_thermal=dict(type='string', required=False, allowed=['Rad', 'Temp']),
scale_factor_TOARef=dict(type='integer', required=False),
scale_factor_BOARef=dict(type='integer', required=False),
)),
L1A=dict(type='dict', required=True, schema=dict(
run_processor=dict(type='boolean', required=True),
write_output=dict(type='boolean', required=True),
delete_output=dict(type='boolean', required=True),
L1A=dict(type='dict', required=False, schema=dict(
run_processor=dict(type='boolean', required=False),
write_output=dict(type='boolean', required=False),
delete_output=dict(type='boolean', required=False),
SZA_SAA_calculation_accurracy=dict(type='string', required=False, allowed=['coarse', 'fine']),
export_VZA_SZA_SAA_RAA_stats=dict(type='boolean', required=True),
export_VZA_SZA_SAA_RAA_stats=dict(type='boolean', required=False),
)),
L1B=dict(type='dict', required=True, schema=dict(
run_processor=dict(type='boolean', required=True),
write_output=dict(type='boolean', required=True),
delete_output=dict(type='boolean', required=True),
skip_coreg=dict(type='boolean', required=True),
L1B=dict(type='dict', required=False, schema=dict(
run_processor=dict(type='boolean', required=False),
write_output=dict(type='boolean', required=False),
delete_output=dict(type='boolean', required=False),
skip_coreg=dict(type='boolean', required=False),
)),
L1C=dict(type='dict', required=True, schema=dict(
run_processor=dict(type='boolean', required=True),
write_output=dict(type='boolean', required=True),
delete_output=dict(type='boolean', required=True),
L1C=dict(type='dict', required=False, schema=dict(
run_processor=dict(type='boolean', required=False),
write_output=dict(type='boolean', required=False),
delete_output=dict(type='boolean', required=False),
cloud_masking_algorithm=dict(type='dict', required=False, schema={
'Landsat-4': dict(type='string', required=True, allowed=['FMASK', 'Classical Bayesian', 'SICOR']),
'Landsat-5': dict(type='string', required=True, allowed=['FMASK', 'Classical Bayesian', 'SICOR']),
'Landsat-7': dict(type='string', required=True, allowed=['FMASK', 'Classical Bayesian', 'SICOR']),
'Landsat-8': dict(type='string', required=True, allowed=['FMASK', 'Classical Bayesian', 'SICOR']),
'Sentinel-2A': dict(type='string', required=True, allowed=['FMASK', 'Classical Bayesian', 'SICOR']),
'Sentinel-2B': dict(type='string', required=True, allowed=['FMASK', 'Classical Bayesian', 'SICOR']),
'Landsat-4': dict(type='string', required=False, allowed=['FMASK', 'Classical Bayesian', 'SICOR']),
'Landsat-5': dict(type='string', required=False, allowed=['FMASK', 'Classical Bayesian', 'SICOR']),
'Landsat-7': dict(type='string', required=False, allowed=['FMASK', 'Classical Bayesian', 'SICOR']),
'Landsat-8': dict(type='string', required=False, allowed=['FMASK', 'Classical Bayesian', 'SICOR']),
'Sentinel-2A': dict(type='string', required=False, allowed=['FMASK', 'Classical Bayesian',
'SICOR']),
'Sentinel-2B': dict(type='string', required=False, allowed=['FMASK', 'Classical Bayesian',
'SICOR']),
}),
export_L1C_obj_dumps=dict(type='boolean', required=True),
scale_factor_errors_ac=dict(type='integer', required=True),
auto_download_ecmwf=dict(type='boolean', required=True),
export_L1C_obj_dumps=dict(type='boolean', required=False),
scale_factor_errors_ac=dict(type='integer', required=False),
auto_download_ecmwf=dict(type='boolean', required=False),
)),
L2A=dict(type='dict', required=True, schema=dict(
run_processor=dict(type='boolean', required=True),
write_output=dict(type='boolean', required=True),
delete_output=dict(type='boolean', required=True),
align_coord_grids=dict(type='boolean', required=True),
match_gsd=dict(type='boolean', required=True),
L2A=dict(type='dict', required=False, schema=dict(
run_processor=dict(type='boolean', required=False),
write_output=dict(type='boolean', required=False),
delete_output=dict(type='boolean', required=False),
align_coord_grids=dict(type='boolean', required=False),
match_gsd=dict(type='boolean', required=False),
)),
L2B=dict(type='dict', required=True, schema=dict(
run_processor=dict(type='boolean', required=True),
write_output=dict(type='boolean', required=True),
delete_output=dict(type='boolean', required=True),
L2B=dict(type='dict', required=False, schema=dict(
run_processor=dict(type='boolean', required=False),
write_output=dict(type='boolean', required=False),
delete_output=dict(type='boolean', required=False),
)),
L2C=dict(type='dict', required=True, schema=dict(
run_processor=dict(type='boolean', required=True),
write_output=dict(type='boolean', required=True),
delete_output=dict(type='boolean', required=True),
L2C=dict(type='dict', required=False, schema=dict(
run_processor=dict(type='boolean', required=False),
write_output=dict(type='boolean', required=False),
delete_output=dict(type='boolean', required=False),
)),
)),
usecase=dict(
type='dict', required=True, schema=dict(
virtual_sensor_id=dict(type='integer', required=True), # TODO add possible values
datasetid_spatial_ref=dict(type='integer', required=True, nullable=True),
datasetid_spectral_ref=dict(type='integer', required=True, nullable=True),
type='dict', required=False, schema=dict(
virtual_sensor_id=dict(type='integer', required=False), # TODO add possible values
datasetid_spatial_ref=dict(type='integer', required=False, nullable=True),
datasetid_spectral_ref=dict(type='integer', required=False, nullable=True),
target_CWL=dict(type='list', required=False, schema=dict(type='float')),
target_FWHM=dict(type='list', required=False, schema=dict(type='float')),
target_gsd=dict(type='list', required=False, schema=dict(type='float'), maxlength=2),
target_epsg_code=dict(type='integer', required=True, nullable=True),
target_epsg_code=dict(type='integer', required=False, nullable=True),
spatial_ref_gridx=dict(type='list', required=False, schema=dict(type='float'), maxlength=2),
spatial_ref_gridy=dict(type='list', required=False, schema=dict(type='float'), maxlength=2),
)),
)
def get_updated_schema(source_schema, key2update, new_value):
def deep_update(schema, key2upd, new_val):
"""Return true if update, else false"""
for key in schema:
if key == key2upd:
schema[key] = new_val
elif isinstance(schema[key], dict):
deep_update(schema[key], key2upd, new_val)
return schema
from copy import deepcopy
tgt_schema = deepcopy(source_schema)
return deep_update(tgt_schema, key2update, new_value)
gms_schema_config_output = get_updated_schema(gms_schema_input, key2update='required', new_value=True)
......@@ -110,7 +110,7 @@ def L1C_map(L1B_objs):
L1C_objs = [L1C_P.L1C_object(L1B_obj) for L1B_obj in L1B_objs]
# check in config if atmospheric correction is desired
if CFG.conversion_type_optical == 'BOA_Ref':
if CFG.target_radunit_optical == 'BOA_Ref':
# atmospheric correction (asserts that there is an ac_options.json file on disk for the current sensor)
if L1C_objs[0].ac_options:
# perform atmospheric correction
......@@ -120,7 +120,7 @@ def L1C_map(L1B_objs):
% (L1C_obj.satellite, L1C_obj.sensor)) for L1C_obj in L1C_objs]
else:
[L1C_obj.logger.warning('Atmospheric correction skipped because optical conversion type is set to %s.'
% CFG.conversion_type_optical) for L1C_obj in L1C_objs]
% CFG.target_radunit_optical) for L1C_obj in L1C_objs]
# write outputs and delete temporary data
for i, L1C_obj in enumerate(L1C_objs):
......
......@@ -38,31 +38,24 @@ __author__ = 'Daniel Scheffler'
class process_controller(object):
def __init__(self, job_ID, exec_mode='Python', db_host='localhost',
parallelization_level='scenes', delete_old_output=False, job_config_kwargs=None):
# type: (int, str, str, str, bool, dict) -> None
def __init__(self, job_ID, **config_kwargs):
"""gms_preprocessing process controller
:param job_ID: <int> a job ID belonging to a valid database record within table 'jobs'
:param exec_mode: <str> choices: 'Python' - writes all intermediate data to disk
'Flink' - keeps all intermediate data in memory
:param db_host: <str> hostname of the host where database is hosted
:param parallelization_level: <str> choices: 'scenes' - parallelization on scene-level
'tiles' - parallelisation on tile-level
:param delete_old_output: <bool> whether to delete previously created output of the given job ID
before running the job (default = False)
:param job_ID: job ID belonging to a valid database record within table 'jobs'
:param config_kwargs: keyword arguments to be passed to gms_preprocessing.set_config()
"""
# assertions
if not isinstance(job_ID, int):
raise ValueError("'job_ID' must be an integer value. Got %s." % type(job_ID))
if exec_mode not in ['Python', 'Flink']:
raise ValueError("Unexpected exec_mode '%s'!" % exec_mode)
if parallelization_level not in ['scenes', 'tiles']:
raise ValueError("Unexpected parallelization_level '%s'!" % parallelization_level)
self.parallLev = parallelization_level
# set GMS configuration
config_kwargs.update(dict(reset_status=True))
set_config(job_ID, **config_kwargs)
self.config = GMS_config # type: GMS_config
# defaults
self._logger = None
self._DB_job_record = None
self.profiler = None
......@@ -78,11 +71,6 @@ class process_controller(object):
self.summary_detailed = None
self.summary_quick = None
# set GMS configuration
set_config(job_ID=job_ID, exec_mode=exec_mode, db_host=db_host, reset=True,
**job_config_kwargs if job_config_kwargs else {})
self.config = GMS_config
# check environment
self.GMSEnv = ENV.GMSEnvironment(self.logger)
self.GMSEnv.check_dependencies()
......@@ -98,7 +86,7 @@ class process_controller(object):
self.logger.info('Process Controller initialized for job ID %s (comment: %s).'
% (self.config.ID, self.DB_job_record.comment))
if delete_old_output:
if self.config.delete_old_output:
self.logger.info('Deleting previously processed data...')