Commit f2915453 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Added options schema and activated options validation. Improved Test_JobConfig(). Bugfixes.

parent 5daf3659
Pipeline #1609 canceled with stage
in 2 minutes and 33 seconds
......@@ -19,6 +19,8 @@ from cerberus import Validator
import pkgutil
from typing import TYPE_CHECKING
from .options_schema import gms_schema
if TYPE_CHECKING:
from .misc.database_tools import GMS_JOB # noqa F401 # flake8 issue
......@@ -40,8 +42,8 @@ class GMS_configuration(object):
GMS_config = GMS_configuration()
def set_config(job_ID, exec_mode='Python', db_host='localhost', reset=False, exec_L1AP=None, exec_L1BP=None,
exec_L1CP=None, exec_L2AP=None, exec_L2BP=None, exec_L2CP=None, CPUs=None,
def set_config(job_ID, exec_mode='Python', db_host='localhost', reset=False, json_config='', exec_L1AP=None,
exec_L1BP=None, exec_L1CP=None, exec_L2AP=None, exec_L2BP=None, exec_L2CP=None, CPUs=None,
allow_subMultiprocessing=True, disable_exception_handler=True, log_level='INFO',
tiling_block_size_XY=(2048, 2048), is_test=False, profiling=False, benchmark_global=False,
path_procdata_scenes=None, path_procdata_MGRS=None, path_archive=None):
......@@ -52,6 +54,7 @@ def set_config(job_ID, exec_mode='Python', db_host='localhost', reset=False, exe
'Flink': keeps intermediate results in memory in order to save IO time
:param db_host: host name of the server that runs the postgreSQL database
:param reset: whether to reset the job status or not (default=False)
:param json_config path to JSON file containing configuration parameters or a string in JSON format
:param exec_L1AP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L1BP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L1CP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
......@@ -217,7 +220,7 @@ class JobConfig(object):
# privates
self._DB_job_record = None # type: GMS_JOB
self._DB_config_table = None # type: dict
self._user_opts_defaults = None
self._kwargs_defaults = None
# fixed attributes
# possible values: 'pending', 'running', 'canceled', 'failed', 'finished_with_warnings',
......@@ -235,14 +238,14 @@ class JobConfig(object):
# args
self.ID = ID
self.db_host = db_host
self.user_opts = user_opts
self.kwargs = user_opts
# database connection
self.conn_database = "dbname='geomultisens' user='gmsdb' password='gmsdb' host='%s' connect_timeout=3" \
% self.db_host
# get validated options dict from JSON-options
json_opts = self.get_json_opts_from_db(validate=True)
json_opts = self.get_json_opts(validate=True)
gp = self.get_parameter
......@@ -431,17 +434,17 @@ class JobConfig(object):
self.data_list = self.get_data_list_of_current_jobID()
@property
def user_opts_defaults(self):
if not self._user_opts_defaults:
def kwargs_defaults(self):
if not self._kwargs_defaults:
a = getfullargspec(set_config)
self._user_opts_defaults = dict(zip(a.args[-len(a.defaults):], a.defaults))
self._kwargs_defaults = dict(zip(a.args[-len(a.defaults):], a.defaults))
return self._user_opts_defaults
return self._kwargs_defaults
def get_parameter(self, key_user_opts, val_json=None, attr_db_job_record='', fallback=None):
# 1. JobConfig parameters: parameters that are directly passed to JobConfig
if key_user_opts in self.user_opts and self.user_opts[key_user_opts] != self.user_opts_defaults[key_user_opts]:
return self.user_opts[key_user_opts]
if key_user_opts in self.kwargs and self.kwargs[key_user_opts] != self.kwargs_defaults[key_user_opts]:
return self.kwargs[key_user_opts]
# 2. WebUI parameters: parameters that have been defined via WebUI
if attr_db_job_record:
......@@ -452,8 +455,8 @@ class JobConfig(object):
return val_json
# fallback: if nothing has been returned until here
if not fallback and key_user_opts in self.user_opts_defaults:
fallback = self.user_opts_defaults[key_user_opts]
if not fallback and key_user_opts in self.kwargs_defaults:
fallback = self.kwargs_defaults[key_user_opts]
return fallback
@property
......@@ -505,7 +508,7 @@ class JobConfig(object):
return VSSpecs
def get_json_opts_from_db(self, validate=True):
def get_json_opts(self, validate=True):
"""Get a dictionary of GMS config parameters according to the jobs table of the database.
NOTE: Reads the default options from options_default.json and updates the values with those from database.
......@@ -514,20 +517,45 @@ class JobConfig(object):
default_options = get_options(os.path.join(os.path.dirname(pkgutil.get_loader("gms_preprocessing").path),
'options_default.json'), validation=validate)
if 'json_config' in self.kwargs and self.kwargs['json_config']:
if self.kwargs['json_config'].startswith("{"):
try:
params_dict = json.loads(jsmin(self.kwargs['json_config']))
except JSONDecodeError:
warnings.warn('The given JSON options string could not be decoded. '
'JSON decoder failed with the following error:')
raise
elif os.path.isfile(self.kwargs['json_config']):
try:
with open(self.kwargs['json_config'], 'r') as inF:
params_dict = json.loads(jsmin(inF.read()))
except JSONDecodeError:
warnings.warn('The given JSON options file %s could not be decoded. '
'JSON decoder failed with the following error:' % self.kwargs['json_config'])
raise
else:
raise ValueError("The parameter 'json_config' must be a JSON formatted string or a JSON file on disk.")
# convert values to useful data types and update the default values
params_dict = json_to_python(params_dict)
default_options.update(params_dict)
# update default options with those from DB
if self.DB_job_record.analysis_parameter:
try:
params_dict = json.loads(jsmin(self.DB_job_record.analysis_parameter))
except JSONDecodeError:
warnings.warn('The given JSON options file could not be decoded. JSON decoder failed with the '
'following error:')
warnings.warn('The advanced options given in the WebUI could not be decoded. '
'JSON decoder failed with the following error:')
raise
db_options = json_to_python(params_dict) # type: dict
default_options.update(db_options)
# convert values to useful data types and update the default values
params_dict = json_to_python(params_dict)
default_options.update(params_dict)
if validate:
GMSValidator().validate(default_options)
GMSValidator(allow_unknown=True, schema=gms_schema).validate(default_options)
json_options = default_options
return json_options
......@@ -984,9 +1012,6 @@ def is_GMSConfig_available():
return False
gms_schema = None
def json_to_python(value):
def is_number(s):
try:
......@@ -1022,17 +1047,14 @@ def json_to_python(value):
class GMSValidator(Validator):
def __init__(self, *args, **kwargs):
# type: (list, dict) -> None
"""
:param args: Arguments to be passed to cerberus.Validator
:param kwargs: Keyword arguments to be passed to cerberus.Validator
"""
super(GMSValidator, self).__init__(*args, allow_unknown=True, schema=gms_schema, **kwargs)
super(GMSValidator, self).__init__(*args, **kwargs)
def validate(self, document2validate, **kwargs):
warnings.warn("Currently options cannot be validated yet.")
return None # FIXME
if super(GMSValidator, self).validate(document=document2validate, **kwargs) is False:
raise ValueError("Options is malformed: %s" % str(self.errors))
......@@ -1051,7 +1073,7 @@ def get_options(target, validation=True):
options = json_to_python(json.loads(jsmin(fl.read())))
if validation is True:
GMSValidator().validate(options)
GMSValidator(allow_unknown=True, schema=gms_schema).validate(options)
return options
else:
......
"""Definition of gms options schema (as used by cerberus library)."""
gms_schema = {
}
gms_schema = dict(
exec_mode=dict(type='string', required=True, allowed=['Python', 'Flink']),
db_host=dict(type='string', required=True),
CPUs=dict(type='integer', required=True, nullable=True),
allow_subMultiprocessing=dict(type='boolean', required=True),
disable_exception_handler=dict(type='boolean', required=True),
log_level=dict(type='string', required=True, allowed=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
tiling_block_size_XY=dict(type='list', required=True, schema=dict(type="integer"), minlength=2, maxlength=2),
is_test=dict(type='boolean', required=True),
profiling=dict(type='boolean', required=True),
benchmark_global=dict(type='boolean', required=True),
paths=dict(
type='dict', required=True,
schema=dict(
path_fileserver=dict(type='string', required=True),
path_archive=dict(type='string', required=True),
path_procdata_scenes=dict(type='string', required=True),
path_procdata_MGRS=dict(type='string', required=True),
path_tempdir=dict(type='string', required=True),
path_benchmarks=dict(type='string', required=True),
path_job_logs=dict(type='string', required=True),
path_spatIdxSrv=dict(type='string', required=True),
path_ac_tables=dict(type='string', required=True),
path_SNR_models=dict(type='string', required=True),
path_SRFs=dict(type='string', required=True),
path_dem_proc_srtm_90m=dict(type='string', required=True),
path_earthSunDist=dict(type='string', required=True),
path_solar_irr=dict(type='string', required=True),
path_cloud_classif=dict(type='string', required=True),
path_ECMWF_db=dict(type='string', required=True),
)),
processors=dict(
type='dict', required=True,
schema=dict(
general_opts=dict(type='dict', required=True, schema=dict(
skip_thermal=dict(type='boolean', required=True),
skip_pan=dict(type='boolean', required=True),
sort_bands_by_cwl=dict(type='boolean', required=True),
conversion_type_optical=dict(type='string', required=True, allowed=['Rad', 'TOA_Ref', 'BOA_Ref']),
conversion_type_thermal=dict(type='string', required=True, allowed=['Rad', 'Temp']),
scale_factor_TOARef=dict(type='integer', required=True),
scale_factor_BOARef=dict(type='integer', required=True),
)),
L1A_P=dict(type='dict', required=True, schema=dict(
run_processor=dict(type='boolean', required=True),
write_output=dict(type='boolean', required=True),
delete_output=dict(type='boolean', required=True),
SZA_SAA_calculation_accurracy=dict(type='string', required=False, allowed=['coarse', 'fine']),
export_VZA_SZA_SAA_RAA_stats=dict(type='boolean', required=True),
)),
L1B_P=dict(type='dict', required=True, schema=dict(
run_processor=dict(type='boolean', required=True),
write_output=dict(type='boolean', required=True),
delete_output=dict(type='boolean', required=True),
skip_coreg=dict(type='boolean', required=True),
)),
L1C_P=dict(type='dict', required=True, schema=dict(
run_processor=dict(type='boolean', required=True),
write_output=dict(type='boolean', required=True),
delete_output=dict(type='boolean', required=True),
cloud_masking_algorithm=dict(type='dict', required=False, schema={
'Landsat-4': dict(type='string', required=True, allowed=['FMASK', 'Classical Bayesian', 'SICOR']),
'Landsat-5': dict(type='string', required=True, allowed=['FMASK', 'Classical Bayesian', 'SICOR']),
'Landsat-7': dict(type='string', required=True, allowed=['FMASK', 'Classical Bayesian', 'SICOR']),
'Landsat-8': dict(type='string', required=True, allowed=['FMASK', 'Classical Bayesian', 'SICOR']),
'Sentinel-2A': dict(type='string', required=True, allowed=['FMASK', 'Classical Bayesian', 'SICOR']),
'Sentinel-2B': dict(type='string', required=True, allowed=['FMASK', 'Classical Bayesian', 'SICOR']),
}),
export_L1C_obj_dumps=dict(type='boolean', required=True),
scale_factor_errors_ac=dict(type='integer', required=True),
auto_download_ecmwf=dict(type='boolean', required=True),
)),
L2A_P=dict(type='dict', required=True, schema=dict(
run_processor=dict(type='boolean', required=True),
write_output=dict(type='boolean', required=True),
delete_output=dict(type='boolean', required=True),
align_coord_grids=dict(type='boolean', required=True),
match_gsd=dict(type='boolean', required=True),
)),
L2B_P=dict(type='dict', required=True, schema=dict(
run_processor=dict(type='boolean', required=True),
write_output=dict(type='boolean', required=True),
delete_output=dict(type='boolean', required=True),
)),
L2C_P=dict(type='dict', required=True, schema=dict(
run_processor=dict(type='boolean', required=True),
write_output=dict(type='boolean', required=True),
delete_output=dict(type='boolean', required=True),
)),
)),
usecase=dict(
type='dict', required=True, schema=dict(
virtual_sensor_id=dict(type='integer', required=True), # TODO add possible values
datasetid_spatial_ref=dict(type='integer', required=True, nullable=True),
datasetid_spectral_ref=dict(type='integer', required=True),
target_CWL=dict(type='list', required=False, schema=dict(type='float')),
target_FWHM=dict(type='list', required=False, schema=dict(type='float')),
target_gsd=dict(type='list', required=False, schema=dict(type='float'), maxlength=2),
target_epsg_code=dict(type='integer', required=True, nullable=True),
spatial_ref_gridx=dict(type='list', required=False, schema=dict(type='float'), maxlength=2),
spatial_ref_gridy=dict(type='list', required=False, schema=dict(type='float'), maxlength=2),
)),
)
......@@ -26,3 +26,5 @@ psycopg2
# fmask # not pip installable
six
tqdm
jsmin
cerberus
......@@ -12,3 +12,5 @@ geoalchemy2
sqlalchemy
six
tqdm
jsmin
cerberus
......@@ -14,7 +14,7 @@ with open('HISTORY.rst') as history_file:
requirements = [
'matplotlib', 'numpy', 'scikit-learn', 'scipy', 'gdal', 'pyproj', 'shapely', 'ephem', 'pyorbital', 'dill', 'pytz',
'pandas', 'numba', 'spectral>=0.16', 'geopandas', 'iso8601', 'pyinstrument', 'geoalchemy2', 'sqlalchemy',
'psycopg2', 'py_tools_ds>=0.12.4', 'geoarray>=0.7.1', 'arosics>=0.6.6', 'six', 'tqdm'
'psycopg2', 'py_tools_ds>=0.12.4', 'geoarray>=0.7.1', 'arosics>=0.6.6', 'six', 'tqdm', 'jsmin', 'cerberus'
# spectral<0.16 has some problems with writing signed integer 8bit data
# fmask # conda install -c conda-forge python-fmask
# 'pyhdf', # conda install --yes -c conda-forge pyhdf
......
......@@ -69,6 +69,8 @@ dependencies:
- sqlalchemy
- psycopg2
- pandas
- jsmin
- cerberus
- py_tools_ds>=0.12.4
- geoarray>=0.7.0
- arosics>=0.6.6
......@@ -10,6 +10,7 @@ Tests for gms_preprocessing.config
import os
from unittest import TestCase
from json import JSONDecodeError
from gms_preprocessing import __path__
from gms_preprocessing.config import get_options
......@@ -31,6 +32,37 @@ class Test_JobConfig(TestCase):
self.jobID = 26186662
self.db_host = 'geoms'
def test(self):
def test_plain_args(self):
cfg = JobConfig(self.jobID, self.db_host)
print(cfg)
self.assertIsInstance(cfg, JobConfig)
def test_jsonconfig_str_allfine(self):
cfg = '{"a": 1 /*comment*/, "b":2}'
cfg = JobConfig(self.jobID, self.db_host, json_config=cfg)
self.assertIsInstance(cfg, JobConfig)
def test_jsonconfig_str_nojson(self):
cfg = 'dict(a=1 /*comment*/, b=2)'
with self.assertRaises(ValueError):
JobConfig(self.jobID, self.db_host, json_config=cfg)
def test_jsonconfig_str_badcomment(self):
cfg = '{"a": 1 /comment*/, "b":2}'
with self.assertWarns(UserWarning), self.assertRaises(JSONDecodeError):
JobConfig(self.jobID, self.db_host, json_config=cfg)
def test_jsonconfig_str_undecodable_val(self):
cfg = '{"a": None /comment*/, "b":2}'
with self.assertWarns(UserWarning), self.assertRaises(JSONDecodeError):
JobConfig(self.jobID, self.db_host, json_config=cfg)
def test_jsonconfig_str_schema_violation(self):
cfg = '{"exec_mode": "badvalue"}'
with self.assertRaises(ValueError):
JobConfig(self.jobID, self.db_host, json_config=cfg)
def test_jsonconfig_file(self):
cfg = os.path.join(__path__[0], 'options_default.json')
cfg = JobConfig(self.jobID, self.db_host, json_config=cfg)
self.assertIsInstance(cfg, JobConfig)
......@@ -24,6 +24,7 @@ class BaseTest_ExceptionHandler:
def get_process_controller(self, jobID):
self.PC = process_controller(jobID, parallelization_level='scenes', db_host='geoms',
job_config_kwargs=dict(is_test=True, log_level='DEBUG'))
self.PC.config.disable_exception_handler = False
# update attributes of DB_job_record and related DB entry
self.PC.DB_job_record.reset_job_progress()
......@@ -57,7 +58,7 @@ class BaseTest_ExceptionHandler:
class Test_ExceptionHandler_NoSubsystems(BaseTest_ExceptionHandler.Test_ExceptionHandler):
def setUp(self):
super().get_process_controller(26186261) # Landsat-8 Coll. Data
super(Test_ExceptionHandler_NoSubsystems, self).get_process_controller(26186261) # Landsat-8 Coll. Data
def test_L1A_mapper_success_progress_stats(self):
"""Check correctness of progress stats if scene succeeds."""
......@@ -73,7 +74,8 @@ class Test_ExceptionHandler_NoSubsystems(BaseTest_ExceptionHandler.Test_Exceptio
def test_disabled_exception_handler(self):
"""Check if exception handler raises exceptions of CFG.Job.disable_exception_handler = True."""
self.PC.config.disable_exception_handler = True
self.assertRaises(RuntimeError, self.dummy_gms_mapper_fail, self.PC.config.data_list[0])
with(self.assertRaises(RuntimeError)):
self.dummy_gms_mapper_fail(self.PC.config.data_list[0])
class Test_ExceptionHandler_Subsystems(BaseTest_ExceptionHandler.Test_ExceptionHandler):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment