Commit eb8781b1 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Fixed incorrect parsing of configuration parameters given by WebUI or CLI parser.

Added additional tests to test_cli.py and test_config.py.
parent 11935e0e
......@@ -14,14 +14,12 @@ from gms_preprocessing.misc.database_tools import GMS_JOB # noqa: E402
from gms_preprocessing.options.config import get_conn_database # noqa: E402
from gms_preprocessing.options.config import path_options_default # noqa: E402
from gms_preprocessing.options.config import get_options # noqa: E402
from gms_preprocessing.options.config import get_config_kwargs_default # noqa: E402
options_default = get_options(path_options_default, validation=True) # type: dict
config_kwargs_default = get_config_kwargs_default() # type: dict
def get_config_kwargs_from_cli_args(cli_args):
return {k: v for k, v in cli_args.__dict__.items() if k in config_kwargs_default.keys()}
def parsedArgs_to_user_opts(cli_args):
return {k: v for k, v in vars(cli_args).items() if not k.startswith('_') and k != 'func'}
def run_from_jobid(args):
......@@ -31,7 +29,7 @@ def run_from_jobid(args):
# TODO download: run only the downloader
# set up process controller instance
kwargs = get_config_kwargs_from_cli_args(args)
kwargs = parsedArgs_to_user_opts(args)
if 'GMS_IS_TEST' in os.environ and os.environ['GMS_IS_TEST'] == 'True':
kwargs['is_test'] = True
......@@ -57,7 +55,7 @@ def run_from_sceneids(args):
virtual_sensor_id=args.virtual_sensor_id,
datasetid_spatial_ref=args.datasetid_spatial_ref,
comment=args.comment)
_run_job(dbJob, **get_config_kwargs_from_cli_args(args))
_run_job(dbJob, **parsedArgs_to_user_opts(args))
def run_from_entityids(args):
......@@ -71,7 +69,7 @@ def run_from_entityids(args):
virtual_sensor_id=args.virtual_sensor_id,
datasetid_spatial_ref=args.datasetid_spatial_ref,
comment=args.comment)
_run_job(dbJob, **get_config_kwargs_from_cli_args(args))
_run_job(dbJob, **parsedArgs_to_user_opts(args))
def run_from_filenames(args):
......@@ -85,7 +83,7 @@ def run_from_filenames(args):
virtual_sensor_id=args.virtual_sensor_id,
datasetid_spatial_ref=args.datasetid_spatial_ref,
comment=args.comment)
_run_job(dbJob, **get_config_kwargs_from_cli_args(args))
_run_job(dbJob, **parsedArgs_to_user_opts(args))
def run_from_constraints(args):
......
......@@ -9,7 +9,7 @@ import builtins
import re
import psycopg2
import psycopg2.extras
from collections import OrderedDict
from collections import OrderedDict, Mapping
import multiprocessing
from inspect import getargvalues, stack, getfullargspec, signature, _empty
import json
......@@ -54,55 +54,51 @@ path_gmslib = os.path.dirname(pkgutil.get_loader("gms_preprocessing").path)
path_options_default = os.path.join(path_gmslib, 'options', 'options_default.json')
def set_config(job_ID, json_config='', inmem_serialization=False, parallelization_level='scenes', db_host='localhost',
spatial_index_server_host='localhost', spatial_index_server_port=8654,
reset_status=False, delete_old_output=False, exec_L1AP=None,
exec_L1BP=None, exec_L1CP=None, exec_L2AP=None, exec_L2BP=None, exec_L2CP=None, CPUs=None,
allow_subMultiprocessing=True, disable_exception_handler=True, log_level='INFO',
tiling_block_size_XY=(2048, 2048), is_test=False, profiling=False, benchmark_global=False,
path_procdata_scenes=None, path_procdata_MGRS=None, path_archive=None, virtual_sensor_id=10,
datasetid_spatial_ref=249):
def set_config(job_ID, json_config='', reset_status=False, **kwargs):
"""Set up a configuration for a new gms_preprocessing job!
:param job_ID: job ID of the job to be executed, e.g. 123456 (must be present in database)
:param json_config path to JSON file containing configuration parameters or a string in JSON format
:param inmem_serialization: False: write intermediate results to disk in order to save memory
True: keep intermediate results in memory in order to save IO time
:param parallelization_level: <str> choices: 'scenes' - parallelization on scene-level
'tiles' - parallelization on tile-level
:param db_host: host name of the server that runs the postgreSQL database
:param spatial_index_server_host: host name of the server that runs the SpatialIndexMediator
:param spatial_index_server_port: port used for connecting to SpatialIndexMediator
:param json_config: path to JSON file containing configuration parameters or a string in JSON format
:param reset_status: whether to reset the job status or not (default=False)
:param delete_old_output: <bool> whether to delete previously created output of the given job ID
before running the job (default = False)
:param exec_L1AP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L1BP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L1CP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L2AP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L2BP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L2CP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param CPUs: number of CPU cores to be used for processing (default: None -> use all available)
:param allow_subMultiprocessing:
allow multiprocessing within workers
:param disable_exception_handler:
enable/disable automatic handling of unexpected exceptions (default: True -> enabled)
:param log_level: the logging level to be used (choices: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL';
default: 'INFO')
:param tiling_block_size_XY:
X/Y block size to be used for any tiling process (default: (2048,2048)
:param is_test: whether the current job represents a software test job (run by a test runner) or not
(default=False)
:param profiling: enable/disable code profiling (default: False)
:param benchmark_global:
enable/disable benchmark of the whole processing pipeline
:param path_procdata_scenes:
output path to store processed scenes
:param path_procdata_MGRS:
output path to store processed MGRS tiles
:param path_archive: input path where downloaded data are stored
:param virtual_sensor_id: 1: Landsat-8, 10: Sentinel-2A 10m
:param datasetid_spatial_ref: 249 Sentinel-2A
:param kwargs: keyword arguments to be passed to JobConfig
:Keyword Arguments:
- inmem_serialization: False: write intermediate results to disk in order to save memory
True: keep intermediate results in memory in order to save IO time
- parallelization_level: <str> choices: 'scenes' - parallelization on scene-level
'tiles' - parallelization on tile-level
- db_host: host name of the server that runs the postgreSQL database
- spatial_index_server_host: host name of the server that runs the SpatialIndexMediator
- spatial_index_server_port: port used for connecting to SpatialIndexMediator
- delete_old_output: <bool> whether to delete previously created output of the given job ID
before running the job (default = False)
- exec_L1AP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
- exec_L1BP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
- exec_L1CP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
- exec_L2AP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
- exec_L2BP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
- exec_L2CP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
- CPUs: number of CPU cores to be used for processing (default: None -> use all available)
- allow_subMultiprocessing:
allow multiprocessing within workers
- disable_exception_handler:
enable/disable automatic handling of unexpected exceptions (default: True -> enabled)
- log_level: the logging level to be used (choices: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL';
default: 'INFO')
- tiling_block_size_XY:
X/Y block size to be used for any tiling process (default: (2048,2048)
- is_test: whether the current job represents a software test job (run by a test runner) or not
(default=False)
- profiling: enable/disable code profiling (default: False)
- benchmark_global:
enable/disable benchmark of the whole processing pipeline
- path_procdata_scenes:
output path to store processed scenes
- path_procdata_MGRS:
output path to store processed MGRS tiles
- path_archive: input path where downloaded data are stored
- virtual_sensor_id: 1: Landsat-8, 10: Sentinel-2A 10m
- datasetid_spatial_ref: 249 Sentinel-2A
:rtype: JobConfig
"""
#################################
......@@ -110,8 +106,7 @@ def set_config(job_ID, json_config='', inmem_serialization=False, parallelizatio
#################################
# FIXME virtual_sensor_id and datasetid_spatial_ref are not respected by JobConfig.
if not hasattr(builtins, 'GMS_JobConfig') or reset_status:
kwargs = dict([x for x in locals().items() if x[0] != "self" and not x[0].startswith('__')])
builtins.GMS_JobConfig = JobConfig(job_ID, **kwargs)
builtins.GMS_JobConfig = JobConfig(job_ID, json_config=json_config, **kwargs)
#####################
# check environment #
......@@ -154,7 +149,7 @@ def get_config_kwargs_default():
class JobConfig(object):
def __init__(self, ID, **user_opts):
def __init__(self, ID, json_config='', **user_opts):
"""Create a job configuration
Workflow:
......@@ -165,7 +160,8 @@ class JobConfig(object):
# 2. dann überschreiben mit user-defined parametern (entweder init-parameter oder db-settings per webapp)
:param ID: job ID of the job to be executed, e.g. 123456 (must be present in database)
:param user_opts keyword arguments to be passed to gms_preprocessing.set_config()
:param json_config path to JSON file containing configuration parameters or a string in JSON format
:param user_opts keyword arguments as passed by gms_preprocessing.set_config()
"""
# privates
self._DB_job_record = None # type: GMS_JOB
......@@ -189,6 +185,7 @@ class JobConfig(object):
# args
self.ID = ID
self.json_config = json_config
self.kwargs = user_opts
# database connection
......@@ -469,13 +466,6 @@ class JobConfig(object):
if self.max_parallel_reads_writes != 0:
self.check_no_read_write_limit_on_xtfs_mountpoint()
@property
def kwargs_defaults(self):
if not self._kwargs_defaults:
self._kwargs_defaults = get_config_kwargs_default()
return self._kwargs_defaults
def get_init_argskwargs(self, ignore=("logger",)):
"""
Return a tuple containing dictionary of calling function's. named arguments and a list of
......@@ -492,7 +482,7 @@ class JobConfig(object):
def get_parameter(self, key_user_opts, val_json=None, attr_db_job_record='', fallback=None):
# 1. JobConfig parameters: parameters that are directly passed to JobConfig
if key_user_opts in self.kwargs and self.kwargs[key_user_opts] != self.kwargs_defaults[key_user_opts]:
if key_user_opts in self.kwargs:
return self.kwargs[key_user_opts]
# 2. WebUI parameters: parameters that have been defined via WebUI
......@@ -504,8 +494,6 @@ class JobConfig(object):
return val_json
# fallback: if nothing has been returned until here
if fallback is None and key_user_opts in self.kwargs_defaults:
fallback = self.kwargs_defaults[key_user_opts]
return fallback
@property
......@@ -562,24 +550,52 @@ class JobConfig(object):
NOTE: Reads the default options from options_default.json and updates the values with those from database.
"""
def update_dict(d, u):
for k, v in u.items():
if isinstance(v, Mapping):
d[k] = update_dict(d.get(k, {}), v)
else:
d[k] = v
return d
# read options_default.json
default_options = get_options(path_options_default, validation=validate)
if 'json_config' in self.kwargs and self.kwargs['json_config']:
if self.kwargs['json_config'].startswith("{"):
#############################################
# update default options with those from DB #
#############################################
if self.DB_job_record.analysis_parameter:
try:
params_dict = json.loads(jsmin(self.DB_job_record.analysis_parameter))
except JSONDecodeError:
warnings.warn('The advanced options given in the WebUI could not be decoded. '
'JSON decoder failed with the following error:')
raise
# convert values to useful data types and update the default values
params_dict = json_to_python(params_dict)
update_dict(default_options, params_dict)
###############################################################################################################
# if json config is provided (via python bindings or CLI parser -> override all options with that json config #
###############################################################################################################
if self.json_config:
if self.json_config.startswith("{"):
try:
params_dict = json.loads(jsmin(self.kwargs['json_config']))
params_dict = json.loads(jsmin(self.json_config))
except JSONDecodeError:
warnings.warn('The given JSON options string could not be decoded. '
'JSON decoder failed with the following error:')
raise
elif os.path.isfile(self.kwargs['json_config']):
elif os.path.isfile(self.json_config):
try:
with open(self.kwargs['json_config'], 'r') as inF:
with open(self.json_config, 'r') as inF:
params_dict = json.loads(jsmin(inF.read()))
except JSONDecodeError:
warnings.warn('The given JSON options file %s could not be decoded. '
'JSON decoder failed with the following error:' % self.kwargs['json_config'])
'JSON decoder failed with the following error:' % self.json_config)
raise
else:
......@@ -587,20 +603,7 @@ class JobConfig(object):
# convert values to useful data types and update the default values
params_dict = json_to_python(params_dict)
default_options.update(params_dict)
# update default options with those from DB
if self.DB_job_record.analysis_parameter:
try:
params_dict = json.loads(jsmin(self.DB_job_record.analysis_parameter))
except JSONDecodeError:
warnings.warn('The advanced options given in the WebUI could not be decoded. '
'JSON decoder failed with the following error:')
raise
# convert values to useful data types and update the default values
params_dict = json_to_python(params_dict)
default_options.update(params_dict)
update_dict(default_options, params_dict)
if validate:
GMSValidator(allow_unknown=True, schema=gms_schema_input).validate(default_options)
......@@ -699,10 +702,7 @@ class JobConfig(object):
def validate_exec_configs(self):
for i in ['L1AP', 'L1BP', 'L1CP', 'L2AP', 'L2BP', 'L2CP']:
try:
exec_lvl = self.kwargs['exec_%s' % i]
except KeyError:
continue
exec_lvl = getattr(self, 'exec_%s' % i)
if exec_lvl is None:
continue
......@@ -717,7 +717,7 @@ class JobConfig(object):
"because any operations on GMS_obj.arr read the intermediate results from disk. "
"Turning it on.." % i)
write = True
self.kwargs['exec_%s' % i] = [execute, write, delete]
setattr(self, 'exec_%s' % i, [execute, write, delete])
else:
raise ValueError('Execution mode must be provided as list of 3 elements containing only boolean '
......
......@@ -55,6 +55,27 @@ class Base_CLITester:
parsed_args.func(parsed_args)
self.assertEqual(self.current_CFG.db_host, db_host)
def test_param_acceptance(self):
for vid in [10, 11]:
parsed_args = self.parser_run.parse_args(self.baseargs +
['--db_host', db_host,
'--virtual_sensor_id', str(vid)])
parsed_args.func(parsed_args)
self.assertEqual(self.current_CFG.virtual_sensor_id, vid)
def test_json_opts(self):
parsed_args = self.parser_run.parse_args(
self.baseargs + ['--db_host', db_host,
'--json_config', '{"global_opts": {"inmem_serialization": true}}'])
parsed_args.func(parsed_args)
self.assertEqual(self.current_CFG.inmem_serialization, True)
parsed_args = self.parser_run.parse_args(
self.baseargs + ['--db_host', db_host,
'--json_config', '{"global_opts": {"inmem_serialization": false}}'])
parsed_args.func(parsed_args)
self.assertEqual(self.current_CFG.inmem_serialization, False)
class Test_run_jobid(Base_CLITester.Base_CLITestCase):
def setUp(self):
......
......@@ -79,6 +79,15 @@ class Test_JobConfig(TestCase):
cfg = JobConfig(self.jobID, db_host=self.db_host, json_config=cfg)
self.assertIsInstance(cfg, JobConfig)
def test_jsonconfig_param_acceptance(self):
cfg = JobConfig(self.jobID, db_host=self.db_host, json_config='{"global_opts": {"inmem_serialization": true}}')
self.assertIsInstance(cfg, JobConfig)
self.assertTrue(cfg.inmem_serialization is True)
cfg = JobConfig(self.jobID, db_host=self.db_host, json_config='{"global_opts": {"inmem_serialization": false}}')
self.assertIsInstance(cfg, JobConfig)
self.assertTrue(cfg.inmem_serialization is False)
def test_to_jsonable_dict(self):
cfg = JobConfig(self.jobID, db_host=self.db_host)
jsonable_dict = cfg.to_jsonable_dict()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment