process_controller.py 39.4 KB
Newer Older
1
2
# -*- coding: utf-8 -*-

3
4
# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
#
5
# Copyright (C) 2020  Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
6
7
8
9
10
11
#
# This software was developed within the context of the GeoMultiSens project funded
# by the German Federal Ministry of Education and Research
# (project grant code: 01 IS 14 010 A-C).
#
# This program is free software: you can redistribute it and/or modify it under
12
13
14
15
16
17
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version. Please note the following exception: `spechomo` depends on tqdm, which
# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
18
19
20
21
22
23
24
25
26
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.

27
from __future__ import (division, print_function, unicode_literals, absolute_import)
28
29
30

import numpy as np
from pandas import DataFrame
31
32
33
34
import datetime
import os
import time
from itertools import chain
35
import signal
36
import re
37
from typing import TYPE_CHECKING
38
import shutil
Daniel Scheffler's avatar
Daniel Scheffler committed
39
import sys
40

41
42
from ..io import output_writer as OUT_W
from ..io import input_reader as INP_R
43
44
45
46
47
from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc.path_generator import path_generator
from ..misc.logging import GMS_logger, shutdown_loggers
from ..algorithms import L1A_P, L1B_P, L1C_P, L2A_P, L2B_P, L2C_P
48
from ..model.metadata import get_LayerBandsAssignment
49
from ..model.gms_object import failed_GMS_object, GMS_object, GMS_identifier
50
from .pipeline import (L1A_map, L1A_map_1, L1A_map_2, L1A_map_3, L1B_map, L1C_map,
51
                       L2A_map, L2B_map, L2C_map)
52
from ..options.config import set_config
53
from .multiproc import MAP, imap_unordered
54
from ..misc.definition_dicts import proc_chain, db_jobs_statistics_def
55
from ..misc.locks import release_unclosed_locks
56
from ..version import __version__, __versionalias__
57

58
59
from py_tools_ds.numeric.array import get_array_tilebounds

60
if TYPE_CHECKING:
Daniel Scheffler's avatar
Daniel Scheffler committed
61
    from collections import OrderedDict  # noqa F401  # flake8 issue
62
63
    from typing import List  # noqa F401  # flake8 issue
    from ..options.config import GMS_config  # noqa F401  # flake8 issue
64
65
66
67


__author__ = 'Daniel Scheffler'

68

69
class ProcessController(object):
70
    def __init__(self, job_ID, **config_kwargs):
71
        """gms_preprocessing process controller
72

73
74
        :param job_ID:          job ID belonging to a valid database record within table 'jobs'
        :param config_kwargs:   keyword arguments to be passed to gms_preprocessing.set_config()
75
76
77
        """

        # assertions
78
79
        if not isinstance(job_ID, int):
            raise ValueError("'job_ID' must be an integer value. Got %s." % type(job_ID))
80

81
82
        # set GMS configuration
        config_kwargs.update(dict(reset_status=True))
83
        self.config = set_config(job_ID, **config_kwargs)  # type: GMS_config
84
85

        # defaults
86
        self._logger = None
87
        self._DB_job_record = None
88
        self.profiler = None
89
90
91
92
93

        self.failed_objects = []
        self.L1A_newObjects = []
        self.L1B_newObjects = []
        self.L1C_newObjects = []
94
        self.L2A_newObjects = []
95
        self.L2A_tiles = []
96
97
98
99
        self.L2B_newObjects = []
        self.L2C_newObjects = []

        self.summary_detailed = None
100
        self.summary_quick = None
101

102
103
        # check if process_controller is executed by debugger
        # isdebugging = 1 if True in [frame[1].endswith("pydevd.py") for frame in inspect.stack()] else False
104
        # if isdebugging:  # override the existing settings in order to get write access everywhere
105
106
        #    pass

107
        # called_from_iPyNb = 1 if 'ipykernel/__main__.py' in sys.argv[0] else 0
108

109
        # create job log
110
111
112
        self._path_job_logfile = os.path.join(self.config.path_job_logs, '%s.log' % self.config.ID)
        if os.path.exists(self._path_job_logfile):
            HLP_F.silentremove(self._path_job_logfile)
113

114
        self.logger.info("Executing gms_preprocessing, version: %s (%s)" % (__version__, __versionalias__))
115
        self.logger.info('Process Controller initialized for job ID %s (comment: %s).'
116
                         % (self.config.ID, self.DB_job_record.comment))
117
        self.logger.info('Job logfile: %s' % self._path_job_logfile)
118
119
120
121

        # save config
        self._path_job_optionsfile = os.path.join(self.config.path_job_logs, '%s_options.json' % self.config.ID)
        self.config.save(self._path_job_optionsfile)
122
        self.logger.info('Job options file: %s' % self._path_job_optionsfile)
123

124
        if self.config.delete_old_output:
125
126
            self.logger.info('Deleting previously processed data...')
            self.DB_job_record.delete_procdata_of_entire_job(force=True)
127

128
129
130
131
132
    @property
    def logger(self):
        if self._logger and self._logger.handlers[:]:
            return self._logger
        else:
133
            self._logger = GMS_logger('ProcessController__%s' % self.config.ID, fmt_suffix='ProcessController',
134
                                      path_logfile=self._path_job_logfile, log_level=self.config.log_level, append=True)
135
136
137
138
139
140
141
142
143
            return self._logger

    @logger.setter
    def logger(self, logger):
        self._logger = logger

    @logger.deleter
    def logger(self):
        if self._logger not in [None, 'not set']:
144
145
            self._logger.close()
            self._logger = None
146
147

    @property
148
149
150
151
    def DB_job_record(self):
        if self._DB_job_record:
            return self._DB_job_record
        else:
152
153
            self._DB_job_record = DB_T.GMS_JOB(self.config.conn_database)
            self._DB_job_record.from_job_ID(self.config.ID)
154
            return self._DB_job_record
155

156
157
158
    @DB_job_record.setter
    def DB_job_record(self, value):
        self._DB_job_record = value
159

160
161
162
    @property
    def sceneids_failed(self):
        return [obj.scene_ID for obj in self.failed_objects]
163

164
165
    def _add_local_availability_single_dataset(self, dataset):
        # type: (OrderedDict) -> OrderedDict
166
        # TODO revise this function
167
168
        # query the database and get the last written processing level and LayerBandsAssignment
        DB_match = DB_T.get_info_from_postgreSQLdb(
169
            self.config.conn_database, 'scenes_proc', ['proc_level', 'layer_bands_assignment'],
170
            dict(sceneid=dataset['scene_ID']))
Daniel Scheffler's avatar
Daniel Scheffler committed
171

172
        # get the corresponding logfile
173
174
        path_logfile = path_generator(dataset).get_path_logfile(merged_subsystems=False)
        path_logfile_merged_ss = path_generator(dataset).get_path_logfile(merged_subsystems=True)
175
176
177
178

        def get_AllWrittenProcL_dueLog(path_log):  # TODO replace this by database query + os.path.exists
            """Returns all processing level that have been successfully written according to logfile."""

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
179
180
181
182
            if not os.path.exists(path_log):
                if path_log == path_logfile:  # path_logfile_merged_ss has already been searched
                    self.logger.info("No logfile named '%s' found for %s at %s. Dataset has to be reprocessed."
                                     % (os.path.basename(path_log), dataset['entity_ID'], os.path.dirname(path_log)))
183
184
185
                AllWrittenProcL_dueLog = []
            else:
                logfile = open(path_log, 'r').read()
186
                AllWrittenProcL_dueLog = re.findall(r":*(\S*\s*) data successfully saved.", logfile, re.I)
187
                if not AllWrittenProcL_dueLog and path_logfile == path_logfile_merged_ss:  # AllWrittenProcL_dueLog = []
188
189
190
191
192
193
194
195
196
197
                    self.logger.info('%s: According to logfile no completely processed data exist at any '
                                     'processing level. Dataset has to be reprocessed.' % dataset['entity_ID'])
                else:
                    AllWrittenProcL_dueLog = HLP_F.sorted_nicely(list(set(AllWrittenProcL_dueLog)))
            return AllWrittenProcL_dueLog

        # check if there are not multiple database records for this dataset
        if len(DB_match) == 1 or DB_match == [] or DB_match == 'database connection fault':

            # get all processing level that have been successfully written
198
199
            # NOTE: first check for merged subsystem datasets because they have hiver processing levels
            AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile_merged_ss)
200
            if not AllWrittenProcL:
201
                AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile)
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
202
203
204
            else:
                # A L2A+ dataset with merged subsystems has been found. Use that logfile.
                path_logfile = path_logfile_merged_ss
205

206
207
208
209
210
211
212
213
214
215
216
217
            dataset['proc_level'] = None  # default (dataset has to be reprocessed)

            # loop through all the found proc. levels and find the one that fulfills all requirements
            for ProcL in reversed(AllWrittenProcL):
                if dataset['proc_level']:
                    break  # proc_level found; no further searching for lower proc_levels
                assumed_path_GMS_file = '%s_%s.gms' % (os.path.splitext(path_logfile)[0], ProcL)

                # check if there is also a corresponding GMS_file on disk
                if os.path.isfile(assumed_path_GMS_file):
                    GMS_file_dict = INP_R.GMSfile2dict(assumed_path_GMS_file)
                    target_LayerBandsAssignment = \
218
                        get_LayerBandsAssignment(GMS_identifier(
219
                            image_type=dataset['image_type'],
220
221
222
                            satellite=dataset['satellite'],
                            sensor=dataset['sensor'],
                            subsystem=dataset['subsystem'] if path_logfile != path_logfile_merged_ss else '',
223
                            proc_level=ProcL,  # must be respected because LBA changes after atm. Corr.
224
                            dataset_ID=dataset['dataset_ID']), nBands=(1 if dataset['sensormode'] == 'P' else None))
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251

                    # check if the LayerBandsAssignment of the written dataset on disk equals the
                    # desired LayerBandsAssignment
                    if target_LayerBandsAssignment == GMS_file_dict['LayerBandsAssignment']:

                        # update the database record if the dataset could not be found in database
                        if DB_match == [] or DB_match == 'database connection fault':
                            self.logger.info('The dataset %s is not included in the database of processed data but'
                                             ' according to logfile %s has been written successfully. Recreating '
                                             'missing database entry.' % (dataset['entity_ID'], ProcL))
                            DB_T.data_DB_updater(GMS_file_dict)

                            dataset['proc_level'] = ProcL

                        # if the dataset could be found in database
                        elif len(DB_match) == 1:
                            try:
                                self.logger.info('Found a matching %s dataset for %s. Processing skipped until %s.'
                                                 % (ProcL, dataset['entity_ID'],
                                                    proc_chain[proc_chain.index(ProcL) + 1]))
                            except IndexError:
                                self.logger.info('Found a matching %s dataset for %s. Processing already done.'
                                                 % (ProcL, dataset['entity_ID']))

                            if DB_match[0][0] == ProcL:
                                dataset['proc_level'] = DB_match[0][0]
                            else:
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
252
                                dataset['proc_level'] = ProcL
253

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
254
                    else:
Daniel Scheffler's avatar
Daniel Scheffler committed
255
                        self.logger.info('Found a matching %s dataset for %s but with a different '
Daniel Scheffler's avatar
Daniel Scheffler committed
256
                                         'LayerBandsAssignment (desired: %s; found %s). Dataset has to be reprocessed.'
257
258
                                         % (ProcL, dataset['entity_ID'],
                                            target_LayerBandsAssignment, GMS_file_dict['LayerBandsAssignment']))
259
260
261
262
263
                else:
                    self.logger.info('%s for dataset %s has been written due to logfile but no corresponding '
                                     'dataset has been found.' % (ProcL, dataset['entity_ID']) +
                                     ' Searching for lower processing level...'
                                     if AllWrittenProcL.index(ProcL) != 0 else '')
264

265
266
267
268
        elif len(DB_match) > 1:
            self.logger.info('According to database there are multiple matches for the dataset %s. Dataset has to '
                             'be reprocessed.' % dataset['entity_ID'])
            dataset['proc_level'] = None
269

270
271
        else:
            dataset['proc_level'] = None
272

273
        return dataset
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
274

275
276
277
    def add_local_availability(self, datasets):
        # type: (List[OrderedDict]) -> List[OrderedDict]
        """Check availability of all subsets per scene and processing level.
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
278

279
280
281
282
283
284
        NOTE: The processing level of those scenes, where not all subsystems are available in the same processing level
              is reset.

        :param datasets:    List of one OrderedDict per subsystem as generated by CFG.data_list
        """
        datasets = [self._add_local_availability_single_dataset(ds) for ds in datasets]
285

286
287
288
        ######################################################################################################
        # validate that all subsystems of the same sceneid are at the same processing level; otherwise reset #
        ######################################################################################################
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
289
290
        datasets_validated = []
        datasets_grouped = HLP_F.group_dicts_by_key(datasets, key='scene_ID')
291

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
292
293
        for ds_group in datasets_grouped:
            proc_lvls = [ds['proc_level'] for ds in ds_group]
294

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
295
296
            if not len(list(set(proc_lvls))) == 1:
                # reset processing level of those scenes where not all subsystems are available
297
                self.logger.info('%s: Found already processed subsystems at different processing levels %s. '
298
                                 'Dataset has to be reprocessed to avoid errors.'
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
299
                                 % (ds_group[0]['entity_ID'], proc_lvls))
300

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
301
302
                for ds in ds_group:
                    ds['proc_level'] = None
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
303
                    datasets_validated.append(ds)
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
304
305
            else:
                datasets_validated.extend(ds_group)
306

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
307
        return datasets_validated
308

309
310
    @staticmethod
    def _is_inMEM(GMS_objects, dataset):
Daniel Scheffler's avatar
Daniel Scheffler committed
311
        # type: (list, OrderedDict) -> bool
312
313
314
315
        """Checks whether a dataset within a dataset list has been processed in the previous processing level.
        :param GMS_objects: <list> a list of GMS objects that has been recently processed
        :param dataset:     <collections.OrderedDict> as generated by L0A_P.get_data_list_of_current_jobID()
        """
316
        # check if the scene ID of the given dataset is in the scene IDs of the previously processed datasets
317
318
319
        return dataset['scene_ID'] in [obj.scene_ID for obj in GMS_objects]

    def _get_processor_data_list(self, procLvl, prevLvl_objects=None):
320
        """Returns a list of datasets that have to be read from disk and then processed by a specific processor.
321
322
323
324
325

        :param procLvl:
        :param prevLvl_objects:
        :return:
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
326
327
        def is_procL_lower(dataset):
            return HLP_F.is_proc_level_lower(dataset['proc_level'], target_lvl=procLvl)
328
329

        if prevLvl_objects is None:
Daniel Scheffler's avatar
Daniel Scheffler committed
330
            return [dataset for dataset in self.config.data_list if is_procL_lower(dataset)]  # TODO generator?
331
        else:
Daniel Scheffler's avatar
Daniel Scheffler committed
332
            return [dataset for dataset in self.config.data_list if is_procL_lower(dataset) and
333
                    not self._is_inMEM(prevLvl_objects + self.failed_objects, dataset)]
334
335
336
337
338
339
340
341
342
343
344
345
346

    def get_DB_objects(self, procLvl, prevLvl_objects=None, parallLev=None, blocksize=None):
        """
        Returns a list of GMS objects for datasets available on disk that have to be processed by the current processor.

        :param procLvl:         <str> processing level oof the current processor
        :param prevLvl_objects: <list> of in-mem GMS objects produced by the previous processor
        :param parallLev:       <str> parallelization level ('scenes' or 'tiles')
                                -> defines if full cubes or blocks are to be returned
        :param blocksize:       <tuple> block size in case blocks are to be returned, e.g. (2000,2000)
        :return:
        """
        # TODO get prevLvl_objects automatically from self
347
        if procLvl == 'L1A':
348
349
350
            return []
        else:
            # handle input parameters
Daniel Scheffler's avatar
Daniel Scheffler committed
351
352
            parallLev = parallLev or self.config.parallelization_level
            blocksize = blocksize or self.config.tiling_block_size_XY
353
            prevLvl = proc_chain[proc_chain.index(procLvl) - 1]  # TODO replace by enum
354
355

            # get GMSfile list
356
            dataset_dicts = self._get_processor_data_list(procLvl, prevLvl_objects)
357
358
359
360
361
            GMSfile_list_prevLvl_inDB = INP_R.get_list_GMSfiles(dataset_dicts, prevLvl)

            # create GMS objects from disk with respect to parallelization level and block size
            if parallLev == 'scenes':
                # get input parameters for creating GMS objects as full cubes
362
                work = [[GMS, ['cube', None]] for GMS in GMSfile_list_prevLvl_inDB]
363
364
            else:
                # define tile positions and size
365
                def get_tilepos_list(GMSfile):
366
367
                    return get_array_tilebounds(array_shape=INP_R.GMSfile2dict(GMSfile)['shape_fullArr'],
                                                tile_shape=blocksize)
368
369
370

                # get input parameters for creating GMS objects as blocks
                work = [[GMSfile, ['block', tp]] for GMSfile in GMSfile_list_prevLvl_inDB
371
                        for tp in get_tilepos_list(GMSfile)]
372

373
374
375
376
            # create GMS objects for the found files on disk
            # NOTE: DON'T multiprocess that with MAP(GMS_object(*initargs).from_disk, work)
            # in case of multiple subsystems GMS_object(*initargs) would always point to the same object in memory
            # -> subsystem attribute will be overwritten each time
377
            DB_objs = [HLP_F.parentObjDict[prevLvl].from_disk(tuple_GMS_subset=w) for w in work]
378

379
380
381
382
383
            if DB_objs:
                DB_objs = list(chain.from_iterable(DB_objs)) if list in [type(i) for i in DB_objs] else list(DB_objs)

            return DB_objs

384
    def run_all_processors(self, custom_data_list=None, serialize_after_each_mapper=False):
385
386
387
        """
        Run all processors at once.
        """
388
389
390
391
        # enable clean shutdown possibility
        # NOTE: a signal.SIGKILL (kill -9 ...) forces to kill the process and cannot be catched or handled
        signal.signal(signal.SIGINT, self.stop)  # catches a KeyboardInterrupt
        signal.signal(signal.SIGTERM, self.stop)  # catches a 'kill' or 'pkill'
392

Daniel Scheffler's avatar
Daniel Scheffler committed
393
394
395
396
397
398
399
400
401
        # noinspection PyBroadException
        try:
            if self.config.profiling:
                from pyinstrument import Profiler
                self.profiler = Profiler()  # or Profiler(use_signal=False), see below
                self.profiler.start()

            self.logger.info('Execution of entire GeoMultiSens pre-processing chain started for job ID %s...'
                             % self.config.ID)
402

Daniel Scheffler's avatar
Daniel Scheffler committed
403
404
            self.DB_job_record.reset_job_progress()  # updates attributes of DB_job_record and related DB entry
            self.config.status = 'running'
405
            GMS_object.proc_status_all_GMSobjs.clear()  # reset
Daniel Scheffler's avatar
Daniel Scheffler committed
406
407
408
409
410
411
412
413
414
415
416
417
            self.update_DB_job_record()  # TODO implement that into config.status.setter

            self.failed_objects = []

            # get list of datasets to be processed
            if custom_data_list:
                self.config.data_list = custom_data_list

            # add local availability
            self.config.data_list = self.add_local_availability(self.config.data_list)
            self.update_DB_job_statistics(self.config.data_list)

418
419
420
421
422
423
            if not serialize_after_each_mapper:
                # group dataset dicts by sceneid
                dataset_groups = HLP_F.group_dicts_by_key(self.config.data_list, key='scene_ID')

                # close logger to release FileHandler of job log (workers will log into job logfile)
                del self.logger
Daniel Scheffler's avatar
Daniel Scheffler committed
424

425
426
427
                # RUN PREPROCESSING
                from .pipeline import run_complete_preprocessing
                GMS_objs = imap_unordered(run_complete_preprocessing, dataset_groups, flatten_output=True)
428

429
430
431
                # separate results into successful and failed objects
                def assign_attr(tgt_procL):
                    return [obj for obj in GMS_objs if isinstance(obj, GMS_object) and obj.proc_level == tgt_procL]
Daniel Scheffler's avatar
Daniel Scheffler committed
432

433
434
435
436
437
438
439
                self.L1A_newObjects = assign_attr('L1A')
                self.L1B_newObjects = assign_attr('L1B')
                self.L1C_newObjects = assign_attr('L1C')
                self.L2A_newObjects = assign_attr('L2A')
                self.L2B_newObjects = assign_attr('L2B')
                self.L2C_newObjects = assign_attr('L2C')
                self.failed_objects = [obj for obj in GMS_objs if isinstance(obj, failed_GMS_object)]
440

441
442
443
444
445
446
447
            else:
                self.L1A_processing()
                self.L1B_processing()
                self.L1C_processing()
                self.L2A_processing()
                self.L2B_processing()
                self.L2C_processing()
Daniel Scheffler's avatar
Daniel Scheffler committed
448
449
450
451
452

            # create summary
            self.create_job_summary()

            self.logger.info('Execution finished.')
453
            self.logger.info('The job logfile, options file and the summary files have been saved here: \n'
454
                             '%s.*' % os.path.splitext(self.logger.path_logfile)[0])
455
456
            # TODO implement failed_with_warnings:
            self.config.status = 'finished' if not self.failed_objects else 'finished_with_errors'
Daniel Scheffler's avatar
Daniel Scheffler committed
457
458
459
460
461
462
            self.config.end_time = datetime.datetime.now()
            self.config.computation_time = self.config.end_time - self.config.start_time
            self.logger.info('Time for execution: %s' % self.config.computation_time)

        except Exception:  # noqa E722  # bare except
            self.config.status = 'failed'
463

464
            if not self.config.disable_exception_handler:
465
                self.logger.error('Execution failed with an error:', exc_info=True)
466
            else:
467
                self.logger.error('Execution failed with an error:')
468
                raise
469

470
471
472
473
474
475
476
477
478
479
        finally:
            # update database entry of current job
            self.update_DB_job_record()

            if self.config.profiling:
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

            self.shutdown()

480
481
    def stop(self, signum, frame):
        """Interrupt the running process controller gracefully."""
482

483
484
        self.logger.info('Process controller stopped via %s.'
                         % ('KeyboardInterrupt' if signum == 2 else 'SIGTERM command'))
485
        self.config.status = 'canceled'
486
487
        self.update_DB_job_record()

488
489
        self.shutdown()

490
491
        if signum == 2:
            raise KeyboardInterrupt('Received a KeyboardInterrupt.')  # terminate execution and show traceback
492
        elif signum == 15:
Daniel Scheffler's avatar
Daniel Scheffler committed
493
494
            sys.exit(0)
            # raise SystemExit()
495
496
497

    def shutdown(self):
        """Shutdown the process controller instance (loggers, remove temporary directories, ...)."""
Daniel Scheffler's avatar
Daniel Scheffler committed
498
        self.logger.info('Shutting down gracefully...')
499

500
        # release unclosed locks
501
        release_unclosed_locks()
502

503
        # clear any temporary files
504
        tempdir = os.path.join(self.config.path_tempdir)
505
        self.logger.info('Deleting temporary directory %s.' % tempdir)
506
        if os.path.exists(tempdir):
507
            shutil.rmtree(tempdir, ignore_errors=True)
508

509
510
511
        del self.logger
        shutdown_loggers()

512
513
514
515
    def benchmark(self):
        """
        Run a benchmark.
        """
516
        data_list_bench = self.config.data_list
517
518
519
520
        for count_datasets in range(len(data_list_bench)):
            t_processing_all_runs, t_IO_all_runs = [], []
            for count_run in range(10):
                current_data_list = data_list_bench[0:count_datasets + 1]
521
522
                if os.path.exists(self.config.path_database):
                    os.remove(self.config.path_database)
523
524
525
526
527
528
529
530
531
532
                t_start = time.time()
                self.run_all_processors(current_data_list)
                t_processing_all_runs.append(time.time() - t_start)
                t_IO_all_runs.append(globals()['time_IO'])

            assert current_data_list, 'Empty data list.'
            OUT_W.write_global_benchmark_output(t_processing_all_runs, t_IO_all_runs, current_data_list)

    def L1A_processing(self):
        """
533
        Run Level 1A processing: Data import and metadata homogenization
534
        """
535
        if self.config.exec_L1AP[0]:
536
            self.logger.info('\n\n##### Level 1A Processing started - raster format and metadata homogenization ####\n')
537

538
539
            datalist_L1A_P = self._get_processor_data_list('L1A')

540
            if self.config.parallelization_level == 'scenes':
541
                # map
542
                L1A_resObjects = MAP(L1A_map, datalist_L1A_P, CPUs=12)
543
            else:  # tiles
544
545
                all_L1A_tiles_map1 = MAP(L1A_map_1, datalist_L1A_P,
                                         flatten_output=True)  # map_1 # merge results to new list of splits
546

547
548
549
                L1A_obj_tiles = MAP(L1A_map_2, all_L1A_tiles_map1)  # map_2
                grouped_L1A_Tiles = HLP_F.group_objects_by_attributes(
                    L1A_obj_tiles, 'scene_ID', 'subsystem')  # group results
550

551
                L1A_objects = MAP(L1A_P.L1A_object.from_tiles, grouped_L1A_Tiles)  # reduce
552

553
                L1A_resObjects = MAP(L1A_map_3, L1A_objects)  # map_3
554

555
            self.L1A_newObjects = [obj for obj in L1A_resObjects if isinstance(obj, L1A_P.L1A_object)]
556
            self.failed_objects += [obj for obj in L1A_resObjects if isinstance(obj, failed_GMS_object) and
557
558
559
560
561
562
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L1A_newObjects

    def L1B_processing(self):
        """
563
        Run Level 1B processing: calculation of geometric shifts
564
565
566
567
        """
        # TODO implement check for running spatial index mediator server
        # run on full cubes

568
        if self.config.exec_L1BP[0]:
569
            self.logger.info('\n\n####### Level 1B Processing started - detection of geometric displacements #######\n')
570

571
572
            L1A_DBObjects = self.get_DB_objects('L1B', self.L1A_newObjects, parallLev='scenes')
            L1A_Instances = self.L1A_newObjects + L1A_DBObjects  # combine newly and earlier processed L1A data
573

574
            L1B_resObjects = MAP(L1B_map, L1A_Instances)
575

576
577
578
            self.L1B_newObjects = [obj for obj in L1B_resObjects if isinstance(obj, L1B_P.L1B_object)]
            self.failed_objects += [obj for obj in L1B_resObjects if isinstance(obj, failed_GMS_object) and
                                    obj.scene_ID not in self.sceneids_failed]
579
580
581
582
583

        return self.L1B_newObjects

    def L1C_processing(self):
        """
584
        Run Level 1C processing: atmospheric correction
585
        """
586
        if self.config.exec_L1CP[0]:
587
            self.logger.info('\n\n############## Level 1C Processing started - atmospheric correction ##############\n')
588

589
            if self.config.parallelization_level == 'scenes':
590
591
592
593
                L1B_DBObjects = self.get_DB_objects('L1C', self.L1B_newObjects)
                L1B_Instances = self.L1B_newObjects + L1B_DBObjects  # combine newly and earlier processed L1B data

                # group by scene ID (all subsystems belonging to the same scene ID must be processed together)
594
                grouped_L1B_Instances = HLP_F.group_objects_by_attributes(L1B_Instances, 'scene_ID')
595

596
597
                L1C_resObjects = MAP(L1C_map, grouped_L1B_Instances, flatten_output=True,
                                     CPUs=15)  # FIXME CPUs set to 15 for testing
598

599
            else:  # tiles
600
601
                raise NotImplementedError("Tiled processing is not yet completely implemented for L1C processor. Use "
                                          "parallelization level 'scenes' instead!")
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
                # blocksize = (5000, 5000)
                # """if newly processed L1A objects are present: cut them into tiles"""
                # L1B_newTiles = []
                # if self.L1B_newObjects:
                #     tuples_obj_blocksize = [(obj, blocksize) for obj in self.L1B_newObjects]
                #     L1B_newTiles = MAP(HLP_F.cut_GMS_obj_into_blocks, tuples_obj_blocksize, flatten_output=True)
                #
                # """combine newly and earlier processed L1B data"""
                # L1B_newDBTiles = self.get_DB_objects('L1C', self.L1B_newObjects, blocksize=blocksize)
                # L1B_tiles = L1B_newTiles + L1B_newDBTiles
                #
                # # TODO merge subsets of S2/Aster in order to provide all bands for atm.correction
                # L1C_tiles = MAP(L1C_map, L1B_tiles)
                # grouped_L1C_Tiles = \
                #     HLP_F.group_objects_by_attributes(L1C_tiles, 'scene_ID', 'subsystem')  # group results
                # [L1C_tiles_group[0].delete_tempFiles() for L1C_tiles_group in grouped_L1C_Tiles]
                # L1C_resObjects = MAP(L1C_P.L1C_object().from_tiles, grouped_L1C_Tiles)  # reduce

            self.L1C_newObjects = [obj for obj in L1C_resObjects if isinstance(obj, L1C_P.L1C_object)]
621
            self.failed_objects += [obj for obj in L1C_resObjects if isinstance(obj, failed_GMS_object) and
622
623
624
625
626
627
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L1C_newObjects

    def L2A_processing(self):
        """
628
        Run Level 2A processing: geometric homogenization
629
        """
630
        if self.config.exec_L2AP[0]:
631
632
            self.logger.info(
                '\n\n#### Level 2A Processing started - shift correction / geometric homogenization ####\n')
633

634
            """combine newly and earlier processed L1C data"""
635
636
            L1C_DBObjects = self.get_DB_objects('L2A', self.L1C_newObjects, parallLev='scenes')
            L1C_Instances = self.L1C_newObjects + L1C_DBObjects  # combine newly and earlier processed L1C data
637
638
639
640

            # group by scene ID (all subsystems belonging to the same scene ID must be processed together)
            grouped_L1C_Instances = HLP_F.group_objects_by_attributes(L1C_Instances, 'scene_ID')

641
            L2A_resTiles = MAP(L2A_map, grouped_L1C_Instances, flatten_output=True)
642

643
            self.L2A_tiles = [obj for obj in L2A_resTiles if isinstance(obj, L2A_P.L2A_object)]
644
            self.failed_objects += [obj for obj in L2A_resTiles if isinstance(obj, failed_GMS_object) and
645
646
647
648
649
650
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2A_tiles

    def L2B_processing(self):
        """
651
        Run Level 2B processing: spectral homogenization
652
        """
653
        if self.config.exec_L2BP[0]:
654
            self.logger.info('\n\n############# Level 2B Processing started - spectral homogenization ##############\n')
655

656
            if self.config.parallelization_level == 'scenes':
657
                # don't know if scenes makes sense in L2B processing because full objects are very big!
658
                """if newly processed L2A objects are present: merge them to scenes"""
659
660
                grouped_L2A_Tiles = HLP_F.group_objects_by_attributes(self.L2A_tiles, 'scene_ID')  # group results
                # reduce # will be too slow because it has to pickle back really large L2A_newObjects
661
                # L2A_newObjects  = MAP(HLP_F.merge_GMS_tiles_to_GMS_obj, grouped_L2A_Tiles)
662
                L2A_newObjects = [L2A_P.L2A_object.from_tiles(tileList) for tileList in grouped_L2A_Tiles]
663

664
                """combine newly and earlier processed L2A data"""
665
666
                L2A_DBObjects = self.get_DB_objects('L2B', self.L2A_tiles)
                L2A_Instances = L2A_newObjects + L2A_DBObjects  # combine newly and earlier processed L2A data
667

668
                L2B_resObjects = MAP(L2B_map, L2A_Instances)
669
670

            else:  # tiles
671
                L2A_newTiles = self.L2A_tiles  # tiles have the block size specified in L2A_map_2
672
673

                """combine newly and earlier processed L2A data"""
674
675
676
                blocksize = (2048, 2048)  # must be equal to the blocksize of L2A_newTiles specified in L2A_map_2
                L2A_newDBTiles = self.get_DB_objects('L2B', self.L2A_tiles, blocksize=blocksize)
                L2A_tiles = L2A_newTiles + L2A_newDBTiles
677

678
                L2B_tiles = MAP(L2B_map, L2A_tiles)
679

680
681
                # group results # FIXME nötig an dieser Stelle?
                grouped_L2B_Tiles = HLP_F.group_objects_by_attributes(L2B_tiles, 'scene_ID')
682
683
                [L2B_tiles_group[0].delete_tempFiles() for L2B_tiles_group in grouped_L2B_Tiles]

684
                L2B_resObjects = [L2B_P.L2B_object.from_tiles(tileList) for tileList in grouped_L2B_Tiles]
685

686
            self.L2B_newObjects = [obj for obj in L2B_resObjects if isinstance(obj, L2B_P.L2B_object)]
687
            self.failed_objects += [obj for obj in L2B_resObjects if isinstance(obj, failed_GMS_object) and
688
689
690
691
692
693
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2B_newObjects

    def L2C_processing(self):
        """
694
        Run Level 2C processing: accurracy assessment and MGRS tiling
695
        """
696
        # FIXME only parallelization_level == 'scenes' implemented
697
        if self.config.exec_L2CP[0]:
698
            self.logger.info('\n\n########## Level 2C Processing started - calculation of quality layers ###########\n')
699

700
            """combine newly and earlier processed L2A data"""
701
702
            L2B_DBObjects = self.get_DB_objects('L2C', self.L2B_newObjects, parallLev='scenes')
            L2B_Instances = self.L2B_newObjects + L2B_DBObjects  # combine newly and earlier processed L2A data
703

704
            L2C_resObjects = MAP(L2C_map, L2B_Instances, CPUs=8)  # FIXME 8 workers due to heavy IO
705
            # FIXME in case of inmem_serialization mode results are too big to be back-pickled
706
            self.L2C_newObjects = [obj for obj in L2C_resObjects if isinstance(obj, L2C_P.L2C_object)]
707
            self.failed_objects += [obj for obj in L2C_resObjects if isinstance(obj, failed_GMS_object) and
708
709
710
711
712
713
714
715
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2C_newObjects

    def update_DB_job_record(self):
        """
        Update the database records of the current job (table 'jobs').
        """
716
        # TODO move this method to config.Job
717
718
        # update 'failed_sceneids' column of job record within jobs table
        sceneids_failed = list(set([obj.scene_ID for obj in self.failed_objects]))
719
        DB_T.update_records_in_postgreSQLdb(
720
            self.config.conn_database, 'jobs',
721
            {'failed_sceneids': sceneids_failed,  # update 'failed_sceneids' column
722
723
             'finishtime': self.config.end_time,  # add job finish timestamp
             'status': self.config.status},  # update 'job_status' column
724
            {'id': self.config.ID}, timeout=30000)
725

726
727
728
729
730
    def update_DB_job_statistics(self, usecase_datalist):
        """
        Update job statistics of the running job in the database.
        """
        # TODO move this method to config.Job
731
        already_updated_IDs = []
732
        for ds in usecase_datalist:
733
734
            if ds['proc_level'] is not None and ds['scene_ID'] not in already_updated_IDs:
                # update statistics column of jobs table
735
                DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
736
                    self.config.conn_database, 'jobs', 'statistics', cond_dict={'id': self.config.ID},
737
                    idx_val2decrement=db_jobs_statistics_def['pending'],
738
739
                    idx_val2increment=db_jobs_statistics_def[ds['proc_level']],
                    timeout=30000)
740

741
742
743
                # avoid double updating in case of subsystems belonging to the same scene ID
                already_updated_IDs.append(ds['scene_ID'])

744
745
746
747
    def create_job_summary(self):
        """
        Create job success summary
        """
748
749
750

        # get objects with highest requested processing level
        highest_procL_Objs = []
751
        for pL in reversed(proc_chain):
752
            if getattr(self.config, 'exec_%sP' % pL)[0]:
753
754
                highest_procL_Objs = \
                    getattr(self, '%s_newObjects' % pL) if pL != 'L2A' else (self.L2A_tiles or self.L2A_newObjects)
755
756
                break

757
758
759
760
        gms_objects2summarize = highest_procL_Objs + self.failed_objects
        if gms_objects2summarize:
            # create summaries
            detailed_JS, quick_JS = get_job_summary(gms_objects2summarize)
761
762
763
            detailed_JS.to_excel(os.path.join(self.config.path_job_logs, '%s_summary.xlsx' % self.config.ID))
            detailed_JS.to_csv(os.path.join(self.config.path_job_logs, '%s_summary.csv' % self.config.ID), sep='\t')
            self.logger.info('\nQUICK JOB SUMMARY (ID %s):\n' % self.config.ID + quick_JS.to_string())
764
765

            self.summary_detailed = detailed_JS
766
            self.summary_quick = quick_JS
767
768
769
770
771

        else:
            # TODO implement check if proc level with lowest procL has to be processed at all (due to job.exec_L1X)
            # TODO otherwise it is possible that get_job_summary receives an empty list
            self.logger.warning("Job summary skipped because get_job_summary() received an empty list of GMS objects.")
772
773
774
775
776
777

    def clear_lists_procObj(self):
        self.failed_objects = []
        self.L1A_newObjects = []
        self.L1B_newObjects = []
        self.L1C_newObjects = []
778
        self.L2A_tiles = []
779
        self.L2B_newObjects = []
780
781
782
783
784
        self.L2C_newObjects = []


def get_job_summary(list_GMS_objects):
    # get detailed job summary
785
786
    DJS_cols = ['GMS_object', 'scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem', 'image_type', 'proc_level',
                'arr_shape', 'arr_pos', 'failedMapper', 'ExceptionType', 'ExceptionValue', 'ExceptionTraceback']
787
788
789
790
    DJS = DataFrame(columns=DJS_cols)
    DJS['GMS_object'] = list_GMS_objects

    for col in DJS_cols[1:]:
791
792
        def get_val(obj): return getattr(obj, col) if hasattr(obj, col) else None
        DJS[col] = list(DJS['GMS_object'].map(get_val))
793
794

    del DJS['GMS_object']
795
    DJS = DJS.sort_values(by=['satellite', 'sensor', 'entity_ID'])
796
797

    # get quick job summary
798
799
800
801
    QJS = DataFrame(columns=['satellite', 'sensor', 'count', 'proc_successfully', 'proc_failed'])
    all_sat, all_sen = zip(*[i.split('__') for i in (np.unique(DJS['satellite'] + '__' + DJS['sensor']))])
    QJS['satellite'] = all_sat
    QJS['sensor'] = all_sen
802
    # count objects with the same satellite/sensor/sceneid combination
803
804
    QJS['count'] = [len(DJS[(DJS['satellite'] == sat) & (DJS['sensor'] == sen)]['scene_ID'].unique())
                    for sat, sen in zip(all_sat, all_sen)]
805
    QJS['proc_successfully'] = [len(DJS[(DJS['satellite'] == sat) &
806
807
                                        (DJS['sensor'] == sen) &
                                        (DJS['failedMapper'].isnull())]['scene_ID'].unique())
808
                                for sat, sen in zip(all_sat, all_sen)]
809
    QJS['proc_failed'] = QJS['count'] - QJS['proc_successfully']
810
811
    QJS = QJS.sort_values(by=['satellite', 'sensor'])
    return DJS, QJS