Commit 76a2d7b4 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Refactored class process_controller to ProcessController. Merged...

Refactored class process_controller to ProcessController. Merged ProcessController.run_all_processors and ProcessController.run_all_processors_OLD.
parent f176df9e
......@@ -205,7 +205,7 @@ Credits
This package was created with Cookiecutter_ and the `audreyr/cookiecutter-pypackage`_ project template.
Landsat-5/7/8 satellite data and SRTM digital elevation models have been have been provided by the US Geological
Landsat-5/7/8 satellite data and SRTM/ASTER digital elevation models have been have been provided by the US Geological
Survey. Sentinel-2 data have been provided by ESA.
.. _Cookiecutter: https://github.com/audreyr/cookiecutter
......
......@@ -9,7 +9,7 @@ import matplotlib
matplotlib.use('Agg', warn=False) # switch matplotlib backend to 'Agg' and disable warning in case its already 'Agg'
from gms_preprocessing import process_controller, __version__ # noqa: E402
from gms_preprocessing import ProcessController, __version__ # noqa: E402
from gms_preprocessing.misc.database_tools import GMS_JOB # noqa: E402
from gms_preprocessing.options.config import get_conn_database # noqa: E402
from gms_preprocessing.options.config import path_options_default # noqa: E402
......@@ -63,7 +63,7 @@ def run_from_jobid(args):
if 'GMS_IS_TEST' in os.environ and os.environ['GMS_IS_TEST'] == 'True':
kwargs['is_test'] = True
PC = process_controller(args.jobid, **kwargs)
PC = ProcessController(args.jobid, **kwargs)
# run the job
if 'GMS_IS_TEST_CONFIG' in os.environ and os.environ['GMS_IS_TEST_CONFIG'] == 'True':
......@@ -135,7 +135,7 @@ def _run_job(dbJob, **config_kwargs):
if 'GMS_IS_TEST' in os.environ and os.environ['GMS_IS_TEST'] == 'True':
config_kwargs['is_test'] = True
PC = process_controller(dbJob.id, **config_kwargs)
PC = ProcessController(dbJob.id, **config_kwargs)
# run the job
if 'GMS_IS_TEST_CONFIG' in os.environ and os.environ['GMS_IS_TEST_CONFIG'] == 'True':
......
......@@ -361,8 +361,8 @@
}
],
"source": [
"from gms_preprocessing import process_controller \n",
"PC = process_controller(26186937, db_host='localhost', is_test=True)\n",
"from gms_preprocessing import ProcessController \n",
"PC = ProcessController(26186937, db_host='localhost', is_test=True)\n",
"PC.run_all_processors()"
]
},
......@@ -405,6 +405,12 @@
}
],
"source": [
"import os\n",
"import glob\n",
"from gms_preprocessing.misc import database_tools as DB_T\n",
"\n",
"conn_DB = PC.config.conn_database\n",
"\n",
"# set parameters\n",
"src_folder = '/home/gfz-fe/Downloads/GMS_beta_usecase/Landsat-7/'\n",
"filenames = [os.path.basename(i) for i in glob.glob(os.path.join(src_folder,'L*.tar.gz'))]\n",
......@@ -905,7 +911,6 @@
{
"ename": "AssertionError",
"evalue": "Mixed data types in postgreSQL matching expressions are not supported. Got [].",
"output_type": "error",
"traceback": [
"\u001b[1;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[1;31mAssertionError\u001b[0m Traceback (most recent call last)",
......@@ -916,7 +921,8 @@
"\u001b[1;32m/home/gfz-fe/GeoMultiSens/misc/database_tools.py\u001b[0m in \u001b[0;36m<listcomp>\u001b[1;34m(.0)\u001b[0m\n\u001b[0;32m 110\u001b[0m \u001b[1;32mreturn\u001b[0m \u001b[1;34m'database connection fault'\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 111\u001b[0m \u001b[0mcursor\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mconnection\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcursor\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 112\u001b[1;33m \u001b[0mcondition\u001b[0m \u001b[1;33m=\u001b[0m \u001b[1;34m\"WHERE \"\u001b[0m \u001b[1;33m+\u001b[0m \u001b[1;34m\" AND \"\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mjoin\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m[\u001b[0m\u001b[0mget_postgreSQL_matchingExp\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mk\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mv\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;32mfor\u001b[0m \u001b[0mk\u001b[0m\u001b[1;33m,\u001b[0m\u001b[0mv\u001b[0m \u001b[1;32min\u001b[0m \u001b[0mcond_dict\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mitems\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m)\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m\\\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 113\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mcond_dict\u001b[0m \u001b[1;32melse\u001b[0m \u001b[1;34m\"\"\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 114\u001b[0m \u001b[0mcmd\u001b[0m \u001b[1;33m=\u001b[0m \u001b[1;34m\"SELECT \"\u001b[0m \u001b[1;33m+\u001b[0m \u001b[1;34m','\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mjoin\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mvals2return\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;33m+\u001b[0m \u001b[1;34m\" FROM \"\u001b[0m \u001b[1;33m+\u001b[0m \u001b[0mtablename\u001b[0m \u001b[1;33m+\u001b[0m \u001b[1;34m\" \"\u001b[0m \u001b[1;33m+\u001b[0m \u001b[0mcondition\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
"\u001b[1;32m/home/gfz-fe/GeoMultiSens/misc/database_tools.py\u001b[0m in \u001b[0;36mget_postgreSQL_matchingExp\u001b[1;34m(key, value)\u001b[0m\n\u001b[0;32m 83\u001b[0m \u001b[0mdTypes_in_value\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mlist\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mset\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m[\u001b[0m\u001b[0mtype\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mi\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;32mfor\u001b[0m \u001b[0mi\u001b[0m \u001b[1;32min\u001b[0m \u001b[0mvalue\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 84\u001b[0m \u001b[1;32massert\u001b[0m \u001b[0mlen\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mdTypes_in_value\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m==\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m,\u001b[0m\u001b[0;31m\\\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 85\u001b[1;33m \u001b[1;34m'Mixed data types in postgreSQL matching expressions are not supported. Got %s.'\u001b[0m \u001b[1;33m%\u001b[0m\u001b[0mdTypes_in_value\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 86\u001b[0m \u001b[1;32massert\u001b[0m \u001b[0mdTypes_in_value\u001b[0m\u001b[1;33m[\u001b[0m\u001b[1;36m0\u001b[0m\u001b[1;33m]\u001b[0m \u001b[1;32min\u001b[0m \u001b[1;33m[\u001b[0m\u001b[0mint\u001b[0m\u001b[1;33m,\u001b[0m\u001b[0mstr\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 87\u001b[0m \u001b[0mpgList\u001b[0m \u001b[1;33m=\u001b[0m \u001b[1;34m\",\"\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mjoin\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m[\u001b[0m\u001b[1;34m\"'%s'\"\u001b[0m \u001b[1;33m%\u001b[0m\u001b[0mi\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0misinstance\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mvalue\u001b[0m\u001b[1;33m[\u001b[0m\u001b[1;36m0\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m,\u001b[0m\u001b[0mstr\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;32melse\u001b[0m \u001b[1;34m\"%s\"\u001b[0m \u001b[1;33m%\u001b[0m\u001b[0mi\u001b[0m \u001b[1;32mfor\u001b[0m \u001b[0mi\u001b[0m \u001b[1;32min\u001b[0m \u001b[0mvalue\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
"\u001b[1;31mAssertionError\u001b[0m: Mixed data types in postgreSQL matching expressions are not supported. Got []."
]
],
"output_type": "error"
}
],
"source": [
......@@ -1375,7 +1381,7 @@
"navigate_menu": true,
"number_sections": true,
"sideBar": true,
"threshold": 6,
"threshold": 6.0,
"toc_cell": false,
"toc_section_display": "block",
"toc_window_display": false
......
......@@ -11,7 +11,7 @@ from . import processing # noqa: E402
from . import options # noqa: F401 (imported but unused)
from .options import config # noqa: F401 (imported but unused)
from .options.config import set_config # noqa: F401 (imported but unused)
from .processing.process_controller import process_controller # noqa: E402
from .processing.process_controller import ProcessController # noqa: E402
from .version import __version__, __versionalias__ # noqa (E402 + F401)
__author__ = """Daniel Scheffler"""
......@@ -25,5 +25,5 @@ __all__ = ['__version__',
'config' # only to keep compatibility with HU-INF codes
'options',
'set_config',
'process_controller',
'ProcessController',
]
......@@ -42,7 +42,7 @@ if TYPE_CHECKING:
__author__ = 'Daniel Scheffler'
class process_controller(object):
class ProcessController(object):
def __init__(self, job_ID, **config_kwargs):
"""gms_preprocessing process controller
......@@ -357,7 +357,7 @@ class process_controller(object):
return DB_objs
def run_all_processors_OLD(self, custom_data_list=None):
def run_all_processors(self, custom_data_list=None, serialize_after_each_mapper=False):
"""
Run all processors at once.
"""
......@@ -366,73 +366,6 @@ class process_controller(object):
signal.signal(signal.SIGINT, self.stop) # catches a KeyboardInterrupt
signal.signal(signal.SIGTERM, self.stop) # catches a 'kill' or 'pkill'
# noinspection PyBroadException
try:
if self.config.profiling:
from pyinstrument import Profiler
self.profiler = Profiler() # or Profiler(use_signal=False), see below
self.profiler.start()
self.logger.info('Execution of entire GeoMultiSens pre-processing chain started for job ID %s...'
% self.config.ID)
self.DB_job_record.reset_job_progress() # updates attributes of DB_job_record and related DB entry
self.config.status = 'running'
self.update_DB_job_record() # TODO implement that into job.status.setter
self.failed_objects = []
# get list of datasets to be processed
if custom_data_list:
self.config.data_list = custom_data_list
# add local availability
self.config.data_list = self.add_local_availability(self.config.data_list)
self.update_DB_job_statistics(self.config.data_list)
self.L1A_processing()
self.L1B_processing()
self.L1C_processing()
self.L2A_processing()
self.L2B_processing()
self.L2C_processing()
# create summary
self.create_job_summary()
self.logger.info('Execution finished.')
self.logger.info('The job logfile, options file and the summary files have been saved here: \n'
'%s.*' % os.path.splitext(self.logger.path_logfile)[0])
# TODO implement failed_with_warnings:
self.config.status = 'finished' if not self.failed_objects else 'finished_with_errors'
self.config.end_time = datetime.datetime.now()
self.config.computation_time = self.config.end_time - self.config.start_time
self.logger.info('Time for execution: %s' % self.config.computation_time)
except Exception: # noqa E722 # bare except
self.config.status = 'failed'
if not self.config.disable_exception_handler:
self.logger.error('Execution failed with an error:', exc_info=True)
else:
self.logger.error('Execution failed with an error:')
raise
finally:
# update database entry of current job
self.update_DB_job_record()
if self.config.profiling:
self.profiler.stop()
print(self.profiler.output_text(unicode=True, color=True))
self.shutdown()
def run_all_processors(self, custom_data_list=None):
# enable clean shutdown possibility
# NOTE: a signal.SIGKILL (kill -9 ...) forces to kill the process and cannot be catched or handled
signal.signal(signal.SIGINT, self.stop) # catches a KeyboardInterrupt
signal.signal(signal.SIGTERM, self.stop) # catches a 'kill' or 'pkill'
# noinspection PyBroadException
try:
if self.config.profiling:
......@@ -458,36 +391,45 @@ class process_controller(object):
self.config.data_list = self.add_local_availability(self.config.data_list)
self.update_DB_job_statistics(self.config.data_list)
# group dataset dicts by sceneid
dataset_groups = HLP_F.group_dicts_by_key(self.config.data_list, key='scene_ID')
if not serialize_after_each_mapper:
# group dataset dicts by sceneid
dataset_groups = HLP_F.group_dicts_by_key(self.config.data_list, key='scene_ID')
# close logger to release FileHandler of job log (workers will log into job logfile)
del self.logger
# close logger to release FileHandler of job log (workers will log into job logfile)
del self.logger
# RUN PREPROCESSING
from .pipeline import run_complete_preprocessing
GMS_objs = imap_unordered(run_complete_preprocessing, dataset_groups, flatten_output=True)
# RUN PREPROCESSING
from .pipeline import run_complete_preprocessing
GMS_objs = imap_unordered(run_complete_preprocessing, dataset_groups, flatten_output=True)
# separate results into successful and failed objects
def assign_attr(tgt_procL):
return [obj for obj in GMS_objs if isinstance(obj, GMS_object) and obj.proc_level == tgt_procL]
# separate results into successful and failed objects
def assign_attr(tgt_procL):
return [obj for obj in GMS_objs if isinstance(obj, GMS_object) and obj.proc_level == tgt_procL]
self.L1A_newObjects = assign_attr('L1A')
self.L1B_newObjects = assign_attr('L1B')
self.L1C_newObjects = assign_attr('L1C')
self.L2A_newObjects = assign_attr('L2A')
self.L2B_newObjects = assign_attr('L2B')
self.L2C_newObjects = assign_attr('L2C')
self.failed_objects = [obj for obj in GMS_objs if isinstance(obj, failed_GMS_object)]
self.L1A_newObjects = assign_attr('L1A')
self.L1B_newObjects = assign_attr('L1B')
self.L1C_newObjects = assign_attr('L1C')
self.L2A_newObjects = assign_attr('L2A')
self.L2B_newObjects = assign_attr('L2B')
self.L2C_newObjects = assign_attr('L2C')
self.failed_objects = [obj for obj in GMS_objs if isinstance(obj, failed_GMS_object)]
else:
self.L1A_processing()
self.L1B_processing()
self.L1C_processing()
self.L2A_processing()
self.L2B_processing()
self.L2C_processing()
# create summary
self.create_job_summary()
self.logger.info('Execution finished.')
self.logger.info('The job logfile and the summary files have been saved here: \n'
self.logger.info('The job logfile, options file and the summary files have been saved here: \n'
'%s.*' % os.path.splitext(self.logger.path_logfile)[0])
# TODO implement failed_with_warnings
self.config.status = 'finished' if not self.failed_objects else 'finished_with_warnings'
# TODO implement failed_with_warnings:
self.config.status = 'finished' if not self.failed_objects else 'finished_with_errors'
self.config.end_time = datetime.datetime.now()
self.config.computation_time = self.config.end_time - self.config.start_time
self.logger.info('Time for execution: %s' % self.config.computation_time)
......
......@@ -11,7 +11,7 @@ Tests for gms_preprocessing.misc.exception_handler.ExceptionHandler
import unittest
from collections import OrderedDict # noqa F401 # flake8 issue
from gms_preprocessing import process_controller
from gms_preprocessing import ProcessController
from gms_preprocessing.misc.exception_handler import log_uncaught_exceptions
from gms_preprocessing.algorithms.L1A_P import L1A_object
from gms_preprocessing.misc.database_tools import get_info_from_postgreSQLdb
......@@ -26,8 +26,8 @@ class BaseTest_ExceptionHandler:
PC = None # default
def get_process_controller(self, jobID):
self.PC = process_controller(jobID, parallelization_level='scenes', db_host=db_host,
is_test=True, log_level='DEBUG', reset_status=True)
self.PC = ProcessController(jobID, parallelization_level='scenes', db_host=db_host,
is_test=True, log_level='DEBUG', reset_status=True)
self.PC.config.disable_exception_handler = False
# update attributes of DB_job_record and related DB entry
......
......@@ -44,7 +44,7 @@ import time
import unittest
# Imports regarding the 'gms_preprocessing' module.
from gms_preprocessing import process_controller, __file__
from gms_preprocessing import ProcessController, __file__
from gms_preprocessing.model.gms_object import GMS_object
from gms_preprocessing.algorithms.L1A_P import L1A_object
from gms_preprocessing.algorithms.L1B_P import L1B_object
......@@ -97,7 +97,7 @@ class BaseTestCases:
@classmethod
def create_job(cls, jobID, config):
cls.PC = process_controller(jobID, **config)
cls.PC = ProcessController(jobID, **config)
# update attributes of DB_job_record and related DB entry
cls.PC.config.DB_job_record.reset_job_progress()
......@@ -203,7 +203,7 @@ class BaseTestCases:
@classmethod
def create_job(cls, jobID, config):
cls.PC = process_controller(jobID, **config)
cls.PC = ProcessController(jobID, **config)
[cls.validate_db_entry(ds['filename']) for ds in cls.PC.config.data_list]
......@@ -410,7 +410,7 @@ class Test_ProcessContinuing_CompletePipeline(unittest.TestCase):
@classmethod
def create_job(cls, jobID, config):
cls.PC = process_controller(jobID, **config)
cls.PC = ProcessController(jobID, **config)
cls.PC.logger.info('Execution of entire GeoMultiSens pre-processing chain started for job ID %s...'
% cls.PC.config.ID)
......@@ -562,8 +562,8 @@ class Test_in_normal_mode(unittest.TestCase):
# self.job_id = 26188163 # GEOMS: pandas.errors.ParserError: Expected 2 fields in line 31, saw 3
self.job_id = 26189301 # GEOMS: process continuation
self.PC = process_controller(self.job_id, **dict(is_test=False, parallelization_level='scenes', db_host=db_host,
delete_old_output=True, disable_exception_handler=True))
self.PC = ProcessController(self.job_id, **dict(is_test=False, parallelization_level='scenes', db_host=db_host,
delete_old_output=True, disable_exception_handler=True))
# self.PC.config.spathomo_estimate_accuracy = True
# self.PC.config.ac_estimate_accuracy = True
# self.PC.config.spechomo_estimate_accuracy = True
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment