process_controller.py 40.1 KB
Newer Older
1
2
# -*- coding: utf-8 -*-

3
from __future__ import (division, print_function, unicode_literals, absolute_import)
4
5
6

import numpy as np
from pandas import DataFrame
7
8
9
10
import datetime
import os
import time
from itertools import chain
11
import signal
12
import re
13
from typing import TYPE_CHECKING
14
import shutil
15

16
17
from ..io import output_writer as OUT_W
from ..io import input_reader as INP_R
18
19
20
21
22
from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc.path_generator import path_generator
from ..misc.logging import GMS_logger, shutdown_loggers
from ..algorithms import L1A_P, L1B_P, L1C_P, L2A_P, L2B_P, L2C_P
23
from ..model.metadata import get_LayerBandsAssignment
24
from ..model.gms_object import failed_GMS_object, GMS_object
25
from .pipeline import (L1A_map, L1A_map_1, L1A_map_2, L1A_map_3, L1B_map, L1C_map,
26
                       L2A_map, L2B_map, L2C_map)
27
from ..options.config import set_config
28
from .multiproc import MAP, imap_unordered
29
from ..misc.definition_dicts import proc_chain, db_jobs_statistics_def
30
from ..misc.locks import release_unclosed_locks
31
from ..version import __version__, __versionalias__
32

33
34
from py_tools_ds.numeric.array import get_array_tilebounds

35
if TYPE_CHECKING:
Daniel Scheffler's avatar
Daniel Scheffler committed
36
    from collections import OrderedDict  # noqa F401  # flake8 issue
37
38
    from typing import List  # noqa F401  # flake8 issue
    from ..options.config import GMS_config  # noqa F401  # flake8 issue
39
40
41
42


__author__ = 'Daniel Scheffler'

43
44

class process_controller(object):
45
    def __init__(self, job_ID, **config_kwargs):
46
        """gms_preprocessing process controller
47

48
49
        :param job_ID:          job ID belonging to a valid database record within table 'jobs'
        :param config_kwargs:   keyword arguments to be passed to gms_preprocessing.set_config()
50
51
52
        """

        # assertions
53
54
        if not isinstance(job_ID, int):
            raise ValueError("'job_ID' must be an integer value. Got %s." % type(job_ID))
55

56
57
        # set GMS configuration
        config_kwargs.update(dict(reset_status=True))
58
        self.config = set_config(job_ID, **config_kwargs)  # type: GMS_config
59
60

        # defaults
61
        self._logger = None
62
        self._DB_job_record = None
63
        self.profiler = None
64
65
66
67
68

        self.failed_objects = []
        self.L1A_newObjects = []
        self.L1B_newObjects = []
        self.L1C_newObjects = []
69
        self.L2A_newObjects = []
70
        self.L2A_tiles = []
71
72
73
74
        self.L2B_newObjects = []
        self.L2C_newObjects = []

        self.summary_detailed = None
75
        self.summary_quick = None
76

77
78
        # check if process_controller is executed by debugger
        # isdebugging = 1 if True in [frame[1].endswith("pydevd.py") for frame in inspect.stack()] else False
79
        # if isdebugging:  # override the existing settings in order to get write access everywhere
80
81
        #    pass

82
        # called_from_iPyNb = 1 if 'ipykernel/__main__.py' in sys.argv[0] else 0
83

84
85
86
        self._path_job_logfile = os.path.join(self.config.path_job_logs, '%s.log' % self.config.ID)
        if os.path.exists(self._path_job_logfile):
            HLP_F.silentremove(self._path_job_logfile)
87

88
        self.logger.info("Executing gms_preprocessing, version: %s (%s)" % (__version__, __versionalias__))
89
        self.logger.info('Process Controller initialized for job ID %s (comment: %s).'
90
                         % (self.config.ID, self.DB_job_record.comment))
91
        self.logger.info('Job logfile: %s' % self._path_job_logfile)
92

93
        if self.config.delete_old_output:
94
95
            self.logger.info('Deleting previously processed data...')
            self.DB_job_record.delete_procdata_of_entire_job(force=True)
96

97
98
99
100
101
    @property
    def logger(self):
        if self._logger and self._logger.handlers[:]:
            return self._logger
        else:
102
            self._logger = GMS_logger('log__%s' % self.config.ID, fmt_suffix='ProcessController',
103
                                      path_logfile=self._path_job_logfile, log_level=self.config.log_level, append=True)
104
105
106
107
108
109
110
111
112
            return self._logger

    @logger.setter
    def logger(self, logger):
        self._logger = logger

    @logger.deleter
    def logger(self):
        if self._logger not in [None, 'not set']:
113
114
            self._logger.close()
            self._logger = None
115
116

    @property
117
118
119
120
    def DB_job_record(self):
        if self._DB_job_record:
            return self._DB_job_record
        else:
121
122
            self._DB_job_record = DB_T.GMS_JOB(self.config.conn_database)
            self._DB_job_record.from_job_ID(self.config.ID)
123
            return self._DB_job_record
124

125
126
127
    @DB_job_record.setter
    def DB_job_record(self, value):
        self._DB_job_record = value
128

129
130
131
    @property
    def sceneids_failed(self):
        return [obj.scene_ID for obj in self.failed_objects]
132

133
134
    def _add_local_availability_single_dataset(self, dataset):
        # type: (OrderedDict) -> OrderedDict
135
        # TODO revise this function
136
137
        # query the database and get the last written processing level and LayerBandsAssignment
        DB_match = DB_T.get_info_from_postgreSQLdb(
138
            self.config.conn_database, 'scenes_proc', ['proc_level', 'layer_bands_assignment'],
139
            dict(sceneid=dataset['scene_ID']))
Daniel Scheffler's avatar
Daniel Scheffler committed
140

141
        # get the corresponding logfile
142
143
        path_logfile = path_generator(dataset).get_path_logfile(merged_subsystems=False)
        path_logfile_merged_ss = path_generator(dataset).get_path_logfile(merged_subsystems=True)
144
145
146
147

        def get_AllWrittenProcL_dueLog(path_log):  # TODO replace this by database query + os.path.exists
            """Returns all processing level that have been successfully written according to logfile."""

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
148
149
150
151
            if not os.path.exists(path_log):
                if path_log == path_logfile:  # path_logfile_merged_ss has already been searched
                    self.logger.info("No logfile named '%s' found for %s at %s. Dataset has to be reprocessed."
                                     % (os.path.basename(path_log), dataset['entity_ID'], os.path.dirname(path_log)))
152
153
154
155
                AllWrittenProcL_dueLog = []
            else:
                logfile = open(path_log, 'r').read()
                AllWrittenProcL_dueLog = re.findall(":*(\S*\s*) data successfully saved.", logfile, re.I)
156
                if not AllWrittenProcL_dueLog and path_logfile == path_logfile_merged_ss:  # AllWrittenProcL_dueLog = []
157
158
159
160
161
162
163
164
165
166
                    self.logger.info('%s: According to logfile no completely processed data exist at any '
                                     'processing level. Dataset has to be reprocessed.' % dataset['entity_ID'])
                else:
                    AllWrittenProcL_dueLog = HLP_F.sorted_nicely(list(set(AllWrittenProcL_dueLog)))
            return AllWrittenProcL_dueLog

        # check if there are not multiple database records for this dataset
        if len(DB_match) == 1 or DB_match == [] or DB_match == 'database connection fault':

            # get all processing level that have been successfully written
167
168
            # NOTE: first check for merged subsystem datasets because they have hiver processing levels
            AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile_merged_ss)
169
            if not AllWrittenProcL:
170
                AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile)
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
171
172
173
            else:
                # A L2A+ dataset with merged subsystems has been found. Use that logfile.
                path_logfile = path_logfile_merged_ss
174

175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
            dataset['proc_level'] = None  # default (dataset has to be reprocessed)

            # loop through all the found proc. levels and find the one that fulfills all requirements
            for ProcL in reversed(AllWrittenProcL):
                if dataset['proc_level']:
                    break  # proc_level found; no further searching for lower proc_levels
                assumed_path_GMS_file = '%s_%s.gms' % (os.path.splitext(path_logfile)[0], ProcL)

                # check if there is also a corresponding GMS_file on disk
                if os.path.isfile(assumed_path_GMS_file):
                    GMS_file_dict = INP_R.GMSfile2dict(assumed_path_GMS_file)
                    target_LayerBandsAssignment = \
                        get_LayerBandsAssignment(dict(
                            image_type=dataset['image_type'],
                            Satellite=dataset['satellite'],
                            Sensor=dataset['sensor'],
191
                            Subsystem=dataset['subsystem'] if path_logfile != path_logfile_merged_ss else '',
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
                            proc_level=ProcL,  # must be respected because LBA changes after atm. Corr.
                            dataset_ID=dataset['dataset_ID'],
                            logger=None), nBands=(1 if dataset['sensormode'] == 'P' else None))

                    # check if the LayerBandsAssignment of the written dataset on disk equals the
                    # desired LayerBandsAssignment
                    if target_LayerBandsAssignment == GMS_file_dict['LayerBandsAssignment']:

                        # update the database record if the dataset could not be found in database
                        if DB_match == [] or DB_match == 'database connection fault':
                            self.logger.info('The dataset %s is not included in the database of processed data but'
                                             ' according to logfile %s has been written successfully. Recreating '
                                             'missing database entry.' % (dataset['entity_ID'], ProcL))
                            DB_T.data_DB_updater(GMS_file_dict)

                            dataset['proc_level'] = ProcL

                        # if the dataset could be found in database
                        elif len(DB_match) == 1:
                            try:
                                self.logger.info('Found a matching %s dataset for %s. Processing skipped until %s.'
                                                 % (ProcL, dataset['entity_ID'],
                                                    proc_chain[proc_chain.index(ProcL) + 1]))
                            except IndexError:
                                self.logger.info('Found a matching %s dataset for %s. Processing already done.'
                                                 % (ProcL, dataset['entity_ID']))

                            if DB_match[0][0] == ProcL:
                                dataset['proc_level'] = DB_match[0][0]
                            else:
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
222
                                dataset['proc_level'] = ProcL
223

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
224
                    else:
Daniel Scheffler's avatar
Daniel Scheffler committed
225
                        self.logger.info('Found a matching %s dataset for %s but with a different '
Daniel Scheffler's avatar
Daniel Scheffler committed
226
                                         'LayerBandsAssignment (desired: %s; found %s). Dataset has to be reprocessed.'
227
228
                                         % (ProcL, dataset['entity_ID'],
                                            target_LayerBandsAssignment, GMS_file_dict['LayerBandsAssignment']))
229
230
231
232
233
                else:
                    self.logger.info('%s for dataset %s has been written due to logfile but no corresponding '
                                     'dataset has been found.' % (ProcL, dataset['entity_ID']) +
                                     ' Searching for lower processing level...'
                                     if AllWrittenProcL.index(ProcL) != 0 else '')
234

235
236
237
238
        elif len(DB_match) > 1:
            self.logger.info('According to database there are multiple matches for the dataset %s. Dataset has to '
                             'be reprocessed.' % dataset['entity_ID'])
            dataset['proc_level'] = None
239

240
241
        else:
            dataset['proc_level'] = None
242

243
        return dataset
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
244

245
246
247
    def add_local_availability(self, datasets):
        # type: (List[OrderedDict]) -> List[OrderedDict]
        """Check availability of all subsets per scene and processing level.
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
248

249
250
251
252
253
254
        NOTE: The processing level of those scenes, where not all subsystems are available in the same processing level
              is reset.

        :param datasets:    List of one OrderedDict per subsystem as generated by CFG.data_list
        """
        datasets = [self._add_local_availability_single_dataset(ds) for ds in datasets]
255

256
257
258
        ######################################################################################################
        # validate that all subsystems of the same sceneid are at the same processing level; otherwise reset #
        ######################################################################################################
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
259
260
        datasets_validated = []
        datasets_grouped = HLP_F.group_dicts_by_key(datasets, key='scene_ID')
261

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
262
263
        for ds_group in datasets_grouped:
            proc_lvls = [ds['proc_level'] for ds in ds_group]
264

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
265
266
            if not len(list(set(proc_lvls))) == 1:
                # reset processing level of those scenes where not all subsystems are available
267
                self.logger.info('%s: Found already processed subsystems at different processing levels %s. '
268
                                 'Dataset has to be reprocessed to avoid errors.'
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
269
                                 % (ds_group[0]['entity_ID'], proc_lvls))
270

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
271
272
                for ds in ds_group:
                    ds['proc_level'] = None
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
273
                    datasets_validated.append(ds)
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
274
275
            else:
                datasets_validated.extend(ds_group)
276

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
277
        return datasets_validated
278

279
280
    @staticmethod
    def _is_inMEM(GMS_objects, dataset):
Daniel Scheffler's avatar
Daniel Scheffler committed
281
        # type: (list, OrderedDict) -> bool
282
283
284
285
        """Checks whether a dataset within a dataset list has been processed in the previous processing level.
        :param GMS_objects: <list> a list of GMS objects that has been recently processed
        :param dataset:     <collections.OrderedDict> as generated by L0A_P.get_data_list_of_current_jobID()
        """
286
        # check if the scene ID of the given dataset is in the scene IDs of the previously processed datasets
287
288
289
        return dataset['scene_ID'] in [obj.scene_ID for obj in GMS_objects]

    def _get_processor_data_list(self, procLvl, prevLvl_objects=None):
290
        """Returns a list of datasets that have to be read from disk and then processed by a specific processor.
291
292
293
294
295

        :param procLvl:
        :param prevLvl_objects:
        :return:
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
296
297
        def is_procL_lower(dataset):
            return HLP_F.is_proc_level_lower(dataset['proc_level'], target_lvl=procLvl)
298
299

        if prevLvl_objects is None:
Daniel Scheffler's avatar
Daniel Scheffler committed
300
            return [dataset for dataset in self.config.data_list if is_procL_lower(dataset)]  # TODO generator?
301
        else:
Daniel Scheffler's avatar
Daniel Scheffler committed
302
            return [dataset for dataset in self.config.data_list if is_procL_lower(dataset) and
303
                    not self._is_inMEM(prevLvl_objects + self.failed_objects, dataset)]
304
305
306
307
308
309
310
311
312
313
314
315
316

    def get_DB_objects(self, procLvl, prevLvl_objects=None, parallLev=None, blocksize=None):
        """
        Returns a list of GMS objects for datasets available on disk that have to be processed by the current processor.

        :param procLvl:         <str> processing level oof the current processor
        :param prevLvl_objects: <list> of in-mem GMS objects produced by the previous processor
        :param parallLev:       <str> parallelization level ('scenes' or 'tiles')
                                -> defines if full cubes or blocks are to be returned
        :param blocksize:       <tuple> block size in case blocks are to be returned, e.g. (2000,2000)
        :return:
        """
        # TODO get prevLvl_objects automatically from self
317
        if procLvl == 'L1A':
318
319
320
            return []
        else:
            # handle input parameters
Daniel Scheffler's avatar
Daniel Scheffler committed
321
322
            parallLev = parallLev or self.config.parallelization_level
            blocksize = blocksize or self.config.tiling_block_size_XY
323
            prevLvl = proc_chain[proc_chain.index(procLvl) - 1]  # TODO replace by enum
324
325

            # get GMSfile list
326
            dataset_dicts = self._get_processor_data_list(procLvl, prevLvl_objects)
327
328
329
330
331
            GMSfile_list_prevLvl_inDB = INP_R.get_list_GMSfiles(dataset_dicts, prevLvl)

            # create GMS objects from disk with respect to parallelization level and block size
            if parallLev == 'scenes':
                # get input parameters for creating GMS objects as full cubes
332
                work = [[GMS, ['cube', None]] for GMS in GMSfile_list_prevLvl_inDB]
333
334
            else:
                # define tile positions and size
335
                def get_tilepos_list(GMSfile):
336
337
                    return get_array_tilebounds(array_shape=INP_R.GMSfile2dict(GMSfile)['shape_fullArr'],
                                                tile_shape=blocksize)
338
339
340

                # get input parameters for creating GMS objects as blocks
                work = [[GMSfile, ['block', tp]] for GMSfile in GMSfile_list_prevLvl_inDB
341
                        for tp in get_tilepos_list(GMSfile)]
342

343
344
345
346
            # create GMS objects for the found files on disk
            # NOTE: DON'T multiprocess that with MAP(GMS_object(*initargs).from_disk, work)
            # in case of multiple subsystems GMS_object(*initargs) would always point to the same object in memory
            # -> subsystem attribute will be overwritten each time
347
            DB_objs = [HLP_F.parentObjDict[prevLvl].from_disk(tuple_GMS_subset=w) for w in work]
348

349
350
351
352
353
            if DB_objs:
                DB_objs = list(chain.from_iterable(DB_objs)) if list in [type(i) for i in DB_objs] else list(DB_objs)

            return DB_objs

Daniel Scheffler's avatar
Daniel Scheffler committed
354
    def run_all_processors_OLD(self, custom_data_list=None):
355
356
357
        """
        Run all processors at once.
        """
358

359
        signal.signal(signal.SIGINT, self.stop)  # enable clean shutdown possibility
360

361
        # noinspection PyBroadException
362
        try:
363
            if self.config.profiling:
364
365
366
367
                from pyinstrument import Profiler
                self.profiler = Profiler()  # or Profiler(use_signal=False), see below
                self.profiler.start()

368
            self.logger.info('Execution of entire GeoMultiSens pre-processing chain started for job ID %s...'
369
                             % self.config.ID)
370
            self.DB_job_record.reset_job_progress()  # updates attributes of DB_job_record and related DB entry
371
            self.config.status = 'running'
372
            self.update_DB_job_record()  # TODO implement that into job.status.setter
373
374
375
376
377

            self.failed_objects = []

            # get list of datasets to be processed
            if custom_data_list:
378
                self.config.data_list = custom_data_list
379
380

            # add local availability
Daniel Scheffler's avatar
Daniel Scheffler committed
381
            self.config.data_list = self.add_local_availability(self.config.data_list)
382
            self.update_DB_job_statistics(self.config.data_list)
383
384
385
386
387
388
389
390
391
392
393
394

            self.L1A_processing()
            self.L1B_processing()
            self.L1C_processing()
            self.L2A_processing()
            self.L2B_processing()
            self.L2C_processing()

            # create summary
            self.create_job_summary()

            self.logger.info('Execution finished.')
395
396
            self.logger.info('The job logfile and the summary files have been saved here: \n'
                             '%s.*' % os.path.splitext(self.logger.path_logfile)[0])
397
            # TODO implement failed_with_warnings:
398
399
400
401
            self.config.status = 'finished' if not self.failed_objects else 'finished_with_errors'
            self.config.end_time = datetime.datetime.now()
            self.config.computation_time = self.config.end_time - self.config.start_time
            self.logger.info('Time for execution: %s' % self.config.computation_time)
402
403
404
405

            # update database entry of current job
            self.update_DB_job_record()

406
            if self.config.profiling:
407
408
409
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

410
            self.shutdown()
411

412
        except Exception:  # noqa E722  # bare except
413
            if self.config.profiling:
414
415
416
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

417
            self.config.status = 'failed'
Daniel Scheffler's avatar
Daniel Scheffler committed
418
419
420
421
            self.update_DB_job_record()

            if not self.config.disable_exception_handler:
                self.logger.error('Execution failed with an error:', exc_info=True)
422
                self.shutdown()
Daniel Scheffler's avatar
Daniel Scheffler committed
423
424
            else:
                self.logger.error('Execution failed with an error:')
425
                self.shutdown()
Daniel Scheffler's avatar
Daniel Scheffler committed
426
427
428
429
430
431
432
433
434
435
436
437
438
439
                raise

    def run_all_processors(self, custom_data_list=None):
        signal.signal(signal.SIGINT, self.stop)  # enable clean shutdown possibility

        # noinspection PyBroadException
        try:
            if self.config.profiling:
                from pyinstrument import Profiler
                self.profiler = Profiler()  # or Profiler(use_signal=False), see below
                self.profiler.start()

            self.logger.info('Execution of entire GeoMultiSens pre-processing chain started for job ID %s...'
                             % self.config.ID)
440

Daniel Scheffler's avatar
Daniel Scheffler committed
441
442
            self.DB_job_record.reset_job_progress()  # updates attributes of DB_job_record and related DB entry
            self.config.status = 'running'
443
            GMS_object.proc_status_all_GMSobjs.clear()  # reset
Daniel Scheffler's avatar
Daniel Scheffler committed
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
            self.update_DB_job_record()  # TODO implement that into config.status.setter

            self.failed_objects = []

            # get list of datasets to be processed
            if custom_data_list:
                self.config.data_list = custom_data_list

            # add local availability
            self.config.data_list = self.add_local_availability(self.config.data_list)
            self.update_DB_job_statistics(self.config.data_list)

            # group dataset dicts by sceneid
            dataset_groups = HLP_F.group_dicts_by_key(self.config.data_list, key='scene_ID')

459
460
461
            # close logger to release FileHandler of job log (workers will log into job logfile)
            del self.logger

462
            # RUN PREPROCESSING
Daniel Scheffler's avatar
Daniel Scheffler committed
463
            from .pipeline import run_complete_preprocessing
464
465
            from ..misc.logging import LogReceiver
            # with LogReceiver():
466
            GMS_objs = imap_unordered(run_complete_preprocessing, dataset_groups, flatten_output=True)
Daniel Scheffler's avatar
Daniel Scheffler committed
467
468

            # separate results into successful and failed objects
469
470
471
472
473
474
475
476
477
            def assign_attr(tgt_procL):
                return [obj for obj in GMS_objs if isinstance(obj, GMS_object) and obj.proc_level == tgt_procL]

            self.L1A_newObjects = assign_attr('L1A')
            self.L1B_newObjects = assign_attr('L1B')
            self.L1C_newObjects = assign_attr('L1C')
            self.L2A_newObjects = assign_attr('L2A')
            self.L2B_newObjects = assign_attr('L2B')
            self.L2C_newObjects = assign_attr('L2C')
Daniel Scheffler's avatar
Daniel Scheffler committed
478
479
480
481
482
483
            self.failed_objects = [obj for obj in GMS_objs if isinstance(obj, failed_GMS_object)]

            # create summary
            self.create_job_summary()

            self.logger.info('Execution finished.')
484
485
            self.logger.info('The job logfile and the summary files have been saved here: \n'
                             '%s.*' % os.path.splitext(self.logger.path_logfile)[0])
486
            # TODO implement failed_with_warnings
487
            self.config.status = 'finished' if not self.failed_objects else 'finished_with_errors'
Daniel Scheffler's avatar
Daniel Scheffler committed
488
489
490
491
492
493
494
495
496
497
498
            self.config.end_time = datetime.datetime.now()
            self.config.computation_time = self.config.end_time - self.config.start_time
            self.logger.info('Time for execution: %s' % self.config.computation_time)

            # update database entry of current job
            self.update_DB_job_record()

            if self.config.profiling:
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

499
            self.shutdown()
Daniel Scheffler's avatar
Daniel Scheffler committed
500
501
502
503
504
505
506

        except Exception:  # noqa E722  # bare except
            if self.config.profiling:
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

            self.config.status = 'failed'
507
508
            self.update_DB_job_record()

509
            if not self.config.disable_exception_handler:
510
                self.logger.error('Execution failed with an error:', exc_info=True)
511
                self.shutdown()
512
            else:
513
                self.logger.error('Execution failed with an error:')
514
                self.shutdown()
515
                raise
516

517
518
    def stop(self, signum, frame):
        """Interrupt the running process controller gracefully."""
519

520
        self.config.status = 'canceled'
521
522
        self.update_DB_job_record()

523
        self.shutdown()
524
        self.logger.warning('Process controller stopped by user.')
525
526
527
528
529
530

        raise KeyboardInterrupt  # terminate execution and show traceback

    def shutdown(self):
        """Shutdown the process controller instance (loggers, remove temporary directories, ...)."""

531
532
533
        # release unclosed locks
        release_unclosed_locks(self.logger)

534
        # clear any temporary files
535
        tempdir = os.path.join(self.config.path_tempdir)
536
        self.logger.info('Deleting temporary directory %s.' % tempdir)
537
538
        if os.path.exists(tempdir):
            shutil.rmtree(tempdir)
539

540
541
542
        del self.logger
        shutdown_loggers()

543
544
545
546
    def benchmark(self):
        """
        Run a benchmark.
        """
547
        data_list_bench = self.config.data_list
548
549
550
551
        for count_datasets in range(len(data_list_bench)):
            t_processing_all_runs, t_IO_all_runs = [], []
            for count_run in range(10):
                current_data_list = data_list_bench[0:count_datasets + 1]
552
553
                if os.path.exists(self.config.path_database):
                    os.remove(self.config.path_database)
554
555
556
557
558
559
560
561
562
563
                t_start = time.time()
                self.run_all_processors(current_data_list)
                t_processing_all_runs.append(time.time() - t_start)
                t_IO_all_runs.append(globals()['time_IO'])

            assert current_data_list, 'Empty data list.'
            OUT_W.write_global_benchmark_output(t_processing_all_runs, t_IO_all_runs, current_data_list)

    def L1A_processing(self):
        """
564
        Run Level 1A processing: Data import and metadata homogenization
565
        """
566
        if self.config.exec_L1AP[0]:
567
            self.logger.info('\n\n##### Level 1A Processing started - raster format and metadata homogenization ####\n')
568

569
570
            datalist_L1A_P = self._get_processor_data_list('L1A')

571
            if self.config.parallelization_level == 'scenes':
572
                # map
573
                L1A_resObjects = MAP(L1A_map, datalist_L1A_P, CPUs=12)
574
            else:  # tiles
575
576
                all_L1A_tiles_map1 = MAP(L1A_map_1, datalist_L1A_P,
                                         flatten_output=True)  # map_1 # merge results to new list of splits
577

578
579
580
                L1A_obj_tiles = MAP(L1A_map_2, all_L1A_tiles_map1)  # map_2
                grouped_L1A_Tiles = HLP_F.group_objects_by_attributes(
                    L1A_obj_tiles, 'scene_ID', 'subsystem')  # group results
581

582
                L1A_objects = MAP(L1A_P.L1A_object.from_tiles, grouped_L1A_Tiles)  # reduce
583

584
                L1A_resObjects = MAP(L1A_map_3, L1A_objects)  # map_3
585

586
            self.L1A_newObjects = [obj for obj in L1A_resObjects if isinstance(obj, L1A_P.L1A_object)]
587
            self.failed_objects += [obj for obj in L1A_resObjects if isinstance(obj, failed_GMS_object) and
588
589
590
591
592
593
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L1A_newObjects

    def L1B_processing(self):
        """
594
        Run Level 1B processing: calculation of geometric shifts
595
596
597
598
        """
        # TODO implement check for running spatial index mediator server
        # run on full cubes

599
        if self.config.exec_L1BP[0]:
600
            self.logger.info('\n\n####### Level 1B Processing started - detection of geometric displacements #######\n')
601

602
603
            L1A_DBObjects = self.get_DB_objects('L1B', self.L1A_newObjects, parallLev='scenes')
            L1A_Instances = self.L1A_newObjects + L1A_DBObjects  # combine newly and earlier processed L1A data
604

605
            L1B_resObjects = MAP(L1B_map, L1A_Instances)
606

607
608
609
            self.L1B_newObjects = [obj for obj in L1B_resObjects if isinstance(obj, L1B_P.L1B_object)]
            self.failed_objects += [obj for obj in L1B_resObjects if isinstance(obj, failed_GMS_object) and
                                    obj.scene_ID not in self.sceneids_failed]
610
611
612
613
614

        return self.L1B_newObjects

    def L1C_processing(self):
        """
615
        Run Level 1C processing: atmospheric correction
616
        """
617
        if self.config.exec_L1CP[0]:
618
            self.logger.info('\n\n############## Level 1C Processing started - atmospheric correction ##############\n')
619

620
            if self.config.parallelization_level == 'scenes':
621
622
623
624
                L1B_DBObjects = self.get_DB_objects('L1C', self.L1B_newObjects)
                L1B_Instances = self.L1B_newObjects + L1B_DBObjects  # combine newly and earlier processed L1B data

                # group by scene ID (all subsystems belonging to the same scene ID must be processed together)
625
                grouped_L1B_Instances = HLP_F.group_objects_by_attributes(L1B_Instances, 'scene_ID')
626

627
628
                L1C_resObjects = MAP(L1C_map, grouped_L1B_Instances, flatten_output=True,
                                     CPUs=15)  # FIXME CPUs set to 15 for testing
629

630
            else:  # tiles
631
632
                raise NotImplementedError("Tiled processing is not yet completely implemented for L1C processor. Use "
                                          "parallelization level 'scenes' instead!")
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
                # blocksize = (5000, 5000)
                # """if newly processed L1A objects are present: cut them into tiles"""
                # L1B_newTiles = []
                # if self.L1B_newObjects:
                #     tuples_obj_blocksize = [(obj, blocksize) for obj in self.L1B_newObjects]
                #     L1B_newTiles = MAP(HLP_F.cut_GMS_obj_into_blocks, tuples_obj_blocksize, flatten_output=True)
                #
                # """combine newly and earlier processed L1B data"""
                # L1B_newDBTiles = self.get_DB_objects('L1C', self.L1B_newObjects, blocksize=blocksize)
                # L1B_tiles = L1B_newTiles + L1B_newDBTiles
                #
                # # TODO merge subsets of S2/Aster in order to provide all bands for atm.correction
                # L1C_tiles = MAP(L1C_map, L1B_tiles)
                # grouped_L1C_Tiles = \
                #     HLP_F.group_objects_by_attributes(L1C_tiles, 'scene_ID', 'subsystem')  # group results
                # [L1C_tiles_group[0].delete_tempFiles() for L1C_tiles_group in grouped_L1C_Tiles]
                # L1C_resObjects = MAP(L1C_P.L1C_object().from_tiles, grouped_L1C_Tiles)  # reduce

            self.L1C_newObjects = [obj for obj in L1C_resObjects if isinstance(obj, L1C_P.L1C_object)]
652
            self.failed_objects += [obj for obj in L1C_resObjects if isinstance(obj, failed_GMS_object) and
653
654
655
656
657
658
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L1C_newObjects

    def L2A_processing(self):
        """
659
        Run Level 2A processing: geometric homogenization
660
        """
661
        if self.config.exec_L2AP[0]:
662
663
            self.logger.info(
                '\n\n#### Level 2A Processing started - shift correction / geometric homogenization ####\n')
664

665
            """combine newly and earlier processed L1C data"""
666
667
            L1C_DBObjects = self.get_DB_objects('L2A', self.L1C_newObjects, parallLev='scenes')
            L1C_Instances = self.L1C_newObjects + L1C_DBObjects  # combine newly and earlier processed L1C data
668
669
670
671

            # group by scene ID (all subsystems belonging to the same scene ID must be processed together)
            grouped_L1C_Instances = HLP_F.group_objects_by_attributes(L1C_Instances, 'scene_ID')

672
            L2A_resTiles = MAP(L2A_map, grouped_L1C_Instances, flatten_output=True)
673

674
            self.L2A_tiles = [obj for obj in L2A_resTiles if isinstance(obj, L2A_P.L2A_object)]
675
            self.failed_objects += [obj for obj in L2A_resTiles if isinstance(obj, failed_GMS_object) and
676
677
678
679
680
681
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2A_tiles

    def L2B_processing(self):
        """
682
        Run Level 2B processing: spectral homogenization
683
        """
684
        if self.config.exec_L2BP[0]:
685
            self.logger.info('\n\n############# Level 2B Processing started - spectral homogenization ##############\n')
686

687
            if self.config.parallelization_level == 'scenes':
688
                # don't know if scenes makes sense in L2B processing because full objects are very big!
689
                """if newly processed L2A objects are present: merge them to scenes"""
690
691
                grouped_L2A_Tiles = HLP_F.group_objects_by_attributes(self.L2A_tiles, 'scene_ID')  # group results
                # reduce # will be too slow because it has to pickle back really large L2A_newObjects
692
                # L2A_newObjects  = MAP(HLP_F.merge_GMS_tiles_to_GMS_obj, grouped_L2A_Tiles)
693
                L2A_newObjects = [L2A_P.L2A_object.from_tiles(tileList) for tileList in grouped_L2A_Tiles]
694

695
                """combine newly and earlier processed L2A data"""
696
697
                L2A_DBObjects = self.get_DB_objects('L2B', self.L2A_tiles)
                L2A_Instances = L2A_newObjects + L2A_DBObjects  # combine newly and earlier processed L2A data
698

699
                L2B_resObjects = MAP(L2B_map, L2A_Instances)
700
701

            else:  # tiles
702
                L2A_newTiles = self.L2A_tiles  # tiles have the block size specified in L2A_map_2
703
704

                """combine newly and earlier processed L2A data"""
705
706
707
                blocksize = (2048, 2048)  # must be equal to the blocksize of L2A_newTiles specified in L2A_map_2
                L2A_newDBTiles = self.get_DB_objects('L2B', self.L2A_tiles, blocksize=blocksize)
                L2A_tiles = L2A_newTiles + L2A_newDBTiles
708

709
                L2B_tiles = MAP(L2B_map, L2A_tiles)
710

711
712
                # group results # FIXME nötig an dieser Stelle?
                grouped_L2B_Tiles = HLP_F.group_objects_by_attributes(L2B_tiles, 'scene_ID')
713
714
                [L2B_tiles_group[0].delete_tempFiles() for L2B_tiles_group in grouped_L2B_Tiles]

715
                L2B_resObjects = [L2B_P.L2B_object.from_tiles(tileList) for tileList in grouped_L2B_Tiles]
716

717
            self.L2B_newObjects = [obj for obj in L2B_resObjects if isinstance(obj, L2B_P.L2B_object)]
718
            self.failed_objects += [obj for obj in L2B_resObjects if isinstance(obj, failed_GMS_object) and
719
720
721
722
723
724
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2B_newObjects

    def L2C_processing(self):
        """
725
        Run Level 2C processing: accurracy assessment and MGRS tiling
726
        """
727
        # FIXME only parallelization_level == 'scenes' implemented
728
        if self.config.exec_L2CP[0]:
729
            self.logger.info('\n\n########## Level 2C Processing started - calculation of quality layers ###########\n')
730

731
            """combine newly and earlier processed L2A data"""
732
733
            L2B_DBObjects = self.get_DB_objects('L2C', self.L2B_newObjects, parallLev='scenes')
            L2B_Instances = self.L2B_newObjects + L2B_DBObjects  # combine newly and earlier processed L2A data
734

735
            L2C_resObjects = MAP(L2C_map, L2B_Instances, CPUs=8)  # FIXME 8 workers due to heavy IO
736
            # FIXME in case of inmem_serialization mode results are too big to be back-pickled
737
            self.L2C_newObjects = [obj for obj in L2C_resObjects if isinstance(obj, L2C_P.L2C_object)]
738
            self.failed_objects += [obj for obj in L2C_resObjects if isinstance(obj, failed_GMS_object) and
739
740
741
742
743
744
745
746
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2C_newObjects

    def update_DB_job_record(self):
        """
        Update the database records of the current job (table 'jobs').
        """
747
        # TODO move this method to config.Job
748
749
        # update 'failed_sceneids' column of job record within jobs table
        sceneids_failed = list(set([obj.scene_ID for obj in self.failed_objects]))
750
        DB_T.update_records_in_postgreSQLdb(
751
            self.config.conn_database, 'jobs',
752
            {'failed_sceneids': sceneids_failed,  # update 'failed_sceneids' column
753
754
             'finishtime': self.config.end_time,  # add job finish timestamp
             'status': self.config.status},  # update 'job_status' column
755
            {'id': self.config.ID}, timeout=30000)
756

757
758
759
760
761
    def update_DB_job_statistics(self, usecase_datalist):
        """
        Update job statistics of the running job in the database.
        """
        # TODO move this method to config.Job
762
        already_updated_IDs = []
763
        for ds in usecase_datalist:
764
765
            if ds['proc_level'] is not None and ds['scene_ID'] not in already_updated_IDs:
                # update statistics column of jobs table
766
                DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
767
                    self.config.conn_database, 'jobs', 'statistics', cond_dict={'id': self.config.ID},
768
                    idx_val2decrement=db_jobs_statistics_def['pending'],
769
770
                    idx_val2increment=db_jobs_statistics_def[ds['proc_level']],
                    timeout=30000)
771

772
773
774
                # avoid double updating in case of subsystems belonging to the same scene ID
                already_updated_IDs.append(ds['scene_ID'])

775
776
777
778
    def create_job_summary(self):
        """
        Create job success summary
        """
779
780
781

        # get objects with highest requested processing level
        highest_procL_Objs = []
782
        for pL in reversed(proc_chain):
783
            if getattr(self.config, 'exec_%sP' % pL)[0]:
784
785
                highest_procL_Objs = \
                    getattr(self, '%s_newObjects' % pL) if pL != 'L2A' else (self.L2A_tiles or self.L2A_newObjects)
786
787
                break

788
789
790
791
        gms_objects2summarize = highest_procL_Objs + self.failed_objects
        if gms_objects2summarize:
            # create summaries
            detailed_JS, quick_JS = get_job_summary(gms_objects2summarize)
792
793
794
            detailed_JS.to_excel(os.path.join(self.config.path_job_logs, '%s_summary.xlsx' % self.config.ID))
            detailed_JS.to_csv(os.path.join(self.config.path_job_logs, '%s_summary.csv' % self.config.ID), sep='\t')
            self.logger.info('\nQUICK JOB SUMMARY (ID %s):\n' % self.config.ID + quick_JS.to_string())
795
796

            self.summary_detailed = detailed_JS
797
            self.summary_quick = quick_JS
798
799
800
801
802

        else:
            # TODO implement check if proc level with lowest procL has to be processed at all (due to job.exec_L1X)
            # TODO otherwise it is possible that get_job_summary receives an empty list
            self.logger.warning("Job summary skipped because get_job_summary() received an empty list of GMS objects.")
803
804
805
806
807
808

    def clear_lists_procObj(self):
        self.failed_objects = []
        self.L1A_newObjects = []
        self.L1B_newObjects = []
        self.L1C_newObjects = []
809
        self.L2A_tiles = []
810
        self.L2B_newObjects = []
811
812
813
814
815
        self.L2C_newObjects = []


def get_job_summary(list_GMS_objects):
    # get detailed job summary
816
817
    DJS_cols = ['GMS_object', 'scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem', 'image_type', 'proc_level',
                'arr_shape', 'arr_pos', 'failedMapper', 'ExceptionType', 'ExceptionValue', 'ExceptionTraceback']
818
819
820
821
    DJS = DataFrame(columns=DJS_cols)
    DJS['GMS_object'] = list_GMS_objects

    for col in DJS_cols[1:]:
822
823
        def get_val(obj): return getattr(obj, col) if hasattr(obj, col) else None
        DJS[col] = list(DJS['GMS_object'].map(get_val))
824
825

    del DJS['GMS_object']
826
    DJS = DJS.sort_values(by=['satellite', 'sensor', 'entity_ID'])
827
828

    # get quick job summary
829
830
831
832
    QJS = DataFrame(columns=['satellite', 'sensor', 'count', 'proc_successfully', 'proc_failed'])
    all_sat, all_sen = zip(*[i.split('__') for i in (np.unique(DJS['satellite'] + '__' + DJS['sensor']))])
    QJS['satellite'] = all_sat
    QJS['sensor'] = all_sen
833
    # count objects with the same satellite/sensor/sceneid combination
834
835
    QJS['count'] = [len(DJS[(DJS['satellite'] == sat) & (DJS['sensor'] == sen)]['scene_ID'].unique())
                    for sat, sen in zip(all_sat, all_sen)]
836
    QJS['proc_successfully'] = [len(DJS[(DJS['satellite'] == sat) &
837
838
                                        (DJS['sensor'] == sen) &
                                        (DJS['failedMapper'].isnull())]['scene_ID'].unique())
839
                                for sat, sen in zip(all_sat, all_sen)]
840
    QJS['proc_failed'] = QJS['count'] - QJS['proc_successfully']
841
842
    QJS = QJS.sort_values(by=['satellite', 'sensor'])
    return DJS, QJS