process_controller.py 37.9 KB
Newer Older
1
2
# -*- coding: utf-8 -*-

3
from __future__ import (division, print_function, unicode_literals, absolute_import)
4
5
6

import numpy as np
from pandas import DataFrame
7
8
9
10
import datetime
import os
import time
from itertools import chain
11
import signal
12
import re
13
from typing import TYPE_CHECKING
14

15
16
from ..io import output_writer as OUT_W
from ..io import input_reader as INP_R
17
18
19
20
21
from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc.path_generator import path_generator
from ..misc.logging import GMS_logger, shutdown_loggers
from ..algorithms import L1A_P, L1B_P, L1C_P, L2A_P, L2B_P, L2C_P
22
from ..model.metadata import get_LayerBandsAssignment
23
from ..model.gms_object import failed_GMS_object, GMS_object
24
from .pipeline import (L1A_map, L1A_map_1, L1A_map_2, L1A_map_3, L1B_map, L1C_map,
25
                       L2A_map, L2B_map, L2C_map)
26
from ..options.config import set_config
27
from .multiproc import MAP, imap_unordered
28
from ..misc.definition_dicts import proc_chain, db_jobs_statistics_def
29

30
31
from py_tools_ds.numeric.array import get_array_tilebounds

32
if TYPE_CHECKING:
Daniel Scheffler's avatar
Daniel Scheffler committed
33
    from collections import OrderedDict  # noqa F401  # flake8 issue
34
35
    from typing import List  # noqa F401  # flake8 issue
    from ..options.config import GMS_config  # noqa F401  # flake8 issue
36
37
38
39


__author__ = 'Daniel Scheffler'

40
41

class process_controller(object):
42
    def __init__(self, job_ID, **config_kwargs):
43
        """gms_preprocessing process controller
44

45
46
        :param job_ID:          job ID belonging to a valid database record within table 'jobs'
        :param config_kwargs:   keyword arguments to be passed to gms_preprocessing.set_config()
47
48
49
        """

        # assertions
50
51
        if not isinstance(job_ID, int):
            raise ValueError("'job_ID' must be an integer value. Got %s." % type(job_ID))
52

53
54
        # set GMS configuration
        config_kwargs.update(dict(reset_status=True))
55
        self.config = set_config(job_ID, **config_kwargs)  # type: GMS_config
56
57

        # defaults
58
        self._logger = None
59
        self._DB_job_record = None
60
        self.profiler = None
61
62
63
64
65

        self.failed_objects = []
        self.L1A_newObjects = []
        self.L1B_newObjects = []
        self.L1C_newObjects = []
66
        self.L2A_newObjects = []
67
        self.L2A_tiles = []
68
69
70
71
        self.L2B_newObjects = []
        self.L2C_newObjects = []

        self.summary_detailed = None
72
        self.summary_quick = None
73

74
75
        # check if process_controller is executed by debugger
        # isdebugging = 1 if True in [frame[1].endswith("pydevd.py") for frame in inspect.stack()] else False
76
        # if isdebugging:  # override the existing settings in order to get write access everywhere
77
78
        #    pass

79
        # called_from_iPyNb = 1 if 'ipykernel/__main__.py' in sys.argv[0] else 0
80

81
        self.logger.info('Process Controller initialized for job ID %s (comment: %s).'
82
                         % (self.config.ID, self.DB_job_record.comment))
83

84
        if self.config.delete_old_output:
85
86
            self.logger.info('Deleting previously processed data...')
            self.DB_job_record.delete_procdata_of_entire_job(force=True)
87

88
89
90
91
92
    @property
    def logger(self):
        if self._logger and self._logger.handlers[:]:
            return self._logger
        else:
93
94
95
            self._logger = GMS_logger('log__%s' % self.config.ID,
                                      path_logfile=os.path.join(self.config.path_job_logs, '%s.log' % self.config.ID),
                                      log_level=self.config.log_level, append=False)
96
97
98
99
100
101
102
103
104
105
106
107
108
            return self._logger

    @logger.setter
    def logger(self, logger):
        self._logger = logger

    @logger.deleter
    def logger(self):
        if self._logger not in [None, 'not set']:
            self.logger.close()
            self.logger = None

    @property
109
110
111
112
    def DB_job_record(self):
        if self._DB_job_record:
            return self._DB_job_record
        else:
113
114
            self._DB_job_record = DB_T.GMS_JOB(self.config.conn_database)
            self._DB_job_record.from_job_ID(self.config.ID)
115
            return self._DB_job_record
116

117
118
119
    @DB_job_record.setter
    def DB_job_record(self, value):
        self._DB_job_record = value
120

121
122
123
    @property
    def sceneids_failed(self):
        return [obj.scene_ID for obj in self.failed_objects]
124

125
126
    def _add_local_availability_single_dataset(self, dataset):
        # type: (OrderedDict) -> OrderedDict
127
        # TODO revise this function
128
129
        # query the database and get the last written processing level and LayerBandsAssignment
        DB_match = DB_T.get_info_from_postgreSQLdb(
130
            self.config.conn_database, 'scenes_proc', ['proc_level', 'layer_bands_assignment'],
131
            dict(sceneid=dataset['scene_ID']))
Daniel Scheffler's avatar
Daniel Scheffler committed
132

133
        # get the corresponding logfile
134
135
        path_logfile = path_generator(dataset).get_path_logfile(merged_subsystems=False)
        path_logfile_merged_ss = path_generator(dataset).get_path_logfile(merged_subsystems=True)
136
137
138
139

        def get_AllWrittenProcL_dueLog(path_log):  # TODO replace this by database query + os.path.exists
            """Returns all processing level that have been successfully written according to logfile."""

140
            if not os.path.exists(path_log) and path_log != path_logfile_merged_ss:
141
142
143
144
145
146
                self.logger.info("No logfile named '%s' found for %s at %s. Dataset has to be reprocessed."
                                 % (os.path.basename(path_log), dataset['entity_ID'], os.path.dirname(path_log)))
                AllWrittenProcL_dueLog = []
            else:
                logfile = open(path_log, 'r').read()
                AllWrittenProcL_dueLog = re.findall(":*(\S*\s*) data successfully saved.", logfile, re.I)
147
                if not AllWrittenProcL_dueLog and path_logfile == path_logfile_merged_ss:  # AllWrittenProcL_dueLog = []
148
149
150
151
152
153
154
155
156
157
                    self.logger.info('%s: According to logfile no completely processed data exist at any '
                                     'processing level. Dataset has to be reprocessed.' % dataset['entity_ID'])
                else:
                    AllWrittenProcL_dueLog = HLP_F.sorted_nicely(list(set(AllWrittenProcL_dueLog)))
            return AllWrittenProcL_dueLog

        # check if there are not multiple database records for this dataset
        if len(DB_match) == 1 or DB_match == [] or DB_match == 'database connection fault':

            # get all processing level that have been successfully written
158
159
            # NOTE: first check for merged subsystem datasets because they have hiver processing levels
            AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile_merged_ss)
160
            if not AllWrittenProcL:
161
                AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile)
162

163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
            dataset['proc_level'] = None  # default (dataset has to be reprocessed)

            # loop through all the found proc. levels and find the one that fulfills all requirements
            for ProcL in reversed(AllWrittenProcL):
                if dataset['proc_level']:
                    break  # proc_level found; no further searching for lower proc_levels
                assumed_path_GMS_file = '%s_%s.gms' % (os.path.splitext(path_logfile)[0], ProcL)

                # check if there is also a corresponding GMS_file on disk
                if os.path.isfile(assumed_path_GMS_file):
                    GMS_file_dict = INP_R.GMSfile2dict(assumed_path_GMS_file)
                    target_LayerBandsAssignment = \
                        get_LayerBandsAssignment(dict(
                            image_type=dataset['image_type'],
                            Satellite=dataset['satellite'],
                            Sensor=dataset['sensor'],
                            Subsystem=dataset['subsystem'],
                            proc_level=ProcL,  # must be respected because LBA changes after atm. Corr.
                            dataset_ID=dataset['dataset_ID'],
                            logger=None), nBands=(1 if dataset['sensormode'] == 'P' else None))

                    # check if the LayerBandsAssignment of the written dataset on disk equals the
                    # desired LayerBandsAssignment
                    if target_LayerBandsAssignment == GMS_file_dict['LayerBandsAssignment']:

                        # update the database record if the dataset could not be found in database
                        if DB_match == [] or DB_match == 'database connection fault':
                            self.logger.info('The dataset %s is not included in the database of processed data but'
                                             ' according to logfile %s has been written successfully. Recreating '
                                             'missing database entry.' % (dataset['entity_ID'], ProcL))
                            DB_T.data_DB_updater(GMS_file_dict)

                            dataset['proc_level'] = ProcL

                        # if the dataset could be found in database
                        elif len(DB_match) == 1:
                            try:
                                self.logger.info('Found a matching %s dataset for %s. Processing skipped until %s.'
                                                 % (ProcL, dataset['entity_ID'],
                                                    proc_chain[proc_chain.index(ProcL) + 1]))
                            except IndexError:
                                self.logger.info('Found a matching %s dataset for %s. Processing already done.'
                                                 % (ProcL, dataset['entity_ID']))

                            if DB_match[0][0] == ProcL:
                                dataset['proc_level'] = DB_match[0][0]
                            else:
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
210
                                dataset['proc_level'] = ProcL
211

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
212
                    else:
213
214
215
216
217
218
219
220
                        self.logger.info('Found a matching dataset for %s but with a different '
                                         'LayerBandsAssignment. Dataset has to be reprocessed.'
                                         % dataset['entity_ID'])
                else:
                    self.logger.info('%s for dataset %s has been written due to logfile but no corresponding '
                                     'dataset has been found.' % (ProcL, dataset['entity_ID']) +
                                     ' Searching for lower processing level...'
                                     if AllWrittenProcL.index(ProcL) != 0 else '')
221

222
223
224
225
        elif len(DB_match) > 1:
            self.logger.info('According to database there are multiple matches for the dataset %s. Dataset has to '
                             'be reprocessed.' % dataset['entity_ID'])
            dataset['proc_level'] = None
226

227
228
        else:
            dataset['proc_level'] = None
229

230
        return dataset
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
231

232
233
234
    def add_local_availability(self, datasets):
        # type: (List[OrderedDict]) -> List[OrderedDict]
        """Check availability of all subsets per scene and processing level.
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
235

236
237
238
239
240
241
        NOTE: The processing level of those scenes, where not all subsystems are available in the same processing level
              is reset.

        :param datasets:    List of one OrderedDict per subsystem as generated by CFG.data_list
        """
        datasets = [self._add_local_availability_single_dataset(ds) for ds in datasets]
242

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
243
244
        datasets_validated = []
        datasets_grouped = HLP_F.group_dicts_by_key(datasets, key='scene_ID')
245

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
246
247
        for ds_group in datasets_grouped:
            proc_lvls = [ds['proc_level'] for ds in ds_group]
248

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
249
250
            if not len(list(set(proc_lvls))) == 1:
                # reset processing level of those scenes where not all subsystems are available
251
                self.logger.info('%s: Found already processed subsystems at different processing levels %s. '
252
                                 'Dataset has to be reprocessed to avoid errors.'
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
253
                                 % (ds_group[0]['entity_ID'], proc_lvls))
254

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
255
256
                for ds in ds_group:
                    ds['proc_level'] = None
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
257
                    datasets_validated.append(ds)
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
258
259
            else:
                datasets_validated.extend(ds_group)
260

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
261
        return datasets_validated
262

263
264
    @staticmethod
    def _is_inMEM(GMS_objects, dataset):
Daniel Scheffler's avatar
Daniel Scheffler committed
265
        # type: (list, OrderedDict) -> bool
266
267
268
269
        """Checks whether a dataset within a dataset list has been processed in the previous processing level.
        :param GMS_objects: <list> a list of GMS objects that has been recently processed
        :param dataset:     <collections.OrderedDict> as generated by L0A_P.get_data_list_of_current_jobID()
        """
270
        # check if the scene ID of the given dataset is in the scene IDs of the previously processed datasets
271
272
273
        return dataset['scene_ID'] in [obj.scene_ID for obj in GMS_objects]

    def _get_processor_data_list(self, procLvl, prevLvl_objects=None):
274
        """Returns a list of datasets that have to be read from disk and then processed by a specific processor.
275
276
277
278
279

        :param procLvl:
        :param prevLvl_objects:
        :return:
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
280
281
        def is_procL_lower(dataset):
            return HLP_F.is_proc_level_lower(dataset['proc_level'], target_lvl=procLvl)
282
283

        if prevLvl_objects is None:
Daniel Scheffler's avatar
Daniel Scheffler committed
284
            return [dataset for dataset in self.config.data_list if is_procL_lower(dataset)]  # TODO generator?
285
        else:
Daniel Scheffler's avatar
Daniel Scheffler committed
286
            return [dataset for dataset in self.config.data_list if is_procL_lower(dataset) and
287
                    not self._is_inMEM(prevLvl_objects + self.failed_objects, dataset)]
288
289
290
291
292
293
294
295
296
297
298
299
300

    def get_DB_objects(self, procLvl, prevLvl_objects=None, parallLev=None, blocksize=None):
        """
        Returns a list of GMS objects for datasets available on disk that have to be processed by the current processor.

        :param procLvl:         <str> processing level oof the current processor
        :param prevLvl_objects: <list> of in-mem GMS objects produced by the previous processor
        :param parallLev:       <str> parallelization level ('scenes' or 'tiles')
                                -> defines if full cubes or blocks are to be returned
        :param blocksize:       <tuple> block size in case blocks are to be returned, e.g. (2000,2000)
        :return:
        """
        # TODO get prevLvl_objects automatically from self
301
        if procLvl == 'L1A':
302
303
304
            return []
        else:
            # handle input parameters
Daniel Scheffler's avatar
Daniel Scheffler committed
305
306
            parallLev = parallLev or self.config.parallelization_level
            blocksize = blocksize or self.config.tiling_block_size_XY
307
            prevLvl = proc_chain[proc_chain.index(procLvl) - 1]  # TODO replace by enum
308
309

            # get GMSfile list
310
            dataset_dicts = self._get_processor_data_list(procLvl, prevLvl_objects)
311
312
313
314
315
            GMSfile_list_prevLvl_inDB = INP_R.get_list_GMSfiles(dataset_dicts, prevLvl)

            # create GMS objects from disk with respect to parallelization level and block size
            if parallLev == 'scenes':
                # get input parameters for creating GMS objects as full cubes
316
                work = [[GMS, ['cube', None]] for GMS in GMSfile_list_prevLvl_inDB]
317
318
            else:
                # define tile positions and size
319
                def get_tilepos_list(GMSfile):
320
321
                    return get_array_tilebounds(array_shape=INP_R.GMSfile2dict(GMSfile)['shape_fullArr'],
                                                tile_shape=blocksize)
322
323
324

                # get input parameters for creating GMS objects as blocks
                work = [[GMSfile, ['block', tp]] for GMSfile in GMSfile_list_prevLvl_inDB
325
                        for tp in get_tilepos_list(GMSfile)]
326

327
328
329
330
            # create GMS objects for the found files on disk
            # NOTE: DON'T multiprocess that with MAP(GMS_object(*initargs).from_disk, work)
            # in case of multiple subsystems GMS_object(*initargs) would always point to the same object in memory
            # -> subsystem attribute will be overwritten each time
331
            def init_GMS_obj(): return HLP_F.parentObjDict[prevLvl](*HLP_F.initArgsDict[prevLvl])
332
333
            DB_objs = [init_GMS_obj().from_disk(tuple_GMS_subset=w) for w in work]  # init

334
335
336
337
338
            if DB_objs:
                DB_objs = list(chain.from_iterable(DB_objs)) if list in [type(i) for i in DB_objs] else list(DB_objs)

            return DB_objs

Daniel Scheffler's avatar
Daniel Scheffler committed
339
    def run_all_processors_OLD(self, custom_data_list=None):
340
341
342
        """
        Run all processors at once.
        """
343

344
        signal.signal(signal.SIGINT, self.stop)  # enable clean shutdown possibility
345

346
        # noinspection PyBroadException
347
        try:
348
            if self.config.profiling:
349
350
351
352
                from pyinstrument import Profiler
                self.profiler = Profiler()  # or Profiler(use_signal=False), see below
                self.profiler.start()

353
            self.logger.info('Execution of entire GeoMultiSens pre-processing chain started for job ID %s...'
354
                             % self.config.ID)
355
            self.DB_job_record.reset_job_progress()  # updates attributes of DB_job_record and related DB entry
356
            self.config.status = 'running'
357
            self.update_DB_job_record()  # TODO implement that into job.status.setter
358
359
360
361
362

            self.failed_objects = []

            # get list of datasets to be processed
            if custom_data_list:
363
                self.config.data_list = custom_data_list
364
365

            # add local availability
Daniel Scheffler's avatar
Daniel Scheffler committed
366
            self.config.data_list = self.add_local_availability(self.config.data_list)
367
            self.update_DB_job_statistics(self.config.data_list)
368
369
370
371
372
373
374
375
376
377
378
379

            self.L1A_processing()
            self.L1B_processing()
            self.L1C_processing()
            self.L2A_processing()
            self.L2B_processing()
            self.L2C_processing()

            # create summary
            self.create_job_summary()

            self.logger.info('Execution finished.')
380
            # TODO implement failed_with_warnings:
381
382
383
384
            self.config.status = 'finished' if not self.failed_objects else 'finished_with_errors'
            self.config.end_time = datetime.datetime.now()
            self.config.computation_time = self.config.end_time - self.config.start_time
            self.logger.info('Time for execution: %s' % self.config.computation_time)
385
386
387
388

            # update database entry of current job
            self.update_DB_job_record()

389
            if self.config.profiling:
390
391
392
393
394
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

            shutdown_loggers()

395
        except Exception:  # noqa E722  # bare except
396
            if self.config.profiling:
397
398
399
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

400
            self.config.status = 'failed'
Daniel Scheffler's avatar
Daniel Scheffler committed
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
            self.update_DB_job_record()

            if not self.config.disable_exception_handler:
                self.logger.error('Execution failed with an error:', exc_info=True)
                shutdown_loggers()
            else:
                self.logger.error('Execution failed with an error:')
                shutdown_loggers()
                raise

    def run_all_processors(self, custom_data_list=None):
        signal.signal(signal.SIGINT, self.stop)  # enable clean shutdown possibility

        # noinspection PyBroadException
        try:
            if self.config.profiling:
                from pyinstrument import Profiler
                self.profiler = Profiler()  # or Profiler(use_signal=False), see below
                self.profiler.start()

            self.logger.info('Execution of entire GeoMultiSens pre-processing chain started for job ID %s...'
                             % self.config.ID)
            self.DB_job_record.reset_job_progress()  # updates attributes of DB_job_record and related DB entry
            self.config.status = 'running'
425
            GMS_object.proc_status_all_GMSobjs.clear()  # reset
Daniel Scheffler's avatar
Daniel Scheffler committed
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
            self.update_DB_job_record()  # TODO implement that into config.status.setter

            self.failed_objects = []

            # get list of datasets to be processed
            if custom_data_list:
                self.config.data_list = custom_data_list

            # add local availability
            self.config.data_list = self.add_local_availability(self.config.data_list)
            self.update_DB_job_statistics(self.config.data_list)

            # group dataset dicts by sceneid
            dataset_groups = HLP_F.group_dicts_by_key(self.config.data_list, key='scene_ID')

            from .pipeline import run_complete_preprocessing
442
            GMS_objs = imap_unordered(run_complete_preprocessing, dataset_groups)
Daniel Scheffler's avatar
Daniel Scheffler committed
443
444

            # separate results into successful and failed objects
445
446
447
448
449
450
451
452
453
            def assign_attr(tgt_procL):
                return [obj for obj in GMS_objs if isinstance(obj, GMS_object) and obj.proc_level == tgt_procL]

            self.L1A_newObjects = assign_attr('L1A')
            self.L1B_newObjects = assign_attr('L1B')
            self.L1C_newObjects = assign_attr('L1C')
            self.L2A_newObjects = assign_attr('L2A')
            self.L2B_newObjects = assign_attr('L2B')
            self.L2C_newObjects = assign_attr('L2C')
Daniel Scheffler's avatar
Daniel Scheffler committed
454
455
456
457
458
459
            self.failed_objects = [obj for obj in GMS_objs if isinstance(obj, failed_GMS_object)]

            # create summary
            self.create_job_summary()

            self.logger.info('Execution finished.')
460
            # TODO implement failed_with_warnings
Daniel Scheffler's avatar
Daniel Scheffler committed
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
            self.config.status = 'finished' if not self.failed_objects else 'finished_with_errors'
            self.config.end_time = datetime.datetime.now()
            self.config.computation_time = self.config.end_time - self.config.start_time
            self.logger.info('Time for execution: %s' % self.config.computation_time)

            # update database entry of current job
            self.update_DB_job_record()

            if self.config.profiling:
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

            shutdown_loggers()

        except Exception:  # noqa E722  # bare except
            if self.config.profiling:
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

            self.config.status = 'failed'
481
482
            self.update_DB_job_record()

483
            if not self.config.disable_exception_handler:
484
                self.logger.error('Execution failed with an error:', exc_info=True)
485
486
                shutdown_loggers()
            else:
487
                self.logger.error('Execution failed with an error:')
488
489
                shutdown_loggers()
                raise
490

491
492
    def stop(self, signum, frame):
        """Interrupt the running process controller gracefully."""
493

494
        self.config.status = 'canceled'
495
496
        self.update_DB_job_record()

497
        self.logger.warning('Process controller stopped by user.')
498
499
        del self.logger
        shutdown_loggers()
500

501
        raise KeyboardInterrupt  # terminate execution and show traceback
502

503
504
505
506
    def benchmark(self):
        """
        Run a benchmark.
        """
507
        data_list_bench = self.config.data_list
508
509
510
511
        for count_datasets in range(len(data_list_bench)):
            t_processing_all_runs, t_IO_all_runs = [], []
            for count_run in range(10):
                current_data_list = data_list_bench[0:count_datasets + 1]
512
513
                if os.path.exists(self.config.path_database):
                    os.remove(self.config.path_database)
514
515
516
517
518
519
520
521
522
523
                t_start = time.time()
                self.run_all_processors(current_data_list)
                t_processing_all_runs.append(time.time() - t_start)
                t_IO_all_runs.append(globals()['time_IO'])

            assert current_data_list, 'Empty data list.'
            OUT_W.write_global_benchmark_output(t_processing_all_runs, t_IO_all_runs, current_data_list)

    def L1A_processing(self):
        """
524
        Run Level 1A processing: Data import and metadata homogenization
525
        """
526
        if self.config.exec_L1AP[0]:
527
            self.logger.info('\n\n##### Level 1A Processing started - raster format and metadata homogenization ####\n')
528

529
530
            datalist_L1A_P = self._get_processor_data_list('L1A')

531
            if self.config.parallelization_level == 'scenes':
532
                # map
533
                L1A_resObjects = MAP(L1A_map, datalist_L1A_P, CPUs=12)
534
            else:  # tiles
535
536
                all_L1A_tiles_map1 = MAP(L1A_map_1, datalist_L1A_P,
                                         flatten_output=True)  # map_1 # merge results to new list of splits
537

538
539
540
                L1A_obj_tiles = MAP(L1A_map_2, all_L1A_tiles_map1)  # map_2
                grouped_L1A_Tiles = HLP_F.group_objects_by_attributes(
                    L1A_obj_tiles, 'scene_ID', 'subsystem')  # group results
541

542
                L1A_objects = MAP(L1A_P.L1A_object().from_tiles, grouped_L1A_Tiles)  # reduce
543

544
                L1A_resObjects = MAP(L1A_map_3, L1A_objects)  # map_3
545

546
            self.L1A_newObjects = [obj for obj in L1A_resObjects if isinstance(obj, L1A_P.L1A_object)]
547
            self.failed_objects += [obj for obj in L1A_resObjects if isinstance(obj, failed_GMS_object) and
548
549
550
551
552
553
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L1A_newObjects

    def L1B_processing(self):
        """
554
        Run Level 1B processing: calculation of geometric shifts
555
556
557
558
        """
        # TODO implement check for running spatial index mediator server
        # run on full cubes

559
        if self.config.exec_L1BP[0]:
560
            self.logger.info('\n\n####### Level 1B Processing started - detection of geometric displacements #######\n')
561

562
563
            L1A_DBObjects = self.get_DB_objects('L1B', self.L1A_newObjects, parallLev='scenes')
            L1A_Instances = self.L1A_newObjects + L1A_DBObjects  # combine newly and earlier processed L1A data
564

565
            L1B_resObjects = MAP(L1B_map, L1A_Instances)
566

567
568
569
            self.L1B_newObjects = [obj for obj in L1B_resObjects if isinstance(obj, L1B_P.L1B_object)]
            self.failed_objects += [obj for obj in L1B_resObjects if isinstance(obj, failed_GMS_object) and
                                    obj.scene_ID not in self.sceneids_failed]
570
571
572
573
574

        return self.L1B_newObjects

    def L1C_processing(self):
        """
575
        Run Level 1C processing: atmospheric correction
576
        """
577
        if self.config.exec_L1CP[0]:
578
            self.logger.info('\n\n############## Level 1C Processing started - atmospheric correction ##############\n')
579

580
            if self.config.parallelization_level == 'scenes':
581
582
583
584
                L1B_DBObjects = self.get_DB_objects('L1C', self.L1B_newObjects)
                L1B_Instances = self.L1B_newObjects + L1B_DBObjects  # combine newly and earlier processed L1B data

                # group by scene ID (all subsystems belonging to the same scene ID must be processed together)
585
                grouped_L1B_Instances = HLP_F.group_objects_by_attributes(L1B_Instances, 'scene_ID')
586

587
588
                L1C_resObjects = MAP(L1C_map, grouped_L1B_Instances, flatten_output=True,
                                     CPUs=15)  # FIXME CPUs set to 15 for testing
589

590
            else:  # tiles
591
592
                raise NotImplementedError("Tiled processing is not yet completely implemented for L1C processor. Use "
                                          "parallelization level 'scenes' instead!")
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
                # blocksize = (5000, 5000)
                # """if newly processed L1A objects are present: cut them into tiles"""
                # L1B_newTiles = []
                # if self.L1B_newObjects:
                #     tuples_obj_blocksize = [(obj, blocksize) for obj in self.L1B_newObjects]
                #     L1B_newTiles = MAP(HLP_F.cut_GMS_obj_into_blocks, tuples_obj_blocksize, flatten_output=True)
                #
                # """combine newly and earlier processed L1B data"""
                # L1B_newDBTiles = self.get_DB_objects('L1C', self.L1B_newObjects, blocksize=blocksize)
                # L1B_tiles = L1B_newTiles + L1B_newDBTiles
                #
                # # TODO merge subsets of S2/Aster in order to provide all bands for atm.correction
                # L1C_tiles = MAP(L1C_map, L1B_tiles)
                # grouped_L1C_Tiles = \
                #     HLP_F.group_objects_by_attributes(L1C_tiles, 'scene_ID', 'subsystem')  # group results
                # [L1C_tiles_group[0].delete_tempFiles() for L1C_tiles_group in grouped_L1C_Tiles]
                # L1C_resObjects = MAP(L1C_P.L1C_object().from_tiles, grouped_L1C_Tiles)  # reduce

            self.L1C_newObjects = [obj for obj in L1C_resObjects if isinstance(obj, L1C_P.L1C_object)]
612
            self.failed_objects += [obj for obj in L1C_resObjects if isinstance(obj, failed_GMS_object) and
613
614
615
616
617
618
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L1C_newObjects

    def L2A_processing(self):
        """
619
        Run Level 2A processing: geometric homogenization
620
        """
621
        if self.config.exec_L2AP[0]:
622
623
            self.logger.info(
                '\n\n#### Level 2A Processing started - shift correction / geometric homogenization ####\n')
624

625
            """combine newly and earlier processed L1C data"""
626
627
            L1C_DBObjects = self.get_DB_objects('L2A', self.L1C_newObjects, parallLev='scenes')
            L1C_Instances = self.L1C_newObjects + L1C_DBObjects  # combine newly and earlier processed L1C data
628
629
630
631

            # group by scene ID (all subsystems belonging to the same scene ID must be processed together)
            grouped_L1C_Instances = HLP_F.group_objects_by_attributes(L1C_Instances, 'scene_ID')

632
            L2A_resTiles = MAP(L2A_map, grouped_L1C_Instances, flatten_output=True)
633

634
            self.L2A_tiles = [obj for obj in L2A_resTiles if isinstance(obj, L2A_P.L2A_object)]
635
            self.failed_objects += [obj for obj in L2A_resTiles if isinstance(obj, failed_GMS_object) and
636
637
638
639
640
641
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2A_tiles

    def L2B_processing(self):
        """
642
        Run Level 2B processing: spectral homogenization
643
        """
644
        if self.config.exec_L2BP[0]:
645
            self.logger.info('\n\n############# Level 2B Processing started - spectral homogenization ##############\n')
646

647
            if self.config.parallelization_level == 'scenes':
648
                # don't know if scenes makes sense in L2B processing because full objects are very big!
649
                """if newly processed L2A objects are present: merge them to scenes"""
650
651
                grouped_L2A_Tiles = HLP_F.group_objects_by_attributes(self.L2A_tiles, 'scene_ID')  # group results
                # reduce # will be too slow because it has to pickle back really large L2A_newObjects
652
                # L2A_newObjects  = MAP(HLP_F.merge_GMS_tiles_to_GMS_obj, grouped_L2A_Tiles)
653
                L2A_newObjects = [L2A_P.L2A_object().from_tiles(tileList) for tileList in grouped_L2A_Tiles]
654

655
                """combine newly and earlier processed L2A data"""
656
657
                L2A_DBObjects = self.get_DB_objects('L2B', self.L2A_tiles)
                L2A_Instances = L2A_newObjects + L2A_DBObjects  # combine newly and earlier processed L2A data
658

659
                L2B_resObjects = MAP(L2B_map, L2A_Instances)
660
661

            else:  # tiles
662
                L2A_newTiles = self.L2A_tiles  # tiles have the block size specified in L2A_map_2
663
664

                """combine newly and earlier processed L2A data"""
665
666
667
                blocksize = (2048, 2048)  # must be equal to the blocksize of L2A_newTiles specified in L2A_map_2
                L2A_newDBTiles = self.get_DB_objects('L2B', self.L2A_tiles, blocksize=blocksize)
                L2A_tiles = L2A_newTiles + L2A_newDBTiles
668

669
                L2B_tiles = MAP(L2B_map, L2A_tiles)
670
671

                grouped_L2B_Tiles = \
672
673
                    HLP_F.group_objects_by_attributes(L2B_tiles,
                                                      'scene_ID')  # group results # FIXME nötig an dieser Stelle?
674
675
                [L2B_tiles_group[0].delete_tempFiles() for L2B_tiles_group in grouped_L2B_Tiles]

676
                L2B_resObjects = [L2B_P.L2B_object().from_tiles(tileList) for tileList in grouped_L2B_Tiles]
677

678
            self.L2B_newObjects = [obj for obj in L2B_resObjects if isinstance(obj, L2B_P.L2B_object)]
679
            self.failed_objects += [obj for obj in L2B_resObjects if isinstance(obj, failed_GMS_object) and
680
681
682
683
684
685
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2B_newObjects

    def L2C_processing(self):
        """
686
        Run Level 2C processing: accurracy assessment and MGRS tiling
687
        """
688
        # FIXME only parallelization_level == 'scenes' implemented
689
        if self.config.exec_L2CP[0]:
690
            self.logger.info('\n\n########## Level 2C Processing started - calculation of quality layers ###########\n')
691

692
            """combine newly and earlier processed L2A data"""
693
694
            L2B_DBObjects = self.get_DB_objects('L2C', self.L2B_newObjects, parallLev='scenes')
            L2B_Instances = self.L2B_newObjects + L2B_DBObjects  # combine newly and earlier processed L2A data
695

696
            L2C_resObjects = MAP(L2C_map, L2B_Instances, CPUs=8)  # FIXME 8 workers due to heavy IO
697
            # FIXME in case of inmem_serialization mode results are too big to be back-pickled
698
            self.L2C_newObjects = [obj for obj in L2C_resObjects if isinstance(obj, L2C_P.L2C_object)]
699
            self.failed_objects += [obj for obj in L2C_resObjects if isinstance(obj, failed_GMS_object) and
700
701
702
703
704
705
706
707
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2C_newObjects

    def update_DB_job_record(self):
        """
        Update the database records of the current job (table 'jobs').
        """
708
        # TODO move this method to config.Job
709
710
        # update 'failed_sceneids' column of job record within jobs table
        sceneids_failed = list(set([obj.scene_ID for obj in self.failed_objects]))
711
        DB_T.update_records_in_postgreSQLdb(
712
            self.config.conn_database, 'jobs',
713
            {'failed_sceneids': sceneids_failed,  # update 'failed_sceneids' column
714
715
716
             'finishtime': self.config.end_time,  # add job finish timestamp
             'status': self.config.status},  # update 'job_status' column
            {'id': self.config.ID})
717

718
719
720
721
722
    def update_DB_job_statistics(self, usecase_datalist):
        """
        Update job statistics of the running job in the database.
        """
        # TODO move this method to config.Job
723
        already_updated_IDs = []
724
        for ds in usecase_datalist:
725
726
            if ds['proc_level'] is not None and ds['scene_ID'] not in already_updated_IDs:
                # update statistics column of jobs table
727
                DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
728
                    self.config.conn_database, 'jobs', 'statistics', cond_dict={'id': self.config.ID},
729
                    idx_val2decrement=db_jobs_statistics_def['pending'],
730
731
                    idx_val2increment=db_jobs_statistics_def[ds['proc_level']])

732
733
734
                # avoid double updating in case of subsystems belonging to the same scene ID
                already_updated_IDs.append(ds['scene_ID'])

735
736
737
738
    def create_job_summary(self):
        """
        Create job success summary
        """
739
740
741

        # get objects with highest requested processing level
        highest_procL_Objs = []
742
        for pL in reversed(proc_chain):
743
            if getattr(self.config, 'exec_%sP' % pL)[0]:
744
                highest_procL_Objs = getattr(self, '%s_newObjects' % pL) if pL != 'L2A' else self.L2A_tiles
745
746
                break

747
748
749
750
        gms_objects2summarize = highest_procL_Objs + self.failed_objects
        if gms_objects2summarize:
            # create summaries
            detailed_JS, quick_JS = get_job_summary(gms_objects2summarize)
751
752
753
            detailed_JS.to_excel(os.path.join(self.config.path_job_logs, '%s_summary.xlsx' % self.config.ID))
            detailed_JS.to_csv(os.path.join(self.config.path_job_logs, '%s_summary.csv' % self.config.ID), sep='\t')
            self.logger.info('\nQUICK JOB SUMMARY (ID %s):\n' % self.config.ID + quick_JS.to_string())
754
755

            self.summary_detailed = detailed_JS
756
            self.summary_quick = quick_JS
757
758
759
760
761

        else:
            # TODO implement check if proc level with lowest procL has to be processed at all (due to job.exec_L1X)
            # TODO otherwise it is possible that get_job_summary receives an empty list
            self.logger.warning("Job summary skipped because get_job_summary() received an empty list of GMS objects.")
762
763
764
765
766
767

    def clear_lists_procObj(self):
        self.failed_objects = []
        self.L1A_newObjects = []
        self.L1B_newObjects = []
        self.L1C_newObjects = []
768
        self.L2A_tiles = []
769
        self.L2B_newObjects = []
770
771
772
773
774
        self.L2C_newObjects = []


def get_job_summary(list_GMS_objects):
    # get detailed job summary
775
776
    DJS_cols = ['GMS_object', 'scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem', 'image_type', 'proc_level',
                'arr_shape', 'arr_pos', 'failedMapper', 'ExceptionType', 'ExceptionValue', 'ExceptionTraceback']
777
778
779
780
    DJS = DataFrame(columns=DJS_cols)
    DJS['GMS_object'] = list_GMS_objects

    for col in DJS_cols[1:]:
781
782
        def get_val(obj): return getattr(obj, col) if hasattr(obj, col) else None
        DJS[col] = list(DJS['GMS_object'].map(get_val))
783
784

    del DJS['GMS_object']
785
    DJS = DJS.sort_values(by=['satellite', 'sensor', 'entity_ID'])
786
787

    # get quick job summary
788
789
790
791
    QJS = DataFrame(columns=['satellite', 'sensor', 'count', 'proc_successfully', 'proc_failed'])
    all_sat, all_sen = zip(*[i.split('__') for i in (np.unique(DJS['satellite'] + '__' + DJS['sensor']))])
    QJS['satellite'] = all_sat
    QJS['sensor'] = all_sen
792
    # count objects with the same satellite/sensor/sceneid combination
793
794
    QJS['count'] = [len(DJS[(DJS['satellite'] == sat) & (DJS['sensor'] == sen)]['scene_ID'].unique())
                    for sat, sen in zip(all_sat, all_sen)]
795
    QJS['proc_successfully'] = [len(DJS[(DJS['satellite'] == sat) &
796
797
                                        (DJS['sensor'] == sen) &
                                        (DJS['failedMapper'].isnull())]['scene_ID'].unique())
798
                                for sat, sen in zip(all_sat, all_sen)]
799
    QJS['proc_failed'] = QJS['count'] - QJS['proc_successfully']
800
801
    QJS = QJS.sort_values(by=['satellite', 'sensor'])
    return DJS, QJS