pipeline.py 15.1 KB
Newer Older
1
2
# -*- coding: utf-8 -*-

Daniel Scheffler's avatar
Daniel Scheffler committed
3
from typing import List, Tuple, Generator, Iterable, Union  # noqa F401  # flake8 issue
4
from psutil import virtual_memory
5

6
from ..options.config import GMS_config as CFG
7
from ..misc import exception_handler as EXC_H
Daniel Scheffler's avatar
Daniel Scheffler committed
8
from ..misc.path_generator import path_generator
9
from ..misc.logging import GMS_logger
10
from ..misc.locks import ProcessLock, MemoryReserver, redis_conn
11
12
13
14
15
16
from ..algorithms import L1A_P
from ..algorithms import L1B_P
from ..algorithms import L1C_P
from ..algorithms import L2A_P
from ..algorithms import L2B_P
from ..algorithms import L2C_P
17
from ..model.gms_object import \
18
    failed_GMS_object, update_proc_status, return_proc_reports_only, estimate_mem_usage
Daniel Scheffler's avatar
Daniel Scheffler committed
19
from ..model.gms_object import GMS_object  # noqa F401  # flake8 issue
20
from ..algorithms.geoprocessing import get_common_extent
21

22
23
__author__ = 'Daniel Scheffler'

24
25

@EXC_H.log_uncaught_exceptions
26
@update_proc_status
27
def L1A_map(dataset_dict):  # map (scene-wise parallelization)
28
29
    # type: (dict) -> L1A_P.L1A_object

30
    L1A_obj = L1A_P.L1A_object(**dataset_dict)
31
    L1A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
32
    L1A_obj.import_rasterdata()
33
    L1A_obj.import_metadata()
34
    L1A_obj.validate_GeoTransProj_GeoAlign()  # sets self.GeoTransProj_ok and self.GeoAlign_ok
35
    L1A_obj.apply_nodata_mask_to_ObjAttr('arr')  # nodata mask is automatically calculated
36
37
38
    L1A_obj.add_rasterInfo_to_MetaObj()
    L1A_obj.reference_data('UTM')
    L1A_obj.calc_TOARadRefTemp()
39
40
    L1A_obj.calc_corner_positions()  # requires mask_nodata
    L1A_obj.calc_center_AcqTime()  # (if neccessary); requires corner positions
41
    L1A_obj.calc_mean_VAA()
42
43
    L1A_obj.calc_orbit_overpassParams()  # requires corner positions
    L1A_obj.apply_nodata_mask_to_ObjAttr('mask_clouds', 0)
44

45
    if CFG.exec_L1AP[1]:
46
        L1A_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
47
    L1A_obj.record_mem_usage()
48
49
    L1A_obj.delete_tempFiles()

50
51
    return L1A_obj

52

53
@EXC_H.log_uncaught_exceptions
54
@update_proc_status
55
def L1A_map_1(dataset_dict, block_size=None):  # map (scene-wise parallelization)
56
    # type: (dict) -> List[L1A_P.L1A_object]
57

58
    L1A_obj = L1A_P.L1A_object(**dataset_dict)
59
    L1A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
60
    L1A_obj.import_rasterdata()
61
    L1A_obj.import_metadata()
62
    L1A_obj.validate_GeoTransProj_GeoAlign()  # sets self.GeoTransProj_ok and self.GeoAlign_ok
63
    L1A_obj.apply_nodata_mask_to_ObjAttr('arr')  # nodata mask is automatically calculated
64
65
    L1A_obj.add_rasterInfo_to_MetaObj()
    L1A_obj.reference_data('UTM')
66
    tiles = list(L1A_obj.to_tiles(
67
        block_size if block_size else CFG.tiling_block_size_XY))  # cut (block-wise parallelization)
68
69
    return tiles

70

71
@EXC_H.log_uncaught_exceptions
72
@update_proc_status
73
def L1A_map_2(L1A_tile):  # map (block-wise parallelization)
74
75
    # type: (L1A_P.L1A_object) -> L1A_P.L1A_object
    L1A_tile.calc_TOARadRefTemp()
76
    if not CFG.inmem_serialization:
77
        L1A_tile.to_ENVI(CFG.write_ENVIclassif_cloudmask, is_tempfile=True)
78
79
    return L1A_tile

80

81
@EXC_H.log_uncaught_exceptions
82
@update_proc_status
83
def L1A_map_3(L1A_obj):  # map (scene-wise parallelization)
84
    # type: (L1A_P.L1A_object) -> L1A_P.L1A_object
85
86
    L1A_obj.calc_corner_positions()  # requires mask_nodata
    L1A_obj.calc_center_AcqTime()  # (if neccessary); requires corner positions
87
    L1A_obj.calc_mean_VAA()
88
89
    L1A_obj.calc_orbit_overpassParams()  # requires corner positions
    L1A_obj.apply_nodata_mask_to_ObjAttr('mask_clouds', 0)
90
    if CFG.exec_L1AP[1]:
91
        L1A_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
92
93
94
        L1A_obj.delete_tempFiles()
    else:
        L1A_obj.delete_tempFiles()
95
    L1A_obj.record_mem_usage()
96
97
    return L1A_obj

98

99
@EXC_H.log_uncaught_exceptions
100
@update_proc_status
101
def L1B_map(L1A_obj):
102
    # type: (L1A_P.L1A_object) -> L1B_P.L1B_object
103
    """L1A_obj enthält in Python- (im Gegensatz zur inmem_serialization-) Implementierung KEINE ARRAY-DATEN!,
104
105
    nur die für die ganze Szene gültigen Metadaten"""

106
    L1A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
107

108
    L1B_obj = L1B_P.L1B_object(L1A_obj)
109
    L1B_obj.compute_global_shifts()
110

111
    if CFG.exec_L1BP[1]:
112
        L1B_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
113
    L1B_obj.record_mem_usage()
114
115
116
    L1B_obj.delete_tempFiles()
    return L1B_obj

117

118
@EXC_H.log_uncaught_exceptions
119
@update_proc_status
120
def L1C_map(L1B_objs):
Daniel Scheffler's avatar
Daniel Scheffler committed
121
    # type: (Iterable[L1B_P.L1B_object]) -> List[L1C_P.L1C_object]
122
123
    """Atmospheric correction.

124
125
126
127
    NOTE: all subsystems (containing all spatial samplings) belonging to the same scene ID are needed

    :param L1B_objs: list containing one or multiple L1B objects belonging to the same scene ID.
    """
128
    list(L1B_objs)[0].block_at_system_overload(max_usage=CFG.critical_mem_usage)
129

130
131
132
133
    # initialize L1C objects
    L1C_objs = [L1C_P.L1C_object(L1B_obj) for L1B_obj in L1B_objs]

    # check in config if atmospheric correction is desired
134
    if CFG.target_radunit_optical == 'BOA_Ref':
135
136
137
        # atmospheric correction (asserts that there is an ac_options.json file on disk for the current sensor)
        if L1C_objs[0].ac_options:
            # perform atmospheric correction
138
            L1C_objs = L1C_P.AtmCorr(*L1C_objs).run_atmospheric_correction(dump_ac_input=False)
139
140
        else:
            [L1C_obj.logger.warning('Atmospheric correction is not yet supported for %s %s and has been skipped.'
141
                                    % (L1C_obj.satellite, L1C_obj.sensor)) for L1C_obj in L1C_objs]
142
143
    else:
        [L1C_obj.logger.warning('Atmospheric correction skipped because optical conversion type is set to %s.'
144
                                % CFG.target_radunit_optical) for L1C_obj in L1C_objs]
145
146

    # write outputs and delete temporary data
Daniel Scheffler's avatar
Daniel Scheffler committed
147
    for L1C_obj in L1C_objs:
148
        if CFG.exec_L1CP[1]:
149
            L1C_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
150
        if L1C_obj.arr_shape == 'cube':
151
            L1C_obj.delete_tempFiles()
152
        L1C_obj.delete_ac_input_arrays()
153

154
    [L1C_obj.record_mem_usage() for L1C_obj in L1C_objs]
155
156
157
158
    return L1C_objs


@EXC_H.log_uncaught_exceptions
159
@update_proc_status
Daniel Scheffler's avatar
Daniel Scheffler committed
160
161
def L2A_map(L1C_objs, block_size=None, return_tiles=True):
    # type: (Union[List[L1C_P.L1C_object], Tuple[L1C_P.L1C_object]]) -> Union[List[L2A_P.L2A_object], L2A_P.L2A_object]
162
163
164
165
166
167
    """Geometric homogenization.

    Performs correction of geometric displacements, resampling to target grid of the usecase and merges multiple
    GMS objects belonging to the same scene ID (subsystems) to a single L2A object.
    NOTE: Output L2A_object must be cut into tiles because L2A_obj size in memory exceeds maximum serialization size.

Daniel Scheffler's avatar
Daniel Scheffler committed
168
169
170
171
    :param L1C_objs:        list containing one or multiple L1C objects belonging to the same scene ID.
    :param block_size:      X/Y size of output tiles in pixels, e.g. (1024,1024)
    :param return_tiles:    return computed L2A object in tiles
    :return:                list of L2A_object tiles
172
    """
173
    L1C_objs[0].block_at_system_overload(max_usage=CFG.critical_mem_usage)
174

175
    # initialize L2A objects
176
177
    L2A_objs = [L2A_P.L2A_object(L1C_obj) for L1C_obj in L1C_objs]

178
179
180
181
182
183
    # get common extent (NOTE: using lon/lat coordinates here would cause black borders around the UTM image
    #                          because get_common_extent() just uses the envelop without regard to the projection
    clip_extent = \
        get_common_extent([obj.trueDataCornerUTM for obj in L2A_objs]) \
        if len(L2A_objs) > 1 else L2A_objs[0].trueDataCornerUTM

184
    # correct geometric displacements and resample to target grid
185
    for L2A_obj in L2A_objs:
186
187
        L2A_obj.correct_spatial_shifts(cliptoextent=CFG.clip_to_extent,
                                       clipextent=clip_extent, clipextent_prj=L2A_obj.arr.prj)
188
189

    # merge multiple subsystems belonging to the same scene ID to a single GMS object
190
191
192
193
194
195
196
197
    if len(L2A_objs) > 1:
        L2A_obj = L2A_P.L2A_object.from_sensor_subsystems(L2A_objs)
    else:
        L2A_obj = L2A_objs[0]

        # update attributes
        L2A_obj.calc_mask_nodata(overwrite=True)
        L2A_obj.calc_corner_positions()
198
199

    # write output
200
    if CFG.exec_L2AP[1]:
201
        L2A_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
202
203
204
205

    # delete tempfiles of separate subsystem GMS objects
    [L2A_obj.delete_tempFiles() for L2A_obj in L2A_objs]

Daniel Scheffler's avatar
Daniel Scheffler committed
206
    if return_tiles:
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
207
        L2A_tiles = list(L2A_obj.to_tiles(blocksize=block_size if block_size else CFG.tiling_block_size_XY))
208
        [L2A_tile.record_mem_usage() for L2A_tile in L2A_tiles]
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
209
        return L2A_tiles
Daniel Scheffler's avatar
Daniel Scheffler committed
210
    else:
211
        L2A_obj.record_mem_usage()
Daniel Scheffler's avatar
Daniel Scheffler committed
212
        return L2A_obj
213
214


215
@EXC_H.log_uncaught_exceptions
216
@update_proc_status
217
def L2B_map(L2A_obj):
218
    # type: (L2A_P.L2A_object) -> L2B_P.L2B_object
219
    L2A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
220
221
    L2B_obj = L2B_P.L2B_object(L2A_obj)
    L2B_obj.spectral_homogenization()
222
    if CFG.exec_L2BP[1]:
223
        L2B_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
224
    if L2B_obj.arr_shape == 'cube':
225
        L2B_obj.delete_tempFiles()
226
    L2B_obj.record_mem_usage()
227
228
    return L2B_obj

229

230
@EXC_H.log_uncaught_exceptions
231
@update_proc_status
232
def L2C_map(L2B_obj):
233
    # type: (L2B_P.L2B_object) -> L2C_P.L2C_objec
234
    L2B_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
235
    L2C_obj = L2C_P.L2C_object(L2B_obj)
236
    if CFG.exec_L2CP[1]:
237
        L2C_MRGS_tiles = L2C_obj.to_MGRS_tiles(pixbuffer=CFG.mgrs_pixel_buffer)
238
239
        [MGRS_tile.to_ENVI(CFG.write_ENVIclassif_cloudmask,
                           compression=CFG.output_data_compression) for MGRS_tile in L2C_MRGS_tiles]
240
    L2C_obj.record_mem_usage()
241
    L2C_obj.delete_tempFiles()
242
    return L2C_obj  # contains no array data in Python mode
Daniel Scheffler's avatar
Daniel Scheffler committed
243
244


245
@return_proc_reports_only
246
# @return_GMS_objs_without_arrays
Daniel Scheffler's avatar
Daniel Scheffler committed
247
248
def run_complete_preprocessing(list_dataset_dicts_per_scene):  # map (scene-wise parallelization)
    # type: (List[dict]) -> Union[L1A_P.GMS_object, List, Generator]
249
250
251
252
253
254
255
    """

    NOTE: Exceptions in this function are must be catched by calling function (process controller).

    :param list_dataset_dicts_per_scene:
    :return:
    """
256
257
258
    pipeline_logger = GMS_logger('log__%s' % CFG.ID, fmt_suffix=list_dataset_dicts_per_scene[0]['scene_ID'],
                                 log_level=CFG.log_level, append=True)

259
    # set CPU and memory limits
260
    cpulimit = CFG.CPUs_all_jobs
261
262
263
    mem2reserve = 15

    if redis_conn:
264
265
266
267
268
269
        mem_estim = estimate_mem_usage(list_dataset_dicts_per_scene[0]['dataset_ID'],
                                       list_dataset_dicts_per_scene[0]['satellite'])
        if mem_estim:
            mem2reserve = mem_estim
        else:
            cpulimit = int((virtual_memory().total * .8 - virtual_memory().used) / 1024 ** 3 / mem2reserve)
270
            pipeline_logger.info('No memory usage statistics from earlier jobs found for the current configuration. '
271
                                 'Limiting processes to %s in order to collect memory statistics first.' % cpulimit)
272
273

    # start processing
274
    with MemoryReserver(mem2lock_gb=mem2reserve, logger=pipeline_logger, max_usage=CFG.max_mem_usage),\
275
            ProcessLock(allowed_slots=cpulimit, logger=pipeline_logger):
276

277
278
279
        if len(list(set([ds['proc_level'] for ds in list_dataset_dicts_per_scene]))) != 1:
            raise ValueError('Lists of subsystem datasets with different processing levels are not supported here. '
                             'Received %s.' % list_dataset_dicts_per_scene)
Daniel Scheffler's avatar
Daniel Scheffler committed
280

281
        input_proc_level = list_dataset_dicts_per_scene[0]['proc_level']
Daniel Scheffler's avatar
Daniel Scheffler committed
282

283
284
285
        ##################
        # L1A processing #
        ##################
Daniel Scheffler's avatar
Daniel Scheffler committed
286

287
288
        L1A_objects = []
        if CFG.exec_L1AP[0] and input_proc_level is None:
289
290
            L1A_objects = \
                [L1A_map(subsystem_dataset_dict) for subsystem_dataset_dict in list_dataset_dicts_per_scene]
Daniel Scheffler's avatar
Daniel Scheffler committed
291

292
293
            if any([isinstance(obj, failed_GMS_object) for obj in L1A_objects]):
                return L1A_objects
Daniel Scheffler's avatar
Daniel Scheffler committed
294

295
296
297
        ##################
        # L1B processing #
        ##################
Daniel Scheffler's avatar
Daniel Scheffler committed
298

299
300
301
        # select subsystem with optimal band for co-registration
        # L1B_obj_coreg = L1B_map(L1A_objects[0])
        # copy coreg information to remaining subsets
Daniel Scheffler's avatar
Daniel Scheffler committed
302

303
304
305
306
307
308
        L1B_objects = L1A_objects
        if CFG.exec_L1BP[0]:
            # add earlier processed L1A data
            if input_proc_level == 'L1A':
                for ds in list_dataset_dicts_per_scene:
                    GMSfile = path_generator(ds, proc_level='L1A').get_path_gmsfile()
309
                    L1A_objects.append(L1A_P.L1A_object.from_disk([GMSfile, ['cube', None]]))
Daniel Scheffler's avatar
Daniel Scheffler committed
310

311
            L1B_objects = [L1B_map(L1A_obj) for L1A_obj in L1A_objects]
312

313
            del L1A_objects
Daniel Scheffler's avatar
Daniel Scheffler committed
314

315
316
            if any([isinstance(obj, failed_GMS_object) for obj in L1B_objects]):
                return L1B_objects
Daniel Scheffler's avatar
Daniel Scheffler committed
317

318
319
320
        ##################
        # L1C processing #
        ##################
Daniel Scheffler's avatar
Daniel Scheffler committed
321

322
323
324
325
326
327
        L1C_objects = L1B_objects
        if CFG.exec_L1CP[0]:
            # add earlier processed L1B data
            if input_proc_level == 'L1B':
                for ds in list_dataset_dicts_per_scene:
                    GMSfile = path_generator(ds, proc_level='L1B').get_path_gmsfile()
328
                    L1B_objects.append(L1B_P.L1B_object.from_disk([GMSfile, ['cube', None]]))
Daniel Scheffler's avatar
Daniel Scheffler committed
329

330
            L1C_objects = L1C_map(L1B_objects)
331

332
            del L1B_objects
Daniel Scheffler's avatar
Daniel Scheffler committed
333

334
335
            if any([isinstance(obj, failed_GMS_object) for obj in L1C_objects]):
                return L1C_objects
Daniel Scheffler's avatar
Daniel Scheffler committed
336

337
338
        if not CFG.exec_L2AP[0]:
            return L1C_objects
Daniel Scheffler's avatar
Daniel Scheffler committed
339

340
341
342
        ##################
        # L2A processing #
        ##################
Daniel Scheffler's avatar
Daniel Scheffler committed
343

344
345
346
347
        # add earlier processed L1C data
        if input_proc_level == 'L1C':
            for ds in list_dataset_dicts_per_scene:
                GMSfile = path_generator(ds, proc_level='L1C').get_path_gmsfile()
348
                L1C_objects.append(L1C_P.L1C_object.from_disk([GMSfile, ['cube', None]]))
Daniel Scheffler's avatar
Daniel Scheffler committed
349

350
        L2A_obj = L2A_map(L1C_objects, return_tiles=False)
351

352
        del L1C_objects
Daniel Scheffler's avatar
Daniel Scheffler committed
353

354
355
        if isinstance(L2A_obj, failed_GMS_object) or not CFG.exec_L2BP[0]:
            return L2A_obj
Daniel Scheffler's avatar
Daniel Scheffler committed
356

357
358
359
        ##################
        # L2B processing #
        ##################
Daniel Scheffler's avatar
Daniel Scheffler committed
360

361
362
363
364
365
        # add earlier processed L2A data
        if input_proc_level == 'L2A':
            assert len(list_dataset_dicts_per_scene) == 1, \
                'Expected only a single L2A dataset since subsystems are merged.'
            GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2A').get_path_gmsfile()
366
            L2A_obj = L2A_P.L2A_object.from_disk([GMSfile, ['cube', None]])
Daniel Scheffler's avatar
Daniel Scheffler committed
367

368
        L2B_obj = L2B_map(L2A_obj)
369

370
        del L2A_obj
Daniel Scheffler's avatar
Daniel Scheffler committed
371

372
373
        if isinstance(L2B_obj, failed_GMS_object) or not CFG.exec_L2CP[0]:
            return L2B_obj
Daniel Scheffler's avatar
Daniel Scheffler committed
374

375
376
377
        ##################
        # L2C processing #
        ##################
Daniel Scheffler's avatar
Daniel Scheffler committed
378

379
380
381
382
383
        # add earlier processed L2B data
        if input_proc_level == 'L2B':
            assert len(list_dataset_dicts_per_scene) == 1, \
                'Expected only a single L2B dataset since subsystems are merged.'
            GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2B').get_path_gmsfile()
384
            L2B_obj = L2B_P.L2B_object.from_disk([GMSfile, ['cube', None]])
Daniel Scheffler's avatar
Daniel Scheffler committed
385

386
        L2C_obj = L2C_map(L2B_obj)  # type: Union[GMS_object, failed_GMS_object, List]
387

388
        del L2B_obj
Daniel Scheffler's avatar
Daniel Scheffler committed
389

390
        return L2C_obj