process_controller.py 39.1 KB
Newer Older
1
2
# -*- coding: utf-8 -*-

3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
#
# Copyright (C) 2019  Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
#
# This software was developed within the context of the GeoMultiSens project funded
# by the German Federal Ministry of Education and Research
# (project grant code: 01 IS 14 010 A-C).
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.

24
from __future__ import (division, print_function, unicode_literals, absolute_import)
25
26
27

import numpy as np
from pandas import DataFrame
28
29
30
31
import datetime
import os
import time
from itertools import chain
32
import signal
33
import re
34
from typing import TYPE_CHECKING
35
import shutil
Daniel Scheffler's avatar
Daniel Scheffler committed
36
import sys
37

38
39
from ..io import output_writer as OUT_W
from ..io import input_reader as INP_R
40
41
42
43
44
from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc.path_generator import path_generator
from ..misc.logging import GMS_logger, shutdown_loggers
from ..algorithms import L1A_P, L1B_P, L1C_P, L2A_P, L2B_P, L2C_P
45
from ..model.metadata import get_LayerBandsAssignment
46
from ..model.gms_object import failed_GMS_object, GMS_object, GMS_identifier
47
from .pipeline import (L1A_map, L1A_map_1, L1A_map_2, L1A_map_3, L1B_map, L1C_map,
48
                       L2A_map, L2B_map, L2C_map)
49
from ..options.config import set_config
50
from .multiproc import MAP, imap_unordered
51
from ..misc.definition_dicts import proc_chain, db_jobs_statistics_def
52
from ..misc.locks import release_unclosed_locks
53
from ..version import __version__, __versionalias__
54

55
56
from py_tools_ds.numeric.array import get_array_tilebounds

57
if TYPE_CHECKING:
Daniel Scheffler's avatar
Daniel Scheffler committed
58
    from collections import OrderedDict  # noqa F401  # flake8 issue
59
60
    from typing import List  # noqa F401  # flake8 issue
    from ..options.config import GMS_config  # noqa F401  # flake8 issue
61
62
63
64


__author__ = 'Daniel Scheffler'

65

66
class ProcessController(object):
67
    def __init__(self, job_ID, **config_kwargs):
68
        """gms_preprocessing process controller
69

70
71
        :param job_ID:          job ID belonging to a valid database record within table 'jobs'
        :param config_kwargs:   keyword arguments to be passed to gms_preprocessing.set_config()
72
73
74
        """

        # assertions
75
76
        if not isinstance(job_ID, int):
            raise ValueError("'job_ID' must be an integer value. Got %s." % type(job_ID))
77

78
79
        # set GMS configuration
        config_kwargs.update(dict(reset_status=True))
80
        self.config = set_config(job_ID, **config_kwargs)  # type: GMS_config
81
82

        # defaults
83
        self._logger = None
84
        self._DB_job_record = None
85
        self.profiler = None
86
87
88
89
90

        self.failed_objects = []
        self.L1A_newObjects = []
        self.L1B_newObjects = []
        self.L1C_newObjects = []
91
        self.L2A_newObjects = []
92
        self.L2A_tiles = []
93
94
95
96
        self.L2B_newObjects = []
        self.L2C_newObjects = []

        self.summary_detailed = None
97
        self.summary_quick = None
98

99
100
        # check if process_controller is executed by debugger
        # isdebugging = 1 if True in [frame[1].endswith("pydevd.py") for frame in inspect.stack()] else False
101
        # if isdebugging:  # override the existing settings in order to get write access everywhere
102
103
        #    pass

104
        # called_from_iPyNb = 1 if 'ipykernel/__main__.py' in sys.argv[0] else 0
105

106
        # create job log
107
108
109
        self._path_job_logfile = os.path.join(self.config.path_job_logs, '%s.log' % self.config.ID)
        if os.path.exists(self._path_job_logfile):
            HLP_F.silentremove(self._path_job_logfile)
110

111
        self.logger.info("Executing gms_preprocessing, version: %s (%s)" % (__version__, __versionalias__))
112
        self.logger.info('Process Controller initialized for job ID %s (comment: %s).'
113
                         % (self.config.ID, self.DB_job_record.comment))
114
        self.logger.info('Job logfile: %s' % self._path_job_logfile)
115
116
117
118

        # save config
        self._path_job_optionsfile = os.path.join(self.config.path_job_logs, '%s_options.json' % self.config.ID)
        self.config.save(self._path_job_optionsfile)
119
        self.logger.info('Job options file: %s' % self._path_job_optionsfile)
120

121
        if self.config.delete_old_output:
122
123
            self.logger.info('Deleting previously processed data...')
            self.DB_job_record.delete_procdata_of_entire_job(force=True)
124

125
126
127
128
129
    @property
    def logger(self):
        if self._logger and self._logger.handlers[:]:
            return self._logger
        else:
130
            self._logger = GMS_logger('ProcessController__%s' % self.config.ID, fmt_suffix='ProcessController',
131
                                      path_logfile=self._path_job_logfile, log_level=self.config.log_level, append=True)
132
133
134
135
136
137
138
139
140
            return self._logger

    @logger.setter
    def logger(self, logger):
        self._logger = logger

    @logger.deleter
    def logger(self):
        if self._logger not in [None, 'not set']:
141
142
            self._logger.close()
            self._logger = None
143
144

    @property
145
146
147
148
    def DB_job_record(self):
        if self._DB_job_record:
            return self._DB_job_record
        else:
149
150
            self._DB_job_record = DB_T.GMS_JOB(self.config.conn_database)
            self._DB_job_record.from_job_ID(self.config.ID)
151
            return self._DB_job_record
152

153
154
155
    @DB_job_record.setter
    def DB_job_record(self, value):
        self._DB_job_record = value
156

157
158
159
    @property
    def sceneids_failed(self):
        return [obj.scene_ID for obj in self.failed_objects]
160

161
162
    def _add_local_availability_single_dataset(self, dataset):
        # type: (OrderedDict) -> OrderedDict
163
        # TODO revise this function
164
165
        # query the database and get the last written processing level and LayerBandsAssignment
        DB_match = DB_T.get_info_from_postgreSQLdb(
166
            self.config.conn_database, 'scenes_proc', ['proc_level', 'layer_bands_assignment'],
167
            dict(sceneid=dataset['scene_ID']))
Daniel Scheffler's avatar
Daniel Scheffler committed
168

169
        # get the corresponding logfile
170
171
        path_logfile = path_generator(dataset).get_path_logfile(merged_subsystems=False)
        path_logfile_merged_ss = path_generator(dataset).get_path_logfile(merged_subsystems=True)
172
173
174
175

        def get_AllWrittenProcL_dueLog(path_log):  # TODO replace this by database query + os.path.exists
            """Returns all processing level that have been successfully written according to logfile."""

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
176
177
178
179
            if not os.path.exists(path_log):
                if path_log == path_logfile:  # path_logfile_merged_ss has already been searched
                    self.logger.info("No logfile named '%s' found for %s at %s. Dataset has to be reprocessed."
                                     % (os.path.basename(path_log), dataset['entity_ID'], os.path.dirname(path_log)))
180
181
182
                AllWrittenProcL_dueLog = []
            else:
                logfile = open(path_log, 'r').read()
183
                AllWrittenProcL_dueLog = re.findall(r":*(\S*\s*) data successfully saved.", logfile, re.I)
184
                if not AllWrittenProcL_dueLog and path_logfile == path_logfile_merged_ss:  # AllWrittenProcL_dueLog = []
185
186
187
188
189
190
191
192
193
194
                    self.logger.info('%s: According to logfile no completely processed data exist at any '
                                     'processing level. Dataset has to be reprocessed.' % dataset['entity_ID'])
                else:
                    AllWrittenProcL_dueLog = HLP_F.sorted_nicely(list(set(AllWrittenProcL_dueLog)))
            return AllWrittenProcL_dueLog

        # check if there are not multiple database records for this dataset
        if len(DB_match) == 1 or DB_match == [] or DB_match == 'database connection fault':

            # get all processing level that have been successfully written
195
196
            # NOTE: first check for merged subsystem datasets because they have hiver processing levels
            AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile_merged_ss)
197
            if not AllWrittenProcL:
198
                AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile)
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
199
200
201
            else:
                # A L2A+ dataset with merged subsystems has been found. Use that logfile.
                path_logfile = path_logfile_merged_ss
202

203
204
205
206
207
208
209
210
211
212
213
214
            dataset['proc_level'] = None  # default (dataset has to be reprocessed)

            # loop through all the found proc. levels and find the one that fulfills all requirements
            for ProcL in reversed(AllWrittenProcL):
                if dataset['proc_level']:
                    break  # proc_level found; no further searching for lower proc_levels
                assumed_path_GMS_file = '%s_%s.gms' % (os.path.splitext(path_logfile)[0], ProcL)

                # check if there is also a corresponding GMS_file on disk
                if os.path.isfile(assumed_path_GMS_file):
                    GMS_file_dict = INP_R.GMSfile2dict(assumed_path_GMS_file)
                    target_LayerBandsAssignment = \
215
                        get_LayerBandsAssignment(GMS_identifier(
216
                            image_type=dataset['image_type'],
217
218
219
                            satellite=dataset['satellite'],
                            sensor=dataset['sensor'],
                            subsystem=dataset['subsystem'] if path_logfile != path_logfile_merged_ss else '',
220
                            proc_level=ProcL,  # must be respected because LBA changes after atm. Corr.
221
                            dataset_ID=dataset['dataset_ID']), nBands=(1 if dataset['sensormode'] == 'P' else None))
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248

                    # check if the LayerBandsAssignment of the written dataset on disk equals the
                    # desired LayerBandsAssignment
                    if target_LayerBandsAssignment == GMS_file_dict['LayerBandsAssignment']:

                        # update the database record if the dataset could not be found in database
                        if DB_match == [] or DB_match == 'database connection fault':
                            self.logger.info('The dataset %s is not included in the database of processed data but'
                                             ' according to logfile %s has been written successfully. Recreating '
                                             'missing database entry.' % (dataset['entity_ID'], ProcL))
                            DB_T.data_DB_updater(GMS_file_dict)

                            dataset['proc_level'] = ProcL

                        # if the dataset could be found in database
                        elif len(DB_match) == 1:
                            try:
                                self.logger.info('Found a matching %s dataset for %s. Processing skipped until %s.'
                                                 % (ProcL, dataset['entity_ID'],
                                                    proc_chain[proc_chain.index(ProcL) + 1]))
                            except IndexError:
                                self.logger.info('Found a matching %s dataset for %s. Processing already done.'
                                                 % (ProcL, dataset['entity_ID']))

                            if DB_match[0][0] == ProcL:
                                dataset['proc_level'] = DB_match[0][0]
                            else:
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
249
                                dataset['proc_level'] = ProcL
250

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
251
                    else:
Daniel Scheffler's avatar
Daniel Scheffler committed
252
                        self.logger.info('Found a matching %s dataset for %s but with a different '
Daniel Scheffler's avatar
Daniel Scheffler committed
253
                                         'LayerBandsAssignment (desired: %s; found %s). Dataset has to be reprocessed.'
254
255
                                         % (ProcL, dataset['entity_ID'],
                                            target_LayerBandsAssignment, GMS_file_dict['LayerBandsAssignment']))
256
257
258
259
260
                else:
                    self.logger.info('%s for dataset %s has been written due to logfile but no corresponding '
                                     'dataset has been found.' % (ProcL, dataset['entity_ID']) +
                                     ' Searching for lower processing level...'
                                     if AllWrittenProcL.index(ProcL) != 0 else '')
261

262
263
264
265
        elif len(DB_match) > 1:
            self.logger.info('According to database there are multiple matches for the dataset %s. Dataset has to '
                             'be reprocessed.' % dataset['entity_ID'])
            dataset['proc_level'] = None
266

267
268
        else:
            dataset['proc_level'] = None
269

270
        return dataset
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
271

272
273
274
    def add_local_availability(self, datasets):
        # type: (List[OrderedDict]) -> List[OrderedDict]
        """Check availability of all subsets per scene and processing level.
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
275

276
277
278
279
280
281
        NOTE: The processing level of those scenes, where not all subsystems are available in the same processing level
              is reset.

        :param datasets:    List of one OrderedDict per subsystem as generated by CFG.data_list
        """
        datasets = [self._add_local_availability_single_dataset(ds) for ds in datasets]
282

283
284
285
        ######################################################################################################
        # validate that all subsystems of the same sceneid are at the same processing level; otherwise reset #
        ######################################################################################################
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
286
287
        datasets_validated = []
        datasets_grouped = HLP_F.group_dicts_by_key(datasets, key='scene_ID')
288

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
289
290
        for ds_group in datasets_grouped:
            proc_lvls = [ds['proc_level'] for ds in ds_group]
291

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
292
293
            if not len(list(set(proc_lvls))) == 1:
                # reset processing level of those scenes where not all subsystems are available
294
                self.logger.info('%s: Found already processed subsystems at different processing levels %s. '
295
                                 'Dataset has to be reprocessed to avoid errors.'
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
296
                                 % (ds_group[0]['entity_ID'], proc_lvls))
297

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
298
299
                for ds in ds_group:
                    ds['proc_level'] = None
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
300
                    datasets_validated.append(ds)
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
301
302
            else:
                datasets_validated.extend(ds_group)
303

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
304
        return datasets_validated
305

306
307
    @staticmethod
    def _is_inMEM(GMS_objects, dataset):
Daniel Scheffler's avatar
Daniel Scheffler committed
308
        # type: (list, OrderedDict) -> bool
309
310
311
312
        """Checks whether a dataset within a dataset list has been processed in the previous processing level.
        :param GMS_objects: <list> a list of GMS objects that has been recently processed
        :param dataset:     <collections.OrderedDict> as generated by L0A_P.get_data_list_of_current_jobID()
        """
313
        # check if the scene ID of the given dataset is in the scene IDs of the previously processed datasets
314
315
316
        return dataset['scene_ID'] in [obj.scene_ID for obj in GMS_objects]

    def _get_processor_data_list(self, procLvl, prevLvl_objects=None):
317
        """Returns a list of datasets that have to be read from disk and then processed by a specific processor.
318
319
320
321
322

        :param procLvl:
        :param prevLvl_objects:
        :return:
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
323
324
        def is_procL_lower(dataset):
            return HLP_F.is_proc_level_lower(dataset['proc_level'], target_lvl=procLvl)
325
326

        if prevLvl_objects is None:
Daniel Scheffler's avatar
Daniel Scheffler committed
327
            return [dataset for dataset in self.config.data_list if is_procL_lower(dataset)]  # TODO generator?
328
        else:
Daniel Scheffler's avatar
Daniel Scheffler committed
329
            return [dataset for dataset in self.config.data_list if is_procL_lower(dataset) and
330
                    not self._is_inMEM(prevLvl_objects + self.failed_objects, dataset)]
331
332
333
334
335
336
337
338
339
340
341
342
343

    def get_DB_objects(self, procLvl, prevLvl_objects=None, parallLev=None, blocksize=None):
        """
        Returns a list of GMS objects for datasets available on disk that have to be processed by the current processor.

        :param procLvl:         <str> processing level oof the current processor
        :param prevLvl_objects: <list> of in-mem GMS objects produced by the previous processor
        :param parallLev:       <str> parallelization level ('scenes' or 'tiles')
                                -> defines if full cubes or blocks are to be returned
        :param blocksize:       <tuple> block size in case blocks are to be returned, e.g. (2000,2000)
        :return:
        """
        # TODO get prevLvl_objects automatically from self
344
        if procLvl == 'L1A':
345
346
347
            return []
        else:
            # handle input parameters
Daniel Scheffler's avatar
Daniel Scheffler committed
348
349
            parallLev = parallLev or self.config.parallelization_level
            blocksize = blocksize or self.config.tiling_block_size_XY
350
            prevLvl = proc_chain[proc_chain.index(procLvl) - 1]  # TODO replace by enum
351
352

            # get GMSfile list
353
            dataset_dicts = self._get_processor_data_list(procLvl, prevLvl_objects)
354
355
356
357
358
            GMSfile_list_prevLvl_inDB = INP_R.get_list_GMSfiles(dataset_dicts, prevLvl)

            # create GMS objects from disk with respect to parallelization level and block size
            if parallLev == 'scenes':
                # get input parameters for creating GMS objects as full cubes
359
                work = [[GMS, ['cube', None]] for GMS in GMSfile_list_prevLvl_inDB]
360
361
            else:
                # define tile positions and size
362
                def get_tilepos_list(GMSfile):
363
364
                    return get_array_tilebounds(array_shape=INP_R.GMSfile2dict(GMSfile)['shape_fullArr'],
                                                tile_shape=blocksize)
365
366
367

                # get input parameters for creating GMS objects as blocks
                work = [[GMSfile, ['block', tp]] for GMSfile in GMSfile_list_prevLvl_inDB
368
                        for tp in get_tilepos_list(GMSfile)]
369

370
371
372
373
            # create GMS objects for the found files on disk
            # NOTE: DON'T multiprocess that with MAP(GMS_object(*initargs).from_disk, work)
            # in case of multiple subsystems GMS_object(*initargs) would always point to the same object in memory
            # -> subsystem attribute will be overwritten each time
374
            DB_objs = [HLP_F.parentObjDict[prevLvl].from_disk(tuple_GMS_subset=w) for w in work]
375

376
377
378
379
380
            if DB_objs:
                DB_objs = list(chain.from_iterable(DB_objs)) if list in [type(i) for i in DB_objs] else list(DB_objs)

            return DB_objs

381
    def run_all_processors(self, custom_data_list=None, serialize_after_each_mapper=False):
382
383
384
        """
        Run all processors at once.
        """
385
386
387
388
        # enable clean shutdown possibility
        # NOTE: a signal.SIGKILL (kill -9 ...) forces to kill the process and cannot be catched or handled
        signal.signal(signal.SIGINT, self.stop)  # catches a KeyboardInterrupt
        signal.signal(signal.SIGTERM, self.stop)  # catches a 'kill' or 'pkill'
389

Daniel Scheffler's avatar
Daniel Scheffler committed
390
391
392
393
394
395
396
397
398
        # noinspection PyBroadException
        try:
            if self.config.profiling:
                from pyinstrument import Profiler
                self.profiler = Profiler()  # or Profiler(use_signal=False), see below
                self.profiler.start()

            self.logger.info('Execution of entire GeoMultiSens pre-processing chain started for job ID %s...'
                             % self.config.ID)
399

Daniel Scheffler's avatar
Daniel Scheffler committed
400
401
            self.DB_job_record.reset_job_progress()  # updates attributes of DB_job_record and related DB entry
            self.config.status = 'running'
402
            GMS_object.proc_status_all_GMSobjs.clear()  # reset
Daniel Scheffler's avatar
Daniel Scheffler committed
403
404
405
406
407
408
409
410
411
412
413
414
            self.update_DB_job_record()  # TODO implement that into config.status.setter

            self.failed_objects = []

            # get list of datasets to be processed
            if custom_data_list:
                self.config.data_list = custom_data_list

            # add local availability
            self.config.data_list = self.add_local_availability(self.config.data_list)
            self.update_DB_job_statistics(self.config.data_list)

415
416
417
418
419
420
            if not serialize_after_each_mapper:
                # group dataset dicts by sceneid
                dataset_groups = HLP_F.group_dicts_by_key(self.config.data_list, key='scene_ID')

                # close logger to release FileHandler of job log (workers will log into job logfile)
                del self.logger
Daniel Scheffler's avatar
Daniel Scheffler committed
421

422
423
424
                # RUN PREPROCESSING
                from .pipeline import run_complete_preprocessing
                GMS_objs = imap_unordered(run_complete_preprocessing, dataset_groups, flatten_output=True)
425

426
427
428
                # separate results into successful and failed objects
                def assign_attr(tgt_procL):
                    return [obj for obj in GMS_objs if isinstance(obj, GMS_object) and obj.proc_level == tgt_procL]
Daniel Scheffler's avatar
Daniel Scheffler committed
429

430
431
432
433
434
435
436
                self.L1A_newObjects = assign_attr('L1A')
                self.L1B_newObjects = assign_attr('L1B')
                self.L1C_newObjects = assign_attr('L1C')
                self.L2A_newObjects = assign_attr('L2A')
                self.L2B_newObjects = assign_attr('L2B')
                self.L2C_newObjects = assign_attr('L2C')
                self.failed_objects = [obj for obj in GMS_objs if isinstance(obj, failed_GMS_object)]
437

438
439
440
441
442
443
444
            else:
                self.L1A_processing()
                self.L1B_processing()
                self.L1C_processing()
                self.L2A_processing()
                self.L2B_processing()
                self.L2C_processing()
Daniel Scheffler's avatar
Daniel Scheffler committed
445
446
447
448
449

            # create summary
            self.create_job_summary()

            self.logger.info('Execution finished.')
450
            self.logger.info('The job logfile, options file and the summary files have been saved here: \n'
451
                             '%s.*' % os.path.splitext(self.logger.path_logfile)[0])
452
453
            # TODO implement failed_with_warnings:
            self.config.status = 'finished' if not self.failed_objects else 'finished_with_errors'
Daniel Scheffler's avatar
Daniel Scheffler committed
454
455
456
457
458
459
            self.config.end_time = datetime.datetime.now()
            self.config.computation_time = self.config.end_time - self.config.start_time
            self.logger.info('Time for execution: %s' % self.config.computation_time)

        except Exception:  # noqa E722  # bare except
            self.config.status = 'failed'
460

461
            if not self.config.disable_exception_handler:
462
                self.logger.error('Execution failed with an error:', exc_info=True)
463
            else:
464
                self.logger.error('Execution failed with an error:')
465
                raise
466

467
468
469
470
471
472
473
474
475
476
        finally:
            # update database entry of current job
            self.update_DB_job_record()

            if self.config.profiling:
                self.profiler.stop()
                print(self.profiler.output_text(unicode=True, color=True))

            self.shutdown()

477
478
    def stop(self, signum, frame):
        """Interrupt the running process controller gracefully."""
479

480
481
        self.logger.info('Process controller stopped via %s.'
                         % ('KeyboardInterrupt' if signum == 2 else 'SIGTERM command'))
482
        self.config.status = 'canceled'
483
484
        self.update_DB_job_record()

485
486
        self.shutdown()

487
488
        if signum == 2:
            raise KeyboardInterrupt('Received a KeyboardInterrupt.')  # terminate execution and show traceback
489
        elif signum == 15:
Daniel Scheffler's avatar
Daniel Scheffler committed
490
491
            sys.exit(0)
            # raise SystemExit()
492
493
494

    def shutdown(self):
        """Shutdown the process controller instance (loggers, remove temporary directories, ...)."""
Daniel Scheffler's avatar
Daniel Scheffler committed
495
        self.logger.info('Shutting down gracefully...')
496

497
        # release unclosed locks
498
        release_unclosed_locks()
499

500
        # clear any temporary files
501
        tempdir = os.path.join(self.config.path_tempdir)
502
        self.logger.info('Deleting temporary directory %s.' % tempdir)
503
        if os.path.exists(tempdir):
504
            shutil.rmtree(tempdir, ignore_errors=True)
505

506
507
508
        del self.logger
        shutdown_loggers()

509
510
511
512
    def benchmark(self):
        """
        Run a benchmark.
        """
513
        data_list_bench = self.config.data_list
514
515
516
517
        for count_datasets in range(len(data_list_bench)):
            t_processing_all_runs, t_IO_all_runs = [], []
            for count_run in range(10):
                current_data_list = data_list_bench[0:count_datasets + 1]
518
519
                if os.path.exists(self.config.path_database):
                    os.remove(self.config.path_database)
520
521
522
523
524
525
526
527
528
529
                t_start = time.time()
                self.run_all_processors(current_data_list)
                t_processing_all_runs.append(time.time() - t_start)
                t_IO_all_runs.append(globals()['time_IO'])

            assert current_data_list, 'Empty data list.'
            OUT_W.write_global_benchmark_output(t_processing_all_runs, t_IO_all_runs, current_data_list)

    def L1A_processing(self):
        """
530
        Run Level 1A processing: Data import and metadata homogenization
531
        """
532
        if self.config.exec_L1AP[0]:
533
            self.logger.info('\n\n##### Level 1A Processing started - raster format and metadata homogenization ####\n')
534

535
536
            datalist_L1A_P = self._get_processor_data_list('L1A')

537
            if self.config.parallelization_level == 'scenes':
538
                # map
539
                L1A_resObjects = MAP(L1A_map, datalist_L1A_P, CPUs=12)
540
            else:  # tiles
541
542
                all_L1A_tiles_map1 = MAP(L1A_map_1, datalist_L1A_P,
                                         flatten_output=True)  # map_1 # merge results to new list of splits
543

544
545
546
                L1A_obj_tiles = MAP(L1A_map_2, all_L1A_tiles_map1)  # map_2
                grouped_L1A_Tiles = HLP_F.group_objects_by_attributes(
                    L1A_obj_tiles, 'scene_ID', 'subsystem')  # group results
547

548
                L1A_objects = MAP(L1A_P.L1A_object.from_tiles, grouped_L1A_Tiles)  # reduce
549

550
                L1A_resObjects = MAP(L1A_map_3, L1A_objects)  # map_3
551

552
            self.L1A_newObjects = [obj for obj in L1A_resObjects if isinstance(obj, L1A_P.L1A_object)]
553
            self.failed_objects += [obj for obj in L1A_resObjects if isinstance(obj, failed_GMS_object) and
554
555
556
557
558
559
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L1A_newObjects

    def L1B_processing(self):
        """
560
        Run Level 1B processing: calculation of geometric shifts
561
562
563
564
        """
        # TODO implement check for running spatial index mediator server
        # run on full cubes

565
        if self.config.exec_L1BP[0]:
566
            self.logger.info('\n\n####### Level 1B Processing started - detection of geometric displacements #######\n')
567

568
569
            L1A_DBObjects = self.get_DB_objects('L1B', self.L1A_newObjects, parallLev='scenes')
            L1A_Instances = self.L1A_newObjects + L1A_DBObjects  # combine newly and earlier processed L1A data
570

571
            L1B_resObjects = MAP(L1B_map, L1A_Instances)
572

573
574
575
            self.L1B_newObjects = [obj for obj in L1B_resObjects if isinstance(obj, L1B_P.L1B_object)]
            self.failed_objects += [obj for obj in L1B_resObjects if isinstance(obj, failed_GMS_object) and
                                    obj.scene_ID not in self.sceneids_failed]
576
577
578
579
580

        return self.L1B_newObjects

    def L1C_processing(self):
        """
581
        Run Level 1C processing: atmospheric correction
582
        """
583
        if self.config.exec_L1CP[0]:
584
            self.logger.info('\n\n############## Level 1C Processing started - atmospheric correction ##############\n')
585

586
            if self.config.parallelization_level == 'scenes':
587
588
589
590
                L1B_DBObjects = self.get_DB_objects('L1C', self.L1B_newObjects)
                L1B_Instances = self.L1B_newObjects + L1B_DBObjects  # combine newly and earlier processed L1B data

                # group by scene ID (all subsystems belonging to the same scene ID must be processed together)
591
                grouped_L1B_Instances = HLP_F.group_objects_by_attributes(L1B_Instances, 'scene_ID')
592

593
594
                L1C_resObjects = MAP(L1C_map, grouped_L1B_Instances, flatten_output=True,
                                     CPUs=15)  # FIXME CPUs set to 15 for testing
595

596
            else:  # tiles
597
598
                raise NotImplementedError("Tiled processing is not yet completely implemented for L1C processor. Use "
                                          "parallelization level 'scenes' instead!")
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
                # blocksize = (5000, 5000)
                # """if newly processed L1A objects are present: cut them into tiles"""
                # L1B_newTiles = []
                # if self.L1B_newObjects:
                #     tuples_obj_blocksize = [(obj, blocksize) for obj in self.L1B_newObjects]
                #     L1B_newTiles = MAP(HLP_F.cut_GMS_obj_into_blocks, tuples_obj_blocksize, flatten_output=True)
                #
                # """combine newly and earlier processed L1B data"""
                # L1B_newDBTiles = self.get_DB_objects('L1C', self.L1B_newObjects, blocksize=blocksize)
                # L1B_tiles = L1B_newTiles + L1B_newDBTiles
                #
                # # TODO merge subsets of S2/Aster in order to provide all bands for atm.correction
                # L1C_tiles = MAP(L1C_map, L1B_tiles)
                # grouped_L1C_Tiles = \
                #     HLP_F.group_objects_by_attributes(L1C_tiles, 'scene_ID', 'subsystem')  # group results
                # [L1C_tiles_group[0].delete_tempFiles() for L1C_tiles_group in grouped_L1C_Tiles]
                # L1C_resObjects = MAP(L1C_P.L1C_object().from_tiles, grouped_L1C_Tiles)  # reduce

            self.L1C_newObjects = [obj for obj in L1C_resObjects if isinstance(obj, L1C_P.L1C_object)]
618
            self.failed_objects += [obj for obj in L1C_resObjects if isinstance(obj, failed_GMS_object) and
619
620
621
622
623
624
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L1C_newObjects

    def L2A_processing(self):
        """
625
        Run Level 2A processing: geometric homogenization
626
        """
627
        if self.config.exec_L2AP[0]:
628
629
            self.logger.info(
                '\n\n#### Level 2A Processing started - shift correction / geometric homogenization ####\n')
630

631
            """combine newly and earlier processed L1C data"""
632
633
            L1C_DBObjects = self.get_DB_objects('L2A', self.L1C_newObjects, parallLev='scenes')
            L1C_Instances = self.L1C_newObjects + L1C_DBObjects  # combine newly and earlier processed L1C data
634
635
636
637

            # group by scene ID (all subsystems belonging to the same scene ID must be processed together)
            grouped_L1C_Instances = HLP_F.group_objects_by_attributes(L1C_Instances, 'scene_ID')

638
            L2A_resTiles = MAP(L2A_map, grouped_L1C_Instances, flatten_output=True)
639

640
            self.L2A_tiles = [obj for obj in L2A_resTiles if isinstance(obj, L2A_P.L2A_object)]
641
            self.failed_objects += [obj for obj in L2A_resTiles if isinstance(obj, failed_GMS_object) and
642
643
644
645
646
647
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2A_tiles

    def L2B_processing(self):
        """
648
        Run Level 2B processing: spectral homogenization
649
        """
650
        if self.config.exec_L2BP[0]:
651
            self.logger.info('\n\n############# Level 2B Processing started - spectral homogenization ##############\n')
652

653
            if self.config.parallelization_level == 'scenes':
654
                # don't know if scenes makes sense in L2B processing because full objects are very big!
655
                """if newly processed L2A objects are present: merge them to scenes"""
656
657
                grouped_L2A_Tiles = HLP_F.group_objects_by_attributes(self.L2A_tiles, 'scene_ID')  # group results
                # reduce # will be too slow because it has to pickle back really large L2A_newObjects
658
                # L2A_newObjects  = MAP(HLP_F.merge_GMS_tiles_to_GMS_obj, grouped_L2A_Tiles)
659
                L2A_newObjects = [L2A_P.L2A_object.from_tiles(tileList) for tileList in grouped_L2A_Tiles]
660

661
                """combine newly and earlier processed L2A data"""
662
663
                L2A_DBObjects = self.get_DB_objects('L2B', self.L2A_tiles)
                L2A_Instances = L2A_newObjects + L2A_DBObjects  # combine newly and earlier processed L2A data
664

665
                L2B_resObjects = MAP(L2B_map, L2A_Instances)
666
667

            else:  # tiles
668
                L2A_newTiles = self.L2A_tiles  # tiles have the block size specified in L2A_map_2
669
670

                """combine newly and earlier processed L2A data"""
671
672
673
                blocksize = (2048, 2048)  # must be equal to the blocksize of L2A_newTiles specified in L2A_map_2
                L2A_newDBTiles = self.get_DB_objects('L2B', self.L2A_tiles, blocksize=blocksize)
                L2A_tiles = L2A_newTiles + L2A_newDBTiles
674

675
                L2B_tiles = MAP(L2B_map, L2A_tiles)
676

677
678
                # group results # FIXME nötig an dieser Stelle?
                grouped_L2B_Tiles = HLP_F.group_objects_by_attributes(L2B_tiles, 'scene_ID')
679
680
                [L2B_tiles_group[0].delete_tempFiles() for L2B_tiles_group in grouped_L2B_Tiles]

681
                L2B_resObjects = [L2B_P.L2B_object.from_tiles(tileList) for tileList in grouped_L2B_Tiles]
682

683
            self.L2B_newObjects = [obj for obj in L2B_resObjects if isinstance(obj, L2B_P.L2B_object)]
684
            self.failed_objects += [obj for obj in L2B_resObjects if isinstance(obj, failed_GMS_object) and
685
686
687
688
689
690
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2B_newObjects

    def L2C_processing(self):
        """
691
        Run Level 2C processing: accurracy assessment and MGRS tiling
692
        """
693
        # FIXME only parallelization_level == 'scenes' implemented
694
        if self.config.exec_L2CP[0]:
695
            self.logger.info('\n\n########## Level 2C Processing started - calculation of quality layers ###########\n')
696

697
            """combine newly and earlier processed L2A data"""
698
699
            L2B_DBObjects = self.get_DB_objects('L2C', self.L2B_newObjects, parallLev='scenes')
            L2B_Instances = self.L2B_newObjects + L2B_DBObjects  # combine newly and earlier processed L2A data
700

701
            L2C_resObjects = MAP(L2C_map, L2B_Instances, CPUs=8)  # FIXME 8 workers due to heavy IO
702
            # FIXME in case of inmem_serialization mode results are too big to be back-pickled
703
            self.L2C_newObjects = [obj for obj in L2C_resObjects if isinstance(obj, L2C_P.L2C_object)]
704
            self.failed_objects += [obj for obj in L2C_resObjects if isinstance(obj, failed_GMS_object) and
705
706
707
708
709
710
711
712
                                    obj.scene_ID not in self.sceneids_failed]

        return self.L2C_newObjects

    def update_DB_job_record(self):
        """
        Update the database records of the current job (table 'jobs').
        """
713
        # TODO move this method to config.Job
714
715
        # update 'failed_sceneids' column of job record within jobs table
        sceneids_failed = list(set([obj.scene_ID for obj in self.failed_objects]))
716
        DB_T.update_records_in_postgreSQLdb(
717
            self.config.conn_database, 'jobs',
718
            {'failed_sceneids': sceneids_failed,  # update 'failed_sceneids' column
719
720
             'finishtime': self.config.end_time,  # add job finish timestamp
             'status': self.config.status},  # update 'job_status' column
721
            {'id': self.config.ID}, timeout=30000)
722

723
724
725
726
727
    def update_DB_job_statistics(self, usecase_datalist):
        """
        Update job statistics of the running job in the database.
        """
        # TODO move this method to config.Job
728
        already_updated_IDs = []
729
        for ds in usecase_datalist:
730
731
            if ds['proc_level'] is not None and ds['scene_ID'] not in already_updated_IDs:
                # update statistics column of jobs table
732
                DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
733
                    self.config.conn_database, 'jobs', 'statistics', cond_dict={'id': self.config.ID},
734
                    idx_val2decrement=db_jobs_statistics_def['pending'],
735
736
                    idx_val2increment=db_jobs_statistics_def[ds['proc_level']],
                    timeout=30000)
737

738
739
740
                # avoid double updating in case of subsystems belonging to the same scene ID
                already_updated_IDs.append(ds['scene_ID'])

741
742
743
744
    def create_job_summary(self):
        """
        Create job success summary
        """
745
746
747

        # get objects with highest requested processing level
        highest_procL_Objs = []
748
        for pL in reversed(proc_chain):
749
            if getattr(self.config, 'exec_%sP' % pL)[0]:
750
751
                highest_procL_Objs = \
                    getattr(self, '%s_newObjects' % pL) if pL != 'L2A' else (self.L2A_tiles or self.L2A_newObjects)
752
753
                break

754
755
756
757
        gms_objects2summarize = highest_procL_Objs + self.failed_objects
        if gms_objects2summarize:
            # create summaries
            detailed_JS, quick_JS = get_job_summary(gms_objects2summarize)
758
759
760
            detailed_JS.to_excel(os.path.join(self.config.path_job_logs, '%s_summary.xlsx' % self.config.ID))
            detailed_JS.to_csv(os.path.join(self.config.path_job_logs, '%s_summary.csv' % self.config.ID), sep='\t')
            self.logger.info('\nQUICK JOB SUMMARY (ID %s):\n' % self.config.ID + quick_JS.to_string())
761
762

            self.summary_detailed = detailed_JS
763
            self.summary_quick = quick_JS
764
765
766
767
768

        else:
            # TODO implement check if proc level with lowest procL has to be processed at all (due to job.exec_L1X)
            # TODO otherwise it is possible that get_job_summary receives an empty list
            self.logger.warning("Job summary skipped because get_job_summary() received an empty list of GMS objects.")
769
770
771
772
773
774

    def clear_lists_procObj(self):
        self.failed_objects = []
        self.L1A_newObjects = []
        self.L1B_newObjects = []
        self.L1C_newObjects = []
775
        self.L2A_tiles = []
776
        self.L2B_newObjects = []
777
778
779
780
781
        self.L2C_newObjects = []


def get_job_summary(list_GMS_objects):
    # get detailed job summary
782
783
    DJS_cols = ['GMS_object', 'scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem', 'image_type', 'proc_level',
                'arr_shape', 'arr_pos', 'failedMapper', 'ExceptionType', 'ExceptionValue', 'ExceptionTraceback']
784
785
786
787
    DJS = DataFrame(columns=DJS_cols)
    DJS['GMS_object'] = list_GMS_objects

    for col in DJS_cols[1:]:
788
789
        def get_val(obj): return getattr(obj, col) if hasattr(obj, col) else None
        DJS[col] = list(DJS['GMS_object'].map(get_val))
790
791

    del DJS['GMS_object']
792
    DJS = DJS.sort_values(by=['satellite', 'sensor', 'entity_ID'])
793
794

    # get quick job summary
795
796
797
798
    QJS = DataFrame(columns=['satellite', 'sensor', 'count', 'proc_successfully', 'proc_failed'])
    all_sat, all_sen = zip(*[i.split('__') for i in (np.unique(DJS['satellite'] + '__' + DJS['sensor']))])
    QJS['satellite'] = all_sat
    QJS['sensor'] = all_sen
799
    # count objects with the same satellite/sensor/sceneid combination
800
801
    QJS['count'] = [len(DJS[(DJS['satellite'] == sat) & (DJS['sensor'] == sen)]['scene_ID'].unique())
                    for sat, sen in zip(all_sat, all_sen)]
802
    QJS['proc_successfully'] = [len(DJS[(DJS['satellite'] == sat) &
803
804
                                        (DJS['sensor'] == sen) &
                                        (DJS['failedMapper'].isnull())]['scene_ID'].unique())
805
                                for sat, sen in zip(all_sat, all_sen)]
806
    QJS['proc_failed'] = QJS['count'] - QJS['proc_successfully']
807
808
    QJS = QJS.sort_values(by=['satellite', 'sensor'])
    return DJS, QJS