process_controller.py 40.6 KB
Newer Older
1
2
# -*- coding: utf-8 -*-

3
from __future__ import (division, print_function, unicode_literals, absolute_import)
4
5
6

import numpy as np
from pandas import DataFrame
7
8
9
10
import datetime
import os
import time
from itertools import chain
11
import signal
12
import re
13
from typing import TYPE_CHECKING
14
import shutil
Daniel Scheffler's avatar
Daniel Scheffler committed
15
import sys
16

17
18
from ..io import output_writer as OUT_W
from ..io import input_reader as INP_R
19
20
21
22
23
from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc.path_generator import path_generator
from ..misc.logging import GMS_logger, shutdown_loggers
from ..algorithms import L1A_P, L1B_P, L1C_P, L2A_P, L2B_P, L2C_P
24
from ..model.metadata import get_LayerBandsAssignment
25
from ..model.gms_object import failed_GMS_object, GMS_object, GMS_identifier
26
from .pipeline import (L1A_map, L1A_map_1, L1A_map_2, L1A_map_3, L1B_map, L1C_map,
27
                       L2A_map, L2B_map, L2C_map)
28
from ..options.config import set_config
29
from .multiproc import MAP, imap_unordered
30
from ..misc.definition_dicts import proc_chain, db_jobs_statistics_def
31
from ..misc.locks import release_unclosed_locks
32
from ..version import __version__, __versionalias__
33

34
35
from py_tools_ds.numeric.array import get_array_tilebounds

36
if TYPE_CHECKING:
Daniel Scheffler's avatar
Daniel Scheffler committed
37
    from collections import OrderedDict  # noqa F401  # flake8 issue
38
39
    from typing import List  # noqa F401  # flake8 issue
    from ..options.config import GMS_config  # noqa F401  # flake8 issue
40
41
42
43


__author__ = 'Daniel Scheffler'

44
45

class process_controller(object):
46
    def __init__(self, job_ID, **config_kwargs):
47
        """gms_preprocessing process controller
48

49
50
        :param job_ID:          job ID belonging to a valid database record within table 'jobs'
        :param config_kwargs:   keyword arguments to be passed to gms_preprocessing.set_config()
51
52
53
        """

        # assertions
54
55
        if not isinstance(job_ID, int):
            raise ValueError("'job_ID' must be an integer value. Got %s." % type(job_ID))
56

57
58
        # set GMS configuration
        config_kwargs.update(dict(reset_status=True))
59
        self.config = set_config(job_ID, **config_kwargs)  # type: GMS_config
60
61

        # defaults
62
        self._logger = None
63
        self._DB_job_record = None
64
        self.profiler = None
65
66
67
68
69

        self.failed_objects = []
        self.L1A_newObjects = []
        self.L1B_newObjects = []
        self.L1C_newObjects = []
70
        self.L2A_newObjects = []
71
        self.L2A_tiles = []
72
73
74
75
        self.L2B_newObjects = []
        self.L2C_newObjects = []

        self.summary_detailed = None
76
        self.summary_quick = None
77

78
79
        # check if process_controller is executed by debugger
        # isdebugging = 1 if True in [frame[1].endswith("pydevd.py") for frame in inspect.stack()] else False
80
        # if isdebugging:  # override the existing settings in order to get write access everywhere
81
82
        #    pass

83
        # called_from_iPyNb = 1 if 'ipykernel/__main__.py' in sys.argv[0] else 0
84

85
86
87
88
89
        # save config
        self._path_job_optionsfile = os.path.join(self.config.path_job_logs, '%s_options.json' % self.config.ID)
        self.config.save(self._path_job_optionsfile)

        # create job log
90
91
92
        self._path_job_logfile = os.path.join(self.config.path_job_logs, '%s.log' % self.config.ID)
        if os.path.exists(self._path_job_logfile):
            HLP_F.silentremove(self._path_job_logfile)
93

94
        self.logger.info("Executing gms_preprocessing, version: %s (%s)" % (__version__, __versionalias__))
95
        self.logger.info('Process Controller initialized for job ID %s (comment: %s).'
96
                         % (self.config.ID, self.DB_job_record.comment))
97
        self.logger.info('Job logfile: %s' % self._path_job_logfile)
98
        self.logger.info('Job options file: %s' % self._path_job_optionsfile)
99

100
        if self.config.delete_old_output:
101
102
            self.logger.info('Deleting previously processed data...')
            self.DB_job_record.delete_procdata_of_entire_job(force=True)
103

104
105
106
107
108
    @property
    def logger(self):
        if self._logger and self._logger.handlers[:]:
            return self._logger
        else:
109
            self._logger = GMS_logger('ProcessController__%s' % self.config.ID, fmt_suffix='ProcessController',
110
                                      path_logfile=self._path_job_logfile, log_level=self.config.log_level, append=True)
111
112
113
114
115
116
117
118
119
            return self._logger

    @logger.setter
    def logger(self, logger):
        self._logger = logger

    @logger.deleter
    def logger(self):
        if self._logger not in [None, 'not set']:
120
121
            self._logger.close()
            self._logger = None
122
123

    @property
124
125
126
127
    def DB_job_record(self):
        if self._DB_job_record:
            return self._DB_job_record
        else:
128
129
            self._DB_job_record = DB_T.GMS_JOB(self.config.conn_database)
            self._DB_job_record.from_job_ID(self.config.ID)
130
            return self._DB_job_record
131

132
133
134
    @DB_job_record.setter
    def DB_job_record(self, value):
        self._DB_job_record = value
135

136
137
138
    @property
    def sceneids_failed(self):
        return [obj.scene_ID for obj in self.failed_objects]
139

140
141
    def _add_local_availability_single_dataset(self, dataset):
        # type: (OrderedDict) -> OrderedDict
142
        # TODO revise this function
143
144
        # query the database and get the last written processing level and LayerBandsAssignment
        DB_match = DB_T.get_info_from_postgreSQLdb(
145
            self.config.conn_database, 'scenes_proc', ['proc_level', 'layer_bands_assignment'],
146
            dict(sceneid=dataset['scene_ID']))
Daniel Scheffler's avatar
Daniel Scheffler committed
147

148
        # get the corresponding logfile
149
150
        path_logfile = path_generator(dataset).get_path_logfile(merged_subsystems=False)
        path_logfile_merged_ss = path_generator(dataset).get_path_logfile(merged_subsystems=True)
151
152
153
154

        def get_AllWrittenProcL_dueLog(path_log):  # TODO replace this by database query + os.path.exists
            """Returns all processing level that have been successfully written according to logfile."""

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
155
156
157
158
            if not os.path.exists(path_log):
                if path_log == path_logfile:  # path_logfile_merged_ss has already been searched
                    self.logger.info("No logfile named '%s' found for %s at %s. Dataset has to be reprocessed."
                                     % (os.path.basename(path_log), dataset['entity_ID'], os.path.dirname(path_log)))
159
160
161
162
                AllWrittenProcL_dueLog = []
            else:
                logfile = open(path_log, 'r').read()
                AllWrittenProcL_dueLog = re.findall(":*(\S*\s*) data successfully saved.", logfile, re.I)
163
                if not AllWrittenProcL_dueLog and path_logfile == path_logfile_merged_ss:  # AllWrittenProcL_dueLog = []
164
165
166
167
168
169
170
171
172
173
                    self.logger.info('%s: According to logfile no completely processed data exist at any '
                                     'processing level. Dataset has to be reprocessed.' % dataset['entity_ID'])
                else:
                    AllWrittenProcL_dueLog = HLP_F.sorted_nicely(list(set(AllWrittenProcL_dueLog)))
            return AllWrittenProcL_dueLog

        # check if there are not multiple database records for this dataset
        if len(DB_match) == 1 or DB_match == [] or DB_match == 'database connection fault':

            # get all processing level that have been successfully written
174
175
            # NOTE: first check for merged subsystem datasets because they have hiver processing levels
            AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile_merged_ss)
176
            if not AllWrittenProcL:
177
                AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile)
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
178
179
180
            else:
                # A L2A+ dataset with merged subsystems has been found. Use that logfile.
                path_logfile = path_logfile_merged_ss
181

182
183
184
185
186
187
188
189
190
191
192
193
            dataset['proc_level'] = None  # default (dataset has to be reprocessed)

            # loop through all the found proc. levels and find the one that fulfills all requirements
            for ProcL in reversed(AllWrittenProcL):
                if dataset['proc_level']:
                    break  # proc_level found; no further searching for lower proc_levels
                assumed_path_GMS_file = '%s_%s.gms' % (os.path.splitext(path_logfile)[0], ProcL)

                # check if there is also a corresponding GMS_file on disk
                if os.path.isfile(assumed_path_GMS_file):
                    GMS_file_dict = INP_R.GMSfile2dict(assumed_path_GMS_file)
                    target_LayerBandsAssignment = \
194
                        get_LayerBandsAssignment(GMS_identifier(
195
                            image_type=dataset['image_type'],
196
197
198
                            satellite=dataset['satellite'],
                            sensor=dataset['sensor'],
                            subsystem=dataset['subsystem'] if path_logfile != path_logfile_merged_ss else '',
199
                            proc_level=ProcL,  # must be respected because LBA changes after atm. Corr.
200
                            dataset_ID=dataset['dataset_ID']), nBands=(1 if dataset['sensormode'] == 'P' else None))
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227

                    # check if the LayerBandsAssignment of the written dataset on disk equals the
                    # desired LayerBandsAssignment
                    if target_LayerBandsAssignment == GMS_file_dict['LayerBandsAssignment']:

                        # update the database record if the dataset could not be found in database
                        if DB_match == [] or DB_match == 'database connection fault':
                            self.logger.info('The dataset %s is not included in the database of processed data but'
                                             ' according to logfile %s has been written successfully. Recreating '
                                             'missing database entry.' % (dataset['entity_ID'], ProcL))
                            DB_T.data_DB_updater(GMS_file_dict)

                            dataset['proc_level'] = ProcL

                        # if the dataset could be found in database
                        elif len(DB_match) == 1:
                            try:
                                self.logger.info('Found a matching %s dataset for %s. Processing skipped until %s.'
                                                 % (ProcL, dataset['entity_ID'],
                                                    proc_chain[proc_chain.index(ProcL) + 1]))
                            except IndexError:
                                self.logger.info('Found a matching %s dataset for %s. Processing already done.'
                                                 % (ProcL, dataset['entity_ID']))

                            if DB_match[0][0] == ProcL:
                                dataset['proc_level'] = DB_match[0][0]
                            else:
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
228
                                dataset['proc_level'] = ProcL
229

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
230
                    else:
Daniel Scheffler's avatar
Daniel Scheffler committed
231
                        self.logger.info('Found a matching %s dataset for %s but with a different '
Daniel Scheffler's avatar
Daniel Scheffler committed
232
                                         'LayerBandsAssignment (desired: %s; found %s). Dataset has to be reprocessed.'
233
234
                                         % (ProcL, dataset['entity_ID'],
                                            target_LayerBandsAssignment, GMS_file_dict['LayerBandsAssignment']))
235
236
237
238
239
                else:
                    self.logger.info('%s for dataset %s has been written due to logfile but no corresponding '
                                     'dataset has been found.' % (ProcL, dataset['entity_ID']) +
                                     ' Searching for lower processing level...'
                                     if AllWrittenProcL.index(ProcL) != 0 else '')
240

241
242
243
244
        elif len(DB_match) > 1:
            self.logger.info('According to database there are multiple matches for the dataset %s. Dataset has to '
                             'be reprocessed.' % dataset['entity_ID'])
            dataset['proc_level'] = None
245

246
247
        else:
            dataset['proc_level'] = None
248

249
        return dataset
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
250

251
252
253
    def add_local_availability(self, datasets):
        # type: (List[OrderedDict]) -> List[OrderedDict]
        """Check availability of all subsets per scene and processing level.
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
254

255
256
257
258
259
260
        NOTE: The processing level of those scenes, where not all subsystems are available in the same processing level
              is reset.

        :param datasets:    List of one OrderedDict per subsystem as generated by CFG.data_list
        """
        datasets = [self._add_local_availability_single_dataset(ds) for ds in datasets]
261

262
263
264
        ######################################################################################################
        # validate that all subsystems of the same sceneid are at the same processing level; otherwise reset #
        ######################################################################################################
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
265
266
        datasets_validated = []
        datasets_grouped = HLP_F.group_dicts_by_key(datasets, key='scene_ID')
267

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
268
269
        for ds_group in datasets_grouped:
            proc_lvls = [ds['proc_level'] for ds in ds_group]
270

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
271
272
            if not len(list(set(proc_lvls))) == 1:
                # reset processing level of those scenes where not all subsystems are available
273
                self.logger.info('%s: Found already processed subsystems at different processing levels %s. '
274
                                 'Dataset has to be reprocessed to avoid errors.'
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
275
                                 % (ds_group[0]['entity_ID'], proc_lvls))
276

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
277
278
                for ds in ds_group:
                    ds['proc_level'] = None
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
279
                    datasets_validated.append(ds)
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
280
281
            else:
                datasets_validated.extend(ds_group)
282

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
283
        return datasets_validated
284

285
286
    @staticmethod
    def _is_inMEM(GMS_objects, dataset):
Daniel Scheffler's avatar
Daniel Scheffler committed
287
        # type: (list, OrderedDict) -> bool
288
289
290
291
        """Checks whether a dataset within a dataset list has been processed in the previous processing level.
        :param GMS_objects: <list> a list of GMS objects that has been recently processed
        :param dataset:     <collections.OrderedDict> as generated by L0A_P.get_data_list_of_current_jobID()
        """
292
        # check if the scene ID of the given dataset is in the scene IDs of the previously processed datasets
293
294
295
        return dataset['scene_ID'] in [obj.scene_ID for obj in GMS_objects]

    def _get_processor_data_list(self, procLvl, prevLvl_objects=None):
296
        """Returns a list of datasets that have to be read from disk and then processed by a specific processor.
297
298
299
300
301

        :param procLvl:
        :param prevLvl_objects:
        :return:
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
302
303
        def is_procL_lower(dataset):
            return HLP_F.is_proc_level_lower(dataset['proc_level'], target_lvl=procLvl)
304
305

        if prevLvl_objects is None:
Daniel Scheffler's avatar
Daniel Scheffler committed
306
            return [dataset for dataset in self.config.data_list if is_procL_lower(dataset)]  # TODO generator?
307
        else:
Daniel Scheffler's avatar
Daniel Scheffler committed
308
            return [dataset for dataset in self.config.data_list if is_procL_lower(dataset) and
309
                    not self._is_inMEM(prevLvl_objects + self.failed_objects, dataset)]
310
311
312
313
314
315
316
317
318
319
320
321
322

    def get_DB_objects(self, procLvl, prevLvl_objects=None, parallLev=None, blocksize=None):
        """
        Returns a list of GMS objects for datasets available on disk that have to be processed by the current processor.

        :param procLvl:         <str> processing level oof the current processor
        :param prevLvl_objects: <list> of in-mem GMS objects produced by the previous processor
        :param parallLev:       <str> parallelization level ('scenes' or 'tiles')
                                -> defines if full cubes or blocks are to be returned
        :param blocksize:       <tuple> block size in case blocks are to be returned, e.g. (2000,2000)
        :return:
        """
        # TODO get prevLvl_objects automatically from self
323
        if procLvl == 'L1A':
324
325
326
            return []
        else:
            # handle input parameters
Daniel Scheffler's avatar
Daniel Scheffler committed
327
328
            parallLev = parallLev or self.config.parallelization_level
            blocksize = blocksize or self.config.tiling_block_size_XY
329
            prevLvl = proc_chain[proc_chain.index(procLvl) - 1]  # TODO replace by enum
330
331

            # get GMSfile list
332
            dataset_dicts = self._get_processor_data_list(procLvl, prevLvl_objects)
333
334
335
336
337
            GMSfile_list_prevLvl_inDB = INP_R.get_list_GMSfiles(dataset_dicts, prevLvl)

            # create GMS objects from disk with respect to parallelization level and block size
            if parallLev == 'scenes':
                # get input parameters for creating GMS objects as full cubes
338
                work = [[GMS, ['cube', None]] for GMS in GMSfile_list_prevLvl_inDB]
339
340
            else:
                # define tile positions and size
341
                def get_tilepos_list(GMSfile):
342
343
                    return get_array_tilebounds(array_shape=INP_R.GMSfile2dict(GMSfile)['shape_fullArr'],
                                                tile_shape=blocksize)
344
345
346

                # get input parameters for creating GMS objects as blocks
                work = [[GMSfile, ['block', tp]] for GMSfile in GMSfile_list_prevLvl_inDB
347
                        for tp in get_tilepos_list(GMSfile)]
348

349
350
351
352
            # create GMS objects for the found files on disk
            # NOTE: DON'T multiprocess that with MAP(GMS_object(*initargs).from_disk, work)
            # in case of multiple subsystems GMS_object(*initargs) would always point to the same object in memory
            # -> subsystem attribute will be overwritten each time
353
            DB_objs = [HLP_F.parentObjDict[prevLvl].from_disk(tuple_GMS_subset=w) for w in work]
354

355
356
357
358
359
            if DB_objs:
                DB_objs = list(chain.from_iterable(DB_objs)) if list in [type(i) for i in DB_objs] else list(DB_objs)

            return DB_objs

Daniel Scheffler's avatar
Daniel Scheffler committed
360
    def run_all_processors_OLD(self, custom_data_list=None):
361
362
363
        """
        Run all processors at once.
        """
364
365
366
367
        # enable clean shutdown possibility
        # NOTE: a signal.SIGKILL (kill -9 ...) forces to kill the process and cannot be catched or handled
        signal.signal(signal.SIGINT, self.stop)  # catches a KeyboardInterrupt
        signal.signal(signal.SIGTERM, self.stop)  # catches a 'kill' or 'pkill'
368

369
        # noinspection PyBroadException
370
        try:
371
            if self.config.profiling:
372
373
374
375
                from pyinstrument import Profiler
                self.profiler = Profiler()  # or Profiler(use_signal=False), see below
                self.profiler.start()

376
            self.logger.info('Execution of entire GeoMultiSens pre-processing chain started for job ID %s...'
377
                             % self.config.ID)
378
            self.DB_job_record.reset_job_progress()  # updates attributes of DB_job_record and related DB entry
379
            self.config.status = 'running'
380
            self.update_DB_job_record()  # TODO implement that into job.status.setter
381
382
383
384
385

            self.failed_objects = []

            # get list of datasets to be processed
            if custom_data_list:
386
                self.config.data_list = custom_data_list
387
388

            # add local availability
Daniel Scheffler's avatar
Daniel Scheffler committed
389
            self.config.data_list = self.add_local_availability(self.config.data_list)
390
            self.update_DB_job_statistics(self.config.data_list)
391
392
393
394
395
396
397
398
399
400
401
402

            self.L1A_processing()
            self.L1B_processing()
            self.L1C_processing()
            self.L2A_processing()
            self.L2B_processing()
            self.L2C_processing()

            # create summary
            self.create_job_summary()

            self.logger.info('Execution finished.')
Daniel Scheffler's avatar
Daniel Scheffler committed
403
            self.logger.info('The job logfile, options file and the summary files have been saved here: \n'
404
                             '%s.*' % os.path.splitext(self.logger.path_logfile)[0])
405
            # TODO implement failed_with_warnings:
406
407
408
409
            self.config.status = 'finished' if not self.failed_objects else 'finished_with_errors'
            self.config.end_time = datetime.datetime.now()
            self.config.computation_time = self.config.end_time - self.config.start_time
            self.logger.info('Time for execution: %s' % self.config.computation_time)
410

411
        except Exception:  # noqa E722  # bare except
412
            self.config.status = 'failed'
Daniel Scheffler's avatar
Daniel Scheffler committed
413
414
415
416
417
418
419

            if not self.config.disable_exception_handler:
                self.logger.error('Execution failed with an error:', exc_info=True)
            else:
                self.logger.error('Execution failed with an error:')
                raise

420
421
422
423
424
425
426
427
428
429
        finally:
            # update database entry of current job
            self.update_DB_job_record()

            if self.config.profiling:
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

            self.shutdown()

Daniel Scheffler's avatar
Daniel Scheffler committed
430
    def run_all_processors(self, custom_data_list=None):
431
432
433
434
        # enable clean shutdown possibility
        # NOTE: a signal.SIGKILL (kill -9 ...) forces to kill the process and cannot be catched or handled
        signal.signal(signal.SIGINT, self.stop)  # catches a KeyboardInterrupt
        signal.signal(signal.SIGTERM, self.stop)  # catches a 'kill' or 'pkill'
Daniel Scheffler's avatar
Daniel Scheffler committed
435
436
437
438
439
440
441
442
443
444

        # noinspection PyBroadException
        try:
            if self.config.profiling:
                from pyinstrument import Profiler
                self.profiler = Profiler()  # or Profiler(use_signal=False), see below
                self.profiler.start()

            self.logger.info('Execution of entire GeoMultiSens pre-processing chain started for job ID %s...'
                             % self.config.ID)
445

Daniel Scheffler's avatar
Daniel Scheffler committed
446
447
            self.DB_job_record.reset_job_progress()  # updates attributes of DB_job_record and related DB entry
            self.config.status = 'running'
448
            GMS_object.proc_status_all_GMSobjs.clear()  # reset
Daniel Scheffler's avatar
Daniel Scheffler committed
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
            self.update_DB_job_record()  # TODO implement that into config.status.setter

            self.failed_objects = []

            # get list of datasets to be processed
            if custom_data_list:
                self.config.data_list = custom_data_list

            # add local availability
            self.config.data_list = self.add_local_availability(self.config.data_list)
            self.update_DB_job_statistics(self.config.data_list)

            # group dataset dicts by sceneid
            dataset_groups = HLP_F.group_dicts_by_key(self.config.data_list, key='scene_ID')

464
465
466
            # close logger to release FileHandler of job log (workers will log into job logfile)
            del self.logger

467
            # RUN PREPROCESSING
Daniel Scheffler's avatar
Daniel Scheffler committed
468
            from .pipeline import run_complete_preprocessing
469
            GMS_objs = imap_unordered(run_complete_preprocessing, dataset_groups, flatten_output=True)
Daniel Scheffler's avatar
Daniel Scheffler committed
470
471

            # separate results into successful and failed objects
472
473
474
475
476
477
478
479
480
            def assign_attr(tgt_procL):
                return [obj for obj in GMS_objs if isinstance(obj, GMS_object) and obj.proc_level == tgt_procL]

            self.L1A_newObjects = assign_attr('L1A')
            self.L1B_newObjects = assign_attr('L1B')
            self.L1C_newObjects = assign_attr('L1C')
            self.L2A_newObjects = assign_attr('L2A')
            self.L2B_newObjects = assign_attr('L2B')
            self.L2C_newObjects = assign_attr('L2C')
Daniel Scheffler's avatar
Daniel Scheffler committed
481
482
483
484
485
486
            self.failed_objects = [obj for obj in GMS_objs if isinstance(obj, failed_GMS_object)]

            # create summary
            self.create_job_summary()

            self.logger.info('Execution finished.')
487
488
            self.logger.info('The job logfile and the summary files have been saved here: \n'
                             '%s.*' % os.path.splitext(self.logger.path_logfile)[0])
489
            # TODO implement failed_with_warnings
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
490
            self.config.status = 'finished' if not self.failed_objects else 'finished_with_warnings'
Daniel Scheffler's avatar
Daniel Scheffler committed
491
492
493
494
495
496
            self.config.end_time = datetime.datetime.now()
            self.config.computation_time = self.config.end_time - self.config.start_time
            self.logger.info('Time for execution: %s' % self.config.computation_time)

        except Exception:  # noqa E722  # bare except
            self.config.status = 'failed'
497

498
            if not self.config.disable_exception_handler:
499
                self.logger.error('Execution failed with an error:', exc_info=True)
500
            else:
501
                self.logger.error('Execution failed with an error:')
502
                raise
503

504
505
506
507
508
509
510
511
512
513
        finally:
            # update database entry of current job
            self.update_DB_job_record()

            if self.config.profiling:
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

            self.shutdown()

514
515
    def stop(self, signum, frame):
        """Interrupt the running process controller gracefully."""
516

517
518
        self.logger.info('Process controller stopped via %s.'
                         % ('KeyboardInterrupt' if signum == 2 else 'SIGTERM command'))
519
        self.config.status = 'canceled'
520
521
        self.update_DB_job_record()

522
523
        self.shutdown()

524
525
        if signum == 2:
            raise KeyboardInterrupt('Received a KeyboardInterrupt.')  # terminate execution and show traceback
526
        elif signum == 15:
Daniel Scheffler's avatar
Daniel Scheffler committed
527
528
            sys.exit(0)
            # raise SystemExit()
529
530
531

    def shutdown(self):
        """Shutdown the process controller instance (loggers, remove temporary directories, ...)."""
Daniel Scheffler's avatar
Daniel Scheffler committed
532
        self.logger.info('Shutting down gracefully...')
533

534
        # release unclosed locks
535
        release_unclosed_locks()
536

537
        # clear any temporary files
538
        tempdir = os.path.join(self.config.path_tempdir)
539
        self.logger.info('Deleting temporary directory %s.' % tempdir)
540
        if os.path.exists(tempdir):
541
            shutil.rmtree(tempdir, ignore_errors=True)
542

543
544
545
        del self.logger
        shutdown_loggers()

546
547
548
549
    def benchmark(self):
        """
        Run a benchmark.
        """
550
        data_list_bench = self.config.data_list
551
552
553
554
        for count_datasets in range(len(data_list_bench)):
            t_processing_all_runs, t_IO_all_runs = [], []
            for count_run in range(10):
                current_data_list = data_list_bench[0:count_datasets + 1]
555
556
                if os.path.exists(self.config.path_database):
                    os.remove(self.config.path_database)
557
558
559
560
561
562
563
564
565
566
                t_start = time.time()
                self.run_all_processors(current_data_list)
                t_processing_all_runs.append(time.time() - t_start)
                t_IO_all_runs.append(globals()['time_IO'])

            assert current_data_list, 'Empty data list.'
            OUT_W.write_global_benchmark_output(t_processing_all_runs, t_IO_all_runs, current_data_list)

    def L1A_processing(self):
        """
567
        Run Level 1A processing: Data import and metadata homogenization
568
        """
569
        if self.config.exec_L1AP[0]:
570
            self.logger.info('\n\n##### Level 1A Processing started - raster format and metadata homogenization ####\n')
571

572
573
            datalist_L1A_P = self._get_processor_data_list('L1A')

574
            if self.config.parallelization_level == 'scenes':
575
                # map
576
                L1A_resObjects = MAP(L1A_map, datalist_L1A_P, CPUs=12)
577
            else:  # tiles
578
579
                all_L1A_tiles_map1 = MAP(L1A_map_1, datalist_L1A_P,
                                         flatten_output=True)  # map_1 # merge results to new list of splits
580

581
582
583
                L1A_obj_tiles = MAP(L1A_map_2, all_L1A_tiles_map1)  # map_2
                grouped_L1A_Tiles = HLP_F.group_objects_by_attributes(
                    L1A_obj_tiles, 'scene_ID', 'subsystem')  # group results
584

585
                L1A_objects = MAP(L1A_P.L1A_object.from_tiles, grouped_L1A_Tiles)  # reduce
586

587
                L1A_resObjects = MAP(L1A_map_3, L1A_objects)  # map_3
588

589
            self.L1A_newObjects = [obj for obj in L1A_resObjects if isinstance(obj, L1A_P.L1A_object)]
590
            self.failed_objects += [obj for obj in L1A_resObjects if isinstance(obj, failed_GMS_object) and
591
592
593
594
595
596
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L1A_newObjects

    def L1B_processing(self):
        """
597
        Run Level 1B processing: calculation of geometric shifts
598
599
600
601
        """
        # TODO implement check for running spatial index mediator server
        # run on full cubes

602
        if self.config.exec_L1BP[0]:
603
            self.logger.info('\n\n####### Level 1B Processing started - detection of geometric displacements #######\n')
604

605
606
            L1A_DBObjects = self.get_DB_objects('L1B', self.L1A_newObjects, parallLev='scenes')
            L1A_Instances = self.L1A_newObjects + L1A_DBObjects  # combine newly and earlier processed L1A data
607

608
            L1B_resObjects = MAP(L1B_map, L1A_Instances)
609

610
611
612
            self.L1B_newObjects = [obj for obj in L1B_resObjects if isinstance(obj, L1B_P.L1B_object)]
            self.failed_objects += [obj for obj in L1B_resObjects if isinstance(obj, failed_GMS_object) and
                                    obj.scene_ID not in self.sceneids_failed]
613
614
615
616
617

        return self.L1B_newObjects

    def L1C_processing(self):
        """
618
        Run Level 1C processing: atmospheric correction
619
        """
620
        if self.config.exec_L1CP[0]:
621
            self.logger.info('\n\n############## Level 1C Processing started - atmospheric correction ##############\n')
622

623
            if self.config.parallelization_level == 'scenes':
624
625
626
627
                L1B_DBObjects = self.get_DB_objects('L1C', self.L1B_newObjects)
                L1B_Instances = self.L1B_newObjects + L1B_DBObjects  # combine newly and earlier processed L1B data

                # group by scene ID (all subsystems belonging to the same scene ID must be processed together)
628
                grouped_L1B_Instances = HLP_F.group_objects_by_attributes(L1B_Instances, 'scene_ID')
629

630
631
                L1C_resObjects = MAP(L1C_map, grouped_L1B_Instances, flatten_output=True,
                                     CPUs=15)  # FIXME CPUs set to 15 for testing
632

633
            else:  # tiles
634
635
                raise NotImplementedError("Tiled processing is not yet completely implemented for L1C processor. Use "
                                          "parallelization level 'scenes' instead!")
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
                # blocksize = (5000, 5000)
                # """if newly processed L1A objects are present: cut them into tiles"""
                # L1B_newTiles = []
                # if self.L1B_newObjects:
                #     tuples_obj_blocksize = [(obj, blocksize) for obj in self.L1B_newObjects]
                #     L1B_newTiles = MAP(HLP_F.cut_GMS_obj_into_blocks, tuples_obj_blocksize, flatten_output=True)
                #
                # """combine newly and earlier processed L1B data"""
                # L1B_newDBTiles = self.get_DB_objects('L1C', self.L1B_newObjects, blocksize=blocksize)
                # L1B_tiles = L1B_newTiles + L1B_newDBTiles
                #
                # # TODO merge subsets of S2/Aster in order to provide all bands for atm.correction
                # L1C_tiles = MAP(L1C_map, L1B_tiles)
                # grouped_L1C_Tiles = \
                #     HLP_F.group_objects_by_attributes(L1C_tiles, 'scene_ID', 'subsystem')  # group results
                # [L1C_tiles_group[0].delete_tempFiles() for L1C_tiles_group in grouped_L1C_Tiles]
                # L1C_resObjects = MAP(L1C_P.L1C_object().from_tiles, grouped_L1C_Tiles)  # reduce

            self.L1C_newObjects = [obj for obj in L1C_resObjects if isinstance(obj, L1C_P.L1C_object)]
655
            self.failed_objects += [obj for obj in L1C_resObjects if isinstance(obj, failed_GMS_object) and
656
657
658
659
660
661
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L1C_newObjects

    def L2A_processing(self):
        """
662
        Run Level 2A processing: geometric homogenization
663
        """
664
        if self.config.exec_L2AP[0]:
665
666
            self.logger.info(
                '\n\n#### Level 2A Processing started - shift correction / geometric homogenization ####\n')
667

668
            """combine newly and earlier processed L1C data"""
669
670
            L1C_DBObjects = self.get_DB_objects('L2A', self.L1C_newObjects, parallLev='scenes')
            L1C_Instances = self.L1C_newObjects + L1C_DBObjects  # combine newly and earlier processed L1C data
671
672
673
674

            # group by scene ID (all subsystems belonging to the same scene ID must be processed together)
            grouped_L1C_Instances = HLP_F.group_objects_by_attributes(L1C_Instances, 'scene_ID')

675
            L2A_resTiles = MAP(L2A_map, grouped_L1C_Instances, flatten_output=True)
676

677
            self.L2A_tiles = [obj for obj in L2A_resTiles if isinstance(obj, L2A_P.L2A_object)]
678
            self.failed_objects += [obj for obj in L2A_resTiles if isinstance(obj, failed_GMS_object) and
679
680
681
682
683
684
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2A_tiles

    def L2B_processing(self):
        """
685
        Run Level 2B processing: spectral homogenization
686
        """
687
        if self.config.exec_L2BP[0]:
688
            self.logger.info('\n\n############# Level 2B Processing started - spectral homogenization ##############\n')
689

690
            if self.config.parallelization_level == 'scenes':
691
                # don't know if scenes makes sense in L2B processing because full objects are very big!
692
                """if newly processed L2A objects are present: merge them to scenes"""
693
694
                grouped_L2A_Tiles = HLP_F.group_objects_by_attributes(self.L2A_tiles, 'scene_ID')  # group results
                # reduce # will be too slow because it has to pickle back really large L2A_newObjects
695
                # L2A_newObjects  = MAP(HLP_F.merge_GMS_tiles_to_GMS_obj, grouped_L2A_Tiles)
696
                L2A_newObjects = [L2A_P.L2A_object.from_tiles(tileList) for tileList in grouped_L2A_Tiles]
697

698
                """combine newly and earlier processed L2A data"""
699
700
                L2A_DBObjects = self.get_DB_objects('L2B', self.L2A_tiles)
                L2A_Instances = L2A_newObjects + L2A_DBObjects  # combine newly and earlier processed L2A data
701

702
                L2B_resObjects = MAP(L2B_map, L2A_Instances)
703
704

            else:  # tiles
705
                L2A_newTiles = self.L2A_tiles  # tiles have the block size specified in L2A_map_2
706
707

                """combine newly and earlier processed L2A data"""
708
709
710
                blocksize = (2048, 2048)  # must be equal to the blocksize of L2A_newTiles specified in L2A_map_2
                L2A_newDBTiles = self.get_DB_objects('L2B', self.L2A_tiles, blocksize=blocksize)
                L2A_tiles = L2A_newTiles + L2A_newDBTiles
711

712
                L2B_tiles = MAP(L2B_map, L2A_tiles)
713

714
715
                # group results # FIXME nötig an dieser Stelle?
                grouped_L2B_Tiles = HLP_F.group_objects_by_attributes(L2B_tiles, 'scene_ID')
716
717
                [L2B_tiles_group[0].delete_tempFiles() for L2B_tiles_group in grouped_L2B_Tiles]

718
                L2B_resObjects = [L2B_P.L2B_object.from_tiles(tileList) for tileList in grouped_L2B_Tiles]
719

720
            self.L2B_newObjects = [obj for obj in L2B_resObjects if isinstance(obj, L2B_P.L2B_object)]
721
            self.failed_objects += [obj for obj in L2B_resObjects if isinstance(obj, failed_GMS_object) and
722
723
724
725
726
727
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2B_newObjects

    def L2C_processing(self):
        """
728
        Run Level 2C processing: accurracy assessment and MGRS tiling
729
        """
730
        # FIXME only parallelization_level == 'scenes' implemented
731
        if self.config.exec_L2CP[0]:
732
            self.logger.info('\n\n########## Level 2C Processing started - calculation of quality layers ###########\n')
733

734
            """combine newly and earlier processed L2A data"""
735
736
            L2B_DBObjects = self.get_DB_objects('L2C', self.L2B_newObjects, parallLev='scenes')
            L2B_Instances = self.L2B_newObjects + L2B_DBObjects  # combine newly and earlier processed L2A data
737

738
            L2C_resObjects = MAP(L2C_map, L2B_Instances, CPUs=8)  # FIXME 8 workers due to heavy IO
739
            # FIXME in case of inmem_serialization mode results are too big to be back-pickled
740
            self.L2C_newObjects = [obj for obj in L2C_resObjects if isinstance(obj, L2C_P.L2C_object)]
741
            self.failed_objects += [obj for obj in L2C_resObjects if isinstance(obj, failed_GMS_object) and
742
743
744
745
746
747
748
749
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2C_newObjects

    def update_DB_job_record(self):
        """
        Update the database records of the current job (table 'jobs').
        """
750
        # TODO move this method to config.Job
751
752
        # update 'failed_sceneids' column of job record within jobs table
        sceneids_failed = list(set([obj.scene_ID for obj in self.failed_objects]))
753
        DB_T.update_records_in_postgreSQLdb(
754
            self.config.conn_database, 'jobs',
755
            {'failed_sceneids': sceneids_failed,  # update 'failed_sceneids' column
756
757
             'finishtime': self.config.end_time,  # add job finish timestamp
             'status': self.config.status},  # update 'job_status' column
758
            {'id': self.config.ID}, timeout=30000)
759

760
761
762
763
764
    def update_DB_job_statistics(self, usecase_datalist):
        """
        Update job statistics of the running job in the database.
        """
        # TODO move this method to config.Job
765
        already_updated_IDs = []
766
        for ds in usecase_datalist:
767
768
            if ds['proc_level'] is not None and ds['scene_ID'] not in already_updated_IDs:
                # update statistics column of jobs table
769
                DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
770
                    self.config.conn_database, 'jobs', 'statistics', cond_dict={'id': self.config.ID},
771
                    idx_val2decrement=db_jobs_statistics_def['pending'],
772
773
                    idx_val2increment=db_jobs_statistics_def[ds['proc_level']],
                    timeout=30000)
774

775
776
777
                # avoid double updating in case of subsystems belonging to the same scene ID
                already_updated_IDs.append(ds['scene_ID'])

778
779
780
781
    def create_job_summary(self):
        """
        Create job success summary
        """
782
783
784

        # get objects with highest requested processing level
        highest_procL_Objs = []
785
        for pL in reversed(proc_chain):
786
            if getattr(self.config, 'exec_%sP' % pL)[0]:
787
788
                highest_procL_Objs = \
                    getattr(self, '%s_newObjects' % pL) if pL != 'L2A' else (self.L2A_tiles or self.L2A_newObjects)
789
790
                break

791
792
793
794
        gms_objects2summarize = highest_procL_Objs + self.failed_objects
        if gms_objects2summarize:
            # create summaries
            detailed_JS, quick_JS = get_job_summary(gms_objects2summarize)
795
796
797
            detailed_JS.to_excel(os.path.join(self.config.path_job_logs, '%s_summary.xlsx' % self.config.ID))
            detailed_JS.to_csv(os.path.join(self.config.path_job_logs, '%s_summary.csv' % self.config.ID), sep='\t')
            self.logger.info('\nQUICK JOB SUMMARY (ID %s):\n' % self.config.ID + quick_JS.to_string())
798
799

            self.summary_detailed = detailed_JS
800
            self.summary_quick = quick_JS
801
802
803
804
805

        else:
            # TODO implement check if proc level with lowest procL has to be processed at all (due to job.exec_L1X)
            # TODO otherwise it is possible that get_job_summary receives an empty list
            self.logger.warning("Job summary skipped because get_job_summary() received an empty list of GMS objects.")
806
807
808
809
810
811

    def clear_lists_procObj(self):
        self.failed_objects = []
        self.L1A_newObjects = []
        self.L1B_newObjects = []
        self.L1C_newObjects = []
812
        self.L2A_tiles = []
813
        self.L2B_newObjects = []
814
815
816
817
818
        self.L2C_newObjects = []


def get_job_summary(list_GMS_objects):
    # get detailed job summary
819
820
    DJS_cols = ['GMS_object', 'scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem', 'image_type', 'proc_level',
                'arr_shape', 'arr_pos', 'failedMapper', 'ExceptionType', 'ExceptionValue', 'ExceptionTraceback']
821
822
823
824
    DJS = DataFrame(columns=DJS_cols)
    DJS['GMS_object'] = list_GMS_objects

    for col in DJS_cols[1:]:
825
826
        def get_val(obj): return getattr(obj, col) if hasattr(obj, col) else None
        DJS[col] = list(DJS['GMS_object'].map(get_val))
827
828

    del DJS['GMS_object']
829
    DJS = DJS.sort_values(by=['satellite', 'sensor', 'entity_ID'])
830
831

    # get quick job summary
832
833
834
835
    QJS = DataFrame(columns=['satellite', 'sensor', 'count', 'proc_successfully', 'proc_failed'])
    all_sat, all_sen = zip(*[i.split('__') for i in (np.unique(DJS['satellite'] + '__' + DJS['sensor']))])
    QJS['satellite'] = all_sat
    QJS['sensor'] = all_sen
836
    # count objects with the same satellite/sensor/sceneid combination
837
838
    QJS['count'] = [len(DJS[(DJS['satellite'] == sat) & (DJS['sensor'] == sen)]['scene_ID'].unique())
                    for sat, sen in zip(all_sat, all_sen)]
839
    QJS['proc_successfully'] = [len(DJS[(DJS['satellite'] == sat) &
840
841
                                        (DJS['sensor'] == sen) &
                                        (DJS['failedMapper'].isnull())]['scene_ID'].unique())
842
                                for sat, sen in zip(all_sat, all_sen)]
843
    QJS['proc_failed'] = QJS['count'] - QJS['proc_successfully']
844
845
    QJS = QJS.sort_values(by=['satellite', 'sensor'])
    return DJS, QJS