process_controller.py 34.7 KB
Newer Older
1
2
3
# -*- coding: utf-8 -*-

from __future__ import (division, print_function, unicode_literals,absolute_import)
4
__author__='Daniel Scheffler'
5
6
7

import numpy as np
from pandas import DataFrame
8
9
10
11
import datetime
import os
import time
from itertools import chain
12
import signal
13
import re
14
import collections
15

16
17
18
19
20
21
22
23
24
25
26
from ..io  import Output_writer as OUT_W
from ..io import Input_reader as INP_R
from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc import environment as ENV
from ..misc.path_generator import path_generator
from ..misc.logging import GMS_logger, shutdown_loggers
from ..algorithms import L1A_P, L1B_P, L1C_P, L2A_P, L2B_P, L2C_P
from ..model.METADATA import get_LayerBandsAssignment
from ..model.gms_object import failed_GMS_object
from .pipeline import (L1A_map, L1A_map_1, L1A_map_2, L1A_map_3, L1B_map, L1C_map,
27
                                     L2A_map, L2B_map, L2C_map)
28
29
from ..config import set_config, GMS_config
from .multiproc import MAP
30
from ..misc.definition_dicts import proc_chain, db_jobs_statistics_def
31
32
33


class process_controller(object):
34
    def __init__(self, job_ID, call_type='webapp', exec_mode='Python', db_host='localhost',
35
                 parallelization_level='scenes', delete_old_output=False, job_config_kwargs=None):
36
37
        # type: (int, str, str, str, str, bool) -> None

38
        """gms_preprocessing process controller
39
40
41
42
43

        :param job_ID:                  <int> a job ID belonging to a valid database record within table 'jobs'
        :param call_type:               <str> choices: 'webapp' and 'console'
        :param exec_mode:               <str> choices: 'Python' - writes all intermediate data to disk
                                                       'Flink'  - keeps all intermediate data in memory
44
        :param db_host:                 <str> hostname of the host where database is hosted
45
46
        :param parallelization_level:   <str> choices: 'scenes' - parallelization on scene-level
                                                       'tiles'  - parallelisation on tile-level
47
48
        :param delete_old_output:       <bool> whether to delete previously created output of the given job ID
                                        before running the job (default = False)
49
50
51
52
53
54
55
56
57
        """

        # assertions
        if not isinstance(job_ID, int): raise ValueError("'job_ID' must be an integer value. Got %s." %type(job_ID))
        if call_type not in ['webapp', 'console']: raise ValueError("Unexpected call_type '%s'!" %call_type)
        if exec_mode not in ['Python', 'Flink']:   raise ValueError("Unexpected exec_mode '%s'!" %exec_mode)
        if parallelization_level not in ['scenes', 'tiles']:
            raise ValueError("Unexpected parallelization_level '%s'!" %parallelization_level)

58
59
60
61
62
        self.call_type      = call_type
        self.parallLev      = parallelization_level
        self._logger        = None
        self._DB_job_record = None
        self.profiler       = None
63
64
65
66
67
68
69
70
71
72
73
74
75

        self.failed_objects = []
        self.L1A_newObjects = []
        self.L1B_newObjects = []
        self.L1C_newObjects = []
        self.L2A_tiles      = []
        self.L2B_newObjects = []
        self.L2C_newObjects = []

        self.summary_detailed = None
        self.summary_quick    = None

        # set GMS configuration
76
77
        set_config(call_type=call_type, exec_mode=exec_mode, job_ID=job_ID, db_host=db_host, reset=True,
                   job_kwargs=job_config_kwargs)
78
79
80
81
82
83
        self.job     = GMS_config.job
        self.usecase = GMS_config.usecase

        # check environment
        self.logger.info('Checking system environment...')
        ENV.check_dependencies(self.logger)
84
85
86
87
88
89
90
        # check if process_controller is executed by debugger
        # isdebugging = 1 if True in [frame[1].endswith("pydevd.py") for frame in inspect.stack()] else False
        #if isdebugging:  # override the existing settings in order to get write access everywhere
        #    pass

        #called_from_iPyNb = 1 if 'ipykernel/__main__.py' in sys.argv[0] else 0

91
92
93
        self.logger.info('Process Controller initialized for job ID %s (comment: %s).'
                         % (self.job.ID, self.DB_job_record.comment))

94
95
96
        if delete_old_output:
            self.logger.info('Deleting previously processed data...')
            self.DB_job_record.delete_procdata_of_entire_job(force=True)
97

98
99
100
101
102
103

    @property
    def logger(self):
        if self._logger and self._logger.handlers[:]:
            return self._logger
        else:
104
105
106
            self._logger = GMS_logger('log__%s' %self.job.ID,
                                      path_logfile=os.path.join(self.job.path_job_logs,'%s.log' % self.job.ID),
                                      log_level=self.job.log_level, append=False)
107
108
            return self._logger

109

110
111
112
113
    @logger.setter
    def logger(self, logger):
        self._logger = logger

114

115
116
117
118
119
120
    @logger.deleter
    def logger(self):
        if self._logger not in [None, 'not set']:
            self.logger.close()
            self.logger = None

121

122
    @property
123
124
125
126
127
128
129
    def DB_job_record(self):
        if self._DB_job_record:
            return self._DB_job_record
        else:
            self._DB_job_record = DB_T.GMS_JOB(self.job.conn_database)
            self._DB_job_record.from_job_ID(self.job.ID)
            return self._DB_job_record
130

131

132
133
134
    @DB_job_record.setter
    def DB_job_record(self, value):
        self._DB_job_record = value
135

136

137
138
139
    @property
    def sceneids_failed(self):
        return [obj.scene_ID for obj in self.failed_objects]
140
141


142
143
144
    @staticmethod
    def add_local_availability(dataset):
        # TODO revise this function
145
146
147
        # TODO this does not respect that all subsystems of the same scene ID must be available at the same proc level!

        # query the database and get the last written processing level and LayerBandsAssignment
148
149
150
        if GMS_config.job.call_type == 'webapp':
            DB_match = DB_T.get_info_from_postgreSQLdb \
                (GMS_config.job.conn_database, 'scenes_proc', ['proc_level', 'layer_bands_assignment'],
151
152
                 dict(sceneid=dataset['scene_ID']))

153
        else:  ## call_type == 'console'
154
155
156
157
158
159
160
161
162
163
            DB_match = DB_T.get_info_from_SQLdb(
                GMS_config.job.path_database, 'processed_data', ['proc_level', 'LayerBandsAssignment'],
                dict(image_type = dataset['image_type'],
                     satellite  = dataset['satellite'],
                     sensor     = dataset['sensor'],
                     subsystem  = dataset['subsystem'],
                     sensormode = dataset['sensormode'],
                     entity_ID  = dataset['entity_ID']))

        # get the corresponding logfile
164
165
166
        path_logfile = path_generator(dataset).get_path_logfile() # FIXME this always returns the logfile for the subsystem.
                                                                  # FIXME -> merged logfiles (L2A+) are ignored
                                                                  # FIXME -> for subsystems the highest start procL is L2A
167

168

169
        def get_AllWrittenProcL_dueLog(path_log): # TODO replace this by database query + os.path.exists
170
171
            """Returns all processing level that have been successfully written according to logfile."""

172
173
174
175
176
177
178
179
180
181
182
183
184
185
            if not os.path.exists(path_log):
                print("No logfile named '%s' found for %s at %s. Dataset has to be reprocessed." \
                      % (os.path.basename(path_log), dataset['entity_ID'], os.path.dirname(path_log)))
                AllWrittenProcL_dueLog = []
            else:
                logfile = open(path_log, 'r').read()
                AllWrittenProcL_dueLog = re.findall(":*(\S*\s*) data successfully saved.", logfile, re.I)
                if not AllWrittenProcL_dueLog:  # AllWrittenProcL_dueLog = []
                    print('%s: According to logfile no completely processed data exist at any processing level. ' \
                          'Dataset has to be reprocessed.' % dataset['entity_ID'])
                else:
                    AllWrittenProcL_dueLog = HLP_F.sorted_nicely(list(set(AllWrittenProcL_dueLog)))
            return AllWrittenProcL_dueLog

186
        # check if there are not multiple database records for this dataset
187
        if len(DB_match) == 1 or DB_match == [] or DB_match == 'database connection fault':
188
189

            # get all processing level that have been successfully written
190
191
            AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile)
            dataset['proc_level'] = None  # default (dataset has to be reprocessed)
192
193

            # loop through all the found proc. levels and find the one that fulfills all requirements
194
            for ProcL in reversed(AllWrittenProcL):
195
196
                if dataset['proc_level']:
                    break  # proc_level found; no further searching for lower proc_levels
197
                assumed_path_GMS_file = '%s_%s.gms' % (os.path.splitext(path_logfile)[0], ProcL)
198
199

                # check if there is also a corresponding GMS_file on disk
200
201
                if os.path.isfile(assumed_path_GMS_file):
                    GMS_file_dict = INP_R.GMSfile2dict(assumed_path_GMS_file)
202
                    target_LayerBandsAssignment = \
203
204
205
206
207
                        get_LayerBandsAssignment(dict(
                            image_type = dataset['image_type'],
                            Satellite  = dataset['satellite'],
                            Sensor     = dataset['sensor'],
                            Subsystem  = dataset['subsystem'],
208
209
                            proc_level = ProcL, # must be respected because LBA changes after atm. Corr.
                            dataset_ID = dataset['datasetid'],
210
211
212
                            logger     = None),  nBands=(1 if dataset['sensormode'] == 'P' else None))

                    # check if the LayerBandsAssignment of the written dataset on disk equals the desired LayerBandsAssignment
213
                    if target_LayerBandsAssignment == GMS_file_dict['LayerBandsAssignment']:
214
215

                        # update the database record if the dataset could not be found in database
216
217
218
219
220
                        if DB_match == [] or DB_match == 'database connection fault':
                            print('The dataset %s is not included in the database of processed data but according to ' \
                                  'logfile %s has been written successfully. Recreating missing database entry.' \
                                  % (dataset['entity_ID'], ProcL))
                            DB_T.data_DB_updater(GMS_file_dict)
221

222
223
224
                            if GMS_config.job.call_type == 'console':
                                DB_T.SQL_DB_to_csv()
                            dataset['proc_level'] = ProcL
225
226

                        # if the dataset could be found in database
227
228
229
                        elif len(DB_match) == 1:
                            try:
                                print('Found a matching %s dataset for %s. Processing skipped until %s.' \
230
                                      % (ProcL, dataset['entity_ID'], proc_chain[proc_chain.index(ProcL) + 1]))
231
232
233
                            except IndexError:
                                print('Found a matching %s dataset for %s. Processing already done.' \
                                      % (ProcL, dataset['entity_ID']))
234

235
236
237
238
                            if DB_match[0][0] == ProcL:
                                dataset['proc_level'] = DB_match[0][0]
                            else:
                                dataset['proc_level'] = ProcL
239

240
241
242
243
244
245
246
                    else:
                        print('Found a matching dataset for %s but with a different LayerBandsAssignment. ' \
                              'Dataset has to be reprocessed.' % dataset['entity_ID'])
                else:
                    print('%s for dataset %s has been written due to logfile but no corresponding dataset has been '
                          'found.' % (ProcL, dataset['entity_ID']) + ' Searching for lower processing level...'
                          if AllWrittenProcL.index(ProcL) != 0 else '')
247

248
249
250
251
        elif len(DB_match) > 1:
            print('According to database there are multiple matches for the dataset %s. Dataset has to be reprocessed.'\
                  % dataset['entity_ID'])
            dataset['proc_level'] = None
252

253
254
255
256
257
258
        else:
            dataset['proc_level'] = None

        return dataset


259
260
    @staticmethod
    def _is_inMEM(GMS_objects, dataset):
261
        # type: (list, collections.OrderedDict) -> bool
262
263
264
265
        """Checks whether a dataset within a dataset list has been processed in the previous processing level.
        :param GMS_objects: <list> a list of GMS objects that has been recently processed
        :param dataset:     <collections.OrderedDict> as generated by L0A_P.get_data_list_of_current_jobID()
        """
266
        # check if the scene ID of the given dataset is in the scene IDs of the previously processed datasets
267
268
269
270
271
        return dataset['scene_ID'] in [obj.scene_ID for obj in GMS_objects]


    @staticmethod
    def _is_already_present(dataset, procLvl):
272
273
274
275
276
277
        """Checks if the given dataset is already available on disk.

        :param dataset:     <GMS object>
        :param procLvl:     <str> processing level to be checked
        :return:            <bool>
        """
278
279
280
281
        return HLP_F.proc_level_already_present(dataset['proc_level'], procLvl)


    def _get_processor_data_list(self, procLvl, prevLvl_objects=None):
282
        """Returns a list of datasets that have to be read from disk and then processed by a specific processor.
283
284
285
286
287

        :param procLvl:
        :param prevLvl_objects:
        :return:
        """
288
        is_already_present = lambda dataset: HLP_F.proc_level_already_present(dataset['proc_level'], target_lvl=procLvl)
289
290
291
292

        if prevLvl_objects is None:
            return [dataset for dataset in self.usecase.data_list if not is_already_present(dataset)] # TODO generator?
        else:
293
            return [dataset for dataset in self.usecase.data_list if not is_already_present(dataset) and \
294
                not self._is_inMEM(prevLvl_objects + self.failed_objects, dataset)]
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314


    def get_DB_objects(self, procLvl, prevLvl_objects=None, parallLev=None, blocksize=None):
        """
        Returns a list of GMS objects for datasets available on disk that have to be processed by the current processor.

        :param procLvl:         <str> processing level oof the current processor
        :param prevLvl_objects: <list> of in-mem GMS objects produced by the previous processor
        :param parallLev:       <str> parallelization level ('scenes' or 'tiles')
                                -> defines if full cubes or blocks are to be returned
        :param blocksize:       <tuple> block size in case blocks are to be returned, e.g. (2000,2000)
        :return:
        """
        # TODO get prevLvl_objects automatically from self
        if procLvl=='L1A':
            return []
        else:
            # handle input parameters
            parallLev = parallLev if parallLev else self.parallLev
            blocksize = blocksize if blocksize else self.job.tiling_block_size_XY
315
            prevLvl   = proc_chain[proc_chain.index(procLvl) - 1] # TODO replace by enum
316
317
318
319
320
321
322
323

            # get GMSfile list
            dataset_dicts             = self._get_processor_data_list(procLvl, prevLvl_objects)
            GMSfile_list_prevLvl_inDB = INP_R.get_list_GMSfiles(dataset_dicts, prevLvl)

            # create GMS objects from disk with respect to parallelization level and block size
            if parallLev == 'scenes':
                # get input parameters for creating GMS objects as full cubes
324
                work = [[GMS, ['cube', None]] for GMS in GMSfile_list_prevLvl_inDB]
325
326
327
328
329
330
331
332
333
            else:
                # define tile positions and size
                get_tilepos_list = lambda GMSfile: HLP_F.get_image_tileborders(
                    'block', blocksize, shape_fullArr=INP_R.GMSfile2dict(GMSfile)['shape_fullArr'])

                # get input parameters for creating GMS objects as blocks
                work = [[GMSfile, ['block', tp]] for GMSfile in GMSfile_list_prevLvl_inDB
                                                 for tp      in get_tilepos_list(GMSfile)]

334
335
336
337
338
339
340
            # create GMS objects for the found files on disk
            # NOTE: DON'T multiprocess that with MAP(GMS_object(*initargs).from_disk, work)
            # in case of multiple subsystems GMS_object(*initargs) would always point to the same object in memory
            # -> subsystem attribute will be overwritten each time
            init_GMS_obj = lambda: HLP_F.parentObjDict[prevLvl](*HLP_F.initArgsDict[prevLvl])
            DB_objs = [init_GMS_obj().from_disk(tuple_GMS_subset=w) for w in work]  # init

341
342
343
344
345
            if DB_objs:
                DB_objs = list(chain.from_iterable(DB_objs)) if list in [type(i) for i in DB_objs] else list(DB_objs)

            return DB_objs

346
347
348
349
350

    def run_all_processors(self, custom_data_list=None):
        """
        Run all processors at once.
        """
351

352
353
        signal.signal(signal.SIGINT, self.stop) # enable clean shutdown possibility

354
355
356
357
358
359
360
        try:
            if self.job.profiling:
                from pyinstrument import Profiler
                self.profiler = Profiler()  # or Profiler(use_signal=False), see below
                self.profiler.start()

            self.logger.info('Execution of entire GeoMultiSens pre-processing chain started...')
361
            self.DB_job_record.reset_job_progress()  # updates attributs of DB_job_record and related DB entry
362
363
364
365
366
367
368
369
370
371
372
            self.job.status = 'running'
            self.update_DB_job_record() # TODO implement that into job.status.setter

            self.failed_objects = []

            # get list of datasets to be processed
            if custom_data_list:
                self.usecase.data_list = custom_data_list

            # add local availability
            self.usecase.data_list = MAP(self.add_local_availability, self.usecase.data_list)
373
            self.update_DB_job_statistics(self.usecase.data_list)
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406

            self.L1A_processing()
            self.L1B_processing()
            self.L1C_processing()
            self.L2A_processing()
            self.L2B_processing()
            self.L2C_processing()

            # create summary
            self.create_job_summary()

            self.logger.info('Execution finished.')
            self.job.status           = 'finished' if not self.failed_objects else 'finished_with_errors' # TODO implement failed_with_warnings
            self.job.end_time         = datetime.datetime.now()
            self.job.computation_time = self.job.end_time-self.job.start_time
            self.logger.info('Time for execution: %s' % self.job.computation_time)

            # update database entry of current job
            self.update_DB_job_record()

            if self.job.profiling:
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

            shutdown_loggers()

        except Exception as e:
            if self.job.profiling:
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

            self.job.status = 'failed'
            self.update_DB_job_record()
407
            self.logger.error('Execution failed with an error:', e)
408
409
410
411
412
413
414

            if not self.job.disable_exception_handler:
                self.logger.error(e)
                shutdown_loggers()
            else:
                shutdown_loggers()
                raise
415
416


417
418
    def stop(self, signum, frame):
        """Interrupt the running process controller gracefully."""
419

420
        self.job.status = 'canceled'
421
422
        self.update_DB_job_record()

423
        self.logger.warning('Process controller stopped by user.')
424
425
        del self.logger
        shutdown_loggers()
426
427

        raise KeyboardInterrupt # terminate execution and show traceback
428
429


430
431
432
433
    def benchmark(self):
        """
        Run a benchmark.
        """
434
        data_list_bench = self.usecase.data_list
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
        for count_datasets in range(len(data_list_bench)):
            t_processing_all_runs, t_IO_all_runs = [], []
            for count_run in range(10):
                current_data_list = data_list_bench[0:count_datasets + 1]
                if os.path.exists(self.job.path_database):
                    os.remove(self.job.path_database)
                t_start = time.time()
                self.run_all_processors(current_data_list)
                t_processing_all_runs.append(time.time() - t_start)
                t_IO_all_runs.append(globals()['time_IO'])

            assert current_data_list, 'Empty data list.'
            OUT_W.write_global_benchmark_output(t_processing_all_runs, t_IO_all_runs, current_data_list)


    def L1A_processing(self):
        """
452
        Run Level 1A processing: Data import and metadata homogenization
453
        """
454
        if self.job.exec_L1AP[0]:
455
            self.logger.info('\n\n##### Level 1A Processing started - raster format and metadata homogenization ####\n')
456

457
458
459
460
            datalist_L1A_P = self._get_processor_data_list('L1A')

            if self.parallLev == 'scenes':
                # map
461
                L1A_resObjects     = MAP(L1A_map, datalist_L1A_P, CPUs=12)
462
            else:  # tiles
463
                all_L1A_tiles_map1 = MAP(L1A_map_1, datalist_L1A_P, flatten_output=True)  # map_1 # merge results to new list of splits
464
465
466
467
468

                L1A_obj_tiles      = MAP(L1A_map_2, all_L1A_tiles_map1)  # map_2
                grouped_L1A_Tiles  = HLP_F.group_objects_by_attributes(
                                        L1A_obj_tiles, 'scene_ID', 'subsystem')  # group results

469
                L1A_objects        = MAP(L1A_P.L1A_object().from_tiles, grouped_L1A_Tiles)  # reduce
470
471
472
473

                L1A_resObjects     = MAP(L1A_map_3, L1A_objects)  # map_3

            self.L1A_newObjects  = [obj for obj in L1A_resObjects if isinstance(obj, L1A_P.L1A_object)]
474
            self.failed_objects += [obj for obj in L1A_resObjects if isinstance(obj, failed_GMS_object) and
475
476
477
478
479
480
481
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L1A_newObjects


    def L1B_processing(self):
        """
482
        Run Level 1B processing: calculation of geometric shifts
483
484
485
486
        """
        # TODO implement check for running spatial index mediator server
        # run on full cubes

487
        if self.job.exec_L1BP[0]:
488
            self.logger.info('\n\n####### Level 1B Processing started - detection of geometric displacements #######\n')
489

490
            L1A_DBObjects         = self.get_DB_objects('L1B', self.L1A_newObjects, parallLev='scenes')
491
492
            L1A_Instances         = self.L1A_newObjects + L1A_DBObjects  # combine newly and earlier processed L1A data

493
            L1B_resObjects        = MAP(L1B_map, L1A_Instances)
494
495

            self.L1B_newObjects   = [obj for obj in L1B_resObjects if isinstance(obj, L1B_P.L1B_object)]
496
            self.failed_objects  += [obj for obj in L1B_resObjects if isinstance(obj, failed_GMS_object) and
497
498
499
500
501
502
503
                                     obj.scene_ID not in self.sceneids_failed]

        return self.L1B_newObjects


    def L1C_processing(self):
        """
504
        Run Level 1C processing: atmospheric correction
505
        """
506
        if self.job.exec_L1CP[0]:
507
            self.logger.info('\n\n############## Level 1C Processing started - atmospheric correction ##############\n')
508

509
            if self.parallLev == 'scenes':
510
511
512
513
                L1B_DBObjects = self.get_DB_objects('L1C', self.L1B_newObjects)
                L1B_Instances = self.L1B_newObjects + L1B_DBObjects  # combine newly and earlier processed L1B data

                # group by scene ID (all subsystems belonging to the same scene ID must be processed together)
514
                grouped_L1B_Instances = HLP_F.group_objects_by_attributes(L1B_Instances, 'scene_ID')
515

516
                L1C_resObjects = MAP(L1C_map, grouped_L1B_Instances, flatten_output=True, CPUs=15) # FIXME CPUs set to 15 for testing
517

518
519
520
521
522
523
            else:  # tiles
                blocksize = (5000, 5000)
                """if newly processed L1A objects are present: cut them into tiles"""
                L1B_newTiles = []
                if self.L1B_newObjects:
                    tuples_obj_blocksize = [(obj, blocksize) for obj in self.L1B_newObjects]
524
                    L1B_newTiles  = MAP(HLP_F.cut_GMS_obj_into_blocks, tuples_obj_blocksize, flatten_output=True)
525

526
527
                """combine newly and earlier processed L1B data"""
                L1B_newDBTiles    = self.get_DB_objects('L1C', self.L1B_newObjects, blocksize=blocksize)
528
529
530
                L1B_tiles         = L1B_newTiles + L1B_newDBTiles

                # TODO merge subsets of S2/Aster in order to provide all bands for atm.correction
531
532
                raise NotImplementedError("Tiled processing is not yet completely implemented for L1C processor. Use "
                                          "parallelization level 'scenes' instead!")
533
                L1C_tiles         = MAP(L1C_map, L1B_tiles)
534
535
536
                grouped_L1C_Tiles = \
                    HLP_F.group_objects_by_attributes(L1C_tiles, 'scene_ID', 'subsystem')  # group results
                [L1C_tiles_group[0].delete_tempFiles() for L1C_tiles_group in grouped_L1C_Tiles]
537
                L1C_resObjects    = MAP(L1C_P.L1C_object().from_tiles, grouped_L1C_Tiles)  # reduce
538
539

            self.L1C_newObjects  = [obj for obj in L1C_resObjects if isinstance(obj, L1C_P.L1C_object)]
540
            self.failed_objects += [obj for obj in L1C_resObjects if isinstance(obj, failed_GMS_object) and
541
542
543
544
545
546
547
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L1C_newObjects


    def L2A_processing(self):
        """
548
        Run Level 2A processing: geometric homogenization
549
        """
550
        if self.job.exec_L2AP[0]:
551
            self.logger.info('\n\n#### Level 2A Processing started - shift correction / geometric homogenization ####\n')
552

553
            """combine newly and earlier processed L1C data"""
554
555
            L1C_DBObjects = self.get_DB_objects('L2A', self.L1C_newObjects, parallLev='scenes')
            L1C_Instances = self.L1C_newObjects + L1C_DBObjects  # combine newly and earlier processed L1C data
556
557
558
559

            # group by scene ID (all subsystems belonging to the same scene ID must be processed together)
            grouped_L1C_Instances = HLP_F.group_objects_by_attributes(L1C_Instances, 'scene_ID')

560
            L2A_resTiles  = MAP(L2A_map, grouped_L1C_Instances, flatten_output=True)
561
562

            self.L2A_tiles       = [obj for obj in L2A_resTiles if isinstance(obj, L2A_P.L2A_object)]
563
            self.failed_objects += [obj for obj in L2A_resTiles if isinstance(obj, failed_GMS_object) and
564
565
566
567
568
569
570
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2A_tiles


    def L2B_processing(self):
        """
571
        Run Level 2B processing: spectral homogenization
572
        """
573
        if self.job.exec_L2BP[0]:
574
            self.logger.info('\n\n############# Level 2B Processing started - spectral homogenization ##############\n')
575

576
577
            if self.parallLev == 'scenes':
                # don't know if scenes makes sense in L2B processing because full objects are very big!
578
                """if newly processed L2A objects are present: merge them to scenes"""
579
580
                grouped_L2A_Tiles = HLP_F.group_objects_by_attributes(self.L2A_tiles, 'scene_ID')  # group results
                # reduce # will be too slow because it has to pickle back really large L2A_newObjects
581
582
                # L2A_newObjects  = MAP(HLP_F.merge_GMS_tiles_to_GMS_obj, grouped_L2A_Tiles)
                L2A_newObjects   = [L2A_P.L2A_object().from_tiles(tileList) for tileList in grouped_L2A_Tiles]
583

584
                """combine newly and earlier processed L2A data"""
585
586
                L2A_DBObjects    = self.get_DB_objects('L2B', self.L2A_tiles)
                L2A_Instances    = L2A_newObjects + L2A_DBObjects  # combine newly and earlier processed L2A data
587

588
                L2B_resObjects   = MAP(L2B_map, L2A_Instances)
589
590

            else:  # tiles
591
                L2A_newTiles     = self.L2A_tiles # tiles have the block size specified in L2A_map_2
592
593

                """combine newly and earlier processed L2A data"""
594
595
                blocksize        = (2048, 2048) # must be equal to the blocksize of L2A_newTiles specified in L2A_map_2
                L2A_newDBTiles   = self.get_DB_objects('L2B', self.L2A_tiles, blocksize=blocksize)
596
597
                L2A_tiles        = L2A_newTiles + L2A_newDBTiles

598
                L2B_tiles        = MAP(L2B_map, L2A_tiles)
599
600
601
602
603

                grouped_L2B_Tiles = \
                    HLP_F.group_objects_by_attributes(L2B_tiles, 'scene_ID')  # group results # FIXME nötig an dieser Stelle?
                [L2B_tiles_group[0].delete_tempFiles() for L2B_tiles_group in grouped_L2B_Tiles]

604
                L2B_resObjects   = [L2B_P.L2B_object().from_tiles(tileList) for tileList in grouped_L2B_Tiles]
605
606

            self.L2B_newObjects  = [obj for obj in L2B_resObjects if isinstance(obj, L2B_P.L2B_object)]
607
            self.failed_objects += [obj for obj in L2B_resObjects if isinstance(obj, failed_GMS_object) and
608
609
610
611
612
613
614
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2B_newObjects


    def L2C_processing(self):
        """
615
        Run Level 2C processing: accurracy assessment and MGRS tiling
616
        """
617
        # FIXME only parallelization_level == 'scenes' implemented
618
        if self.job.exec_L2CP[0]:
619
            self.logger.info('\n\n########## Level 2C Processing started - calculation of quality layers ###########\n')
620

621
622
            """combine newly and earlier processed L2A data"""
            L2B_DBObjects  = self.get_DB_objects('L2C', self.L2B_newObjects, parallLev='scenes')
623
            L2B_Instances  = self.L2B_newObjects + L2B_DBObjects  # combine newly and earlier processed L2A data
624

625
            L2C_resObjects = MAP(L2C_map, L2B_Instances, CPUs=8)  # FIXME 8 workers due to heavy IO
626
            # FIXME in Flink mode results are too big to be back-pickled
627
            self.L2C_newObjects  = [obj for obj in L2C_resObjects if isinstance(obj, L2C_P.L2C_object)]
628
            self.failed_objects += [obj for obj in L2C_resObjects if isinstance(obj, failed_GMS_object) and
629
630
631
632
633
634
635
636
637
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2C_newObjects


    def update_DB_job_record(self):
        """
        Update the database records of the current job (table 'jobs').
        """
638
        # TODO move this method to config.Job
639
640
        # update 'failed_sceneids' column of job record within jobs table
        sceneids_failed = list(set([obj.scene_ID for obj in self.failed_objects]))
641
642
643
644
645
646
        DB_T.update_records_in_postgreSQLdb(
            self.job.conn_database, 'jobs',
            {'failed_sceneids': sceneids_failed, # update 'failed_sceneids' column
             'finishtime': self.job.end_time, # add job finish timestamp
             'status': self.job.status}, # update 'job_status' column
            {'id': self.job.ID})
647
648


649
650
651
652
653
    def update_DB_job_statistics(self, usecase_datalist):
        """
        Update job statistics of the running job in the database.
        """
        # TODO move this method to config.Job
654
        already_updated_IDs = []
655
        for ds in usecase_datalist:
656
657
            if ds['proc_level'] is not None and ds['scene_ID'] not in already_updated_IDs:
                # update statistics column of jobs table
658
659
660
661
662
                DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
                    self.job.conn_database, 'jobs', 'statistics', cond_dict={'id': self.job.ID},
                    idx_val2decrement=db_jobs_statistics_def['downloaded'],
                    idx_val2increment=db_jobs_statistics_def[ds['proc_level']])

663
664
665
                # avoid double updating in case of subsystems belonging to the same scene ID
                already_updated_IDs.append(ds['scene_ID'])

666

667
668
669
670
    def create_job_summary(self):
        """
        Create job success summary
        """
671
672
673

        # get objects with highest requested processing level
        highest_procL_Objs = []
674
        for pL in reversed(proc_chain):
675
            if getattr(self.job, 'exec_%sP'%pL)[0]:
676
677
678
                highest_procL_Objs = getattr(self, '%s_newObjects'%pL) if pL!='L2A' else self.L2A_tiles
                break

679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
        gms_objects2summarize = highest_procL_Objs + self.failed_objects
        if gms_objects2summarize:
            # create summaries
            detailed_JS, quick_JS = get_job_summary(gms_objects2summarize)
            detailed_JS.to_excel(os.path.join(self.job.path_job_logs, '%s_summary.xlsx' % self.job.ID))
            detailed_JS.to_csv  (os.path.join(self.job.path_job_logs, '%s_summary.csv' % self.job.ID), sep='\t')
            self.logger.info('\nQUICK JOB SUMMARY (ID %s):\n' % self.job.ID + quick_JS.to_string())

            self.summary_detailed = detailed_JS
            self.summary_quick    = quick_JS

        else:
            # TODO implement check if proc level with lowest procL has to be processed at all (due to job.exec_L1X)
            # TODO otherwise it is possible that get_job_summary receives an empty list
            self.logger.warning("Job summary skipped because get_job_summary() received an empty list of GMS objects.")
694
695
696
697
698
699
700
701
702
703



    def clear_lists_procObj(self):
        self.failed_objects = []
        self.L1A_newObjects = []
        self.L1B_newObjects = []
        self.L1C_newObjects = []
        self.L2A_tiles      = []
        self.L2B_newObjects = []
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
        self.L2C_newObjects = []



def get_job_summary(list_GMS_objects):
    # get detailed job summary
    DJS_cols = ['GMS_object','scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem', 'image_type', 'proc_level',
               'arr_shape', 'arr_pos', 'failedMapper', 'ExceptionType', 'ExceptionValue', 'ExceptionTraceback']
    DJS = DataFrame(columns=DJS_cols)
    DJS['GMS_object'] = list_GMS_objects

    for col in DJS_cols[1:]:
        get_val  = lambda obj: getattr(obj, col) if hasattr(obj, col) else None
        DJS[col] = [*DJS['GMS_object'].map(get_val)]

    del DJS['GMS_object']
    DJS = DJS.sort_values(by=['satellite','sensor','entity_ID'])

    # get quick job summary
    QJS = DataFrame(columns  =['satellite', 'sensor', 'count', 'proc_successfully', 'proc_failed'])
    all_sat, all_sen         =  zip(*[i.split('__') for i in (np.unique(DJS['satellite'] + '__' + DJS['sensor']))])
    QJS['satellite']         = all_sat
    QJS['sensor']            = all_sen
727
    # count objects with the same satellite/sensor/sceneid combination
728
729
730
731
732
733
734
735
736
    QJS['count']             = [len(DJS[(DJS['satellite'] == sat) & (DJS['sensor'] == sen)]['scene_ID'].unique()) \
                                for sat, sen in zip(all_sat, all_sen)]
    QJS['proc_successfully'] = [len(DJS[(DJS['satellite'] == sat) &
                                        (DJS['sensor'] == sen)    &
                                        (DJS['failedMapper'].isnull())] ['scene_ID'].unique())
                                for sat, sen in zip(all_sat, all_sen)]
    QJS['proc_failed']       = QJS['count']-QJS['proc_successfully']
    QJS = QJS.sort_values(by=['satellite', 'sensor'])
    return DJS, QJS