process_controller.py 36.7 KB
Newer Older
1
2
# -*- coding: utf-8 -*-

3
from __future__ import (division, print_function, unicode_literals, absolute_import)
4
5
6

import numpy as np
from pandas import DataFrame
7
8
9
10
import datetime
import os
import time
from itertools import chain
11
import signal
12
import re
Daniel Scheffler's avatar
Daniel Scheffler committed
13
from typing import TYPE_CHECKING
14

15
16
from ..io import output_writer as OUT_W
from ..io import input_reader as INP_R
17
18
19
20
21
from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc.path_generator import path_generator
from ..misc.logging import GMS_logger, shutdown_loggers
from ..algorithms import L1A_P, L1B_P, L1C_P, L2A_P, L2B_P, L2C_P
22
from ..model.metadata import get_LayerBandsAssignment
23
24
from ..model.gms_object import failed_GMS_object
from .pipeline import (L1A_map, L1A_map_1, L1A_map_2, L1A_map_3, L1B_map, L1C_map,
25
                       L2A_map, L2B_map, L2C_map)
26
from ..options.config import set_config, GMS_config
27
from .multiproc import MAP
28
from ..misc.definition_dicts import proc_chain, db_jobs_statistics_def
29

30
31
from py_tools_ds.numeric.array import get_array_tilebounds

32
if TYPE_CHECKING:
Daniel Scheffler's avatar
Daniel Scheffler committed
33
    from collections import OrderedDict  # noqa F401  # flake8 issue
34
35
36
37


__author__ = 'Daniel Scheffler'

38
39

class process_controller(object):
40
    def __init__(self, job_ID, **config_kwargs):
41
        """gms_preprocessing process controller
42

43
44
        :param job_ID:          job ID belonging to a valid database record within table 'jobs'
        :param config_kwargs:   keyword arguments to be passed to gms_preprocessing.set_config()
45
46
47
        """

        # assertions
48
49
        if not isinstance(job_ID, int):
            raise ValueError("'job_ID' must be an integer value. Got %s." % type(job_ID))
50

51
52
53
54
55
56
        # set GMS configuration
        config_kwargs.update(dict(reset_status=True))
        set_config(job_ID, **config_kwargs)
        self.config = GMS_config  # type: GMS_config

        # defaults
57
        self._logger = None
58
        self._DB_job_record = None
59
        self.profiler = None
60
61
62
63
64

        self.failed_objects = []
        self.L1A_newObjects = []
        self.L1B_newObjects = []
        self.L1C_newObjects = []
65
        self.L2A_tiles = []
66
67
68
69
        self.L2B_newObjects = []
        self.L2C_newObjects = []

        self.summary_detailed = None
70
        self.summary_quick = None
71

72
73
        # check if process_controller is executed by debugger
        # isdebugging = 1 if True in [frame[1].endswith("pydevd.py") for frame in inspect.stack()] else False
74
        # if isdebugging:  # override the existing settings in order to get write access everywhere
75
76
        #    pass

77
        # called_from_iPyNb = 1 if 'ipykernel/__main__.py' in sys.argv[0] else 0
78

79
        self.logger.info('Process Controller initialized for job ID %s (comment: %s).'
80
                         % (self.config.ID, self.DB_job_record.comment))
81

82
        if self.config.delete_old_output:
83
84
            self.logger.info('Deleting previously processed data...')
            self.DB_job_record.delete_procdata_of_entire_job(force=True)
85

86
87
88
89
90
    @property
    def logger(self):
        if self._logger and self._logger.handlers[:]:
            return self._logger
        else:
91
92
93
            self._logger = GMS_logger('log__%s' % self.config.ID,
                                      path_logfile=os.path.join(self.config.path_job_logs, '%s.log' % self.config.ID),
                                      log_level=self.config.log_level, append=False)
94
95
96
97
98
99
100
101
102
103
104
105
106
            return self._logger

    @logger.setter
    def logger(self, logger):
        self._logger = logger

    @logger.deleter
    def logger(self):
        if self._logger not in [None, 'not set']:
            self.logger.close()
            self.logger = None

    @property
107
108
109
110
    def DB_job_record(self):
        if self._DB_job_record:
            return self._DB_job_record
        else:
111
112
            self._DB_job_record = DB_T.GMS_JOB(self.config.conn_database)
            self._DB_job_record.from_job_ID(self.config.ID)
113
            return self._DB_job_record
114

115
116
117
    @DB_job_record.setter
    def DB_job_record(self, value):
        self._DB_job_record = value
118

119
120
121
    @property
    def sceneids_failed(self):
        return [obj.scene_ID for obj in self.failed_objects]
122

Daniel Scheffler's avatar
Daniel Scheffler committed
123
    def add_local_availability(self, datasets):
124
        # TODO revise this function
Daniel Scheffler's avatar
Daniel Scheffler committed
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
        if isinstance(datasets, (list, tuple)):
            datasets = MAP(self.add_local_availability, datasets)

            # check availability of all subsets per scene an proc_level
            datasets_validated = []
            datasets_grouped = HLP_F.group_dicts_by_key(datasets, key='scene_ID')

            for ds_group in datasets_grouped:
                proc_lvls = [ds['proc_level'] for ds in ds_group]

                if not len(list(set(proc_lvls))) == 1:
                    # reset processing level of those scenes where not all subsystems are available
                    self.logger.info('%s: Found already processed subsystems %s. Remaining subsystems are missing. '
                                     'Therefore, the dataset has to be reprocessed.'
                                     % (ds_group[0]['entity_ID'], proc_lvls))

                    for ds in ds_group:
                        ds['proc_level'] = None
                        datasets_validated += ds
                else:
                    datasets_validated.extend(ds_group)

            return datasets_validated

        else:
            dataset = datasets
151
152

        # query the database and get the last written processing level and LayerBandsAssignment
153
        DB_match = DB_T.get_info_from_postgreSQLdb(
154
            GMS_config.conn_database, 'scenes_proc', ['proc_level', 'layer_bands_assignment'],
155
            dict(sceneid=dataset['scene_ID']))
156
157

        # get the corresponding logfile
158
159
        path_logfile = path_generator(
            dataset).get_path_logfile()  # FIXME this always returns the logfile for the subsystem.
160

161
162
        # FIXME -> merged logfiles (L2A+) are ignored
        # FIXME -> for subsystems the highest start procL is L2A
163

164
        def get_AllWrittenProcL_dueLog(path_log):  # TODO replace this by database query + os.path.exists
165
166
            """Returns all processing level that have been successfully written according to logfile."""

167
            if not os.path.exists(path_log):
Daniel Scheffler's avatar
Daniel Scheffler committed
168
169
                self.logger.info("No logfile named '%s' found for %s at %s. Dataset has to be reprocessed."
                                 % (os.path.basename(path_log), dataset['entity_ID'], os.path.dirname(path_log)))
170
171
172
173
174
                AllWrittenProcL_dueLog = []
            else:
                logfile = open(path_log, 'r').read()
                AllWrittenProcL_dueLog = re.findall(":*(\S*\s*) data successfully saved.", logfile, re.I)
                if not AllWrittenProcL_dueLog:  # AllWrittenProcL_dueLog = []
Daniel Scheffler's avatar
Daniel Scheffler committed
175
176
                    self.logger.info('%s: According to logfile no completely processed data exist at any processing '
                                     'level. Dataset has to be reprocessed.' % dataset['entity_ID'])
177
178
179
180
                else:
                    AllWrittenProcL_dueLog = HLP_F.sorted_nicely(list(set(AllWrittenProcL_dueLog)))
            return AllWrittenProcL_dueLog

181
        # check if there are not multiple database records for this dataset
182
        if len(DB_match) == 1 or DB_match == [] or DB_match == 'database connection fault':
183
184

            # get all processing level that have been successfully written
185
186
            AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile)
            dataset['proc_level'] = None  # default (dataset has to be reprocessed)
187
188

            # loop through all the found proc. levels and find the one that fulfills all requirements
189
            for ProcL in reversed(AllWrittenProcL):
190
191
                if dataset['proc_level']:
                    break  # proc_level found; no further searching for lower proc_levels
192
                assumed_path_GMS_file = '%s_%s.gms' % (os.path.splitext(path_logfile)[0], ProcL)
193
194

                # check if there is also a corresponding GMS_file on disk
195
196
                if os.path.isfile(assumed_path_GMS_file):
                    GMS_file_dict = INP_R.GMSfile2dict(assumed_path_GMS_file)
197
                    target_LayerBandsAssignment = \
198
                        get_LayerBandsAssignment(dict(
199
200
201
202
203
                            image_type=dataset['image_type'],
                            Satellite=dataset['satellite'],
                            Sensor=dataset['sensor'],
                            Subsystem=dataset['subsystem'],
                            proc_level=ProcL,  # must be respected because LBA changes after atm. Corr.
204
                            dataset_ID=dataset['dataset_ID'],
205
206
207
208
                            logger=None), nBands=(1 if dataset['sensormode'] == 'P' else None))

                    # check if the LayerBandsAssignment of the written dataset on disk equals the
                    # desired LayerBandsAssignment
209
                    if target_LayerBandsAssignment == GMS_file_dict['LayerBandsAssignment']:
210
211

                        # update the database record if the dataset could not be found in database
212
                        if DB_match == [] or DB_match == 'database connection fault':
Daniel Scheffler's avatar
Daniel Scheffler committed
213
214
215
                            self.logger.info('The dataset %s is not included in the database of processed data but'
                                             ' according to logfile %s has been written successfully. Recreating '
                                             'missing database entry.' % (dataset['entity_ID'], ProcL))
216
                            DB_T.data_DB_updater(GMS_file_dict)
217

218
                            dataset['proc_level'] = ProcL
219
220

                        # if the dataset could be found in database
221
222
                        elif len(DB_match) == 1:
                            try:
Daniel Scheffler's avatar
Daniel Scheffler committed
223
224
225
                                self.logger.info('Found a matching %s dataset for %s. Processing skipped until %s.'
                                                 % (ProcL, dataset['entity_ID'],
                                                    proc_chain[proc_chain.index(ProcL) + 1]))
226
                            except IndexError:
Daniel Scheffler's avatar
Daniel Scheffler committed
227
228
                                self.logger.info('Found a matching %s dataset for %s. Processing already done.'
                                                 % (ProcL, dataset['entity_ID']))
229

230
231
232
233
                            if DB_match[0][0] == ProcL:
                                dataset['proc_level'] = DB_match[0][0]
                            else:
                                dataset['proc_level'] = ProcL
234

235
                    else:
Daniel Scheffler's avatar
Daniel Scheffler committed
236
237
                        self.logger.info('Found a matching dataset for %s but with a different LayerBandsAssignment. '
                                         'Dataset has to be reprocessed.' % dataset['entity_ID'])
238
                else:
Daniel Scheffler's avatar
Daniel Scheffler committed
239
240
241
242
                    self.logger.info('%s for dataset %s has been written due to logfile but no corresponding dataset '
                                     'has been found.' % (ProcL, dataset['entity_ID']) +
                                     ' Searching for lower processing level...'
                                     if AllWrittenProcL.index(ProcL) != 0 else '')
243

244
        elif len(DB_match) > 1:
Daniel Scheffler's avatar
Daniel Scheffler committed
245
246
            self.logger.info('According to database there are multiple matches for the dataset %s. Dataset has to be '
                             'reprocessed.' % dataset['entity_ID'])
247
            dataset['proc_level'] = None
248

249
250
251
252
253
        else:
            dataset['proc_level'] = None

        return dataset

254
255
    @staticmethod
    def _is_inMEM(GMS_objects, dataset):
Daniel Scheffler's avatar
Daniel Scheffler committed
256
        # type: (list, OrderedDict) -> bool
257
258
259
260
        """Checks whether a dataset within a dataset list has been processed in the previous processing level.
        :param GMS_objects: <list> a list of GMS objects that has been recently processed
        :param dataset:     <collections.OrderedDict> as generated by L0A_P.get_data_list_of_current_jobID()
        """
261
        # check if the scene ID of the given dataset is in the scene IDs of the previously processed datasets
262
263
264
        return dataset['scene_ID'] in [obj.scene_ID for obj in GMS_objects]

    def _get_processor_data_list(self, procLvl, prevLvl_objects=None):
265
        """Returns a list of datasets that have to be read from disk and then processed by a specific processor.
266
267
268
269
270

        :param procLvl:
        :param prevLvl_objects:
        :return:
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
271
272
        def is_procL_lower(dataset):
            return HLP_F.is_proc_level_lower(dataset['proc_level'], target_lvl=procLvl)
273
274

        if prevLvl_objects is None:
Daniel Scheffler's avatar
Daniel Scheffler committed
275
            return [dataset for dataset in self.config.data_list if is_procL_lower(dataset)]  # TODO generator?
276
        else:
Daniel Scheffler's avatar
Daniel Scheffler committed
277
            return [dataset for dataset in self.config.data_list if is_procL_lower(dataset) and
278
                    not self._is_inMEM(prevLvl_objects + self.failed_objects, dataset)]
279
280
281
282
283
284
285
286
287
288
289
290
291

    def get_DB_objects(self, procLvl, prevLvl_objects=None, parallLev=None, blocksize=None):
        """
        Returns a list of GMS objects for datasets available on disk that have to be processed by the current processor.

        :param procLvl:         <str> processing level oof the current processor
        :param prevLvl_objects: <list> of in-mem GMS objects produced by the previous processor
        :param parallLev:       <str> parallelization level ('scenes' or 'tiles')
                                -> defines if full cubes or blocks are to be returned
        :param blocksize:       <tuple> block size in case blocks are to be returned, e.g. (2000,2000)
        :return:
        """
        # TODO get prevLvl_objects automatically from self
292
        if procLvl == 'L1A':
293
294
295
            return []
        else:
            # handle input parameters
Daniel Scheffler's avatar
Daniel Scheffler committed
296
297
            parallLev = parallLev or self.config.parallelization_level
            blocksize = blocksize or self.config.tiling_block_size_XY
298
            prevLvl = proc_chain[proc_chain.index(procLvl) - 1]  # TODO replace by enum
299
300

            # get GMSfile list
301
            dataset_dicts = self._get_processor_data_list(procLvl, prevLvl_objects)
302
303
304
305
306
            GMSfile_list_prevLvl_inDB = INP_R.get_list_GMSfiles(dataset_dicts, prevLvl)

            # create GMS objects from disk with respect to parallelization level and block size
            if parallLev == 'scenes':
                # get input parameters for creating GMS objects as full cubes
307
                work = [[GMS, ['cube', None]] for GMS in GMSfile_list_prevLvl_inDB]
308
309
            else:
                # define tile positions and size
310
                def get_tilepos_list(GMSfile):
311
312
                    return get_array_tilebounds(array_shape=INP_R.GMSfile2dict(GMSfile)['shape_fullArr'],
                                                tile_shape=blocksize)
313
314
315

                # get input parameters for creating GMS objects as blocks
                work = [[GMSfile, ['block', tp]] for GMSfile in GMSfile_list_prevLvl_inDB
316
                        for tp in get_tilepos_list(GMSfile)]
317

318
319
320
321
            # create GMS objects for the found files on disk
            # NOTE: DON'T multiprocess that with MAP(GMS_object(*initargs).from_disk, work)
            # in case of multiple subsystems GMS_object(*initargs) would always point to the same object in memory
            # -> subsystem attribute will be overwritten each time
322
            def init_GMS_obj(): return HLP_F.parentObjDict[prevLvl](*HLP_F.initArgsDict[prevLvl])
323
324
            DB_objs = [init_GMS_obj().from_disk(tuple_GMS_subset=w) for w in work]  # init

325
326
327
328
329
            if DB_objs:
                DB_objs = list(chain.from_iterable(DB_objs)) if list in [type(i) for i in DB_objs] else list(DB_objs)

            return DB_objs

Daniel Scheffler's avatar
Daniel Scheffler committed
330
    def run_all_processors_OLD(self, custom_data_list=None):
331
332
333
        """
        Run all processors at once.
        """
334

335
        signal.signal(signal.SIGINT, self.stop)  # enable clean shutdown possibility
336

337
        # noinspection PyBroadException
338
        try:
339
            if self.config.profiling:
340
341
342
343
                from pyinstrument import Profiler
                self.profiler = Profiler()  # or Profiler(use_signal=False), see below
                self.profiler.start()

344
            self.logger.info('Execution of entire GeoMultiSens pre-processing chain started for job ID %s...'
345
                             % self.config.ID)
346
            self.DB_job_record.reset_job_progress()  # updates attributes of DB_job_record and related DB entry
347
            self.config.status = 'running'
348
            self.update_DB_job_record()  # TODO implement that into job.status.setter
349
350
351
352
353

            self.failed_objects = []

            # get list of datasets to be processed
            if custom_data_list:
354
                self.config.data_list = custom_data_list
355
356

            # add local availability
Daniel Scheffler's avatar
Daniel Scheffler committed
357
            self.config.data_list = self.add_local_availability(self.config.data_list)
358
            self.update_DB_job_statistics(self.config.data_list)
359
360
361
362
363
364
365
366
367
368
369
370

            self.L1A_processing()
            self.L1B_processing()
            self.L1C_processing()
            self.L2A_processing()
            self.L2B_processing()
            self.L2C_processing()

            # create summary
            self.create_job_summary()

            self.logger.info('Execution finished.')
371
            # TODO implement failed_with_warnings:
372
373
374
375
            self.config.status = 'finished' if not self.failed_objects else 'finished_with_errors'
            self.config.end_time = datetime.datetime.now()
            self.config.computation_time = self.config.end_time - self.config.start_time
            self.logger.info('Time for execution: %s' % self.config.computation_time)
376
377
378
379

            # update database entry of current job
            self.update_DB_job_record()

380
            if self.config.profiling:
381
382
383
384
385
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

            shutdown_loggers()

386
        except Exception:  # noqa E722  # bare except
387
            if self.config.profiling:
388
389
390
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

391
            self.config.status = 'failed'
Daniel Scheffler's avatar
Daniel Scheffler committed
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
            self.update_DB_job_record()

            if not self.config.disable_exception_handler:
                self.logger.error('Execution failed with an error:', exc_info=True)
                shutdown_loggers()
            else:
                self.logger.error('Execution failed with an error:')
                shutdown_loggers()
                raise

    def run_all_processors(self, custom_data_list=None):
        signal.signal(signal.SIGINT, self.stop)  # enable clean shutdown possibility

        # noinspection PyBroadException
        try:
            if self.config.profiling:
                from pyinstrument import Profiler
                self.profiler = Profiler()  # or Profiler(use_signal=False), see below
                self.profiler.start()

            self.logger.info('Execution of entire GeoMultiSens pre-processing chain started for job ID %s...'
                             % self.config.ID)
            self.DB_job_record.reset_job_progress()  # updates attributes of DB_job_record and related DB entry
            self.config.status = 'running'
            self.update_DB_job_record()  # TODO implement that into config.status.setter

            self.failed_objects = []

            # get list of datasets to be processed
            if custom_data_list:
                self.config.data_list = custom_data_list

            # add local availability
            self.config.data_list = self.add_local_availability(self.config.data_list)
            self.update_DB_job_statistics(self.config.data_list)

            # group dataset dicts by sceneid
            dataset_groups = HLP_F.group_dicts_by_key(self.config.data_list, key='scene_ID')

            from .pipeline import run_complete_preprocessing
            GMS_objs = MAP(run_complete_preprocessing, dataset_groups)

            # separate results into successful and failed objects
            self.L2C_newObjects = [obj for obj in GMS_objs if isinstance(obj, L2C_P.L2C_object)]
            self.failed_objects = [obj for obj in GMS_objs if isinstance(obj, failed_GMS_object)]

            # create summary
            self.create_job_summary()

            self.logger.info('Execution finished.')
            # TODO implement failed_with_warnings:
            self.config.status = 'finished' if not self.failed_objects else 'finished_with_errors'
            self.config.end_time = datetime.datetime.now()
            self.config.computation_time = self.config.end_time - self.config.start_time
            self.logger.info('Time for execution: %s' % self.config.computation_time)

            # update database entry of current job
            self.update_DB_job_record()

            if self.config.profiling:
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

            shutdown_loggers()

        except Exception:  # noqa E722  # bare except
            if self.config.profiling:
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

            self.config.status = 'failed'
463
464
            self.update_DB_job_record()

465
            if not self.config.disable_exception_handler:
466
                self.logger.error('Execution failed with an error:', exc_info=True)
467
468
                shutdown_loggers()
            else:
469
                self.logger.error('Execution failed with an error:')
470
471
                shutdown_loggers()
                raise
472

473
474
    def stop(self, signum, frame):
        """Interrupt the running process controller gracefully."""
475

476
        self.config.status = 'canceled'
477
478
        self.update_DB_job_record()

479
        self.logger.warning('Process controller stopped by user.')
480
481
        del self.logger
        shutdown_loggers()
482

483
        raise KeyboardInterrupt  # terminate execution and show traceback
484

485
486
487
488
    def benchmark(self):
        """
        Run a benchmark.
        """
489
        data_list_bench = self.config.data_list
490
491
492
493
        for count_datasets in range(len(data_list_bench)):
            t_processing_all_runs, t_IO_all_runs = [], []
            for count_run in range(10):
                current_data_list = data_list_bench[0:count_datasets + 1]
494
495
                if os.path.exists(self.config.path_database):
                    os.remove(self.config.path_database)
496
497
498
499
500
501
502
503
504
505
                t_start = time.time()
                self.run_all_processors(current_data_list)
                t_processing_all_runs.append(time.time() - t_start)
                t_IO_all_runs.append(globals()['time_IO'])

            assert current_data_list, 'Empty data list.'
            OUT_W.write_global_benchmark_output(t_processing_all_runs, t_IO_all_runs, current_data_list)

    def L1A_processing(self):
        """
506
        Run Level 1A processing: Data import and metadata homogenization
507
        """
508
        if self.config.exec_L1AP[0]:
509
            self.logger.info('\n\n##### Level 1A Processing started - raster format and metadata homogenization ####\n')
510

511
512
            datalist_L1A_P = self._get_processor_data_list('L1A')

513
            if self.config.parallelization_level == 'scenes':
514
                # map
515
                L1A_resObjects = MAP(L1A_map, datalist_L1A_P, CPUs=12)
516
            else:  # tiles
517
518
                all_L1A_tiles_map1 = MAP(L1A_map_1, datalist_L1A_P,
                                         flatten_output=True)  # map_1 # merge results to new list of splits
519

520
521
522
                L1A_obj_tiles = MAP(L1A_map_2, all_L1A_tiles_map1)  # map_2
                grouped_L1A_Tiles = HLP_F.group_objects_by_attributes(
                    L1A_obj_tiles, 'scene_ID', 'subsystem')  # group results
523

524
                L1A_objects = MAP(L1A_P.L1A_object().from_tiles, grouped_L1A_Tiles)  # reduce
525

526
                L1A_resObjects = MAP(L1A_map_3, L1A_objects)  # map_3
527

528
            self.L1A_newObjects = [obj for obj in L1A_resObjects if isinstance(obj, L1A_P.L1A_object)]
529
            self.failed_objects += [obj for obj in L1A_resObjects if isinstance(obj, failed_GMS_object) and
530
531
532
533
534
535
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L1A_newObjects

    def L1B_processing(self):
        """
536
        Run Level 1B processing: calculation of geometric shifts
537
538
539
540
        """
        # TODO implement check for running spatial index mediator server
        # run on full cubes

541
        if self.config.exec_L1BP[0]:
542
            self.logger.info('\n\n####### Level 1B Processing started - detection of geometric displacements #######\n')
543

544
545
            L1A_DBObjects = self.get_DB_objects('L1B', self.L1A_newObjects, parallLev='scenes')
            L1A_Instances = self.L1A_newObjects + L1A_DBObjects  # combine newly and earlier processed L1A data
546

547
            L1B_resObjects = MAP(L1B_map, L1A_Instances)
548

549
550
551
            self.L1B_newObjects = [obj for obj in L1B_resObjects if isinstance(obj, L1B_P.L1B_object)]
            self.failed_objects += [obj for obj in L1B_resObjects if isinstance(obj, failed_GMS_object) and
                                    obj.scene_ID not in self.sceneids_failed]
552
553
554
555
556

        return self.L1B_newObjects

    def L1C_processing(self):
        """
557
        Run Level 1C processing: atmospheric correction
558
        """
559
        if self.config.exec_L1CP[0]:
560
            self.logger.info('\n\n############## Level 1C Processing started - atmospheric correction ##############\n')
561

562
            if self.config.parallelization_level == 'scenes':
563
564
565
566
                L1B_DBObjects = self.get_DB_objects('L1C', self.L1B_newObjects)
                L1B_Instances = self.L1B_newObjects + L1B_DBObjects  # combine newly and earlier processed L1B data

                # group by scene ID (all subsystems belonging to the same scene ID must be processed together)
567
                grouped_L1B_Instances = HLP_F.group_objects_by_attributes(L1B_Instances, 'scene_ID')
568

569
570
                L1C_resObjects = MAP(L1C_map, grouped_L1B_Instances, flatten_output=True,
                                     CPUs=15)  # FIXME CPUs set to 15 for testing
571

572
            else:  # tiles
573
574
                raise NotImplementedError("Tiled processing is not yet completely implemented for L1C processor. Use "
                                          "parallelization level 'scenes' instead!")
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
                # blocksize = (5000, 5000)
                # """if newly processed L1A objects are present: cut them into tiles"""
                # L1B_newTiles = []
                # if self.L1B_newObjects:
                #     tuples_obj_blocksize = [(obj, blocksize) for obj in self.L1B_newObjects]
                #     L1B_newTiles = MAP(HLP_F.cut_GMS_obj_into_blocks, tuples_obj_blocksize, flatten_output=True)
                #
                # """combine newly and earlier processed L1B data"""
                # L1B_newDBTiles = self.get_DB_objects('L1C', self.L1B_newObjects, blocksize=blocksize)
                # L1B_tiles = L1B_newTiles + L1B_newDBTiles
                #
                # # TODO merge subsets of S2/Aster in order to provide all bands for atm.correction
                # L1C_tiles = MAP(L1C_map, L1B_tiles)
                # grouped_L1C_Tiles = \
                #     HLP_F.group_objects_by_attributes(L1C_tiles, 'scene_ID', 'subsystem')  # group results
                # [L1C_tiles_group[0].delete_tempFiles() for L1C_tiles_group in grouped_L1C_Tiles]
                # L1C_resObjects = MAP(L1C_P.L1C_object().from_tiles, grouped_L1C_Tiles)  # reduce

            self.L1C_newObjects = [obj for obj in L1C_resObjects if isinstance(obj, L1C_P.L1C_object)]
594
            self.failed_objects += [obj for obj in L1C_resObjects if isinstance(obj, failed_GMS_object) and
595
596
597
598
599
600
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L1C_newObjects

    def L2A_processing(self):
        """
601
        Run Level 2A processing: geometric homogenization
602
        """
603
        if self.config.exec_L2AP[0]:
604
605
            self.logger.info(
                '\n\n#### Level 2A Processing started - shift correction / geometric homogenization ####\n')
606

607
            """combine newly and earlier processed L1C data"""
608
609
            L1C_DBObjects = self.get_DB_objects('L2A', self.L1C_newObjects, parallLev='scenes')
            L1C_Instances = self.L1C_newObjects + L1C_DBObjects  # combine newly and earlier processed L1C data
610
611
612
613

            # group by scene ID (all subsystems belonging to the same scene ID must be processed together)
            grouped_L1C_Instances = HLP_F.group_objects_by_attributes(L1C_Instances, 'scene_ID')

614
            L2A_resTiles = MAP(L2A_map, grouped_L1C_Instances, flatten_output=True)
615

616
            self.L2A_tiles = [obj for obj in L2A_resTiles if isinstance(obj, L2A_P.L2A_object)]
617
            self.failed_objects += [obj for obj in L2A_resTiles if isinstance(obj, failed_GMS_object) and
618
619
620
621
622
623
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2A_tiles

    def L2B_processing(self):
        """
624
        Run Level 2B processing: spectral homogenization
625
        """
626
        if self.config.exec_L2BP[0]:
627
            self.logger.info('\n\n############# Level 2B Processing started - spectral homogenization ##############\n')
628

629
            if self.config.parallelization_level == 'scenes':
630
                # don't know if scenes makes sense in L2B processing because full objects are very big!
631
                """if newly processed L2A objects are present: merge them to scenes"""
632
633
                grouped_L2A_Tiles = HLP_F.group_objects_by_attributes(self.L2A_tiles, 'scene_ID')  # group results
                # reduce # will be too slow because it has to pickle back really large L2A_newObjects
634
                # L2A_newObjects  = MAP(HLP_F.merge_GMS_tiles_to_GMS_obj, grouped_L2A_Tiles)
635
                L2A_newObjects = [L2A_P.L2A_object().from_tiles(tileList) for tileList in grouped_L2A_Tiles]
636

637
                """combine newly and earlier processed L2A data"""
638
639
                L2A_DBObjects = self.get_DB_objects('L2B', self.L2A_tiles)
                L2A_Instances = L2A_newObjects + L2A_DBObjects  # combine newly and earlier processed L2A data
640

641
                L2B_resObjects = MAP(L2B_map, L2A_Instances)
642
643

            else:  # tiles
644
                L2A_newTiles = self.L2A_tiles  # tiles have the block size specified in L2A_map_2
645
646

                """combine newly and earlier processed L2A data"""
647
648
649
                blocksize = (2048, 2048)  # must be equal to the blocksize of L2A_newTiles specified in L2A_map_2
                L2A_newDBTiles = self.get_DB_objects('L2B', self.L2A_tiles, blocksize=blocksize)
                L2A_tiles = L2A_newTiles + L2A_newDBTiles
650

651
                L2B_tiles = MAP(L2B_map, L2A_tiles)
652
653

                grouped_L2B_Tiles = \
654
655
                    HLP_F.group_objects_by_attributes(L2B_tiles,
                                                      'scene_ID')  # group results # FIXME nötig an dieser Stelle?
656
657
                [L2B_tiles_group[0].delete_tempFiles() for L2B_tiles_group in grouped_L2B_Tiles]

658
                L2B_resObjects = [L2B_P.L2B_object().from_tiles(tileList) for tileList in grouped_L2B_Tiles]
659

660
            self.L2B_newObjects = [obj for obj in L2B_resObjects if isinstance(obj, L2B_P.L2B_object)]
661
            self.failed_objects += [obj for obj in L2B_resObjects if isinstance(obj, failed_GMS_object) and
662
663
664
665
666
667
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2B_newObjects

    def L2C_processing(self):
        """
668
        Run Level 2C processing: accurracy assessment and MGRS tiling
669
        """
670
        # FIXME only parallelization_level == 'scenes' implemented
671
        if self.config.exec_L2CP[0]:
672
            self.logger.info('\n\n########## Level 2C Processing started - calculation of quality layers ###########\n')
673

674
            """combine newly and earlier processed L2A data"""
675
676
            L2B_DBObjects = self.get_DB_objects('L2C', self.L2B_newObjects, parallLev='scenes')
            L2B_Instances = self.L2B_newObjects + L2B_DBObjects  # combine newly and earlier processed L2A data
677

678
            L2C_resObjects = MAP(L2C_map, L2B_Instances, CPUs=8)  # FIXME 8 workers due to heavy IO
679
            # FIXME in case of inmem_serialization mode results are too big to be back-pickled
680
            self.L2C_newObjects = [obj for obj in L2C_resObjects if isinstance(obj, L2C_P.L2C_object)]
681
            self.failed_objects += [obj for obj in L2C_resObjects if isinstance(obj, failed_GMS_object) and
682
683
684
685
686
687
688
689
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2C_newObjects

    def update_DB_job_record(self):
        """
        Update the database records of the current job (table 'jobs').
        """
690
        # TODO move this method to config.Job
691
692
        # update 'failed_sceneids' column of job record within jobs table
        sceneids_failed = list(set([obj.scene_ID for obj in self.failed_objects]))
693
        DB_T.update_records_in_postgreSQLdb(
694
            self.config.conn_database, 'jobs',
695
            {'failed_sceneids': sceneids_failed,  # update 'failed_sceneids' column
696
697
698
             'finishtime': self.config.end_time,  # add job finish timestamp
             'status': self.config.status},  # update 'job_status' column
            {'id': self.config.ID})
699

700
701
702
703
704
    def update_DB_job_statistics(self, usecase_datalist):
        """
        Update job statistics of the running job in the database.
        """
        # TODO move this method to config.Job
705
        already_updated_IDs = []
706
        for ds in usecase_datalist:
707
708
            if ds['proc_level'] is not None and ds['scene_ID'] not in already_updated_IDs:
                # update statistics column of jobs table
709
                DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
710
                    self.config.conn_database, 'jobs', 'statistics', cond_dict={'id': self.config.ID},
711
712
713
                    idx_val2decrement=db_jobs_statistics_def['downloaded'],
                    idx_val2increment=db_jobs_statistics_def[ds['proc_level']])

714
715
716
                # avoid double updating in case of subsystems belonging to the same scene ID
                already_updated_IDs.append(ds['scene_ID'])

717
718
719
720
    def create_job_summary(self):
        """
        Create job success summary
        """
721
722
723

        # get objects with highest requested processing level
        highest_procL_Objs = []
724
        for pL in reversed(proc_chain):
725
            if getattr(self.config, 'exec_%sP' % pL)[0]:
726
                highest_procL_Objs = getattr(self, '%s_newObjects' % pL) if pL != 'L2A' else self.L2A_tiles
727
728
                break

729
730
731
732
        gms_objects2summarize = highest_procL_Objs + self.failed_objects
        if gms_objects2summarize:
            # create summaries
            detailed_JS, quick_JS = get_job_summary(gms_objects2summarize)
733
734
735
            detailed_JS.to_excel(os.path.join(self.config.path_job_logs, '%s_summary.xlsx' % self.config.ID))
            detailed_JS.to_csv(os.path.join(self.config.path_job_logs, '%s_summary.csv' % self.config.ID), sep='\t')
            self.logger.info('\nQUICK JOB SUMMARY (ID %s):\n' % self.config.ID + quick_JS.to_string())
736
737

            self.summary_detailed = detailed_JS
738
            self.summary_quick = quick_JS
739
740
741
742
743

        else:
            # TODO implement check if proc level with lowest procL has to be processed at all (due to job.exec_L1X)
            # TODO otherwise it is possible that get_job_summary receives an empty list
            self.logger.warning("Job summary skipped because get_job_summary() received an empty list of GMS objects.")
744
745
746
747
748
749

    def clear_lists_procObj(self):
        self.failed_objects = []
        self.L1A_newObjects = []
        self.L1B_newObjects = []
        self.L1C_newObjects = []
750
        self.L2A_tiles = []
751
        self.L2B_newObjects = []
752
753
754
755
756
        self.L2C_newObjects = []


def get_job_summary(list_GMS_objects):
    # get detailed job summary
757
758
    DJS_cols = ['GMS_object', 'scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem', 'image_type', 'proc_level',
                'arr_shape', 'arr_pos', 'failedMapper', 'ExceptionType', 'ExceptionValue', 'ExceptionTraceback']
759
760
761
762
    DJS = DataFrame(columns=DJS_cols)
    DJS['GMS_object'] = list_GMS_objects

    for col in DJS_cols[1:]:
763
764
        def get_val(obj): return getattr(obj, col) if hasattr(obj, col) else None
        DJS[col] = list(DJS['GMS_object'].map(get_val))
765
766

    del DJS['GMS_object']
767
    DJS = DJS.sort_values(by=['satellite', 'sensor', 'entity_ID'])
768
769

    # get quick job summary
770
771
772
773
    QJS = DataFrame(columns=['satellite', 'sensor', 'count', 'proc_successfully', 'proc_failed'])
    all_sat, all_sen = zip(*[i.split('__') for i in (np.unique(DJS['satellite'] + '__' + DJS['sensor']))])
    QJS['satellite'] = all_sat
    QJS['sensor'] = all_sen
774
    # count objects with the same satellite/sensor/sceneid combination
775
776
    QJS['count'] = [len(DJS[(DJS['satellite'] == sat) & (DJS['sensor'] == sen)]['scene_ID'].unique())
                    for sat, sen in zip(all_sat, all_sen)]
777
    QJS['proc_successfully'] = [len(DJS[(DJS['satellite'] == sat) &
778
779
                                        (DJS['sensor'] == sen) &
                                        (DJS['failedMapper'].isnull())]['scene_ID'].unique())
780
                                for sat, sen in zip(all_sat, all_sen)]
781
    QJS['proc_failed'] = QJS['count'] - QJS['proc_successfully']
782
783
    QJS = QJS.sort_values(by=['satellite', 'sensor'])
    return DJS, QJS