process_controller.py 39.6 KB
Newer Older
1 2
# -*- coding: utf-8 -*-

3 4
# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
#
5
# Copyright (C) 2020  Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
6 7 8 9 10 11
#
# This software was developed within the context of the GeoMultiSens project funded
# by the German Federal Ministry of Education and Research
# (project grant code: 01 IS 14 010 A-C).
#
# This program is free software: you can redistribute it and/or modify it under
12
# the terms of the GNU General Public License as published by the Free Software
13 14
# Foundation, either version 3 of the License, or (at your option) any later version.
# Please note the following exception: `gms_preprocessing` depends on tqdm, which
15 16 17
# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
18 19 20 21 22 23 24 25 26
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.

27
from __future__ import (division, print_function, unicode_literals, absolute_import)
28 29 30

import numpy as np
from pandas import DataFrame
31 32 33 34
import datetime
import os
import time
from itertools import chain
35
import signal
36
import re
37
from typing import TYPE_CHECKING
38
import shutil
Daniel Scheffler's avatar
Daniel Scheffler committed
39
import sys
40
from natsort import natsorted
41

42 43
from ..io import output_writer as OUT_W
from ..io import input_reader as INP_R
44 45 46 47 48
from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc.path_generator import path_generator
from ..misc.logging import GMS_logger, shutdown_loggers
from ..algorithms import L1A_P, L1B_P, L1C_P, L2A_P, L2B_P, L2C_P
49
from ..model.metadata import get_LayerBandsAssignment
50
from ..model.gms_object import failed_GMS_object, GMS_object, GMS_identifier
51
from .pipeline import (L1A_map, L1A_map_1, L1A_map_2, L1A_map_3, L1B_map, L1C_map,
52
                       L2A_map, L2B_map, L2C_map)
53
from ..options.config import set_config
54
from .multiproc import MAP, imap_unordered
55
from ..misc.definition_dicts import proc_chain, db_jobs_statistics_def
56
from ..misc.locks import release_unclosed_locks
57
from ..version import __version__, __versionalias__
58

59 60
from py_tools_ds.numeric.array import get_array_tilebounds

61
if TYPE_CHECKING:
Daniel Scheffler's avatar
Daniel Scheffler committed
62
    from collections import OrderedDict  # noqa F401  # flake8 issue
63 64
    from typing import List  # noqa F401  # flake8 issue
    from ..options.config import GMS_config  # noqa F401  # flake8 issue
65 66 67 68


__author__ = 'Daniel Scheffler'

69

70
class ProcessController(object):
71
    def __init__(self, job_ID, **config_kwargs):
72
        """gms_preprocessing process controller
73

74 75
        :param job_ID:          job ID belonging to a valid database record within table 'jobs'
        :param config_kwargs:   keyword arguments to be passed to gms_preprocessing.set_config()
76 77 78
        """

        # assertions
79 80
        if not isinstance(job_ID, int):
            raise ValueError("'job_ID' must be an integer value. Got %s." % type(job_ID))
81

82 83
        # set GMS configuration
        config_kwargs.update(dict(reset_status=True))
84
        self.config = set_config(job_ID, **config_kwargs)  # type: GMS_config
85 86

        # defaults
87
        self._logger = None
88
        self._DB_job_record = None
89
        self.profiler = None
90 91 92 93 94

        self.failed_objects = []
        self.L1A_newObjects = []
        self.L1B_newObjects = []
        self.L1C_newObjects = []
95
        self.L2A_newObjects = []
96
        self.L2A_tiles = []
97 98 99 100
        self.L2B_newObjects = []
        self.L2C_newObjects = []

        self.summary_detailed = None
101
        self.summary_quick = None
102

103 104
        # check if process_controller is executed by debugger
        # isdebugging = 1 if True in [frame[1].endswith("pydevd.py") for frame in inspect.stack()] else False
105
        # if isdebugging:  # override the existing settings in order to get write access everywhere
106 107
        #    pass

108
        # called_from_iPyNb = 1 if 'ipykernel/__main__.py' in sys.argv[0] else 0
109

110
        # create job log
111 112 113
        self._path_job_logfile = os.path.join(self.config.path_job_logs, '%s.log' % self.config.ID)
        if os.path.exists(self._path_job_logfile):
            HLP_F.silentremove(self._path_job_logfile)
114

115
        self.logger.info("Executing gms_preprocessing, version: %s (%s)" % (__version__, __versionalias__))
116
        self.logger.info('Process Controller initialized for job ID %s (comment: %s).'
117
                         % (self.config.ID, self.DB_job_record.comment))
118
        self.logger.info('Job logfile: %s' % self._path_job_logfile)
119 120 121 122

        # save config
        self._path_job_optionsfile = os.path.join(self.config.path_job_logs, '%s_options.json' % self.config.ID)
        self.config.save(self._path_job_optionsfile)
123
        self.logger.info('Job options file: %s' % self._path_job_optionsfile)
124

125
        if self.config.delete_old_output:
126 127
            self.logger.info('Deleting previously processed data...')
            self.DB_job_record.delete_procdata_of_entire_job(force=True)
128

129 130 131 132 133
    @property
    def logger(self):
        if self._logger and self._logger.handlers[:]:
            return self._logger
        else:
134
            self._logger = GMS_logger('ProcessController__%s' % self.config.ID, fmt_suffix='ProcessController',
135
                                      path_logfile=self._path_job_logfile, log_level=self.config.log_level, append=True)
136 137 138 139 140 141 142 143 144
            return self._logger

    @logger.setter
    def logger(self, logger):
        self._logger = logger

    @logger.deleter
    def logger(self):
        if self._logger not in [None, 'not set']:
145 146
            self._logger.close()
            self._logger = None
147 148

    @property
149 150 151 152
    def DB_job_record(self):
        if self._DB_job_record:
            return self._DB_job_record
        else:
153 154
            self._DB_job_record = DB_T.GMS_JOB(self.config.conn_database)
            self._DB_job_record.from_job_ID(self.config.ID)
155
            return self._DB_job_record
156

157 158 159
    @DB_job_record.setter
    def DB_job_record(self, value):
        self._DB_job_record = value
160

161 162 163
    @property
    def sceneids_failed(self):
        return [obj.scene_ID for obj in self.failed_objects]
164

165 166
    def _add_local_availability_single_dataset(self, dataset):
        # type: (OrderedDict) -> OrderedDict
167
        # TODO revise this function
168 169
        # query the database and get the last written processing level and LayerBandsAssignment
        DB_match = DB_T.get_info_from_postgreSQLdb(
170
            self.config.conn_database, 'scenes_proc', ['proc_level', 'layer_bands_assignment'],
171
            dict(sceneid=dataset['scene_ID']))
Daniel Scheffler's avatar
Daniel Scheffler committed
172

173
        # get the corresponding logfile
174 175
        path_logfile = path_generator(dataset).get_path_logfile(merged_subsystems=False)
        path_logfile_merged_ss = path_generator(dataset).get_path_logfile(merged_subsystems=True)
176 177 178 179

        def get_AllWrittenProcL_dueLog(path_log):  # TODO replace this by database query + os.path.exists
            """Returns all processing level that have been successfully written according to logfile."""

Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
180 181 182 183
            if not os.path.exists(path_log):
                if path_log == path_logfile:  # path_logfile_merged_ss has already been searched
                    self.logger.info("No logfile named '%s' found for %s at %s. Dataset has to be reprocessed."
                                     % (os.path.basename(path_log), dataset['entity_ID'], os.path.dirname(path_log)))
184 185 186
                AllWrittenProcL_dueLog = []
            else:
                logfile = open(path_log, 'r').read()
187
                AllWrittenProcL_dueLog = re.findall(r":*(\S*\s*) data successfully saved.", logfile, re.I)
188
                if not AllWrittenProcL_dueLog and path_logfile == path_logfile_merged_ss:  # AllWrittenProcL_dueLog = []
189 190 191
                    self.logger.info('%s: According to logfile no completely processed data exist at any '
                                     'processing level. Dataset has to be reprocessed.' % dataset['entity_ID'])
                else:
192
                    AllWrittenProcL_dueLog = natsorted(list(set(AllWrittenProcL_dueLog)))
193 194 195 196 197 198
            return AllWrittenProcL_dueLog

        # check if there are not multiple database records for this dataset
        if len(DB_match) == 1 or DB_match == [] or DB_match == 'database connection fault':

            # get all processing level that have been successfully written
199 200
            # NOTE: first check for merged subsystem datasets because they have hiver processing levels
            AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile_merged_ss)
201
            if not AllWrittenProcL:
202
                AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile)
Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
203 204 205
            else:
                # A L2A+ dataset with merged subsystems has been found. Use that logfile.
                path_logfile = path_logfile_merged_ss
206

207 208 209 210 211 212 213 214 215 216 217 218
            dataset['proc_level'] = None  # default (dataset has to be reprocessed)

            # loop through all the found proc. levels and find the one that fulfills all requirements
            for ProcL in reversed(AllWrittenProcL):
                if dataset['proc_level']:
                    break  # proc_level found; no further searching for lower proc_levels
                assumed_path_GMS_file = '%s_%s.gms' % (os.path.splitext(path_logfile)[0], ProcL)

                # check if there is also a corresponding GMS_file on disk
                if os.path.isfile(assumed_path_GMS_file):
                    GMS_file_dict = INP_R.GMSfile2dict(assumed_path_GMS_file)
                    target_LayerBandsAssignment = \
219 220 221 222 223 224 225 226 227
                        get_LayerBandsAssignment(
                            GMS_identifier(
                                image_type=dataset['image_type'],
                                satellite=dataset['satellite'],
                                sensor=dataset['sensor'],
                                subsystem=dataset['subsystem'] if path_logfile != path_logfile_merged_ss else '',
                                proc_level=ProcL,  # must be respected because LBA changes after atm. Corr.
                                dataset_ID=dataset['dataset_ID']),
                            nBands=(1 if dataset['sensormode'] == 'P' else None))
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254

                    # check if the LayerBandsAssignment of the written dataset on disk equals the
                    # desired LayerBandsAssignment
                    if target_LayerBandsAssignment == GMS_file_dict['LayerBandsAssignment']:

                        # update the database record if the dataset could not be found in database
                        if DB_match == [] or DB_match == 'database connection fault':
                            self.logger.info('The dataset %s is not included in the database of processed data but'
                                             ' according to logfile %s has been written successfully. Recreating '
                                             'missing database entry.' % (dataset['entity_ID'], ProcL))
                            DB_T.data_DB_updater(GMS_file_dict)

                            dataset['proc_level'] = ProcL

                        # if the dataset could be found in database
                        elif len(DB_match) == 1:
                            try:
                                self.logger.info('Found a matching %s dataset for %s. Processing skipped until %s.'
                                                 % (ProcL, dataset['entity_ID'],
                                                    proc_chain[proc_chain.index(ProcL) + 1]))
                            except IndexError:
                                self.logger.info('Found a matching %s dataset for %s. Processing already done.'
                                                 % (ProcL, dataset['entity_ID']))

                            if DB_match[0][0] == ProcL:
                                dataset['proc_level'] = DB_match[0][0]
                            else:
Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
255
                                dataset['proc_level'] = ProcL
256

Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
257
                    else:
Daniel Scheffler's avatar
Daniel Scheffler committed
258
                        self.logger.info('Found a matching %s dataset for %s but with a different '
Daniel Scheffler's avatar
Daniel Scheffler committed
259
                                         'LayerBandsAssignment (desired: %s; found %s). Dataset has to be reprocessed.'
260 261
                                         % (ProcL, dataset['entity_ID'],
                                            target_LayerBandsAssignment, GMS_file_dict['LayerBandsAssignment']))
262 263 264 265 266
                else:
                    self.logger.info('%s for dataset %s has been written due to logfile but no corresponding '
                                     'dataset has been found.' % (ProcL, dataset['entity_ID']) +
                                     ' Searching for lower processing level...'
                                     if AllWrittenProcL.index(ProcL) != 0 else '')
267

268 269 270 271
        elif len(DB_match) > 1:
            self.logger.info('According to database there are multiple matches for the dataset %s. Dataset has to '
                             'be reprocessed.' % dataset['entity_ID'])
            dataset['proc_level'] = None
272

273 274
        else:
            dataset['proc_level'] = None
275

276
        return dataset
Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
277

278 279 280
    def add_local_availability(self, datasets):
        # type: (List[OrderedDict]) -> List[OrderedDict]
        """Check availability of all subsets per scene and processing level.
Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
281

282 283 284 285 286 287
        NOTE: The processing level of those scenes, where not all subsystems are available in the same processing level
              is reset.

        :param datasets:    List of one OrderedDict per subsystem as generated by CFG.data_list
        """
        datasets = [self._add_local_availability_single_dataset(ds) for ds in datasets]
288

289 290 291
        ######################################################################################################
        # validate that all subsystems of the same sceneid are at the same processing level; otherwise reset #
        ######################################################################################################
Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
292 293
        datasets_validated = []
        datasets_grouped = HLP_F.group_dicts_by_key(datasets, key='scene_ID')
294

Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
295 296
        for ds_group in datasets_grouped:
            proc_lvls = [ds['proc_level'] for ds in ds_group]
297

Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
298 299
            if not len(list(set(proc_lvls))) == 1:
                # reset processing level of those scenes where not all subsystems are available
300
                self.logger.info('%s: Found already processed subsystems at different processing levels %s. '
301
                                 'Dataset has to be reprocessed to avoid errors.'
Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
302
                                 % (ds_group[0]['entity_ID'], proc_lvls))
303

Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
304 305
                for ds in ds_group:
                    ds['proc_level'] = None
Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
306
                    datasets_validated.append(ds)
Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
307 308
            else:
                datasets_validated.extend(ds_group)
309

Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
310
        return datasets_validated
311

312 313
    @staticmethod
    def _is_inMEM(GMS_objects, dataset):
Daniel Scheffler's avatar
Daniel Scheffler committed
314
        # type: (list, OrderedDict) -> bool
315 316 317 318
        """Checks whether a dataset within a dataset list has been processed in the previous processing level.
        :param GMS_objects: <list> a list of GMS objects that has been recently processed
        :param dataset:     <collections.OrderedDict> as generated by L0A_P.get_data_list_of_current_jobID()
        """
319
        # check if the scene ID of the given dataset is in the scene IDs of the previously processed datasets
320 321 322
        return dataset['scene_ID'] in [obj.scene_ID for obj in GMS_objects]

    def _get_processor_data_list(self, procLvl, prevLvl_objects=None):
323
        """Returns a list of datasets that have to be read from disk and then processed by a specific processor.
324 325 326 327 328

        :param procLvl:
        :param prevLvl_objects:
        :return:
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
329 330
        def is_procL_lower(dataset):
            return HLP_F.is_proc_level_lower(dataset['proc_level'], target_lvl=procLvl)
331 332

        if prevLvl_objects is None:
Daniel Scheffler's avatar
Daniel Scheffler committed
333
            return [dataset for dataset in self.config.data_list if is_procL_lower(dataset)]  # TODO generator?
334
        else:
Daniel Scheffler's avatar
Daniel Scheffler committed
335
            return [dataset for dataset in self.config.data_list if is_procL_lower(dataset) and
336
                    not self._is_inMEM(prevLvl_objects + self.failed_objects, dataset)]
337 338 339 340 341 342 343 344 345 346 347 348 349

    def get_DB_objects(self, procLvl, prevLvl_objects=None, parallLev=None, blocksize=None):
        """
        Returns a list of GMS objects for datasets available on disk that have to be processed by the current processor.

        :param procLvl:         <str> processing level oof the current processor
        :param prevLvl_objects: <list> of in-mem GMS objects produced by the previous processor
        :param parallLev:       <str> parallelization level ('scenes' or 'tiles')
                                -> defines if full cubes or blocks are to be returned
        :param blocksize:       <tuple> block size in case blocks are to be returned, e.g. (2000,2000)
        :return:
        """
        # TODO get prevLvl_objects automatically from self
350
        if procLvl == 'L1A':
351 352 353
            return []
        else:
            # handle input parameters
Daniel Scheffler's avatar
Daniel Scheffler committed
354 355
            parallLev = parallLev or self.config.parallelization_level
            blocksize = blocksize or self.config.tiling_block_size_XY
356
            prevLvl = proc_chain[proc_chain.index(procLvl) - 1]  # TODO replace by enum
357 358

            # get GMSfile list
359
            dataset_dicts = self._get_processor_data_list(procLvl, prevLvl_objects)
360 361 362 363 364
            GMSfile_list_prevLvl_inDB = INP_R.get_list_GMSfiles(dataset_dicts, prevLvl)

            # create GMS objects from disk with respect to parallelization level and block size
            if parallLev == 'scenes':
                # get input parameters for creating GMS objects as full cubes
365
                work = [[GMS, ['cube', None]] for GMS in GMSfile_list_prevLvl_inDB]
366 367
            else:
                # define tile positions and size
368
                def get_tilepos_list(GMSfile):
369 370
                    return get_array_tilebounds(array_shape=INP_R.GMSfile2dict(GMSfile)['shape_fullArr'],
                                                tile_shape=blocksize)
371 372 373

                # get input parameters for creating GMS objects as blocks
                work = [[GMSfile, ['block', tp]] for GMSfile in GMSfile_list_prevLvl_inDB
374
                        for tp in get_tilepos_list(GMSfile)]
375

376 377 378 379
            # create GMS objects for the found files on disk
            # NOTE: DON'T multiprocess that with MAP(GMS_object(*initargs).from_disk, work)
            # in case of multiple subsystems GMS_object(*initargs) would always point to the same object in memory
            # -> subsystem attribute will be overwritten each time
380 381 382
            from ..misc.helper_functions import get_parentObjDict
            parentObjDict = get_parentObjDict()
            DB_objs = [parentObjDict[prevLvl].from_disk(tuple_GMS_subset=w) for w in work]
383

384 385 386 387 388
            if DB_objs:
                DB_objs = list(chain.from_iterable(DB_objs)) if list in [type(i) for i in DB_objs] else list(DB_objs)

            return DB_objs

389
    def run_all_processors(self, custom_data_list=None, serialize_after_each_mapper=False):
390 391 392
        """
        Run all processors at once.
        """
393 394 395 396
        # enable clean shutdown possibility
        # NOTE: a signal.SIGKILL (kill -9 ...) forces to kill the process and cannot be catched or handled
        signal.signal(signal.SIGINT, self.stop)  # catches a KeyboardInterrupt
        signal.signal(signal.SIGTERM, self.stop)  # catches a 'kill' or 'pkill'
397

Daniel Scheffler's avatar
Daniel Scheffler committed
398 399 400 401 402 403 404 405 406
        # noinspection PyBroadException
        try:
            if self.config.profiling:
                from pyinstrument import Profiler
                self.profiler = Profiler()  # or Profiler(use_signal=False), see below
                self.profiler.start()

            self.logger.info('Execution of entire GeoMultiSens pre-processing chain started for job ID %s...'
                             % self.config.ID)
407

Daniel Scheffler's avatar
Daniel Scheffler committed
408 409
            self.DB_job_record.reset_job_progress()  # updates attributes of DB_job_record and related DB entry
            self.config.status = 'running'
410
            GMS_object.proc_status_all_GMSobjs.clear()  # reset
Daniel Scheffler's avatar
Daniel Scheffler committed
411 412 413 414 415 416 417 418 419 420 421 422
            self.update_DB_job_record()  # TODO implement that into config.status.setter

            self.failed_objects = []

            # get list of datasets to be processed
            if custom_data_list:
                self.config.data_list = custom_data_list

            # add local availability
            self.config.data_list = self.add_local_availability(self.config.data_list)
            self.update_DB_job_statistics(self.config.data_list)

423 424 425 426 427 428
            if not serialize_after_each_mapper:
                # group dataset dicts by sceneid
                dataset_groups = HLP_F.group_dicts_by_key(self.config.data_list, key='scene_ID')

                # close logger to release FileHandler of job log (workers will log into job logfile)
                del self.logger
Daniel Scheffler's avatar
Daniel Scheffler committed
429

430 431 432
                # RUN PREPROCESSING
                from .pipeline import run_complete_preprocessing
                GMS_objs = imap_unordered(run_complete_preprocessing, dataset_groups, flatten_output=True)
433

434 435 436
                # separate results into successful and failed objects
                def assign_attr(tgt_procL):
                    return [obj for obj in GMS_objs if isinstance(obj, GMS_object) and obj.proc_level == tgt_procL]
Daniel Scheffler's avatar
Daniel Scheffler committed
437

438 439 440 441 442 443 444
                self.L1A_newObjects = assign_attr('L1A')
                self.L1B_newObjects = assign_attr('L1B')
                self.L1C_newObjects = assign_attr('L1C')
                self.L2A_newObjects = assign_attr('L2A')
                self.L2B_newObjects = assign_attr('L2B')
                self.L2C_newObjects = assign_attr('L2C')
                self.failed_objects = [obj for obj in GMS_objs if isinstance(obj, failed_GMS_object)]
445

446 447 448 449 450 451 452
            else:
                self.L1A_processing()
                self.L1B_processing()
                self.L1C_processing()
                self.L2A_processing()
                self.L2B_processing()
                self.L2C_processing()
Daniel Scheffler's avatar
Daniel Scheffler committed
453 454 455 456 457

            # create summary
            self.create_job_summary()

            self.logger.info('Execution finished.')
458
            self.logger.info('The job logfile, options file and the summary files have been saved here: \n'
459
                             '%s.*' % os.path.splitext(self.logger.path_logfile)[0])
460 461
            # TODO implement failed_with_warnings:
            self.config.status = 'finished' if not self.failed_objects else 'finished_with_errors'
Daniel Scheffler's avatar
Daniel Scheffler committed
462 463 464 465 466 467
            self.config.end_time = datetime.datetime.now()
            self.config.computation_time = self.config.end_time - self.config.start_time
            self.logger.info('Time for execution: %s' % self.config.computation_time)

        except Exception:  # noqa E722  # bare except
            self.config.status = 'failed'
468

469
            if not self.config.disable_exception_handler:
470
                self.logger.error('Execution failed with an error:', exc_info=True)
471
            else:
472
                self.logger.error('Execution failed with an error:')
473
                raise
474

475 476 477 478 479 480 481 482 483 484
        finally:
            # update database entry of current job
            self.update_DB_job_record()

            if self.config.profiling:
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

            self.shutdown()

485 486
    def stop(self, signum, frame):
        """Interrupt the running process controller gracefully."""
487

488 489
        self.logger.info('Process controller stopped via %s.'
                         % ('KeyboardInterrupt' if signum == 2 else 'SIGTERM command'))
490
        self.config.status = 'canceled'
491 492
        self.update_DB_job_record()

493 494
        self.shutdown()

495 496
        if signum == 2:
            raise KeyboardInterrupt('Received a KeyboardInterrupt.')  # terminate execution and show traceback
497
        elif signum == 15:
Daniel Scheffler's avatar
Daniel Scheffler committed
498 499
            sys.exit(0)
            # raise SystemExit()
500 501 502

    def shutdown(self):
        """Shutdown the process controller instance (loggers, remove temporary directories, ...)."""
Daniel Scheffler's avatar
Daniel Scheffler committed
503
        self.logger.info('Shutting down gracefully...')
504

505
        # release unclosed locks
506
        release_unclosed_locks()
507

508
        # clear any temporary files
509
        tempdir = os.path.join(self.config.path_tempdir)
510
        self.logger.info('Deleting temporary directory %s.' % tempdir)
511
        if os.path.exists(tempdir):
512
            shutil.rmtree(tempdir, ignore_errors=True)
513

514 515 516
        del self.logger
        shutdown_loggers()

517 518 519 520
    def benchmark(self):
        """
        Run a benchmark.
        """
521
        data_list_bench = self.config.data_list
522 523 524 525
        for count_datasets in range(len(data_list_bench)):
            t_processing_all_runs, t_IO_all_runs = [], []
            for count_run in range(10):
                current_data_list = data_list_bench[0:count_datasets + 1]
526 527
                if os.path.exists(self.config.path_database):
                    os.remove(self.config.path_database)
528 529 530 531 532 533 534 535 536 537
                t_start = time.time()
                self.run_all_processors(current_data_list)
                t_processing_all_runs.append(time.time() - t_start)
                t_IO_all_runs.append(globals()['time_IO'])

            assert current_data_list, 'Empty data list.'
            OUT_W.write_global_benchmark_output(t_processing_all_runs, t_IO_all_runs, current_data_list)

    def L1A_processing(self):
        """
538
        Run Level 1A processing: Data import and metadata homogenization
539
        """
540
        if self.config.exec_L1AP[0]:
541
            self.logger.info('\n\n##### Level 1A Processing started - raster format and metadata homogenization ####\n')
542

543 544
            datalist_L1A_P = self._get_processor_data_list('L1A')

545
            if self.config.parallelization_level == 'scenes':
546
                # map
547
                L1A_resObjects = MAP(L1A_map, datalist_L1A_P, CPUs=12)
548
            else:  # tiles
549 550
                all_L1A_tiles_map1 = MAP(L1A_map_1, datalist_L1A_P,
                                         flatten_output=True)  # map_1 # merge results to new list of splits
551

552 553 554
                L1A_obj_tiles = MAP(L1A_map_2, all_L1A_tiles_map1)  # map_2
                grouped_L1A_Tiles = HLP_F.group_objects_by_attributes(
                    L1A_obj_tiles, 'scene_ID', 'subsystem')  # group results
555

556
                L1A_objects = MAP(L1A_P.L1A_object.from_tiles, grouped_L1A_Tiles)  # reduce
557

558
                L1A_resObjects = MAP(L1A_map_3, L1A_objects)  # map_3
559

560
            self.L1A_newObjects = [obj for obj in L1A_resObjects if isinstance(obj, L1A_P.L1A_object)]
561
            self.failed_objects += [obj for obj in L1A_resObjects if isinstance(obj, failed_GMS_object) and
562 563 564 565 566 567
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L1A_newObjects

    def L1B_processing(self):
        """
568
        Run Level 1B processing: calculation of geometric shifts
569 570 571 572
        """
        # TODO implement check for running spatial index mediator server
        # run on full cubes

573
        if self.config.exec_L1BP[0]:
574
            self.logger.info('\n\n####### Level 1B Processing started - detection of geometric displacements #######\n')
575

576 577
            L1A_DBObjects = self.get_DB_objects('L1B', self.L1A_newObjects, parallLev='scenes')
            L1A_Instances = self.L1A_newObjects + L1A_DBObjects  # combine newly and earlier processed L1A data
578

579
            L1B_resObjects = MAP(L1B_map, L1A_Instances)
580

581 582 583
            self.L1B_newObjects = [obj for obj in L1B_resObjects if isinstance(obj, L1B_P.L1B_object)]
            self.failed_objects += [obj for obj in L1B_resObjects if isinstance(obj, failed_GMS_object) and
                                    obj.scene_ID not in self.sceneids_failed]
584 585 586 587 588

        return self.L1B_newObjects

    def L1C_processing(self):
        """
589
        Run Level 1C processing: atmospheric correction
590
        """
591
        if self.config.exec_L1CP[0]:
592
            self.logger.info('\n\n############## Level 1C Processing started - atmospheric correction ##############\n')
593

594
            if self.config.parallelization_level == 'scenes':
595 596 597 598
                L1B_DBObjects = self.get_DB_objects('L1C', self.L1B_newObjects)
                L1B_Instances = self.L1B_newObjects + L1B_DBObjects  # combine newly and earlier processed L1B data

                # group by scene ID (all subsystems belonging to the same scene ID must be processed together)
599
                grouped_L1B_Instances = HLP_F.group_objects_by_attributes(L1B_Instances, 'scene_ID')
600

601 602
                L1C_resObjects = MAP(L1C_map, grouped_L1B_Instances, flatten_output=True,
                                     CPUs=15)  # FIXME CPUs set to 15 for testing
603

604
            else:  # tiles
605 606
                raise NotImplementedError("Tiled processing is not yet completely implemented for L1C processor. Use "
                                          "parallelization level 'scenes' instead!")
607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625
                # blocksize = (5000, 5000)
                # """if newly processed L1A objects are present: cut them into tiles"""
                # L1B_newTiles = []
                # if self.L1B_newObjects:
                #     tuples_obj_blocksize = [(obj, blocksize) for obj in self.L1B_newObjects]
                #     L1B_newTiles = MAP(HLP_F.cut_GMS_obj_into_blocks, tuples_obj_blocksize, flatten_output=True)
                #
                # """combine newly and earlier processed L1B data"""
                # L1B_newDBTiles = self.get_DB_objects('L1C', self.L1B_newObjects, blocksize=blocksize)
                # L1B_tiles = L1B_newTiles + L1B_newDBTiles
                #
                # # TODO merge subsets of S2/Aster in order to provide all bands for atm.correction
                # L1C_tiles = MAP(L1C_map, L1B_tiles)
                # grouped_L1C_Tiles = \
                #     HLP_F.group_objects_by_attributes(L1C_tiles, 'scene_ID', 'subsystem')  # group results
                # [L1C_tiles_group[0].delete_tempFiles() for L1C_tiles_group in grouped_L1C_Tiles]
                # L1C_resObjects = MAP(L1C_P.L1C_object().from_tiles, grouped_L1C_Tiles)  # reduce

            self.L1C_newObjects = [obj for obj in L1C_resObjects if isinstance(obj, L1C_P.L1C_object)]
626
            self.failed_objects += [obj for obj in L1C_resObjects if isinstance(obj, failed_GMS_object) and
627 628 629 630 631 632
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L1C_newObjects

    def L2A_processing(self):
        """
633
        Run Level 2A processing: geometric homogenization
634
        """
635
        if self.config.exec_L2AP[0]:
636 637
            self.logger.info(
                '\n\n#### Level 2A Processing started - shift correction / geometric homogenization ####\n')
638

639
            """combine newly and earlier processed L1C data"""
640 641
            L1C_DBObjects = self.get_DB_objects('L2A', self.L1C_newObjects, parallLev='scenes')
            L1C_Instances = self.L1C_newObjects + L1C_DBObjects  # combine newly and earlier processed L1C data
642 643 644 645

            # group by scene ID (all subsystems belonging to the same scene ID must be processed together)
            grouped_L1C_Instances = HLP_F.group_objects_by_attributes(L1C_Instances, 'scene_ID')

646
            L2A_resTiles = MAP(L2A_map, grouped_L1C_Instances, flatten_output=True)
647

648
            self.L2A_tiles = [obj for obj in L2A_resTiles if isinstance(obj, L2A_P.L2A_object)]
649
            self.failed_objects += [obj for obj in L2A_resTiles if isinstance(obj, failed_GMS_object) and
650 651 652 653 654 655
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2A_tiles

    def L2B_processing(self):
        """
656
        Run Level 2B processing: spectral homogenization
657
        """
658
        if self.config.exec_L2BP[0]:
659
            self.logger.info('\n\n############# Level 2B Processing started - spectral homogenization ##############\n')
660

661
            if self.config.parallelization_level == 'scenes':
662
                # don't know if scenes makes sense in L2B processing because full objects are very big!
663
                """if newly processed L2A objects are present: merge them to scenes"""
664 665
                grouped_L2A_Tiles = HLP_F.group_objects_by_attributes(self.L2A_tiles, 'scene_ID')  # group results
                # reduce # will be too slow because it has to pickle back really large L2A_newObjects
666
                # L2A_newObjects  = MAP(HLP_F.merge_GMS_tiles_to_GMS_obj, grouped_L2A_Tiles)
667
                L2A_newObjects = [L2A_P.L2A_object.from_tiles(tileList) for tileList in grouped_L2A_Tiles]
668

669
                """combine newly and earlier processed L2A data"""
670 671
                L2A_DBObjects = self.get_DB_objects('L2B', self.L2A_tiles)
                L2A_Instances = L2A_newObjects + L2A_DBObjects  # combine newly and earlier processed L2A data
672

673
                L2B_resObjects = MAP(L2B_map, L2A_Instances)
674 675

            else:  # tiles
676
                L2A_newTiles = self.L2A_tiles  # tiles have the block size specified in L2A_map_2
677 678

                """combine newly and earlier processed L2A data"""
679 680 681
                blocksize = (2048, 2048)  # must be equal to the blocksize of L2A_newTiles specified in L2A_map_2
                L2A_newDBTiles = self.get_DB_objects('L2B', self.L2A_tiles, blocksize=blocksize)
                L2A_tiles = L2A_newTiles + L2A_newDBTiles
682

683
                L2B_tiles = MAP(L2B_map, L2A_tiles)
684

685 686
                # group results # FIXME nötig an dieser Stelle?
                grouped_L2B_Tiles = HLP_F.group_objects_by_attributes(L2B_tiles, 'scene_ID')
687 688
                [L2B_tiles_group[0].delete_tempFiles() for L2B_tiles_group in grouped_L2B_Tiles]

689
                L2B_resObjects = [L2B_P.L2B_object.from_tiles(tileList) for tileList in grouped_L2B_Tiles]
690

691
            self.L2B_newObjects = [obj for obj in L2B_resObjects if isinstance(obj, L2B_P.L2B_object)]
692
            self.failed_objects += [obj for obj in L2B_resObjects if isinstance(obj, failed_GMS_object) and
693 694 695 696 697 698
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2B_newObjects

    def L2C_processing(self):
        """
699
        Run Level 2C processing: accurracy assessment and MGRS tiling
700
        """
701
        # FIXME only parallelization_level == 'scenes' implemented
702
        if self.config.exec_L2CP[0]:
703
            self.logger.info('\n\n########## Level 2C Processing started - calculation of quality layers ###########\n')
704

705
            """combine newly and earlier processed L2A data"""
706 707
            L2B_DBObjects = self.get_DB_objects('L2C', self.L2B_newObjects, parallLev='scenes')
            L2B_Instances = self.L2B_newObjects + L2B_DBObjects  # combine newly and earlier processed L2A data
708

709
            L2C_resObjects = MAP(L2C_map, L2B_Instances, CPUs=8)  # FIXME 8 workers due to heavy IO
710
            # FIXME in case of inmem_serialization mode results are too big to be back-pickled
711
            self.L2C_newObjects = [obj for obj in L2C_resObjects if isinstance(obj, L2C_P.L2C_object)]
712
            self.failed_objects += [obj for obj in L2C_resObjects if isinstance(obj, failed_GMS_object) and
713 714 715 716 717 718 719 720
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2C_newObjects

    def update_DB_job_record(self):
        """
        Update the database records of the current job (table 'jobs').
        """
721
        # TODO move this method to config.Job
722 723
        # update 'failed_sceneids' column of job record within jobs table
        sceneids_failed = list(set([obj.scene_ID for obj in self.failed_objects]))
724
        DB_T.update_records_in_postgreSQLdb(
725
            self.config.conn_database, 'jobs',
726
            {'failed_sceneids': sceneids_failed,  # update 'failed_sceneids' column
727 728
             'finishtime': self.config.end_time,  # add job finish timestamp
             'status': self.config.status},  # update 'job_status' column
729
            {'id': self.config.ID}, timeout=30000)
730

731 732 733 734 735
    def update_DB_job_statistics(self, usecase_datalist):
        """
        Update job statistics of the running job in the database.
        """
        # TODO move this method to config.Job
736
        already_updated_IDs = []
737
        for ds in usecase_datalist:
738 739
            if ds['proc_level'] is not None and ds['scene_ID'] not in already_updated_IDs:
                # update statistics column of jobs table
740
                DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
741
                    self.config.conn_database, 'jobs', 'statistics', cond_dict={'id': self.config.ID},
742
                    idx_val2decrement=db_jobs_statistics_def['pending'],
743 744
                    idx_val2increment=db_jobs_statistics_def[ds['proc_level']],
                    timeout=30000)
745

746 747 748
                # avoid double updating in case of subsystems belonging to the same scene ID
                already_updated_IDs.append(ds['scene_ID'])

749 750 751 752
    def create_job_summary(self):
        """
        Create job success summary
        """
753 754 755

        # get objects with highest requested processing level
        highest_procL_Objs = []
756
        for pL in reversed(proc_chain):
757
            if getattr(self.config, 'exec_%sP' % pL)[0]:
758 759
                highest_procL_Objs = \
                    getattr(self, '%s_newObjects' % pL) if pL != 'L2A' else (self.L2A_tiles or self.L2A_newObjects)
760 761
                break

762 763 764 765
        gms_objects2summarize = highest_procL_Objs + self.failed_objects
        if gms_objects2summarize:
            # create summaries
            detailed_JS, quick_JS = get_job_summary(gms_objects2summarize)
766 767 768
            detailed_JS.to_excel(os.path.join(self.config.path_job_logs, '%s_summary.xlsx' % self.config.ID))
            detailed_JS.to_csv(os.path.join(self.config.path_job_logs, '%s_summary.csv' % self.config.ID), sep='\t')
            self.logger.info('\nQUICK JOB SUMMARY (ID %s):\n' % self.config.ID + quick_JS.to_string())
769 770

            self.summary_detailed = detailed_JS
771
            self.summary_quick = quick_JS
772 773 774 775 776

        else:
            # TODO implement check if proc level with lowest procL has to be processed at all (due to job.exec_L1X)
            # TODO otherwise it is possible that get_job_summary receives an empty list
            self.logger.warning("Job summary skipped because get_job_summary() received an empty list of GMS objects.")