config.py 40.1 KB
Newer Older
1
# -*- coding: utf-8 -*-
2

3
import datetime
4
import socket
5
import os
6
import warnings
7
import numpy as np
8
import builtins
9
10
11
12
import re
import psycopg2
import psycopg2.extras
from collections import OrderedDict
13
import multiprocessing
14
from inspect import getargvalues, stack, getfullargspec, signature, _empty
15
import json
Daniel Scheffler's avatar
Daniel Scheffler committed
16
from json import JSONDecodeError
17
18
from jsmin import jsmin
from cerberus import Validator
19
import pkgutil
20
import logging
21
22
import time
import psutil
23
from pprint import pformat
24
from typing import TYPE_CHECKING
25

26
from .options_schema import gms_schema_input, gms_schema_config_output
27

28
if TYPE_CHECKING:
29
    from gms_preprocessing.misc.database_tools import GMS_JOB  # noqa F401  # flake8 issue
30

31

32
33
__author__ = 'Daniel Scheffler'

34

35
class GMS_configuration(object):
36
37
38
39
40
41
    def __getattr__(self, attr):
        if hasattr(builtins, 'GMS_JobConfig'):
            if attr in ['job', 'usecase']:
                # This is only to keep compatibility with HU-INF codes
                return getattr(builtins, 'GMS_JobConfig')
            return getattr(builtins.GMS_JobConfig, attr)
42
43
        else:
            raise EnvironmentError("Config has not been set already on this machine. Run 'set_config()' first!'")
44

45
46
47
    def __repr__(self):
        return getattr(builtins, 'GMS_JobConfig').__repr__()

48

49
GMS_config = GMS_configuration()  # type: JobConfig
50
51


52
53
path_gmslib = os.path.dirname(pkgutil.get_loader("gms_preprocessing").path)
path_options_default = os.path.join(path_gmslib, 'options', 'options_default.json')
54
55


56
def set_config(job_ID, json_config='', inmem_serialization=False, parallelization_level='scenes', db_host='localhost',
57
               spatial_index_server_host='localhost', spatial_index_server_port=8654,
58
               reset_status=False, delete_old_output=False, exec_L1AP=None,
59
               exec_L1BP=None, exec_L1CP=None, exec_L2AP=None, exec_L2BP=None, exec_L2CP=None, CPUs=None,
60
61
               allow_subMultiprocessing=True, disable_exception_handler=True, log_level='INFO',
               tiling_block_size_XY=(2048, 2048), is_test=False, profiling=False, benchmark_global=False,
62
63
               path_procdata_scenes=None, path_procdata_MGRS=None, path_archive=None, virtual_sensor_id=10,
               datasetid_spatial_ref=249):
64
65
66
    """Set up a configuration for a new gms_preprocessing job!

    :param job_ID:          job ID of the job to be executed, e.g. 123456 (must be present in database)
67
    :param json_config      path to JSON file containing configuration parameters or a string in JSON format
68
69
    :param inmem_serialization:     False: write intermediate results to disk in order to save memory
                                    True:  keep intermediate results in memory in order to save IO time
70
    :param parallelization_level:   <str> choices: 'scenes' - parallelization on scene-level
71
                                                   'tiles'  - parallelization on tile-level
72
    :param db_host:         host name of the server that runs the postgreSQL database
73
74
    :param spatial_index_server_host:   host name of the server that runs the SpatialIndexMediator
    :param spatial_index_server_port:   port used for connecting to SpatialIndexMediator
75
76
77
    :param reset_status:    whether to reset the job status or not (default=False)
    :param delete_old_output:        <bool> whether to delete previously created output of the given job ID
                                     before running the job (default = False)
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
    :param exec_L1AP:       list of 3 elements: [run processor, write output, delete output if not needed anymore]
    :param exec_L1BP:       list of 3 elements: [run processor, write output, delete output if not needed anymore]
    :param exec_L1CP:       list of 3 elements: [run processor, write output, delete output if not needed anymore]
    :param exec_L2AP:       list of 3 elements: [run processor, write output, delete output if not needed anymore]
    :param exec_L2BP:       list of 3 elements: [run processor, write output, delete output if not needed anymore]
    :param exec_L2CP:       list of 3 elements: [run processor, write output, delete output if not needed anymore]
    :param CPUs:            number of CPU cores to be used for processing (default: None -> use all available)
    :param allow_subMultiprocessing:
                            allow multiprocessing within workers
    :param disable_exception_handler:
                            enable/disable automatic handling of unexpected exceptions (default: True -> enabled)
    :param log_level:       the logging level to be used (choices: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL';
                            default: 'INFO')
    :param tiling_block_size_XY:
                            X/Y block size to be used for any tiling process (default: (2048,2048)
    :param is_test:         whether the current job represents a software test job (run by a test runner) or not
                            (default=False)
    :param profiling:       enable/disable code profiling (default: False)
    :param benchmark_global:
                            enable/disable benchmark of the whole processing pipeline
    :param path_procdata_scenes:
                            output path to store processed scenes
    :param path_procdata_MGRS:
                            output path to store processed MGRS tiles
    :param path_archive:    input path where downloaded data are stored
103
104
    :param virtual_sensor_id:   1:  Landsat-8,  10: Sentinel-2A 10m
    :param datasetid_spatial_ref:   249 Sentinel-2A
Daniel Scheffler's avatar
Daniel Scheffler committed
105
    :rtype: JobConfig
106
    """
107
108
109
    #################################
    # set GMS_JobConfig in builtins #
    #################################
110
    # FIXME virtual_sensor_id and datasetid_spatial_ref are not respected by JobConfig.
111
    if not hasattr(builtins, 'GMS_JobConfig') or reset_status:
112
113
114
        kwargs = dict([x for x in locals().items() if x[0] != "self" and not x[0].startswith('__')])
        builtins.GMS_JobConfig = JobConfig(job_ID, **kwargs)

115
116
117
118
119
120
    #####################
    # check environment #
    #####################
    if not hasattr(builtins, 'GMS_EnvOK') or not getattr(builtins, 'GMS_EnvOK'):
        # check environment
        from ..misc import environment as ENV
121
122
        logger = logging.getLogger('GMSEnvironmentChecker')
        logger.setLevel(logging.DEBUG)
123
124
125
126
        GMSEnv = ENV.GMSEnvironment()
        GMSEnv.check_dependencies()
        GMSEnv.check_read_write_permissions()
        GMSEnv.ensure_properly_activated_GDAL()
127
        GMSEnv.check_ecmwf_api_creds()
128

129
130
131
132
        # close unclosed locks from previous runs
        from ..misc.locks import release_unclosed_locks
        release_unclosed_locks(logger)

133
134
        builtins.GMS_EnvOK = True

135
136
137
    return getattr(builtins, 'GMS_JobConfig')


138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def get_conn_database(hostname='localhost', timeout=3):
    # type: (str, int) -> str
    """Return database connection string.

    :param hostname:    the host that runs the GMS postgreSQL database
    :param timeout:     connection timeout in seconds
    :return:
    """
    return "dbname='geomultisens' user='gmsdb' password='gmsdb' host='%s' connect_timeout=%d" \
           % (hostname, timeout)


def get_config_kwargs_default():
    a = getfullargspec(set_config)
    return dict(zip(a.args[-len(a.defaults):], a.defaults))


155
class JobConfig(object):
156
    def __init__(self, ID, **user_opts):
157
158
159
160
161
162
163
164
165
        """Create a job configuration

        Workflow:
        # 0. Environment
        # 1. 2 Wege, wo JSON herkommen kann: per console-command oder aus Datenbank
        #       - bei console-command: GMS_JOB.from_... muss default-options in DB schreiben
        # => zuerst JobConfig auf Basis von JSON erstellen
        # 2. dann überschreiben mit user-defined parametern (entweder init-parameter oder db-settings per webapp)

166
167
        :param ID:          job ID of the job to be executed, e.g. 123456 (must be present in database)
        :param user_opts    keyword arguments to be passed to gms_preprocessing.set_config()
168
169
170
171
        """
        # privates
        self._DB_job_record = None  # type: GMS_JOB
        self._DB_config_table = None  # type: dict
172
        self._kwargs_defaults = None
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188

        # fixed attributes
        # possible values: 'pending', 'running', 'canceled', 'failed', 'finished_with_warnings',
        # 'finished_with_errors', 'finished'
        self.status = 'pending'
        self.start_time = datetime.datetime.now()
        self.end_time = None
        self.computation_time = None
        self.hostname = socket.gethostname()

        #######################
        # POPULATE PARAMETERS #
        #######################

        # args
        self.ID = ID
189
        self.kwargs = user_opts
190
191

        # database connection
192
        self.db_host = user_opts['db_host']
193
        self.conn_database = get_conn_database(hostname=self.db_host)
194
195

        # get validated options dict from JSON-options
196
        json_opts = self.get_json_opts(validate=True)
197
198
199

        gp = self.get_parameter

200
201
202
203
204
        ##################
        # global options #
        ##################

        json_globts = json_opts['global_opts']  # type: dict
205

206
207
        self.inmem_serialization = \
            gp('inmem_serialization', json_globts['inmem_serialization'])
208
209
        self.parallelization_level = \
            gp('parallelization_level', json_globts['parallelization_level'])
210
211
212
213
        self.spatial_index_server_host = \
            gp('spatial_index_server_host', json_globts['spatial_index_server_host'])
        self.spatial_index_server_port = \
            gp('spatial_index_server_port', json_globts['spatial_index_server_port'])
214
        self.CPUs = \
215
            gp('CPUs', json_globts['CPUs'], fallback=multiprocessing.cpu_count())
216
        self.CPUs_all_jobs = \
217
            gp('CPUs_all_jobs', json_globts['CPUs_all_jobs'])
218
219
        self.delete_old_output = \
            gp('delete_old_output', json_globts['delete_old_output'])
220
221
        self.max_parallel_reads_writes = \
            gp('max_parallel_reads_writes', json_globts['max_parallel_reads_writes'])
222
        self.allow_subMultiprocessing = \
223
            gp('allow_subMultiprocessing', json_globts['allow_subMultiprocessing'])
224
        self.disable_exception_handler = \
225
            gp('disable_exception_handler', json_globts['disable_exception_handler'])
226
        self.log_level = \
227
            gp('log_level', json_globts['log_level'])
228
        self.tiling_block_size_XY = \
229
            gp('tiling_block_size_XY', json_globts['tiling_block_size_XY'])
230
        self.is_test = \
231
            gp('is_test', json_globts['is_test'])
232
        self.profiling = \
233
            gp('profiling', json_globts['profiling'])
234
        self.benchmark_global = \
235
            gp('benchmark_global', json_globts['benchmark_global'])
236
237
238
239
240
241

        #########
        # paths #
        #########
        json_paths = json_opts['paths']  # type: dict

242
        # external
243
        self.path_spatIdxSrv = self.DB_config_table['path_spatial_index_mediator_server']
244
        self.path_tempdir = self.absP(self.DB_config_table['path_tempdir'])
245
        self.path_custom_sicor_options = gp('path_custom_sicor_options', json_paths['path_custom_sicor_options'])
246
        self.path_dem_proc_srtm_90m = self.absP(self.DB_config_table['path_dem_proc_srtm_90m'])
247
248

        # internal (included in gms_preprocessing repository)
249
        self.path_spechomo_classif = self.joinP(path_gmslib, 'database', 'spechomo_classifier')
250
251
252
253
254
255
256
        self.path_earthSunDist = self.joinP(path_gmslib, 'database', 'earth_sun_distance',
                                            'Earth_Sun_distances_per_day_edited.csv')
        self.path_SNR_models = self.joinP(path_gmslib, 'database', 'snr')
        self.path_SRFs = self.joinP(path_gmslib, 'database', 'srf')
        self.path_cloud_classif = self.joinP(path_gmslib, 'database', 'cloud_classifier')
        self.path_solar_irr = self.joinP(path_gmslib, 'database', 'solar_irradiance',
                                         'SUNp1fontenla__350-2500nm_@0.1nm_converted.txt')
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273

        if not self.is_test:
            # normal mode
            self.path_fileserver = self.DB_config_table['path_data_root']

            self.path_archive = \
                gp('path_archive', json_paths['path_archive'],
                   fallback=self.joinP(self.path_fileserver, self.DB_config_table['foldername_download']))

            self.path_procdata_scenes = \
                gp('path_procdata_scenes', json_paths['path_procdata_scenes'],
                   fallback=self.joinP(self.path_fileserver, self.DB_config_table['foldername_procdata_scenes']))

            self.path_procdata_MGRS = \
                gp('path_procdata_MGRS', json_paths['path_procdata_MGRS'],
                   fallback=self.joinP(self.path_fileserver, self.DB_config_table['foldername_procdata_MGRS']))

274
            self.path_ECMWF_db = self.absP(self.DB_config_table['path_ECMWF_db'])
275
276
277

            self.path_benchmarks = \
                gp('path_benchmarks', json_paths['path_benchmarks'],
278
                   fallback=self.absP(self.DB_config_table['path_benchmarks']))
279
280

            self.path_job_logs = \
281
282
                gp('path_job_logs', json_paths['path_job_logs'],
                   fallback=self.absP(self.DB_config_table['path_job_logs']))
283
284
285

        else:
            # software test mode, the repository should be self-contained -> use only relative paths
286
287
288
289
290
291
292
            self.path_fileserver = self.joinP(path_gmslib, '..', 'tests', 'data')
            self.path_archive = self.joinP(path_gmslib, '..', 'tests', 'data', 'archive_data')
            self.path_procdata_scenes = self.joinP(path_gmslib, '..', 'tests', 'data', 'output_scenes')
            self.path_procdata_MGRS = self.joinP(path_gmslib, '..', 'tests', 'data', 'output_mgrs_tiles')
            self.path_ECMWF_db = self.joinP(path_gmslib, '..', 'tests', 'data', 'processed_ECMWF')
            self.path_benchmarks = self.joinP(path_gmslib, '..', 'tests', 'data', 'benchmarks')
            self.path_job_logs = self.joinP(path_gmslib, 'logs', 'job_logs')
293
294
295
296
297
298
299
300
301
302
303
304
305
306

        ###########################
        # processor configuration #
        ###########################

        json_processors = json_opts['processors']  # type: dict

        # general_opts
        self.skip_thermal = \
            gp('skip_thermal', json_processors['general_opts']['skip_thermal'])
        self.skip_pan = \
            gp('skip_pan', json_processors['general_opts']['skip_pan'])
        self.sort_bands_by_cwl = \
            gp('sort_bands_by_cwl', json_processors['general_opts']['sort_bands_by_cwl'])
307
308
309
310
        self.target_radunit_optical = \
            gp('target_radunit_optical', json_processors['general_opts']['target_radunit_optical'])
        self.target_radunit_thermal = \
            gp('target_radunit_thermal', json_processors['general_opts']['target_radunit_thermal'])
311
312
313
314
        self.scale_factor_TOARef = \
            gp('scale_factor_TOARef', json_processors['general_opts']['scale_factor_TOARef'])
        self.scale_factor_BOARef = \
            gp('scale_factor_BOARef', json_processors['general_opts']['scale_factor_BOARef'])
315
316
317
318
        self.mgrs_pixel_buffer = \
            gp('mgrs_pixel_buffer', json_processors['general_opts']['mgrs_pixel_buffer'])
        self.output_data_compression = \
            gp('output_data_compression', json_processors['general_opts']['output_data_compression'])
319
320
321
322
        # processor specific opts

        # L1A
        self.exec_L1AP = gp('exec_L1AP', [
323
324
325
            json_processors['L1A']['run_processor'],
            json_processors['L1A']['write_output'],
            json_processors['L1A']['delete_output']])
326
        self.SZA_SAA_calculation_accurracy = \
327
            gp('SZA_SAA_calculation_accurracy', json_processors['L1A']['SZA_SAA_calculation_accurracy'])
328
        self.export_VZA_SZA_SAA_RAA_stats = \
329
            gp('export_VZA_SZA_SAA_RAA_stats', json_processors['L1A']['export_VZA_SZA_SAA_RAA_stats'])
330
331
332

        # L1B
        self.exec_L1BP = gp('exec_L1BP', [
333
334
335
336
            json_processors['L1B']['run_processor'],
            json_processors['L1B']['write_output'],
            json_processors['L1B']['delete_output']])
        self.skip_coreg = gp('skip_coreg', json_processors['L1B']['skip_coreg'])
337
338
339
340
341
342
343
344
345
346
        self.spatial_ref_min_overlap = \
            gp('spatial_ref_min_overlap', json_processors['L1B']['spatial_ref_min_overlap'])
        self.spatial_ref_min_cloudcov = \
            gp('spatial_ref_min_cloudcov', json_processors['L1B']['spatial_ref_min_cloudcov'])
        self.spatial_ref_max_cloudcov = \
            gp('spatial_ref_max_cloudcov', json_processors['L1B']['spatial_ref_max_cloudcov'])
        self.spatial_ref_plusminus_days = \
            gp('spatial_ref_plusminus_days', json_processors['L1B']['spatial_ref_plusminus_days'])
        self.spatial_ref_plusminus_years = \
            gp('spatial_ref_plusminus_years', json_processors['L1B']['spatial_ref_plusminus_years'])
347
348
349
350
351
352
        self.coreg_band_wavelength_for_matching = \
            gp('coreg_band_wavelength_for_matching', json_processors['L1B']['coreg_band_wavelength_for_matching'])
        self.coreg_max_shift_allowed = \
            gp('coreg_max_shift_allowed', json_processors['L1B']['coreg_max_shift_allowed'])
        self.coreg_window_size = \
            gp('coreg_window_size', json_processors['L1B']['coreg_window_size'])
353
354
        self.spathomo_estimate_accuracy = \
            gp('spathomo_estimate_accuracy', json_processors['L1B']['spathomo_estimate_accuracy'])
355
356
357

        # L1C
        self.exec_L1CP = gp('exec_L1CP', [
358
359
360
            json_processors['L1C']['run_processor'],
            json_processors['L1C']['write_output'],
            json_processors['L1C']['delete_output']])
361
        self.cloud_masking_algorithm = \
362
            gp('cloud_masking_algorithm', json_processors['L1C']['cloud_masking_algorithm'])
363
        self.export_L1C_obj_dumps = \
364
            gp('export_L1C_obj_dumps', json_processors['L1C']['export_L1C_obj_dumps'])
365
        self.auto_download_ecmwf = \
366
            gp('auto_download_ecmwf', json_processors['L1C']['auto_download_ecmwf'])
367
368
369
370
371
372
373
374
        self.ac_fillnonclear_areas = \
            gp('ac_fillnonclear_areas', json_processors['L1C']['ac_fillnonclear_areas'])
        self.ac_clear_area_labels = \
            gp('ac_clear_area_labels', json_processors['L1C']['ac_clear_area_labels'])
        self.ac_scale_factor_errors = \
            gp('ac_scale_factor_errors', json_processors['L1C']['ac_scale_factor_errors'])
        self.ac_max_ram_gb = \
            gp('ac_max_ram_gb', json_processors['L1C']['ac_max_ram_gb'])
375
376
        self.ac_estimate_accuracy = \
            gp('ac_estimate_accuracy', json_processors['L1C']['ac_estimate_accuracy'])
377
378
        self.ac_bandwise_accuracy = \
            gp('ac_bandwise_accuracy', json_processors['L1C']['ac_bandwise_accuracy'])
379
380
381

        # L2A
        self.exec_L2AP = gp('exec_L2AP', [
382
383
384
385
386
            json_processors['L2A']['run_processor'],
            json_processors['L2A']['write_output'],
            json_processors['L2A']['delete_output']])
        self.align_coord_grids = gp('align_coord_grids', json_processors['L2A']['align_coord_grids'])
        self.match_gsd = gp('match_gsd', json_processors['L2A']['match_gsd'])
387
388
        self.spatial_resamp_alg = gp('spatial_resamp_alg', json_processors['L2A']['spatial_resamp_alg'])
        self.clip_to_extent = gp('clip_to_extent', json_processors['L2A']['clip_to_extent'])
389
390
391

        # L2B
        self.exec_L2BP = gp('exec_L2BP', [
392
393
394
            json_processors['L2B']['run_processor'],
            json_processors['L2B']['write_output'],
            json_processors['L2B']['delete_output']])
395
        self.spechomo_method = gp('spechomo_method', json_processors['L2B']['spechomo_method'])
396
397
        self.spechomo_estimate_accuracy = \
            gp('spechomo_estimate_accuracy', json_processors['L2B']['spechomo_estimate_accuracy'])
398
399
        self.spechomo_bandwise_accuracy = \
            gp('spechomo_bandwise_accuracy', json_processors['L2B']['spechomo_bandwise_accuracy'])
400
401
402

        # L2C
        self.exec_L2CP = gp('exec_L2CP', [
403
404
405
            json_processors['L2C']['run_processor'],
            json_processors['L2C']['write_output'],
            json_processors['L2C']['delete_output']])
406
407
408
409
410
411
412
413
414
415
416
417
418

        ################################
        # target sensor specifications #
        ################################

        self.virtual_sensor_id = gp('virtual_sensor_id', attr_db_job_record='virtualsensorid')
        # FIXME Why is datasetid_spatial_ref missing in virtual_sensors table
        self.datasetid_spatial_ref = gp('datasetid_spatial_ref', attr_db_job_record='datasetid_spatial_ref')

        VSSpecs = self.get_virtual_sensor_specs()
        self.virtual_sensor_name = VSSpecs['name']

        # spectral specifications
419
        self.datasetid_spectral_ref = VSSpecs['spectral_characteristics_datasetid']  # None in case of custom sensor
420
        # FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen:
421
        self.target_CWL = VSSpecs['wavelengths_pos']
422
        # FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen:
423
424
425
426
        self.target_FWHM = VSSpecs['band_width']

        # spatial specifications
        target_gsd_tmp = VSSpecs['spatial_resolution']  # table features only 1 value for X/Y-dims FIXME user inputs?
427
        # FIXME target GSD setting is a duplicate to datasetid_spatial_ref!
428
429
        self.target_gsd = xgsd, ygsd = \
            [target_gsd_tmp]*2 if isinstance(target_gsd_tmp, (int, float)) else target_gsd_tmp
430
        self.target_epsg_code = VSSpecs['projection_epsg']
431
        # FIXME values in case user defines only Landsat?
432
433
        self.spatial_ref_gridx = np.arange(xgsd / 2., xgsd / 2. + 2 * xgsd, xgsd).tolist()  # e.g. [15, 45]
        self.spatial_ref_gridy = np.arange(ygsd / 2., ygsd / 2. + 2 * ygsd, ygsd).tolist()
434

435
436
437
438
439
440
        #############
        # data list #
        #############

        self.data_list = self.get_data_list_of_current_jobID()

441
442
443
        ############
        # validate #
        ############
444
445
        self.validate_exec_configs()

446
        GMSValidator(allow_unknown=True, schema=gms_schema_config_output).validate(self.to_dict())
447

448
449
450
451
        # check if parallel read/write processes have been limited on a storage mountpoint shared between multiple hosts
        if self.max_parallel_reads_writes != 0:
            self.check_no_read_write_limit_on_xtfs_mountpoint()

452
    @property
453
454
    def kwargs_defaults(self):
        if not self._kwargs_defaults:
455
            self._kwargs_defaults = get_config_kwargs_default()
456

457
        return self._kwargs_defaults
458

459
460
461
462
463
464
465
466
467
468
469
470
471
472
    def get_init_argskwargs(self, ignore=("logger",)):
        """
        Return a tuple containing dictionary of calling function's. named arguments and a list of
        calling function's unnamed positional arguments.
        """

        posname, kwname, argskwargs = getargvalues(stack()[1][0])[-3:]
        argskwargs.update(argskwargs.pop(kwname, []))
        argskwargs = {k: v for k, v in argskwargs.items() if k not in ignore and k != 'self' and not k.startswith('__')}
        sig = signature(self.__init__)
        argsnames = [k for k in sig.parameters if sig.parameters[k].default == _empty]
        return {'args': {k: v for k, v in argskwargs.items() if k in argsnames},
                'kwargs': {k: v for k, v in argskwargs.items() if k not in argsnames}}

473
474
    def get_parameter(self, key_user_opts, val_json=None, attr_db_job_record='', fallback=None):
        # 1. JobConfig parameters: parameters that are directly passed to JobConfig
475
476
        if key_user_opts in self.kwargs and self.kwargs[key_user_opts] != self.kwargs_defaults[key_user_opts]:
            return self.kwargs[key_user_opts]
477
478
479
480
481
482

        # 2. WebUI parameters: parameters that have been defined via WebUI
        if attr_db_job_record:
            return getattr(self.DB_job_record, attr_db_job_record)

        # 3. JSON parameters: parameters that have been defined via JSON Input (command line or advanced UI params)
483
        if val_json or val_json is False:
484
485
486
            return val_json

        # fallback: if nothing has been returned until here
487
        if fallback is None and key_user_opts in self.kwargs_defaults:
488
            fallback = self.kwargs_defaults[key_user_opts]
489
490
491
492
493
494
495
        return fallback

    @property
    def DB_job_record(self):
        # type: () -> GMS_JOB
        if not self._DB_job_record:
            # check if job ID exists in database
496
            from ..misc.database_tools import GMS_JOB  # noqa F811  # redefinition of unused 'GMS_JOB' from line 22
497
498
499
500
501
502
503
504
505
506
507
508
            try:
                self._DB_job_record = GMS_JOB(self.conn_database).from_job_ID(self.ID)
            except ValueError:
                raise

        return self._DB_job_record

    @property
    def DB_config_table(self):
        # type: () -> dict
        """Returns the content of the config table of the postgreSQL database as dictionary."""
        if not self._DB_config_table:
509
            from ..misc.database_tools import get_info_from_postgreSQLdb
510
511
512
513
514
515
516
517
518
519
520
            db_cfg = dict(get_info_from_postgreSQLdb(self.conn_database, 'config', ['key', 'value']))

            # convert relative to absolute paths
            self._DB_config_table = {k: self.absP(v) if k.startswith('path_') and v.startswith('./') else v
                                     for k, v in db_cfg.items()}

        return self._DB_config_table

    def get_virtual_sensor_specs(self):
        # type: () -> dict
        """Returns the content of the virtual_sensors table of the postgreSQL database as dictionary."""
521
        from ..misc.database_tools import get_info_from_postgreSQLdb
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539

        # column spectral_characteristics_datasetid is not used later because its given by jobs.datasetid_spatial_ref
        cols2read = ['name', 'projection_epsg', 'spatial_resolution', 'spectral_characteristics_datasetid',
                     'wavelengths_pos', 'band_width']

        res = get_info_from_postgreSQLdb(self.conn_database, 'virtual_sensors',
                                         cols2read, {'id': self.virtual_sensor_id})[0]

        VSSpecs = dict()
        for i, col in enumerate(cols2read):
            val = res[i]
            if col == 'spectral_characteristics_datasetid' and val == -1:  # nodata value
                val = None

            VSSpecs[col] = val

        return VSSpecs

540
    def get_json_opts(self, validate=True):
541
542
543
544
545
        """Get a dictionary of GMS config parameters according to the jobs table of the database.

        NOTE: Reads the default options from options_default.json and updates the values with those from database.
        """
        # read options_default.json
546
        default_options = get_options(path_options_default, validation=validate)
547

548
549
550
551
552
553
554
555
556
557
558
559
560
        if 'json_config' in self.kwargs and self.kwargs['json_config']:
            if self.kwargs['json_config'].startswith("{"):
                try:
                    params_dict = json.loads(jsmin(self.kwargs['json_config']))
                except JSONDecodeError:
                    warnings.warn('The given JSON options string could not be decoded. '
                                  'JSON decoder failed with the following error:')
                    raise
            elif os.path.isfile(self.kwargs['json_config']):
                try:
                    with open(self.kwargs['json_config'], 'r') as inF:
                        params_dict = json.loads(jsmin(inF.read()))
                except JSONDecodeError:
Daniel Scheffler's avatar
Daniel Scheffler committed
561
                    warnings.warn('The given JSON options file %s could not be decoded. '
562
563
564
565
566
567
568
569
570
571
                                  'JSON decoder failed with the following error:' % self.kwargs['json_config'])
                    raise

            else:
                raise ValueError("The parameter 'json_config' must be a JSON formatted string or a JSON file on disk.")

            # convert values to useful data types and update the default values
            params_dict = json_to_python(params_dict)
            default_options.update(params_dict)

572
        # update default options with those from DB
573
        if self.DB_job_record.analysis_parameter:
Daniel Scheffler's avatar
Daniel Scheffler committed
574
575
576
            try:
                params_dict = json.loads(jsmin(self.DB_job_record.analysis_parameter))
            except JSONDecodeError:
577
578
                warnings.warn('The advanced options given in the WebUI could not be decoded. '
                              'JSON decoder failed with the following error:')
Daniel Scheffler's avatar
Daniel Scheffler committed
579
580
                raise

581
582
583
            # convert values to useful data types and update the default values
            params_dict = json_to_python(params_dict)
            default_options.update(params_dict)
584
585

        if validate:
586
            GMSValidator(allow_unknown=True, schema=gms_schema_input).validate(default_options)
587
588
589
590
591
592
593
594
595
596
597
598

        json_options = default_options
        return json_options

    @staticmethod
    def absP(relP):
        return os.path.abspath(os.path.join(os.path.dirname(__file__), relP))

    @staticmethod
    def joinP(*items):
        return os.path.join(*items)

599
600
601
602
603
604
605
606
607
608
    def get_data_list_of_current_jobID(self):
        """
        Get a list of datasets to be processed from database and return it together with some metadata.

        :return:    <list> of OrderedDicts, e.g. [OrderedDict([('proc_level', None), ('scene_ID', 5895940),
                    ('datasetid', 104), ('image_type', 'RSD'), ('satellite', 'Landsat-8'), ('sensor', 'OLI_TIRS'),
                    ('subsystem', ''), ('acquisition_date', datetime.datetime(2015, 2, 5, 10, 2, 52)),
                    ('entity_ID', 'LC81930242015036LGN00'), ('filename', 'LC81930242015036LGN00.tar.gz'),
                    ('sensormode', 'M'), ('logger', None)]), ...]
        """
609
        from ..model.metadata import get_sensormode
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
        data_list = []
        with psycopg2.connect(self.conn_database) as conn:
            with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
                cur.execute("""
                    WITH jobs_unnested AS (
                            SELECT id, unnest(sceneids) AS sceneid FROM jobs
                        )
                    SELECT jobs_unnested.sceneid,
                           scenes.datasetid,
                           scenes.acquisitiondate,
                           scenes.entityid,
                           scenes.filename,
                           COALESCE(scenes_proc.proc_level::text, 'L1A') AS proc_level,
                           datasets.image_type,
                           satellites.name AS satellite,
                           sensors.name AS sensor,
                           subsystems.name AS subsystem
                    FROM jobs_unnested
                    LEFT OUTER JOIN scenes ON scenes.id = jobs_unnested.sceneid
                    LEFT OUTER JOIN scenes_proc ON scenes_proc.sceneid = jobs_unnested.sceneid
                    LEFT OUTER JOIN datasets ON datasets.id = datasetid
                    LEFT OUTER JOIN satellites ON satellites.id = satelliteid
                    LEFT OUTER JOIN sensors ON sensors.id = sensorid
                    LEFT OUTER JOIN subsystems ON subsystems.id = subsystemid
                    WHERE jobs_unnested.id = %s
                    """,
                            (self.ID,))

                for row in cur.fetchall():
                    ds = OrderedDict()
640
                    ds["proc_level"] = row["proc_level"] if not self.is_test else None
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
                    ds["scene_ID"] = row["sceneid"]
                    ds["dataset_ID"] = row["datasetid"]
                    ds["image_type"] = row["image_type"]
                    ds["satellite"] = row["satellite"]
                    ds["sensor"] = row["sensor"]
                    ds["subsystem"] = row["subsystem"]
                    ds["acq_datetime"] = row["acquisitiondate"]
                    ds["entity_ID"] = row["entityid"]
                    ds["filename"] = row["filename"]

                    ds['sensor'] = 'ETM+' if re.search('ETM+', ds['sensor']) else ds['sensor']
                    if self.skip_thermal and ds['subsystem'] == 'TIR':
                        continue  # removes ASTER TIR in case of skip_thermal
                    ds['subsystem'] = '' if ds['subsystem'] is None else ds['subsystem']
                    ds['sensormode'] = get_sensormode(ds)
                    if self.skip_pan and ds['sensormode'] == 'P':
                        continue  # removes e.g. SPOT PAN in case of skip_pan

                    if re.search("Sentinel-2A", ds['satellite'], re.I):
                        for subsystem in ['S2A10', 'S2A20', 'S2A60']:
                            sub_ds = ds.copy()
                            sub_ds['subsystem'] = subsystem
                            data_list.append(sub_ds)
664
665
666
667
668
                    elif re.search("Sentinel-2B", ds['satellite'], re.I):
                        for subsystem in ['S2B10', 'S2B20', 'S2B60']:
                            sub_ds = ds.copy()
                            sub_ds['subsystem'] = subsystem
                            data_list.append(sub_ds)
669
670
671
672
673
674
675
676
677
678
679
                    elif re.search("Terra", ds['satellite'], re.I):
                        for subsystem in ['VNIR1', 'VNIR2', 'SWIR', 'TIR']:
                            sub_ds = ds.copy()
                            sub_ds['subsystem'] = subsystem
                            data_list.append(sub_ds)
                    else:
                        data_list.append(ds)

        self.data_list = data_list
        return self.data_list

Daniel Scheffler's avatar
Daniel Scheffler committed
680
    def validate_exec_configs(self):
681
        for i in ['L1AP', 'L1BP', 'L1CP', 'L2AP', 'L2BP', 'L2CP']:
682
683
684
685
686
687
688
            try:
                exec_lvl = self.kwargs['exec_%s' % i]
            except KeyError:
                continue

            if exec_lvl is None:
                continue
Daniel Scheffler's avatar
Daniel Scheffler committed
689
690

            # check input format
691
692
693
694
            if all([len(exec_lvl) == 3, (np.array(exec_lvl) == np.array(np.array(exec_lvl, np.bool), np.int)).all()]):
                execute, write, delete = exec_lvl

                # written output cannot be turned off in execution mode 'Python'
695
696
                if not self.inmem_serialization and execute and not write:
                    warnings.warn("If CFG.inmem_serialization is False the output writer for %s has to be enabled "
697
698
699
700
701
702
                                  "because any operations on GMS_obj.arr read the intermediate results from disk. "
                                  "Turning it on.." % i)
                    write = True
                    self.kwargs['exec_%s' % i] = [execute, write, delete]

            else:
Daniel Scheffler's avatar
Daniel Scheffler committed
703
                raise ValueError('Execution mode must be provided as list of 3 elements containing only boolean '
704
                                 'values. Got %s for %s.' % (exec_lvl, i))
705

706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
    def check_no_read_write_limit_on_xtfs_mountpoint(self):
        intensive_IO_paths = [self.path_fileserver, self.path_archive, self.path_benchmarks,
                              self.path_dem_proc_srtm_90m, self.path_ECMWF_db, self.path_procdata_MGRS,
                              self.path_procdata_scenes]

        mount_points = {el.mountpoint: el for el in psutil.disk_partitions(all=True)}

        for path in intensive_IO_paths:
            for mp, mp_object in mount_points.items():
                if path.startswith(mp) and mp_object.device.startswith('xtreemfs'):
                    warnings.warn("Path %s appears to be on an XtreemFS mountpoint. It is highly recommended to set "
                                  "the configuration parameter 'max_parallel_reads_writes' to 0 in that case! "
                                  "Otherwise read/write processes might be slowed down! Continuing in 20 seconds.."
                                  % path)
                    time.sleep(20)
                    break

723
    def to_dict(self):
724
        """Generate a dictionary in the same structure like the one in options_default.json from the current config."""
725
        opts_default = get_options(path_options_default)
726

727
728
729
730
731
732
733
734
735
        # add all keys included in options_default.json
        outdict = dict()
        for key in opts_default.keys():
            if not isinstance(opts_default[key], (dict, OrderedDict)):
                outdict[key] = getattr(self, key)
            else:
                group = key
                if group not in outdict:
                    outdict[group] = dict()
736

737
738
739
740
741
742
743
                for group_key in opts_default[group]:
                    if not isinstance(opts_default[group][group_key], (dict, OrderedDict)):
                        outdict[group][group_key] = getattr(self, group_key)
                    else:
                        subgroup = group_key
                        if subgroup not in outdict[group]:
                            outdict[group][subgroup] = dict()
744

745
746
747
748
749
750
751
752
753
754
755
                        for subgroup_key in opts_default[group][subgroup]:
                            try:
                                outdict[group][subgroup][subgroup_key] = getattr(self, subgroup_key)
                            except AttributeError:
                                procexec_keys = ['run_processor', 'write_output', 'delete_output']
                                if subgroup_key in procexec_keys:
                                    proc_code = subgroup
                                    outdict[group][subgroup][subgroup_key] = \
                                        getattr(self, 'exec_%sP' % proc_code)[procexec_keys.index(subgroup_key)]
                                else:
                                    raise
756

757
758
759
760
        # add job metadata
        outdict.update(dict(
            job_meta={k: getattr(self, k) for k in ['ID', 'start_time', 'end_time', 'computation_time', 'hostname']},
            data_list={'dataset_%s' % i: ds for i, ds in enumerate(self.data_list)}))
761

762
        # add data_list
763

764
        return outdict
765

766
767
    def to_jsonable_dict(self):
        return python_to_json(self.to_dict())
768

769
770
    def __repr__(self):
        return pformat(self.to_dict())
771
772


773
774
def is_GMSConfig_available():
    try:
775
        if GMS_config is not None:
776
777
778
            return True
    except (EnvironmentError, OSError):
        return False
779
780


781
def json_to_python(value):
782
783
784
785
786
787
788
    def is_number(s):
        try:
            float(s)
            return True
        except ValueError:
            return False

789
790
791
792
    if type(value) is dict:
        return {json_to_python(k): json_to_python(v) for k, v in value.items()}
    elif type(value) is list:
        return [json_to_python(v) for v in value]
793
    else:
794
        if value == "None":
795
            return None
796
        if value == "slice(None, None, None)":
797
            return slice(None)
798
        if value in [True, "true"]:
799
            return True
800
        if value in [False, "false"]:
801
            return False
802
        if is_number(value):
803
            try:
804
805
                if str(int(value)) != str(float(value)):
                    return int(value)
806
                else:
807
                    return float(value)
808
            except ValueError:
809
                return float(value)
810
        else:
811
812
813
            return value


814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
def python_to_json(value):
    if type(value) in [dict, OrderedDict]:
        return {python_to_json(k): python_to_json(v) for k, v in value.items()}
    elif type(value) is list:
        return [python_to_json(v) for v in value]
    elif type(value) is np.ndarray:
        return [python_to_json(v) for v in value.tolist()]
    else:
        if value is None:
            return "None"
        if value is slice(None):
            return "slice(None, None, None)"
        if value is True:
            return "true"
        if value is False:
            return "false"
        if type(value) is datetime.datetime:
            return datetime.datetime.strftime(value, '%Y-%m-%d %H:%M:%S.%f%z')
        else:
            return value


836
837
838
839
840
841
842
class GMSValidator(Validator):
    def __init__(self, *args, **kwargs):
        """

        :param args:    Arguments to be passed to cerberus.Validator
        :param kwargs:  Keyword arguments to be passed to cerberus.Validator
        """
843
        super(GMSValidator, self).__init__(*args, **kwargs)
844
845
846
847

    def validate(self, document2validate, **kwargs):
        if super(GMSValidator, self).validate(document=document2validate, **kwargs) is False:
            raise ValueError("Options is malformed: %s" % str(self.errors))
848
849
850


def get_options(target, validation=True):
851
852
853
854
    """Return dictionary with all options.

    :param validation: True / False, whether to validate options read from files or not
    :param target: if path to file, then json is used to load, otherwise the default template is used
855
856
857
858
859
860
861
862
    :return: dictionary with options
    """

    if os.path.isfile(target):
        with open(target, "r") as fl:
            options = json_to_python(json.loads(jsmin(fl.read())))

        if validation is True:
863
            GMSValidator(allow_unknown=True, schema=gms_schema_input).validate(options)
864
865
866

        return options
    else:
867
        raise FileNotFoundError("Options file not found at file path %s." % target)