Commit 15b51ddc authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Fix tests.

parent e4a672bc
...@@ -93,7 +93,7 @@ To enable lock functionality (needed for CPU / memory / disk IO management), ins ...@@ -93,7 +93,7 @@ To enable lock functionality (needed for CPU / memory / disk IO management), ins
Code history Code history
------------ ------------
Release notes on earlier versions of gms_preprocessing can be found here_. Release notes for the current and earlier versions of gms_preprocessing can be found here_.
Credits Credits
...@@ -101,6 +101,9 @@ Credits ...@@ -101,6 +101,9 @@ Credits
This package was created with Cookiecutter_ and the `audreyr/cookiecutter-pypackage`_ project template. This package was created with Cookiecutter_ and the `audreyr/cookiecutter-pypackage`_ project template.
Landsat-5/7/8 satellite data and SRTM digital elevation models have been have been provided by the US Geological
Survey. Sentinel-2 data have been provided by ESA.
.. _Cookiecutter: https://github.com/audreyr/cookiecutter .. _Cookiecutter: https://github.com/audreyr/cookiecutter
.. _`audreyr/cookiecutter-pypackage`: https://github.com/audreyr/cookiecutter-pypackage .. _`audreyr/cookiecutter-pypackage`: https://github.com/audreyr/cookiecutter-pypackage
.. _coverage: http://geomultisens.gitext.gfz-potsdam.de/gms_preprocessing/coverage/ .. _coverage: http://geomultisens.gitext.gfz-potsdam.de/gms_preprocessing/coverage/
......
...@@ -400,7 +400,7 @@ class process_controller(object): ...@@ -400,7 +400,7 @@ class process_controller(object):
self.create_job_summary() self.create_job_summary()
self.logger.info('Execution finished.') self.logger.info('Execution finished.')
self.logger.info('The job logfile and the summary files have been saved here: \n' self.logger.info('The job logfile, options file and the summary files have been saved here: \n'
'%s.*' % os.path.splitext(self.logger.path_logfile)[0]) '%s.*' % os.path.splitext(self.logger.path_logfile)[0])
# TODO implement failed_with_warnings: # TODO implement failed_with_warnings:
self.config.status = 'finished' if not self.failed_objects else 'finished_with_errors' self.config.status = 'finished' if not self.failed_objects else 'finished_with_errors'
......
...@@ -43,6 +43,7 @@ class Base_CLITester: ...@@ -43,6 +43,7 @@ class Base_CLITester:
@property @property
def current_CFG(self): def current_CFG(self):
from gms_preprocessing.options.config import GMS_config from gms_preprocessing.options.config import GMS_config
GMS_config.is_test = True
return GMS_config return GMS_config
#################################### ####################################
......
...@@ -37,59 +37,61 @@ class Test_JobConfig(TestCase): ...@@ -37,59 +37,61 @@ class Test_JobConfig(TestCase):
self.db_host = db_host self.db_host = db_host
def test_plain_args(self): def test_plain_args(self):
cfg = JobConfig(self.jobID, db_host=self.db_host) cfg = JobConfig(self.jobID, is_test=True, db_host=self.db_host)
self.assertIsInstance(cfg, JobConfig) self.assertIsInstance(cfg, JobConfig)
def test_inmem_serialization(self): def test_inmem_serialization(self):
cfg = JobConfig(self.jobID, db_host=self.db_host, inmem_serialization=True) cfg = JobConfig(self.jobID, is_test=True, db_host=self.db_host, inmem_serialization=True)
self.assertIsInstance(cfg, JobConfig) self.assertIsInstance(cfg, JobConfig)
self.assertTrue(cfg.inmem_serialization is True) self.assertTrue(cfg.inmem_serialization is True)
cfg = JobConfig(self.jobID, db_host=self.db_host, inmem_serialization=False) cfg = JobConfig(self.jobID, is_test=True, db_host=self.db_host, inmem_serialization=False)
self.assertIsInstance(cfg, JobConfig) self.assertIsInstance(cfg, JobConfig)
self.assertTrue(cfg.inmem_serialization is False) self.assertTrue(cfg.inmem_serialization is False)
def test_jsonconfig_str_allfine(self): def test_jsonconfig_str_allfine(self):
cfg = '{"a": 1 /*comment*/, "b":2}' cfg = '{"a": 1 /*comment*/, "b":2}'
cfg = JobConfig(self.jobID, db_host=self.db_host, json_config=cfg) cfg = JobConfig(self.jobID, is_test=True, db_host=self.db_host, json_config=cfg)
self.assertIsInstance(cfg, JobConfig) self.assertIsInstance(cfg, JobConfig)
def test_jsonconfig_str_nojson(self): def test_jsonconfig_str_nojson(self):
cfg = 'dict(a=1 /*comment*/, b=2)' cfg = 'dict(a=1 /*comment*/, b=2)'
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
JobConfig(self.jobID, db_host=self.db_host, json_config=cfg) JobConfig(self.jobID, is_test=True, db_host=self.db_host, json_config=cfg)
def test_jsonconfig_str_badcomment(self): def test_jsonconfig_str_badcomment(self):
cfg = '{"a": 1 /comment*/, "b":2}' cfg = '{"a": 1 /comment*/, "b":2}'
with self.assertWarns(UserWarning), self.assertRaises(JSONDecodeError): with self.assertWarns(UserWarning), self.assertRaises(JSONDecodeError):
JobConfig(self.jobID, db_host=self.db_host, json_config=cfg) JobConfig(self.jobID, is_test=True, db_host=self.db_host, json_config=cfg)
def test_jsonconfig_str_undecodable_val(self): def test_jsonconfig_str_undecodable_val(self):
cfg = '{"a": None /comment*/, "b":2}' cfg = '{"a": None /comment*/, "b":2}'
with self.assertWarns(UserWarning), self.assertRaises(JSONDecodeError): with self.assertWarns(UserWarning), self.assertRaises(JSONDecodeError):
JobConfig(self.jobID, db_host=self.db_host, json_config=cfg) JobConfig(self.jobID, is_test=True, db_host=self.db_host, json_config=cfg)
def test_jsonconfig_str_schema_violation(self): def test_jsonconfig_str_schema_violation(self):
cfg = '{"global_opts": {"inmem_serialization": "badvalue"}}' cfg = '{"global_opts": {"inmem_serialization": "badvalue"}}'
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
JobConfig(self.jobID, db_host=self.db_host, json_config=cfg) JobConfig(self.jobID, is_test=True, db_host=self.db_host, json_config=cfg)
def test_jsonconfig_file(self): def test_jsonconfig_file(self):
cfg = os.path.join(path_options_default) cfg = os.path.join(path_options_default)
cfg = JobConfig(self.jobID, db_host=self.db_host, json_config=cfg) cfg = JobConfig(self.jobID, is_test=True, db_host=self.db_host, json_config=cfg)
self.assertIsInstance(cfg, JobConfig) self.assertIsInstance(cfg, JobConfig)
def test_jsonconfig_param_acceptance(self): def test_jsonconfig_param_acceptance(self):
cfg = JobConfig(self.jobID, db_host=self.db_host, json_config='{"global_opts": {"inmem_serialization": true}}') cfg = JobConfig(self.jobID, is_test=True, db_host=self.db_host,
json_config='{"global_opts": {"inmem_serialization": true}}')
self.assertIsInstance(cfg, JobConfig) self.assertIsInstance(cfg, JobConfig)
self.assertTrue(cfg.inmem_serialization is True) self.assertTrue(cfg.inmem_serialization is True)
cfg = JobConfig(self.jobID, db_host=self.db_host, json_config='{"global_opts": {"inmem_serialization": false}}') cfg = JobConfig(self.jobID, is_test=True, db_host=self.db_host,
json_config='{"global_opts": {"inmem_serialization": false}}')
self.assertIsInstance(cfg, JobConfig) self.assertIsInstance(cfg, JobConfig)
self.assertTrue(cfg.inmem_serialization is False) self.assertTrue(cfg.inmem_serialization is False)
def test_to_jsonable_dict(self): def test_to_jsonable_dict(self):
cfg = JobConfig(self.jobID, db_host=self.db_host) cfg = JobConfig(self.jobID, is_test=True, db_host=self.db_host)
jsonable_dict = cfg.to_jsonable_dict() jsonable_dict = cfg.to_jsonable_dict()
self.assertIsInstance(cfg.to_jsonable_dict(), dict) self.assertIsInstance(cfg.to_jsonable_dict(), dict)
...@@ -97,7 +99,7 @@ class Test_JobConfig(TestCase): ...@@ -97,7 +99,7 @@ class Test_JobConfig(TestCase):
dumps(jsonable_dict) dumps(jsonable_dict)
def test_to_dict_validity(self): def test_to_dict_validity(self):
cfg = JobConfig(self.jobID, db_host=self.db_host) cfg = JobConfig(self.jobID, is_test=True, db_host=self.db_host)
params = cfg.to_dict() params = cfg.to_dict()
self.assertIsInstance(cfg.to_jsonable_dict(), dict) self.assertIsInstance(cfg.to_jsonable_dict(), dict)
......
...@@ -27,7 +27,7 @@ class BaseTest_ExceptionHandler: ...@@ -27,7 +27,7 @@ class BaseTest_ExceptionHandler:
def get_process_controller(self, jobID): def get_process_controller(self, jobID):
self.PC = process_controller(jobID, parallelization_level='scenes', db_host=db_host, self.PC = process_controller(jobID, parallelization_level='scenes', db_host=db_host,
is_test=True, log_level='DEBUG') is_test=True, log_level='DEBUG', reset_status=True)
self.PC.config.disable_exception_handler = False self.PC.config.disable_exception_handler = False
# update attributes of DB_job_record and related DB entry # update attributes of DB_job_record and related DB entry
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment