process_controller.py 39.1 KB
Newer Older
1
2
# -*- coding: utf-8 -*-

3
from __future__ import (division, print_function, unicode_literals, absolute_import)
4
5
6

import numpy as np
from pandas import DataFrame
7
8
9
10
import datetime
import os
import time
from itertools import chain
11
import signal
12
import re
13
from typing import TYPE_CHECKING
14
import shutil
15

16
17
from ..io import output_writer as OUT_W
from ..io import input_reader as INP_R
18
19
20
21
22
from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc.path_generator import path_generator
from ..misc.logging import GMS_logger, shutdown_loggers
from ..algorithms import L1A_P, L1B_P, L1C_P, L2A_P, L2B_P, L2C_P
23
from ..model.metadata import get_LayerBandsAssignment
24
from ..model.gms_object import failed_GMS_object, GMS_object
25
from .pipeline import (L1A_map, L1A_map_1, L1A_map_2, L1A_map_3, L1B_map, L1C_map,
26
                       L2A_map, L2B_map, L2C_map)
27
from ..options.config import set_config
28
from .multiproc import MAP, imap_unordered
29
from ..misc.definition_dicts import proc_chain, db_jobs_statistics_def
30

31
32
from py_tools_ds.numeric.array import get_array_tilebounds

33
if TYPE_CHECKING:
Daniel Scheffler's avatar
Daniel Scheffler committed
34
    from collections import OrderedDict  # noqa F401  # flake8 issue
35
36
    from typing import List  # noqa F401  # flake8 issue
    from ..options.config import GMS_config  # noqa F401  # flake8 issue
37
38
39
40


__author__ = 'Daniel Scheffler'

41
42

class process_controller(object):
43
    def __init__(self, job_ID, **config_kwargs):
44
        """gms_preprocessing process controller
45

46
47
        :param job_ID:          job ID belonging to a valid database record within table 'jobs'
        :param config_kwargs:   keyword arguments to be passed to gms_preprocessing.set_config()
48
49
50
        """

        # assertions
51
52
        if not isinstance(job_ID, int):
            raise ValueError("'job_ID' must be an integer value. Got %s." % type(job_ID))
53

54
55
        # set GMS configuration
        config_kwargs.update(dict(reset_status=True))
56
        self.config = set_config(job_ID, **config_kwargs)  # type: GMS_config
57
58

        # defaults
59
        self._logger = None
60
        self._DB_job_record = None
61
        self.profiler = None
62
63
64
65
66

        self.failed_objects = []
        self.L1A_newObjects = []
        self.L1B_newObjects = []
        self.L1C_newObjects = []
67
        self.L2A_newObjects = []
68
        self.L2A_tiles = []
69
70
71
72
        self.L2B_newObjects = []
        self.L2C_newObjects = []

        self.summary_detailed = None
73
        self.summary_quick = None
74

75
76
        # check if process_controller is executed by debugger
        # isdebugging = 1 if True in [frame[1].endswith("pydevd.py") for frame in inspect.stack()] else False
77
        # if isdebugging:  # override the existing settings in order to get write access everywhere
78
79
        #    pass

80
        # called_from_iPyNb = 1 if 'ipykernel/__main__.py' in sys.argv[0] else 0
81

82
        self.logger.info('Process Controller initialized for job ID %s (comment: %s).'
83
                         % (self.config.ID, self.DB_job_record.comment))
84

85
        if self.config.delete_old_output:
86
87
            self.logger.info('Deleting previously processed data...')
            self.DB_job_record.delete_procdata_of_entire_job(force=True)
88

89
90
91
92
93
    @property
    def logger(self):
        if self._logger and self._logger.handlers[:]:
            return self._logger
        else:
94
95
96
            self._logger = GMS_logger('log__%s' % self.config.ID,
                                      path_logfile=os.path.join(self.config.path_job_logs, '%s.log' % self.config.ID),
                                      log_level=self.config.log_level, append=False)
97
98
99
100
101
102
103
104
105
106
107
108
109
            return self._logger

    @logger.setter
    def logger(self, logger):
        self._logger = logger

    @logger.deleter
    def logger(self):
        if self._logger not in [None, 'not set']:
            self.logger.close()
            self.logger = None

    @property
110
111
112
113
    def DB_job_record(self):
        if self._DB_job_record:
            return self._DB_job_record
        else:
114
115
            self._DB_job_record = DB_T.GMS_JOB(self.config.conn_database)
            self._DB_job_record.from_job_ID(self.config.ID)
116
            return self._DB_job_record
117

118
119
120
    @DB_job_record.setter
    def DB_job_record(self, value):
        self._DB_job_record = value
121

122
123
124
    @property
    def sceneids_failed(self):
        return [obj.scene_ID for obj in self.failed_objects]
125

126
127
    def _add_local_availability_single_dataset(self, dataset):
        # type: (OrderedDict) -> OrderedDict
128
        # TODO revise this function
129
130
        # query the database and get the last written processing level and LayerBandsAssignment
        DB_match = DB_T.get_info_from_postgreSQLdb(
131
            self.config.conn_database, 'scenes_proc', ['proc_level', 'layer_bands_assignment'],
132
            dict(sceneid=dataset['scene_ID']))
Daniel Scheffler's avatar
Daniel Scheffler committed
133

134
        # get the corresponding logfile
135
136
        path_logfile = path_generator(dataset).get_path_logfile(merged_subsystems=False)
        path_logfile_merged_ss = path_generator(dataset).get_path_logfile(merged_subsystems=True)
137
138
139
140

        def get_AllWrittenProcL_dueLog(path_log):  # TODO replace this by database query + os.path.exists
            """Returns all processing level that have been successfully written according to logfile."""

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
141
142
143
144
            if not os.path.exists(path_log):
                if path_log == path_logfile:  # path_logfile_merged_ss has already been searched
                    self.logger.info("No logfile named '%s' found for %s at %s. Dataset has to be reprocessed."
                                     % (os.path.basename(path_log), dataset['entity_ID'], os.path.dirname(path_log)))
145
146
147
148
                AllWrittenProcL_dueLog = []
            else:
                logfile = open(path_log, 'r').read()
                AllWrittenProcL_dueLog = re.findall(":*(\S*\s*) data successfully saved.", logfile, re.I)
149
                if not AllWrittenProcL_dueLog and path_logfile == path_logfile_merged_ss:  # AllWrittenProcL_dueLog = []
150
151
152
153
154
155
156
157
158
159
                    self.logger.info('%s: According to logfile no completely processed data exist at any '
                                     'processing level. Dataset has to be reprocessed.' % dataset['entity_ID'])
                else:
                    AllWrittenProcL_dueLog = HLP_F.sorted_nicely(list(set(AllWrittenProcL_dueLog)))
            return AllWrittenProcL_dueLog

        # check if there are not multiple database records for this dataset
        if len(DB_match) == 1 or DB_match == [] or DB_match == 'database connection fault':

            # get all processing level that have been successfully written
160
161
            # NOTE: first check for merged subsystem datasets because they have hiver processing levels
            AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile_merged_ss)
162
            if not AllWrittenProcL:
163
                AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile)
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
164
165
166
            else:
                # A L2A+ dataset with merged subsystems has been found. Use that logfile.
                path_logfile = path_logfile_merged_ss
167

168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
            dataset['proc_level'] = None  # default (dataset has to be reprocessed)

            # loop through all the found proc. levels and find the one that fulfills all requirements
            for ProcL in reversed(AllWrittenProcL):
                if dataset['proc_level']:
                    break  # proc_level found; no further searching for lower proc_levels
                assumed_path_GMS_file = '%s_%s.gms' % (os.path.splitext(path_logfile)[0], ProcL)

                # check if there is also a corresponding GMS_file on disk
                if os.path.isfile(assumed_path_GMS_file):
                    GMS_file_dict = INP_R.GMSfile2dict(assumed_path_GMS_file)
                    target_LayerBandsAssignment = \
                        get_LayerBandsAssignment(dict(
                            image_type=dataset['image_type'],
                            Satellite=dataset['satellite'],
                            Sensor=dataset['sensor'],
184
                            Subsystem=dataset['subsystem'] if path_logfile != path_logfile_merged_ss else '',
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
                            proc_level=ProcL,  # must be respected because LBA changes after atm. Corr.
                            dataset_ID=dataset['dataset_ID'],
                            logger=None), nBands=(1 if dataset['sensormode'] == 'P' else None))

                    # check if the LayerBandsAssignment of the written dataset on disk equals the
                    # desired LayerBandsAssignment
                    if target_LayerBandsAssignment == GMS_file_dict['LayerBandsAssignment']:

                        # update the database record if the dataset could not be found in database
                        if DB_match == [] or DB_match == 'database connection fault':
                            self.logger.info('The dataset %s is not included in the database of processed data but'
                                             ' according to logfile %s has been written successfully. Recreating '
                                             'missing database entry.' % (dataset['entity_ID'], ProcL))
                            DB_T.data_DB_updater(GMS_file_dict)

                            dataset['proc_level'] = ProcL

                        # if the dataset could be found in database
                        elif len(DB_match) == 1:
                            try:
                                self.logger.info('Found a matching %s dataset for %s. Processing skipped until %s.'
                                                 % (ProcL, dataset['entity_ID'],
                                                    proc_chain[proc_chain.index(ProcL) + 1]))
                            except IndexError:
                                self.logger.info('Found a matching %s dataset for %s. Processing already done.'
                                                 % (ProcL, dataset['entity_ID']))

                            if DB_match[0][0] == ProcL:
                                dataset['proc_level'] = DB_match[0][0]
                            else:
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
215
                                dataset['proc_level'] = ProcL
216

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
217
                    else:
Daniel Scheffler's avatar
Daniel Scheffler committed
218
                        self.logger.info('Found a matching %s dataset for %s but with a different '
Daniel Scheffler's avatar
Daniel Scheffler committed
219
                                         'LayerBandsAssignment (desired: %s; found %s). Dataset has to be reprocessed.'
220
221
                                         % (ProcL, dataset['entity_ID'],
                                            target_LayerBandsAssignment, GMS_file_dict['LayerBandsAssignment']))
222
223
224
225
226
                else:
                    self.logger.info('%s for dataset %s has been written due to logfile but no corresponding '
                                     'dataset has been found.' % (ProcL, dataset['entity_ID']) +
                                     ' Searching for lower processing level...'
                                     if AllWrittenProcL.index(ProcL) != 0 else '')
227

228
229
230
231
        elif len(DB_match) > 1:
            self.logger.info('According to database there are multiple matches for the dataset %s. Dataset has to '
                             'be reprocessed.' % dataset['entity_ID'])
            dataset['proc_level'] = None
232

233
234
        else:
            dataset['proc_level'] = None
235

236
        return dataset
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
237

238
239
240
    def add_local_availability(self, datasets):
        # type: (List[OrderedDict]) -> List[OrderedDict]
        """Check availability of all subsets per scene and processing level.
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
241

242
243
244
245
246
247
        NOTE: The processing level of those scenes, where not all subsystems are available in the same processing level
              is reset.

        :param datasets:    List of one OrderedDict per subsystem as generated by CFG.data_list
        """
        datasets = [self._add_local_availability_single_dataset(ds) for ds in datasets]
248

249
250
251
        ######################################################################################################
        # validate that all subsystems of the same sceneid are at the same processing level; otherwise reset #
        ######################################################################################################
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
252
253
        datasets_validated = []
        datasets_grouped = HLP_F.group_dicts_by_key(datasets, key='scene_ID')
254

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
255
256
        for ds_group in datasets_grouped:
            proc_lvls = [ds['proc_level'] for ds in ds_group]
257

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
258
259
            if not len(list(set(proc_lvls))) == 1:
                # reset processing level of those scenes where not all subsystems are available
260
                self.logger.info('%s: Found already processed subsystems at different processing levels %s. '
261
                                 'Dataset has to be reprocessed to avoid errors.'
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
262
                                 % (ds_group[0]['entity_ID'], proc_lvls))
263

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
264
265
                for ds in ds_group:
                    ds['proc_level'] = None
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
266
                    datasets_validated.append(ds)
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
267
268
            else:
                datasets_validated.extend(ds_group)
269

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
270
        return datasets_validated
271

272
273
    @staticmethod
    def _is_inMEM(GMS_objects, dataset):
Daniel Scheffler's avatar
Daniel Scheffler committed
274
        # type: (list, OrderedDict) -> bool
275
276
277
278
        """Checks whether a dataset within a dataset list has been processed in the previous processing level.
        :param GMS_objects: <list> a list of GMS objects that has been recently processed
        :param dataset:     <collections.OrderedDict> as generated by L0A_P.get_data_list_of_current_jobID()
        """
279
        # check if the scene ID of the given dataset is in the scene IDs of the previously processed datasets
280
281
282
        return dataset['scene_ID'] in [obj.scene_ID for obj in GMS_objects]

    def _get_processor_data_list(self, procLvl, prevLvl_objects=None):
283
        """Returns a list of datasets that have to be read from disk and then processed by a specific processor.
284
285
286
287
288

        :param procLvl:
        :param prevLvl_objects:
        :return:
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
289
290
        def is_procL_lower(dataset):
            return HLP_F.is_proc_level_lower(dataset['proc_level'], target_lvl=procLvl)
291
292

        if prevLvl_objects is None:
Daniel Scheffler's avatar
Daniel Scheffler committed
293
            return [dataset for dataset in self.config.data_list if is_procL_lower(dataset)]  # TODO generator?
294
        else:
Daniel Scheffler's avatar
Daniel Scheffler committed
295
            return [dataset for dataset in self.config.data_list if is_procL_lower(dataset) and
296
                    not self._is_inMEM(prevLvl_objects + self.failed_objects, dataset)]
297
298
299
300
301
302
303
304
305
306
307
308
309

    def get_DB_objects(self, procLvl, prevLvl_objects=None, parallLev=None, blocksize=None):
        """
        Returns a list of GMS objects for datasets available on disk that have to be processed by the current processor.

        :param procLvl:         <str> processing level oof the current processor
        :param prevLvl_objects: <list> of in-mem GMS objects produced by the previous processor
        :param parallLev:       <str> parallelization level ('scenes' or 'tiles')
                                -> defines if full cubes or blocks are to be returned
        :param blocksize:       <tuple> block size in case blocks are to be returned, e.g. (2000,2000)
        :return:
        """
        # TODO get prevLvl_objects automatically from self
310
        if procLvl == 'L1A':
311
312
313
            return []
        else:
            # handle input parameters
Daniel Scheffler's avatar
Daniel Scheffler committed
314
315
            parallLev = parallLev or self.config.parallelization_level
            blocksize = blocksize or self.config.tiling_block_size_XY
316
            prevLvl = proc_chain[proc_chain.index(procLvl) - 1]  # TODO replace by enum
317
318

            # get GMSfile list
319
            dataset_dicts = self._get_processor_data_list(procLvl, prevLvl_objects)
320
321
322
323
324
            GMSfile_list_prevLvl_inDB = INP_R.get_list_GMSfiles(dataset_dicts, prevLvl)

            # create GMS objects from disk with respect to parallelization level and block size
            if parallLev == 'scenes':
                # get input parameters for creating GMS objects as full cubes
325
                work = [[GMS, ['cube', None]] for GMS in GMSfile_list_prevLvl_inDB]
326
327
            else:
                # define tile positions and size
328
                def get_tilepos_list(GMSfile):
329
330
                    return get_array_tilebounds(array_shape=INP_R.GMSfile2dict(GMSfile)['shape_fullArr'],
                                                tile_shape=blocksize)
331
332
333

                # get input parameters for creating GMS objects as blocks
                work = [[GMSfile, ['block', tp]] for GMSfile in GMSfile_list_prevLvl_inDB
334
                        for tp in get_tilepos_list(GMSfile)]
335

336
337
338
339
            # create GMS objects for the found files on disk
            # NOTE: DON'T multiprocess that with MAP(GMS_object(*initargs).from_disk, work)
            # in case of multiple subsystems GMS_object(*initargs) would always point to the same object in memory
            # -> subsystem attribute will be overwritten each time
340
            def init_GMS_obj(): return HLP_F.parentObjDict[prevLvl](*HLP_F.initArgsDict[prevLvl])
341
342
            DB_objs = [init_GMS_obj().from_disk(tuple_GMS_subset=w) for w in work]  # init

343
344
345
346
347
            if DB_objs:
                DB_objs = list(chain.from_iterable(DB_objs)) if list in [type(i) for i in DB_objs] else list(DB_objs)

            return DB_objs

Daniel Scheffler's avatar
Daniel Scheffler committed
348
    def run_all_processors_OLD(self, custom_data_list=None):
349
350
351
        """
        Run all processors at once.
        """
352

353
        signal.signal(signal.SIGINT, self.stop)  # enable clean shutdown possibility
354

355
        # noinspection PyBroadException
356
        try:
357
            if self.config.profiling:
358
359
360
361
                from pyinstrument import Profiler
                self.profiler = Profiler()  # or Profiler(use_signal=False), see below
                self.profiler.start()

362
            self.logger.info('Execution of entire GeoMultiSens pre-processing chain started for job ID %s...'
363
                             % self.config.ID)
364
            self.DB_job_record.reset_job_progress()  # updates attributes of DB_job_record and related DB entry
365
            self.config.status = 'running'
366
            self.update_DB_job_record()  # TODO implement that into job.status.setter
367
368
369
370
371

            self.failed_objects = []

            # get list of datasets to be processed
            if custom_data_list:
372
                self.config.data_list = custom_data_list
373
374

            # add local availability
Daniel Scheffler's avatar
Daniel Scheffler committed
375
            self.config.data_list = self.add_local_availability(self.config.data_list)
376
            self.update_DB_job_statistics(self.config.data_list)
377
378
379
380
381
382
383
384
385
386
387
388

            self.L1A_processing()
            self.L1B_processing()
            self.L1C_processing()
            self.L2A_processing()
            self.L2B_processing()
            self.L2C_processing()

            # create summary
            self.create_job_summary()

            self.logger.info('Execution finished.')
389
            # TODO implement failed_with_warnings:
390
391
392
393
            self.config.status = 'finished' if not self.failed_objects else 'finished_with_errors'
            self.config.end_time = datetime.datetime.now()
            self.config.computation_time = self.config.end_time - self.config.start_time
            self.logger.info('Time for execution: %s' % self.config.computation_time)
394
395
396
397

            # update database entry of current job
            self.update_DB_job_record()

398
            if self.config.profiling:
399
400
401
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

402
            self.shutdown()
403

404
        except Exception:  # noqa E722  # bare except
405
            if self.config.profiling:
406
407
408
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

409
            self.config.status = 'failed'
Daniel Scheffler's avatar
Daniel Scheffler committed
410
411
412
413
            self.update_DB_job_record()

            if not self.config.disable_exception_handler:
                self.logger.error('Execution failed with an error:', exc_info=True)
414
                self.shutdown()
Daniel Scheffler's avatar
Daniel Scheffler committed
415
416
            else:
                self.logger.error('Execution failed with an error:')
417
                self.shutdown()
Daniel Scheffler's avatar
Daniel Scheffler committed
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
                raise

    def run_all_processors(self, custom_data_list=None):
        signal.signal(signal.SIGINT, self.stop)  # enable clean shutdown possibility

        # noinspection PyBroadException
        try:
            if self.config.profiling:
                from pyinstrument import Profiler
                self.profiler = Profiler()  # or Profiler(use_signal=False), see below
                self.profiler.start()

            self.logger.info('Execution of entire GeoMultiSens pre-processing chain started for job ID %s...'
                             % self.config.ID)
            self.DB_job_record.reset_job_progress()  # updates attributes of DB_job_record and related DB entry
            self.config.status = 'running'
434
            GMS_object.proc_status_all_GMSobjs.clear()  # reset
Daniel Scheffler's avatar
Daniel Scheffler committed
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
            self.update_DB_job_record()  # TODO implement that into config.status.setter

            self.failed_objects = []

            # get list of datasets to be processed
            if custom_data_list:
                self.config.data_list = custom_data_list

            # add local availability
            self.config.data_list = self.add_local_availability(self.config.data_list)
            self.update_DB_job_statistics(self.config.data_list)

            # group dataset dicts by sceneid
            dataset_groups = HLP_F.group_dicts_by_key(self.config.data_list, key='scene_ID')

450
            # RUN PREPROCESSING
Daniel Scheffler's avatar
Daniel Scheffler committed
451
            from .pipeline import run_complete_preprocessing
452
            GMS_objs = imap_unordered(run_complete_preprocessing, dataset_groups, flatten_output=True)
Daniel Scheffler's avatar
Daniel Scheffler committed
453
454

            # separate results into successful and failed objects
455
456
457
458
459
460
461
462
463
            def assign_attr(tgt_procL):
                return [obj for obj in GMS_objs if isinstance(obj, GMS_object) and obj.proc_level == tgt_procL]

            self.L1A_newObjects = assign_attr('L1A')
            self.L1B_newObjects = assign_attr('L1B')
            self.L1C_newObjects = assign_attr('L1C')
            self.L2A_newObjects = assign_attr('L2A')
            self.L2B_newObjects = assign_attr('L2B')
            self.L2C_newObjects = assign_attr('L2C')
Daniel Scheffler's avatar
Daniel Scheffler committed
464
465
466
467
468
469
            self.failed_objects = [obj for obj in GMS_objs if isinstance(obj, failed_GMS_object)]

            # create summary
            self.create_job_summary()

            self.logger.info('Execution finished.')
470
            # TODO implement failed_with_warnings
Daniel Scheffler's avatar
Daniel Scheffler committed
471
472
473
474
475
476
477
478
479
480
481
482
            self.config.status = 'finished' if not self.failed_objects else 'finished_with_errors'
            self.config.end_time = datetime.datetime.now()
            self.config.computation_time = self.config.end_time - self.config.start_time
            self.logger.info('Time for execution: %s' % self.config.computation_time)

            # update database entry of current job
            self.update_DB_job_record()

            if self.config.profiling:
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

483
            self.shutdown()
Daniel Scheffler's avatar
Daniel Scheffler committed
484
485
486
487
488
489
490

        except Exception:  # noqa E722  # bare except
            if self.config.profiling:
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

            self.config.status = 'failed'
491
492
            self.update_DB_job_record()

493
            if not self.config.disable_exception_handler:
494
                self.logger.error('Execution failed with an error:', exc_info=True)
495
                self.shutdown()
496
            else:
497
                self.logger.error('Execution failed with an error:')
498
                self.shutdown()
499
                raise
500

501
502
    def stop(self, signum, frame):
        """Interrupt the running process controller gracefully."""
503

504
        self.config.status = 'canceled'
505
506
        self.update_DB_job_record()

507
        self.shutdown()
508
        self.logger.warning('Process controller stopped by user.')
509
510
511
512
513
514

        raise KeyboardInterrupt  # terminate execution and show traceback

    def shutdown(self):
        """Shutdown the process controller instance (loggers, remove temporary directories, ...)."""

515
516
        del self.logger
        shutdown_loggers()
517

518
        # clear any temporary files
519
        tempdir = os.path.join(self.config.path_tempdir)
520
        self.logger.warning('Deleting temporary directory %s.' % tempdir)
521
522
        if os.path.exists(tempdir):
            shutil.rmtree(tempdir)
523

524
525
526
527
    def benchmark(self):
        """
        Run a benchmark.
        """
528
        data_list_bench = self.config.data_list
529
530
531
532
        for count_datasets in range(len(data_list_bench)):
            t_processing_all_runs, t_IO_all_runs = [], []
            for count_run in range(10):
                current_data_list = data_list_bench[0:count_datasets + 1]
533
534
                if os.path.exists(self.config.path_database):
                    os.remove(self.config.path_database)
535
536
537
538
539
540
541
542
543
544
                t_start = time.time()
                self.run_all_processors(current_data_list)
                t_processing_all_runs.append(time.time() - t_start)
                t_IO_all_runs.append(globals()['time_IO'])

            assert current_data_list, 'Empty data list.'
            OUT_W.write_global_benchmark_output(t_processing_all_runs, t_IO_all_runs, current_data_list)

    def L1A_processing(self):
        """
545
        Run Level 1A processing: Data import and metadata homogenization
546
        """
547
        if self.config.exec_L1AP[0]:
548
            self.logger.info('\n\n##### Level 1A Processing started - raster format and metadata homogenization ####\n')
549

550
551
            datalist_L1A_P = self._get_processor_data_list('L1A')

552
            if self.config.parallelization_level == 'scenes':
553
                # map
554
                L1A_resObjects = MAP(L1A_map, datalist_L1A_P, CPUs=12)
555
            else:  # tiles
556
557
                all_L1A_tiles_map1 = MAP(L1A_map_1, datalist_L1A_P,
                                         flatten_output=True)  # map_1 # merge results to new list of splits
558

559
560
561
                L1A_obj_tiles = MAP(L1A_map_2, all_L1A_tiles_map1)  # map_2
                grouped_L1A_Tiles = HLP_F.group_objects_by_attributes(
                    L1A_obj_tiles, 'scene_ID', 'subsystem')  # group results
562

563
                L1A_objects = MAP(L1A_P.L1A_object().from_tiles, grouped_L1A_Tiles)  # reduce
564

565
                L1A_resObjects = MAP(L1A_map_3, L1A_objects)  # map_3
566

567
            self.L1A_newObjects = [obj for obj in L1A_resObjects if isinstance(obj, L1A_P.L1A_object)]
568
            self.failed_objects += [obj for obj in L1A_resObjects if isinstance(obj, failed_GMS_object) and
569
570
571
572
573
574
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L1A_newObjects

    def L1B_processing(self):
        """
575
        Run Level 1B processing: calculation of geometric shifts
576
577
578
579
        """
        # TODO implement check for running spatial index mediator server
        # run on full cubes

580
        if self.config.exec_L1BP[0]:
581
            self.logger.info('\n\n####### Level 1B Processing started - detection of geometric displacements #######\n')
582

583
584
            L1A_DBObjects = self.get_DB_objects('L1B', self.L1A_newObjects, parallLev='scenes')
            L1A_Instances = self.L1A_newObjects + L1A_DBObjects  # combine newly and earlier processed L1A data
585

586
            L1B_resObjects = MAP(L1B_map, L1A_Instances)
587

588
589
590
            self.L1B_newObjects = [obj for obj in L1B_resObjects if isinstance(obj, L1B_P.L1B_object)]
            self.failed_objects += [obj for obj in L1B_resObjects if isinstance(obj, failed_GMS_object) and
                                    obj.scene_ID not in self.sceneids_failed]
591
592
593
594
595

        return self.L1B_newObjects

    def L1C_processing(self):
        """
596
        Run Level 1C processing: atmospheric correction
597
        """
598
        if self.config.exec_L1CP[0]:
599
            self.logger.info('\n\n############## Level 1C Processing started - atmospheric correction ##############\n')
600

601
            if self.config.parallelization_level == 'scenes':
602
603
604
605
                L1B_DBObjects = self.get_DB_objects('L1C', self.L1B_newObjects)
                L1B_Instances = self.L1B_newObjects + L1B_DBObjects  # combine newly and earlier processed L1B data

                # group by scene ID (all subsystems belonging to the same scene ID must be processed together)
606
                grouped_L1B_Instances = HLP_F.group_objects_by_attributes(L1B_Instances, 'scene_ID')
607

608
609
                L1C_resObjects = MAP(L1C_map, grouped_L1B_Instances, flatten_output=True,
                                     CPUs=15)  # FIXME CPUs set to 15 for testing
610

611
            else:  # tiles
612
613
                raise NotImplementedError("Tiled processing is not yet completely implemented for L1C processor. Use "
                                          "parallelization level 'scenes' instead!")
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
                # blocksize = (5000, 5000)
                # """if newly processed L1A objects are present: cut them into tiles"""
                # L1B_newTiles = []
                # if self.L1B_newObjects:
                #     tuples_obj_blocksize = [(obj, blocksize) for obj in self.L1B_newObjects]
                #     L1B_newTiles = MAP(HLP_F.cut_GMS_obj_into_blocks, tuples_obj_blocksize, flatten_output=True)
                #
                # """combine newly and earlier processed L1B data"""
                # L1B_newDBTiles = self.get_DB_objects('L1C', self.L1B_newObjects, blocksize=blocksize)
                # L1B_tiles = L1B_newTiles + L1B_newDBTiles
                #
                # # TODO merge subsets of S2/Aster in order to provide all bands for atm.correction
                # L1C_tiles = MAP(L1C_map, L1B_tiles)
                # grouped_L1C_Tiles = \
                #     HLP_F.group_objects_by_attributes(L1C_tiles, 'scene_ID', 'subsystem')  # group results
                # [L1C_tiles_group[0].delete_tempFiles() for L1C_tiles_group in grouped_L1C_Tiles]
                # L1C_resObjects = MAP(L1C_P.L1C_object().from_tiles, grouped_L1C_Tiles)  # reduce

            self.L1C_newObjects = [obj for obj in L1C_resObjects if isinstance(obj, L1C_P.L1C_object)]
633
            self.failed_objects += [obj for obj in L1C_resObjects if isinstance(obj, failed_GMS_object) and
634
635
636
637
638
639
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L1C_newObjects

    def L2A_processing(self):
        """
640
        Run Level 2A processing: geometric homogenization
641
        """
642
        if self.config.exec_L2AP[0]:
643
644
            self.logger.info(
                '\n\n#### Level 2A Processing started - shift correction / geometric homogenization ####\n')
645

646
            """combine newly and earlier processed L1C data"""
647
648
            L1C_DBObjects = self.get_DB_objects('L2A', self.L1C_newObjects, parallLev='scenes')
            L1C_Instances = self.L1C_newObjects + L1C_DBObjects  # combine newly and earlier processed L1C data
649
650
651
652

            # group by scene ID (all subsystems belonging to the same scene ID must be processed together)
            grouped_L1C_Instances = HLP_F.group_objects_by_attributes(L1C_Instances, 'scene_ID')

653
            L2A_resTiles = MAP(L2A_map, grouped_L1C_Instances, flatten_output=True)
654

655
            self.L2A_tiles = [obj for obj in L2A_resTiles if isinstance(obj, L2A_P.L2A_object)]
656
            self.failed_objects += [obj for obj in L2A_resTiles if isinstance(obj, failed_GMS_object) and
657
658
659
660
661
662
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2A_tiles

    def L2B_processing(self):
        """
663
        Run Level 2B processing: spectral homogenization
664
        """
665
        if self.config.exec_L2BP[0]:
666
            self.logger.info('\n\n############# Level 2B Processing started - spectral homogenization ##############\n')
667

668
            if self.config.parallelization_level == 'scenes':
669
                # don't know if scenes makes sense in L2B processing because full objects are very big!
670
                """if newly processed L2A objects are present: merge them to scenes"""
671
672
                grouped_L2A_Tiles = HLP_F.group_objects_by_attributes(self.L2A_tiles, 'scene_ID')  # group results
                # reduce # will be too slow because it has to pickle back really large L2A_newObjects
673
                # L2A_newObjects  = MAP(HLP_F.merge_GMS_tiles_to_GMS_obj, grouped_L2A_Tiles)
674
                L2A_newObjects = [L2A_P.L2A_object().from_tiles(tileList) for tileList in grouped_L2A_Tiles]
675

676
                """combine newly and earlier processed L2A data"""
677
678
                L2A_DBObjects = self.get_DB_objects('L2B', self.L2A_tiles)
                L2A_Instances = L2A_newObjects + L2A_DBObjects  # combine newly and earlier processed L2A data
679

680
                L2B_resObjects = MAP(L2B_map, L2A_Instances)
681
682

            else:  # tiles
683
                L2A_newTiles = self.L2A_tiles  # tiles have the block size specified in L2A_map_2
684
685

                """combine newly and earlier processed L2A data"""
686
687
688
                blocksize = (2048, 2048)  # must be equal to the blocksize of L2A_newTiles specified in L2A_map_2
                L2A_newDBTiles = self.get_DB_objects('L2B', self.L2A_tiles, blocksize=blocksize)
                L2A_tiles = L2A_newTiles + L2A_newDBTiles
689

690
                L2B_tiles = MAP(L2B_map, L2A_tiles)
691
692

                grouped_L2B_Tiles = \
693
694
                    HLP_F.group_objects_by_attributes(L2B_tiles,
                                                      'scene_ID')  # group results # FIXME nötig an dieser Stelle?
695
696
                [L2B_tiles_group[0].delete_tempFiles() for L2B_tiles_group in grouped_L2B_Tiles]

697
                L2B_resObjects = [L2B_P.L2B_object().from_tiles(tileList) for tileList in grouped_L2B_Tiles]
698

699
            self.L2B_newObjects = [obj for obj in L2B_resObjects if isinstance(obj, L2B_P.L2B_object)]
700
            self.failed_objects += [obj for obj in L2B_resObjects if isinstance(obj, failed_GMS_object) and
701
702
703
704
705
706
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2B_newObjects

    def L2C_processing(self):
        """
707
        Run Level 2C processing: accurracy assessment and MGRS tiling
708
        """
709
        # FIXME only parallelization_level == 'scenes' implemented
710
        if self.config.exec_L2CP[0]:
711
            self.logger.info('\n\n########## Level 2C Processing started - calculation of quality layers ###########\n')
712

713
            """combine newly and earlier processed L2A data"""
714
715
            L2B_DBObjects = self.get_DB_objects('L2C', self.L2B_newObjects, parallLev='scenes')
            L2B_Instances = self.L2B_newObjects + L2B_DBObjects  # combine newly and earlier processed L2A data
716

717
            L2C_resObjects = MAP(L2C_map, L2B_Instances, CPUs=8)  # FIXME 8 workers due to heavy IO
718
            # FIXME in case of inmem_serialization mode results are too big to be back-pickled
719
            self.L2C_newObjects = [obj for obj in L2C_resObjects if isinstance(obj, L2C_P.L2C_object)]
720
            self.failed_objects += [obj for obj in L2C_resObjects if isinstance(obj, failed_GMS_object) and
721
722
723
724
725
726
727
728
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2C_newObjects

    def update_DB_job_record(self):
        """
        Update the database records of the current job (table 'jobs').
        """
729
        # TODO move this method to config.Job
730
731
        # update 'failed_sceneids' column of job record within jobs table
        sceneids_failed = list(set([obj.scene_ID for obj in self.failed_objects]))
732
        DB_T.update_records_in_postgreSQLdb(
733
            self.config.conn_database, 'jobs',
734
            {'failed_sceneids': sceneids_failed,  # update 'failed_sceneids' column
735
736
             'finishtime': self.config.end_time,  # add job finish timestamp
             'status': self.config.status},  # update 'job_status' column
737
            {'id': self.config.ID}, timeout=30000)
738

739
740
741
742
743
    def update_DB_job_statistics(self, usecase_datalist):
        """
        Update job statistics of the running job in the database.
        """
        # TODO move this method to config.Job
744
        already_updated_IDs = []
745
        for ds in usecase_datalist:
746
747
            if ds['proc_level'] is not None and ds['scene_ID'] not in already_updated_IDs:
                # update statistics column of jobs table
748
                DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
749
                    self.config.conn_database, 'jobs', 'statistics', cond_dict={'id': self.config.ID},
750
                    idx_val2decrement=db_jobs_statistics_def['pending'],
751
752
                    idx_val2increment=db_jobs_statistics_def[ds['proc_level']])

753
754
755
                # avoid double updating in case of subsystems belonging to the same scene ID
                already_updated_IDs.append(ds['scene_ID'])

756
757
758
759
    def create_job_summary(self):
        """
        Create job success summary
        """
760
761
762

        # get objects with highest requested processing level
        highest_procL_Objs = []
763
        for pL in reversed(proc_chain):
764
            if getattr(self.config, 'exec_%sP' % pL)[0]:
765
                highest_procL_Objs = getattr(self, '%s_newObjects' % pL) if pL != 'L2A' else self.L2A_tiles
766
767
                break

768
769
770
771
        gms_objects2summarize = highest_procL_Objs + self.failed_objects
        if gms_objects2summarize:
            # create summaries
            detailed_JS, quick_JS = get_job_summary(gms_objects2summarize)
772
773
774
            detailed_JS.to_excel(os.path.join(self.config.path_job_logs, '%s_summary.xlsx' % self.config.ID))
            detailed_JS.to_csv(os.path.join(self.config.path_job_logs, '%s_summary.csv' % self.config.ID), sep='\t')
            self.logger.info('\nQUICK JOB SUMMARY (ID %s):\n' % self.config.ID + quick_JS.to_string())
775
776

            self.summary_detailed = detailed_JS
777
            self.summary_quick = quick_JS
778
779
780
781
782

        else:
            # TODO implement check if proc level with lowest procL has to be processed at all (due to job.exec_L1X)
            # TODO otherwise it is possible that get_job_summary receives an empty list
            self.logger.warning("Job summary skipped because get_job_summary() received an empty list of GMS objects.")
783
784
785
786
787
788

    def clear_lists_procObj(self):
        self.failed_objects = []
        self.L1A_newObjects = []
        self.L1B_newObjects = []
        self.L1C_newObjects = []
789
        self.L2A_tiles = []
790
        self.L2B_newObjects = []
791
792
793
794
795
        self.L2C_newObjects = []


def get_job_summary(list_GMS_objects):
    # get detailed job summary
796
797
    DJS_cols = ['GMS_object', 'scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem', 'image_type', 'proc_level',
                'arr_shape', 'arr_pos', 'failedMapper', 'ExceptionType', 'ExceptionValue', 'ExceptionTraceback']
798
799
800
801
    DJS = DataFrame(columns=DJS_cols)
    DJS['GMS_object'] = list_GMS_objects

    for col in DJS_cols[1:]:
802
803
        def get_val(obj): return getattr(obj, col) if hasattr(obj, col) else None
        DJS[col] = list(DJS['GMS_object'].map(get_val))
804
805

    del DJS['GMS_object']
806
    DJS = DJS.sort_values(by=['satellite', 'sensor', 'entity_ID'])
807
808

    # get quick job summary
809
810
811
812
    QJS = DataFrame(columns=['satellite', 'sensor', 'count', 'proc_successfully', 'proc_failed'])
    all_sat, all_sen = zip(*[i.split('__') for i in (np.unique(DJS['satellite'] + '__' + DJS['sensor']))])
    QJS['satellite'] = all_sat
    QJS['sensor'] = all_sen
813
    # count objects with the same satellite/sensor/sceneid combination
814
815
    QJS['count'] = [len(DJS[(DJS['satellite'] == sat) & (DJS['sensor'] == sen)]['scene_ID'].unique())
                    for sat, sen in zip(all_sat, all_sen)]
816
    QJS['proc_successfully'] = [len(DJS[(DJS['satellite'] == sat) &
817
818
                                        (DJS['sensor'] == sen) &
                                        (DJS['failedMapper'].isnull())]['scene_ID'].unique())
819
                                for sat, sen in zip(all_sat, all_sen)]
820
    QJS['proc_failed'] = QJS['count'] - QJS['proc_successfully']
821
822
    QJS = QJS.sort_values(by=['satellite', 'sensor'])
    return DJS, QJS