pipeline.py 12.8 KB
Newer Older
1
2
# -*- coding: utf-8 -*-

Daniel Scheffler's avatar
Daniel Scheffler committed
3
from typing import List, Tuple, Generator, Iterable, Union  # noqa F401  # flake8 issue
4

5
from ..options.config import GMS_config as CFG
6
from ..misc import exception_handler as EXC_H
Daniel Scheffler's avatar
Daniel Scheffler committed
7
from ..misc.path_generator import path_generator
8
from ..misc.logging import GMS_logger
9
from ..misc.locks import ProcessLock
10
11
12
13
14
15
from ..algorithms import L1A_P
from ..algorithms import L1B_P
from ..algorithms import L1C_P
from ..algorithms import L2A_P
from ..algorithms import L2B_P
from ..algorithms import L2C_P
Daniel Scheffler's avatar
Daniel Scheffler committed
16
17
from ..model.gms_object import failed_GMS_object, update_proc_status, return_GMS_objs_without_arrays
from ..model.gms_object import GMS_object  # noqa F401  # flake8 issue
18
from ..algorithms.geoprocessing import get_common_extent
19

20
21
__author__ = 'Daniel Scheffler'

22
23

@EXC_H.log_uncaught_exceptions
24
@update_proc_status
25
def L1A_map(dataset_dict):  # map (scene-wise parallelization)
26
27
    # type: (dict) -> L1A_P.L1A_object

28
    L1A_obj = L1A_P.L1A_object(**dataset_dict)
29
    L1A_obj.import_rasterdata()
30
    L1A_obj.import_metadata(v=False)
31
    L1A_obj.validate_GeoTransProj_GeoAlign()  # sets self.GeoTransProj_ok and self.GeoAlign_ok
32
    L1A_obj.apply_nodata_mask_to_ObjAttr('arr')  # nodata mask is automatically calculated
33
34
35
    L1A_obj.add_rasterInfo_to_MetaObj()
    L1A_obj.reference_data('UTM')
    L1A_obj.calc_TOARadRefTemp()
36
37
    L1A_obj.calc_corner_positions()  # requires mask_nodata
    L1A_obj.calc_center_AcqTime()  # (if neccessary); requires corner positions
38
    L1A_obj.calc_mean_VAA()
39
40
    L1A_obj.calc_orbit_overpassParams()  # requires corner positions
    L1A_obj.apply_nodata_mask_to_ObjAttr('mask_clouds', 0)
41

42
    if CFG.exec_L1AP[1]:
43
        L1A_obj.to_ENVI()
44
45
    L1A_obj.delete_tempFiles()

46
47
    return L1A_obj

48

49
@EXC_H.log_uncaught_exceptions
50
@update_proc_status
51
def L1A_map_1(dataset_dict, block_size=None):  # map (scene-wise parallelization)
52
    # type: (dict) -> List[L1A_P.L1A_object]
53

54
    L1A_obj = L1A_P.L1A_object(**dataset_dict)
55
    L1A_obj.import_rasterdata()
56
    L1A_obj.import_metadata(v=False)
57
    L1A_obj.validate_GeoTransProj_GeoAlign()  # sets self.GeoTransProj_ok and self.GeoAlign_ok
58
    L1A_obj.apply_nodata_mask_to_ObjAttr('arr')  # nodata mask is automatically calculated
59
60
    L1A_obj.add_rasterInfo_to_MetaObj()
    L1A_obj.reference_data('UTM')
61
    tiles = list(L1A_obj.to_tiles(
62
        block_size if block_size else CFG.tiling_block_size_XY))  # cut (block-wise parallelization)
63
64
    return tiles

65

66
@EXC_H.log_uncaught_exceptions
67
@update_proc_status
68
def L1A_map_2(L1A_tile):  # map (block-wise parallelization)
69
70
    # type: (L1A_P.L1A_object) -> L1A_P.L1A_object
    L1A_tile.calc_TOARadRefTemp()
71
    if not CFG.inmem_serialization:
72
        L1A_tile.to_ENVI(is_tempfile=True)
73
74
    return L1A_tile

75

76
@EXC_H.log_uncaught_exceptions
77
@update_proc_status
78
def L1A_map_3(L1A_obj):  # map (scene-wise parallelization)
79
    # type: (L1A_P.L1A_object) -> L1A_P.L1A_object
80
81
    L1A_obj.calc_corner_positions()  # requires mask_nodata
    L1A_obj.calc_center_AcqTime()  # (if neccessary); requires corner positions
82
    L1A_obj.calc_mean_VAA()
83
84
    L1A_obj.calc_orbit_overpassParams()  # requires corner positions
    L1A_obj.apply_nodata_mask_to_ObjAttr('mask_clouds', 0)
85
    if CFG.exec_L1AP[1]:
86
        L1A_obj.to_ENVI()
87
88
89
90
91
        L1A_obj.delete_tempFiles()
    else:
        L1A_obj.delete_tempFiles()
    return L1A_obj

92

93
@EXC_H.log_uncaught_exceptions
94
@update_proc_status
95
def L1B_map(L1A_obj):
96
    # type: (L1A_P.L1A_object) -> L1B_P.L1B_object
97
    """L1A_obj enthält in Python- (im Gegensatz zur inmem_serialization-) Implementierung KEINE ARRAY-DATEN!,
98
99
100
    nur die für die ganze Szene gültigen Metadaten"""

    L1B_obj = L1B_P.L1B_object(L1A_obj)
101
    L1B_obj.compute_global_shifts()
102

103
    if CFG.exec_L1BP[1]:
104
        L1B_obj.to_ENVI()
105
106
107
    L1B_obj.delete_tempFiles()
    return L1B_obj

108

109
@EXC_H.log_uncaught_exceptions
110
@update_proc_status
111
def L1C_map(L1B_objs):
Daniel Scheffler's avatar
Daniel Scheffler committed
112
    # type: (Iterable[L1B_P.L1B_object]) -> List[L1C_P.L1C_object]
113
114
    """Atmospheric correction.

115
116
117
118
119
    NOTE: all subsystems (containing all spatial samplings) belonging to the same scene ID are needed

    :param L1B_objs: list containing one or multiple L1B objects belonging to the same scene ID.
    """

120
121
122
123
    # initialize L1C objects
    L1C_objs = [L1C_P.L1C_object(L1B_obj) for L1B_obj in L1B_objs]

    # check in config if atmospheric correction is desired
124
    if CFG.target_radunit_optical == 'BOA_Ref':
125
126
127
        # atmospheric correction (asserts that there is an ac_options.json file on disk for the current sensor)
        if L1C_objs[0].ac_options:
            # perform atmospheric correction
128
            L1C_objs = L1C_P.AtmCorr(*L1C_objs).run_atmospheric_correction(dump_ac_input=False)
129
130
        else:
            [L1C_obj.logger.warning('Atmospheric correction is not yet supported for %s %s and has been skipped.'
131
                                    % (L1C_obj.satellite, L1C_obj.sensor)) for L1C_obj in L1C_objs]
132
133
    else:
        [L1C_obj.logger.warning('Atmospheric correction skipped because optical conversion type is set to %s.'
134
                                % CFG.target_radunit_optical) for L1C_obj in L1C_objs]
135
136

    # write outputs and delete temporary data
Daniel Scheffler's avatar
Daniel Scheffler committed
137
    for L1C_obj in L1C_objs:
138
        if CFG.exec_L1CP[1]:
139
            L1C_obj.to_ENVI()
140
        if L1C_obj.arr_shape == 'cube':
141
            L1C_obj.delete_tempFiles()
142
        L1C_obj.delete_ac_input_arrays()
143
144
145
146
147

    return L1C_objs


@EXC_H.log_uncaught_exceptions
148
@update_proc_status
Daniel Scheffler's avatar
Daniel Scheffler committed
149
150
def L2A_map(L1C_objs, block_size=None, return_tiles=True):
    # type: (Union[List[L1C_P.L1C_object], Tuple[L1C_P.L1C_object]]) -> Union[List[L2A_P.L2A_object], L2A_P.L2A_object]
151
152
153
154
155
156
    """Geometric homogenization.

    Performs correction of geometric displacements, resampling to target grid of the usecase and merges multiple
    GMS objects belonging to the same scene ID (subsystems) to a single L2A object.
    NOTE: Output L2A_object must be cut into tiles because L2A_obj size in memory exceeds maximum serialization size.

Daniel Scheffler's avatar
Daniel Scheffler committed
157
158
159
160
    :param L1C_objs:        list containing one or multiple L1C objects belonging to the same scene ID.
    :param block_size:      X/Y size of output tiles in pixels, e.g. (1024,1024)
    :param return_tiles:    return computed L2A object in tiles
    :return:                list of L2A_object tiles
161
162
    """

163
    # initialize L2A objects
164
165
166
    L2A_objs = [L2A_P.L2A_object(L1C_obj) for L1C_obj in L1C_objs]

    # correct geometric displacements and resample to target grid
167
    common_extent = get_common_extent([obj.trueDataCornerLonLat for obj in L2A_objs]) if len(L2A_objs) > 1 else None
168
169
    for L2A_obj in L2A_objs:
        L2A_obj.correct_spatial_shifts(cliptoextent=CFG.clip_to_extent, clipextent=common_extent, clipextent_prj=4326)
170
171
172
173
174

    # merge multiple subsystems belonging to the same scene ID to a single GMS object
    L2A_obj = L2A_P.L2A_object().from_sensor_subsystems(L2A_objs) if len(L2A_objs) > 1 else L2A_objs[0]

    # update some attibutes
175
    L2A_obj.calc_mask_nodata(overwrite=True)  # update no data mask
176
    L2A_obj.calc_corner_positions()  # update corner coordinates
177
178

    # write output
179
    if CFG.exec_L2AP[1]:
180
        L2A_obj.to_ENVI()
181
182
183
184

    # delete tempfiles of separate subsystem GMS objects
    [L2A_obj.delete_tempFiles() for L2A_obj in L2A_objs]

Daniel Scheffler's avatar
Daniel Scheffler committed
185
186
187
188
189
    if return_tiles:
        L2A_tiles = L2A_obj.to_tiles(blocksize=block_size if block_size else CFG.tiling_block_size_XY)
        return list(L2A_tiles)
    else:
        return L2A_obj
190
191


192
@EXC_H.log_uncaught_exceptions
193
@update_proc_status
194
def L2B_map(L2A_obj):
195
196
197
    # type: (L2A_P.L2A_object) -> L2B_P.L2B_object
    L2B_obj = L2B_P.L2B_object(L2A_obj)
    L2B_obj.spectral_homogenization()
198
    if CFG.exec_L2BP[1]:
199
        L2B_obj.to_ENVI()
200
    if L2B_obj.arr_shape == 'cube':
201
202
203
        L2B_obj.delete_tempFiles()
    return L2B_obj

204

205
@EXC_H.log_uncaught_exceptions
206
@update_proc_status
207
def L2C_map(L2B_obj):
208
209
    # type: (L2B_P.L2B_object) -> L2C_P.L2C_object
    L2C_obj = L2C_P.L2C_object(L2B_obj)
210
    if CFG.exec_L2CP[1]:
211
212
        L2C_MRGS_tiles = L2C_obj.to_MGRS_tiles(pixbuffer=CFG.mgrs_pixel_buffer)
        [MGRS_tile.to_ENVI(compression=CFG.output_data_compression) for MGRS_tile in L2C_MRGS_tiles]
213
    L2C_obj.delete_tempFiles()
214
    return L2C_obj  # contains no array data in Python mode
Daniel Scheffler's avatar
Daniel Scheffler committed
215
216


217
@return_GMS_objs_without_arrays
Daniel Scheffler's avatar
Daniel Scheffler committed
218
219
def run_complete_preprocessing(list_dataset_dicts_per_scene):  # map (scene-wise parallelization)
    # type: (List[dict]) -> Union[L1A_P.GMS_object, List, Generator]
220
221
222
223
224
225
226
    """

    NOTE: Exceptions in this function are must be catched by calling function (process controller).

    :param list_dataset_dicts_per_scene:
    :return:
    """
227
228
229
230
    with ProcessLock(processes=CFG.CPUs_all_jobs,
                     logger=GMS_logger('log__%s' % CFG.ID, fmt_suffix=list_dataset_dicts_per_scene[0]['scene_ID'],
                                       log_level=CFG.log_level, append=True)):

231
232
233
        if len(list(set([ds['proc_level'] for ds in list_dataset_dicts_per_scene]))) != 1:
            raise ValueError('Lists of subsystem datasets with different processing levels are not supported here. '
                             'Received %s.' % list_dataset_dicts_per_scene)
Daniel Scheffler's avatar
Daniel Scheffler committed
234

235
        input_proc_level = list_dataset_dicts_per_scene[0]['proc_level']
Daniel Scheffler's avatar
Daniel Scheffler committed
236

237
238
239
        ##################
        # L1A processing #
        ##################
Daniel Scheffler's avatar
Daniel Scheffler committed
240

241
242
243
        L1A_objects = []
        if CFG.exec_L1AP[0] and input_proc_level is None:
            L1A_objects = [L1A_map(subsystem_dataset_dict) for subsystem_dataset_dict in list_dataset_dicts_per_scene]
Daniel Scheffler's avatar
Daniel Scheffler committed
244

245
246
            if any([isinstance(obj, failed_GMS_object) for obj in L1A_objects]):
                return L1A_objects
Daniel Scheffler's avatar
Daniel Scheffler committed
247

248
249
250
        ##################
        # L1B processing #
        ##################
Daniel Scheffler's avatar
Daniel Scheffler committed
251

252
253
254
        # select subsystem with optimal band for co-registration
        # L1B_obj_coreg = L1B_map(L1A_objects[0])
        # copy coreg information to remaining subsets
Daniel Scheffler's avatar
Daniel Scheffler committed
255

256
257
258
259
260
261
262
        L1B_objects = L1A_objects
        if CFG.exec_L1BP[0]:
            # add earlier processed L1A data
            if input_proc_level == 'L1A':
                for ds in list_dataset_dicts_per_scene:
                    GMSfile = path_generator(ds, proc_level='L1A').get_path_gmsfile()
                    L1A_objects.append(L1A_P.L1A_object().from_disk([GMSfile, ['cube', None]]))
Daniel Scheffler's avatar
Daniel Scheffler committed
263

264
265
            L1B_objects = [L1B_map(L1A_obj) for L1A_obj in L1A_objects]
            del L1A_objects
Daniel Scheffler's avatar
Daniel Scheffler committed
266

267
268
            if any([isinstance(obj, failed_GMS_object) for obj in L1B_objects]):
                return L1B_objects
Daniel Scheffler's avatar
Daniel Scheffler committed
269

270
271
272
        ##################
        # L1C processing #
        ##################
Daniel Scheffler's avatar
Daniel Scheffler committed
273

274
275
276
277
278
279
280
        L1C_objects = L1B_objects
        if CFG.exec_L1CP[0]:
            # add earlier processed L1B data
            if input_proc_level == 'L1B':
                for ds in list_dataset_dicts_per_scene:
                    GMSfile = path_generator(ds, proc_level='L1B').get_path_gmsfile()
                    L1B_objects.append(L1B_P.L1B_object().from_disk([GMSfile, ['cube', None]]))
Daniel Scheffler's avatar
Daniel Scheffler committed
281

282
283
            L1C_objects = L1C_map(L1B_objects)
            del L1B_objects
Daniel Scheffler's avatar
Daniel Scheffler committed
284

285
286
            if any([isinstance(obj, failed_GMS_object) for obj in L1C_objects]):
                return L1C_objects
Daniel Scheffler's avatar
Daniel Scheffler committed
287

288
289
        if not CFG.exec_L2AP[0]:
            return L1C_objects
Daniel Scheffler's avatar
Daniel Scheffler committed
290

291
292
293
        ##################
        # L2A processing #
        ##################
Daniel Scheffler's avatar
Daniel Scheffler committed
294

295
296
297
298
299
        # add earlier processed L1C data
        if input_proc_level == 'L1C':
            for ds in list_dataset_dicts_per_scene:
                GMSfile = path_generator(ds, proc_level='L1C').get_path_gmsfile()
                L1C_objects.append(L1C_P.L1C_object().from_disk([GMSfile, ['cube', None]]))
Daniel Scheffler's avatar
Daniel Scheffler committed
300

301
302
        L2A_obj = L2A_map(L1C_objects, return_tiles=False)
        del L1C_objects
Daniel Scheffler's avatar
Daniel Scheffler committed
303

304
305
        if isinstance(L2A_obj, failed_GMS_object) or not CFG.exec_L2BP[0]:
            return L2A_obj
Daniel Scheffler's avatar
Daniel Scheffler committed
306

307
308
309
        ##################
        # L2B processing #
        ##################
Daniel Scheffler's avatar
Daniel Scheffler committed
310

311
312
313
314
315
316
        # add earlier processed L2A data
        if input_proc_level == 'L2A':
            assert len(list_dataset_dicts_per_scene) == 1, \
                'Expected only a single L2A dataset since subsystems are merged.'
            GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2A').get_path_gmsfile()
            L2A_obj = L2A_P.L2A_object().from_disk([GMSfile, ['cube', None]])
Daniel Scheffler's avatar
Daniel Scheffler committed
317

318
319
        L2B_obj = L2B_map(L2A_obj)
        del L2A_obj
Daniel Scheffler's avatar
Daniel Scheffler committed
320

321
322
        if isinstance(L2B_obj, failed_GMS_object) or not CFG.exec_L2CP[0]:
            return L2B_obj
Daniel Scheffler's avatar
Daniel Scheffler committed
323

324
325
326
        ##################
        # L2C processing #
        ##################
Daniel Scheffler's avatar
Daniel Scheffler committed
327

328
329
330
331
332
333
        # add earlier processed L2B data
        if input_proc_level == 'L2B':
            assert len(list_dataset_dicts_per_scene) == 1, \
                'Expected only a single L2B dataset since subsystems are merged.'
            GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2B').get_path_gmsfile()
            L2B_obj = L2B_P.L2B_object().from_disk([GMSfile, ['cube', None]])
Daniel Scheffler's avatar
Daniel Scheffler committed
334

335
336
        L2C_obj = L2C_map(L2B_obj)  # type: Union[GMS_object, failed_GMS_object, List]
        del L2B_obj
Daniel Scheffler's avatar
Daniel Scheffler committed
337

338
        return L2C_obj