L1B_P.py 35 KB
Newer Older
Daniel Scheffler's avatar
Daniel Scheffler committed
1
# -*- coding: utf-8 -*-
2
3
4
5
6
7
"""
Level 1B Processor:

Detection of global/local geometric displacements.
"""

Daniel Scheffler's avatar
Daniel Scheffler committed
8

9
10
import collections
import json
11
import os
12
import socket
13
import time
14
import warnings
15
from datetime import datetime, timedelta
16
17

import numpy as np
18
from geopandas import GeoDataFrame
19
from shapely.geometry import box
20
import pytz
Daniel Scheffler's avatar
Daniel Scheffler committed
21

22
from arosics import COREG, DESHIFTER
23
from geoarray import GeoArray
24
25
26
27
28
29
30
31
32
33
34
from py_tools_ds.geo.coord_grid import is_coord_grid_equal
from py_tools_ds.geo.coord_calc import corner_coord_to_minmax
from py_tools_ds.geo.coord_trafo import reproject_shapelyGeometry, transform_any_prj
from py_tools_ds.geo.projection import prj_equal, EPSG2WKT, WKT2EPSG
from py_tools_ds.geo.vector.topology import get_overlap_polygon

from ..config import GMS_config as CFG
from .L1A_P import L1A_object
from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc import path_generator as PG
35
from ..misc.spatial_index_mediator import SpatialIndexMediator
36
from ..misc.definition_dicts import get_GMS_sensorcode, get_outFillZeroSaturated
37

38
__author__ = 'Daniel Scheffler'
39
40


41
42
43
class Scene_finder(object):
    def __init__(self, src_boundsLonLat, src_AcqDate, src_prj, src_footprint_poly, min_overlap=20,
                 min_cloudcov=0, max_cloudcov=20, plusminus_days=30, plusminus_years=10):
44

45
46
47
        self.boundsLonLat = src_boundsLonLat
        self.src_AcqDate = src_AcqDate
        self.src_prj = src_prj
48
        self.src_footprint_poly = src_footprint_poly
49
50
51
52
53
        self.min_overlap = min_overlap
        self.min_cloudcov = min_cloudcov
        self.max_cloudcov = max_cloudcov
        self.plusminus_days = plusminus_days
        self.plusminus_years = plusminus_years
54

55
        # get temporal constraints
56
        def add_years(dt, years): return dt.replace(dt.year + years) \
57
58
59
            if not (dt.month == 2 and dt.day == 29) else dt.replace(dt.year + years, 3, 1)
        self.timeStart = add_years(self.src_AcqDate, -plusminus_years)
        timeEnd = add_years(self.src_AcqDate, +plusminus_years)
60
61
        timeNow = datetime.utcnow().replace(tzinfo=pytz.UTC)
        self.timeEnd = timeEnd if timeEnd <= timeNow else timeNow
62

63
64
65
        self.possib_ref_scenes = None  # set by self.spatial_query()
        self.GDF_ref_scenes = GeoDataFrame()  # set by self.spatial_query()
        self.ref_scene = None
66

67
    def spatial_query(self, timeout=5):
68
69
70
71
        """Query the postgreSQL database to find possible reference scenes matching the specified criteria.

        :param timeout:     maximum query duration allowed (seconds)
        """
72
73
        for i in range(10):
            try:
74
75
                SpIM = SpatialIndexMediator(timeout=timeout)
                self.possib_ref_scenes = \
76
                    SpIM.getFullSceneDataForDataset(self.boundsLonLat, self.timeStart, self.timeEnd, self.min_cloudcov,
77
                                                    self.max_cloudcov, CFG.usecase.datasetid_spatial_ref,
78
                                                    refDate=self.src_AcqDate, maxDaysDelta=self.plusminus_days)
79
80
                break
            except socket.timeout:
81
                if i < 9:
82
83
84
                    continue
                else:
                    raise TimeoutError('Spatial query timed out 10 times!')
85
86
                    # TODO catch error when index server is not running:
                    # TODO ConnectionRefusedError: [Errno 111] Connection refused
87

88
89
90
        if self.possib_ref_scenes:
            # fill GeoDataFrame with possible ref scene parameters
            GDF = GeoDataFrame(self.possib_ref_scenes, columns=['object'])
91
92
93
94
            GDF['sceneid'] = list(GDF['object'].map(lambda scene: scene.sceneid))
            GDF['acquisitiondate'] = list(GDF['object'].map(lambda scene: scene.acquisitiondate))
            GDF['cloudcover'] = list(GDF['object'].map(lambda scene: scene.cloudcover))
            GDF['polyLonLat'] = list(GDF['object'].map(lambda scene: scene.polyLonLat))
95

96
97
            def LonLat2UTM(polyLL):
                return reproject_shapelyGeometry(polyLL, 4326, self.src_prj)
98

99
100
            GDF['polyUTM'] = list(GDF['polyLonLat'].map(LonLat2UTM))
            self.GDF_ref_scenes = GDF
101
102

    def filter_possib_ref_scenes(self):
103
        """Filter possible scenes by running all filter functions."""
104
        self._filter_by_overlap()
105
106
        self._filter_by_proc_status()
        self._filter_by_dataset_existance()
107
        self._filter_by_entity_ID_availability()
108
        self._filter_by_projection()
109
110

    def choose_ref_scene(self):
111
        """Choose reference scene with minimum cloud cover and maximum overlap."""
112
113
        if not self.GDF_ref_scenes.empty:
            GDF = self.GDF_ref_scenes
114
            GDF = GDF[GDF['cloudcover'] == GDF['cloudcover'].min()]
115
            GDF = GDF[GDF['overlap percentage'] == GDF['overlap percentage'].max()]
116

117
118
119
120
121
            if not GDF.empty:
                GDF_res = GDF.iloc[0]
                return ref_Scene(GDF_res)
        else:
            return None
122

123
    def _filter_by_overlap(self):
124
        """Filter all scenes with less spatial overlap than self.min_overlap."""
125
126
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
127
            # get overlap parameter
128
            def get_OL_prms(poly): return get_overlap_polygon(poly, self.src_footprint_poly)
129
130
            GDF['overlapParams'] = list(GDF['polyLonLat'].map(get_OL_prms))
            GDF['overlap area'] = list(GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap area']))
131
            GDF['overlap percentage'] = list(GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap percentage']))
132
            GDF['overlap poly'] = list(GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap poly']))
133
134
135
            del GDF['overlapParams']

            # filter scenes out that have too less overlap
136
            self.GDF_ref_scenes = GDF.loc[GDF['overlap percentage'] >= self.min_overlap]
137

138
    def _filter_by_proc_status(self):
139
        """Filter all scenes that have not been processed before according to proc. status (at least L1A is needed)."""
140
141
142
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
            # get processing level of refernce scenes
143
144
145
            def query_procL(sceneID):
                return DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'scenes_proc', ['proc_level'],
                                                       {'sceneid': sceneID})
146
147
148
            GDF['temp_queryRes'] = list(GDF['sceneid'].map(query_procL))
            GDF['proc_level'] = list(GDF['temp_queryRes'].map(lambda queryRes: queryRes[0][0] if queryRes else None))
            GDF.drop('temp_queryRes', axis=1, inplace=True)
149
150
151

            # filter scenes out that have not yet been processed
            self.GDF_ref_scenes = GDF[GDF['proc_level'].notnull()]
152

153
    def _filter_by_dataset_existance(self):
154
        """Filter all scenes where no processed data can be found on disk."""
155
156
157
158
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
            # get path of binary file and check if the corresponding dataset exists
            GDF = self.GDF_ref_scenes
159
160
161
162
163
164

            def get_path_binary(GDF_row):
                return PG.path_generator(scene_ID=GDF_row['sceneid'], proc_level=GDF_row['proc_level'])\
                    .get_path_imagedata()

            def check_exists(path): return os.path.exists(path)
165
            GDF['path_ref'] = GDF.apply(lambda GDF_row: get_path_binary(GDF_row), axis=1)
166
            GDF['refDs_exists'] = list(GDF['path_ref'].map(check_exists))
167

168
            # filter scenes out where the corresponding dataset does not exist on fileserver
169
            self.GDF_ref_scenes = GDF[GDF['refDs_exists']]
170

171
    def _filter_by_entity_ID_availability(self):
172
        """Filter all scenes where no proper entity ID can be found in the database."""
173
174
175
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
            # check if a proper entity ID can be gathered from database
176
177
178
            def query_eID(sceneID):
                return DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'scenes', ['entityid'],
                                                       {'id': sceneID}, records2fetch=1)
179
180
            GDF['temp_queryRes'] = list(GDF['sceneid'].map(query_eID))
            GDF['entityid'] = list(GDF['temp_queryRes'].map(lambda queryRes: queryRes[0][0] if queryRes else None))
181
            GDF.drop('temp_queryRes', axis=1, inplace=True)
182

183
            # filter scenes out that have no entity ID (database errors)
184
            self.GDF_ref_scenes = GDF[GDF['entityid'].notnull()]
185

186
    def _filter_by_projection(self):
187
        """Filter all scenes that have a different projection than the target image."""
188
189
190
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
            # compare projections of target and reference image
191
            from ..io.input_reader import read_ENVIhdr_to_dict
192
193
194
195
196

            def get_prj(path_binary):
                return read_ENVIhdr_to_dict(os.path.splitext(path_binary)[0] + '.hdr')['coordinate system string']

            def is_prj_equal(path_binary): return prj_equal(self.src_prj, get_prj(path_binary))
197
            GDF['prj_equal'] = list(GDF['path_ref'].map(is_prj_equal))
198

199
            # filter scenes out that have a different projection
200
            self.GDF_ref_scenes = GDF[GDF['prj_equal']]
201

202

203
204
class ref_Scene:
    def __init__(self, GDF_record):
205
206
207
        self.scene_ID = int(GDF_record['sceneid'])
        self.entity_ID = GDF_record['entityid']
        self.AcqDate = GDF_record['acquisitiondate']
208
209
        self.cloudcover = GDF_record['cloudcover']
        self.polyLonLat = GDF_record['polyLonLat']
210
        self.polyUTM = GDF_record['polyUTM']
211
        self.proc_level = GDF_record['proc_level']
212
        self.filePath = GDF_record['path_ref']
213
214
215
216
217


class L1B_object(L1A_object):
    def __init__(self, L1A_obj=None):

218
        super(L1B_object, self).__init__()
219
220
221

        # set defaults
        self._spatRef_available = None
222
223
224
        self.spatRef_scene = None  # set by self.get_spatial_reference_scene()
        self.coreg_info = {}
        self.deshift_results = collections.OrderedDict()
225
226
227
228
229
230
231
232
233

        if L1A_obj:
            # populate attributes
            [setattr(self, key, value) for key, value in L1A_obj.__dict__.items()]

        self.proc_level = 'L1B'

    @property
    def spatRef_available(self):
234
        if self._spatRef_available is not None:
235
236
237
238
239
240
241
242
243
244
            return self._spatRef_available
        else:
            self.get_spatial_reference_scene()
            return self._spatRef_available

    @spatRef_available.setter
    def spatRef_available(self, spatRef_available):
        self._spatRef_available = spatRef_available

    def get_spatial_reference_scene(self):
245
        boundsLonLat = corner_coord_to_minmax(self.trueDataCornerLonLat)
246
        footprint_poly = HLP_F.CornerLonLat_to_shapelyPoly(self.trueDataCornerLonLat)
247
248
        RSF = Scene_finder(boundsLonLat, self.acq_datetime, self.meta_odict['coordinate system string'],
                           footprint_poly, 20, 0, 20, 30, 10)
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268

        # run spatial query
        self.logger.info('Querying database in order to find a suitable reference scene for co-registration.')
        RSF.spatial_query(timeout=5)
        if RSF.possib_ref_scenes:
            self.logger.info('Query result: %s reference scenes with matching metadata.' % len(RSF.possib_ref_scenes))
        else:
            self.logger.warning('Spatial query returned no matches. Coregistration impossible.')
            self.spatRef_available = False
            return None

        # filter results
        RSF.filter_possib_ref_scenes()
        if RSF.GDF_ref_scenes.empty:
            self.logger.warning('No scene fulfills all requirements to serve as spatial reference for scene %s '
                                '(entity ID %s). Coregistration impossible.' % (self.scene_ID, self.entity_ID))
            self.spatRef_available = False
            return None

        # assign spatial refernce scene
269
        self.spatRef_scene = RSF.choose_ref_scene()
270
271
272
273
274
        self.spatRef_available = True
        self.logger.info('Found a suitable reference image for coregistration: scene ID %s (entity ID %s).'
                         % (self.spatRef_scene.scene_ID, self.spatRef_scene.entity_ID))

    def _get_reference_image_params_pgSQL(self):
275
        # TODO implement earlier version of this function as a backup for SpatialIndexMediator
276
277
        """postgreSQL query: get IDs of overlapping scenes and select most suitable scene_ID
            (with respect to DGM, cloud cover"""
278
279
        warnings.warn('_get_reference_image_params_pgSQL is deprecated an will not work anymore.', DeprecationWarning)

280
281
        # vorab-check anhand wolkenmaske, welche region von im2shift überhaupt für shift-corr tauglich ist
        # -> diese region als argument in postgresql abfrage
282
        # scene_ID            = 14536400 # LE71510322000093SGS00 im2shift
283

284
        # set query conditions
285
286
        min_overlap = 20  # %
        max_cloudcov = 20  # %
287
        plusminus_days = 30
288
289
290
291
292
293
294
295
        AcqDate = self.im2shift_objDict['acquisition_date']
        date_minmax = [AcqDate - timedelta(days=plusminus_days), AcqDate + timedelta(days=plusminus_days)]
        dataset_cond = 'datasetid=%s' % CFG.usecase.datasetid_spatial_ref
        cloudcov_cond = 'cloudcover < %s' % max_cloudcov
        # FIXME cloudcover noch nicht für alle scenes im proc_level METADATA verfügbar
        dayrange_cond = "(EXTRACT(MONTH FROM scenes.acquisitiondate), EXTRACT(DAY FROM scenes.acquisitiondate)) " \
                        "BETWEEN (%s, %s) AND (%s, %s)" \
                        % (date_minmax[0].month, date_minmax[0].day, date_minmax[1].month, date_minmax[1].day)
296
297
        # TODO weitere Kriterien einbauen!

298
299
300
301
302
303
304
305
        def query_scenes(condlist):
            return DB_T.get_overlapping_scenes_from_postgreSQLdb(
                CFG.job.conn_database,
                table='scenes',
                tgt_corners_lonlat=self.trueDataCornerLonLat,
                conditions=condlist,
                add_cmds='ORDER BY scenes.cloudcover ASC',
                timeout=30000)
306
307
        conds_descImportance = [dataset_cond, cloudcov_cond, dayrange_cond]

308
        self.logger.info('Querying database in order to find a suitable reference scene for co-registration.')
309

310
        count, filt_overlap_scenes = 0, []
311
        while not filt_overlap_scenes:
312
313
314
315
            if count == 0:
                # search within already processed scenes
                # das ist nur Ergebnis aus scenes_proc
                # -> dort liegt nur eine referenz, wenn die szene schon bei CFG.job-Beginn in Datensatzliste drin war
316
                res = DB_T.get_overlapping_scenes_from_postgreSQLdb(
317
                    CFG.job.conn_database,
318
319
320
321
                    tgt_corners_lonlat=self.trueDataCornerLonLat,
                    conditions=['datasetid=%s' % CFG.usecase.datasetid_spatial_ref],
                    add_cmds='ORDER BY scenes.cloudcover ASC',
                    timeout=25000)
322
                filt_overlap_scenes = self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], 20.)
323

324
            else:
325
326
327
                # search within complete scenes table using less filter criteria with each run
                # TODO: Daniels Index einbauen, sonst  bei wachsender Tabellengröße evtl. irgendwann zu langsam
                res = query_scenes(conds_descImportance)
328
                filt_overlap_scenes = self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], min_overlap)
329

330
                if len(conds_descImportance) > 1:  # FIXME anderer Referenzsensor?
331
332
333
334
                    del conds_descImportance[-1]
                else:  # reduce min_overlap to 10 percent if there are overlapping scenes
                    if res:
                        min_overlap = 10
335
336
                        filt_overlap_scenes = \
                            self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], min_overlap)
337
338

                    # raise warnings if no match found
339
                    if not filt_overlap_scenes:
340
341
                        if not res:
                            warnings.warn('No reference scene found for %s (scene ID %s). Coregistration of this scene '
342
                                          'failed.' % (self.baseN, self.scene_ID))
343
344
345
                        else:
                            warnings.warn('Reference scenes for %s (scene ID %s) have been found but none has more '
                                          'than %s percent overlap. Coregistration of this scene failed.'
346
                                          % (self.baseN, self.scene_ID, min_overlap))
347
                        break
348
            count += 1
349
350
351
352

        if filt_overlap_scenes:
            ref_available = False
            for count, sc in enumerate(filt_overlap_scenes):
353
                if count == 2:  # FIXME Abbuch schon bei 3. Szene?
354
                    warnings.warn('No reference scene for %s (scene ID %s) available. '
355
                                  'Coregistration of this scene failed.' % (self.baseN, self.scene_ID))
356
357
358
                    break

                # start download of scene data not available and start L1A processing
359
                def dl_cmd(scene_ID): print('%s %s %s' % (
360
361
                    CFG.job.java_commands['keyword'].strip(),  # FIXME CFG.job.java_commands is deprecated
                    CFG.job.java_commands["value_download"].strip(), scene_ID))
362

363
                path = PG.path_generator(scene_ID=sc['scene_ID']).get_path_imagedata()
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
364

365
366
367
368
369
370
371
372
                if not os.path.exists(path):
                    # add scene 2 download to scenes_jobs.missing_scenes

                    # print JAVA download command
                    t_dl_start = time.time()
                    dl_cmd(sc['scene_ID'])

                    # check if scene is downloading
373
374
375
376
                    download_start_timeout = 5  # seconds
                    # set timout for external processing
                    # -> DEPRECATED BECAUSE CREATION OF EXTERNAL CFG.job WITHIN CFG.job IS NOT ALLOWED
                    processing_timeout = 5  # seconds # FIXME increase timeout if processing is really started
377
378
379
                    proc_level = None
                    while True:
                        proc_level_chk = DB_T.get_info_from_postgreSQLdb(
380
                            CFG.job.conn_database, 'scenes', ['proc_level'], {'id': sc['scene_ID']})[0][0]
381
                        if proc_level != proc_level_chk:
382
                            print('Reference scene %s, current processing level: %s' % (sc['scene_ID'], proc_level_chk))
383
                        proc_level = proc_level_chk
384
385
                        if proc_level_chk in ['SCHEDULED', 'METADATA'] and \
                           time.time() - t_dl_start >= download_start_timeout:
386
                            warnings.warn('Download of reference scene %s (entity ID %s) timed out. '
387
                                          'Coregistration of this scene failed.' % (self.baseN, self.scene_ID))
388
389
                            break
                        if proc_level_chk == 'L1A':
390
391
392
393
                            ref_available = True
                            break
                        elif proc_level_chk == 'DOWNLOADED' and time.time() - t_dl_start >= processing_timeout:
                            # proc_level='DOWNLOADED' but scene has not been processed
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
394
395
396
                            warnings.warn('L1A processing of reference scene %s (entity ID %s) timed out. '
                                          'Coregistration of this scene failed.' % (self.baseN, self.scene_ID))
                            break
397
398
                            # DB_T.set_info_in_postgreSQLdb(CFG.job.conn_database,'scenes',
                            #                             {'proc_level':'METADATA'},{'id':sc['scene_ID']})
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
399

400
401
402
403
404
405
406
                        time.sleep(5)
                else:
                    ref_available = True

                if not ref_available:
                    continue
                else:
407
408
                    self.path_imref = path
                    self.imref_scene_ID = sc['scene_ID']
409
                    self.imref_footprint_poly = sc['scene poly']
410
411
412
413
414
415
416
417
                    self.overlap_poly = sc['overlap poly']
                    self.overlap_percentage = sc['overlap percentage']
                    self.overlap_area = sc['overlap area']

                    query_res = DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'scenes', ['entityid'],
                                                                {'id': self.imref_scene_ID}, records2fetch=1)
                    assert query_res != [], 'No entity-ID found for scene number %s' % self.imref_scene_ID
                    self.imref_entity_ID = query_res[0][0]  # [('LC81510322013152LGN00',)]
418
                    break
419
        self.logger.close()
420

421
    def _sceneIDList_to_filt_overlap_scenes(self, sceneIDList, min_overlap):
422
423
424
        """find reference scenes that cover at least 20% of the scene with the given ID
        ONLY FIRST 50 scenes are considered"""

425
426
427
        # t0 = time.time()
        dict_sceneID_poly = [{'scene_ID': ID, 'scene poly': HLP_F.scene_ID_to_shapelyPolygon(ID)}
                             for ID in sceneIDList]  # always returns LonLot polygons
428
429

        # get overlap polygons and their parameters. result: [{overlap poly, overlap percentage, overlap area}]
430
431
        for dic in dict_sceneID_poly:  # input: dicts {scene_ID, ref_poly}
            dict_overlap_poly_params = get_overlap_polygon(dic['scene poly'], self.arr.footprint_poly)
432
            dic.update(dict_overlap_poly_params)  # adds {overlap poly, overlap percentage, overlap area}
433
        # print('polygon creation time', time.time()-t0)
434
435
436
437
438
439
440
441

        # filter those scene_IDs out where overlap percentage is below 20%
        if min_overlap:
            filt_overlap_scenes = [scene for scene in dict_sceneID_poly if scene['overlap percentage'] >= min_overlap]
        else:
            filt_overlap_scenes = dict_sceneID_poly

        return filt_overlap_scenes
442

443
444
445
446
447
448
449
    def get_opt_bands4matching(self, target_cwlPos_nm=550, v=False):
        """Automatically determines the optimal bands used für fourier shift theorem matching

        :param target_cwlPos_nm:   the desired wavelength used for matching
        :param v:                  verbose mode
        """
        # get spectral characteristics
450
451
        shift_cwl = [float(i) for i in self.meta_odict['wavelength']]
        shift_fwhm = [float(i) for i in self.meta_odict['bandwidths']]
452
453
        with open(PG.path_generator(scene_ID=self.spatRef_scene.scene_ID).get_path_gmsfile()) as inF:
            ref_gmsDict = json.load(inF)
454
455
        ref_cwl = [float(i) for i in ref_gmsDict['meta']['wavelength']]
        ref_fwhm = [float(i) for i in ref_gmsDict['meta']['bandwidths']]
456
457

        # exclude cirrus/oxygen band of Landsat-8/Sentinel-2
458
459
460
        shift_bbl, ref_bbl = [False] * len(shift_cwl), [False] * len(ref_cwl)  # bad band lists
        for dic, s_r in zip([self.__dict__, ref_gmsDict], ['shift', 'ref']):
            dic['GMS_identifier']['logger'] = None  # set a dummy value in order to avoid Exception
461
            sensorcode = get_GMS_sensorcode(dic['GMS_identifier'])
462
463
464
465
            if sensorcode in ['LDCM', 'S2A', 'S2B'] and '9' in dic['LayerBandsAssignment']:
                locals()['%s_bbl' % s_r][dic['LayerBandsAssignment'].index('9')] = True
            if sensorcode in ['S2A', 'S2B'] and '10' in dic['LayerBandsAssignment']:
                locals()['%s_bbl' % s_r][dic['LayerBandsAssignment'].index('10')] = True
466

467
        # cwl_overlap = (max(min(shift_cwl),min(ref_cwl)),  min(max(shift_cwl),max(ref_cwl))) # -> (min wvl, max wvl)
468
        # find matching band of reference image for each band of image to be shifted
469
470
471
472
        match_dic = collections.OrderedDict()
        for idx, cwl, fwhm in zip(range(len(shift_cwl)), shift_cwl, shift_fwhm):
            if shift_bbl[idx]:
                continue  # skip cwl if it is declared as bad band above
473
474
475

            def is_inside(r_cwl, s_cwl, s_fwhm): return s_cwl - s_fwhm / 2 < r_cwl < s_cwl + s_fwhm / 2

476
477
            matching_r_cwls = [r_cwl for i, r_cwl in enumerate(ref_cwl) if
                               is_inside(r_cwl, cwl, fwhm) and not ref_bbl[i]]
478
479
            if matching_r_cwls:
                match_dic[cwl] = matching_r_cwls[0] if len(matching_r_cwls) else \
480
                    matching_r_cwls[np.abs(np.array(matching_r_cwls) - cwl).argmin()]
481
482
483
484
485

        # set bands4 match based on the above results
        poss_cwls = [cwl for cwl in shift_cwl if cwl in match_dic]
        if poss_cwls:
            if not target_cwlPos_nm:
486
487
488
489
490
491
492
                shift_band4match = shift_cwl.index(poss_cwls[0]) + 1  # first possible shift band
                ref_band4match = ref_cwl.index(match_dic[poss_cwls[0]]) + 1  # matching reference band
            else:  # target_cwlPos is given
                closestWvl_to_target = poss_cwls[np.abs(np.array(poss_cwls) - target_cwlPos_nm).argmin()]
                shift_band4match = shift_cwl.index(closestWvl_to_target) + 1  # the shift band closest to target
                ref_band4match = ref_cwl.index(match_dic[closestWvl_to_target]) + 1  # matching ref
        else:  # all reference bands are outside of shift-cwl +- fwhm/2
493
494
            warnings.warn('Optimal bands for matching could not be automatically determined. Choosing first band of'
                          'each image.')
495
496
            shift_band4match = 1
            ref_band4match = 1
497

498
499
500
        if v:
            print('Shift band for matching:     %s (%snm)' % (shift_band4match, shift_cwl[shift_band4match - 1]))
            print('Reference band for matching: %s (%snm)' % (ref_band4match, ref_cwl[ref_band4match - 1]))
501
502
503
504
505

        return ref_band4match, shift_band4match

    def coregister_spatially(self):
        if self.coreg_needed and self.spatRef_available:
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
            geoArr_ref = GeoArray(self.spatRef_scene.filePath)
            geoArr_shift = GeoArray(self.arr)
            r_b4match, s_b4match = self.get_opt_bands4matching(target_cwlPos_nm=550, v=False)
            coreg_kwargs = dict(
                r_b4match=r_b4match,
                s_b4match=s_b4match,
                align_grids=True,  # FIXME not needed here
                match_gsd=True,  # FIXME not needed here
                data_corners_ref=[[x, y] for x, y in self.spatRef_scene.polyUTM.convex_hull.exterior.coords],
                data_corners_tgt=[transform_any_prj(EPSG2WKT(4326), self.meta_odict['coordinate system string'], x, y)
                                  for x, y in HLP_F.reorder_CornerLonLat(self.trueDataCornerLonLat)],
                nodata=(get_outFillZeroSaturated(geoArr_ref.dtype)[0],
                        get_outFillZeroSaturated(geoArr_shift.dtype)[0]),
                ignore_errors=True,
                v=False,
                q=True
            )
523
524
525
526

            COREG_obj = COREG(geoArr_ref, geoArr_shift, **coreg_kwargs)
            COREG_obj.calculate_spatial_shifts()

527
528
529
530
            self.coreg_info.update(
                COREG_obj.coreg_info)  # no clipping to trueCornerLonLat until here -> only shift correction
            self.coreg_info.update({'reference scene ID': self.spatRef_scene.scene_ID})
            self.coreg_info.update({'reference entity ID': self.spatRef_scene.entity_ID})
531
532
533

            if COREG_obj.success:
                self.logger.info("Calculated map shifts (X,Y): %s / %s"
534
535
                                 % (COREG_obj.x_shift_map,
                                    COREG_obj.y_shift_map))  # FIXME direkt in calculate_spatial_shifts loggen
536

537
538
539
            else:
                # TODO add database entry with error hint
                [self.logger.error('ERROR during coregistration of scene %s (entity ID %s):\n%s'
540
                                   % (self.scene_ID, self.entity_ID, err)) for err in COREG_obj.tracked_errors]
541
        else:
542
            if self.coreg_needed:
543
544
                self.logger.warning('Coregistration skipped because no suitable reference scene is available or '
                                    'spatial query failed.')
545
546
            else:
                self.logger.info('Coregistration of scene %s (entity ID %s) skipped because target dataset ID equals '
547
548
549
550
551
552
553
554
                                 'reference dataset ID.' % (self.scene_ID, self.entity_ID))

            self.coreg_info.update({'corrected_shifts_px': {'x': 0, 'y': 0}})
            self.coreg_info.update({'corrected_shifts_map': {'x': 0, 'y': 0}})
            self.coreg_info.update({'original map info': self.meta_odict['map info']})
            self.coreg_info.update({'updated map info': None})
            self.coreg_info.update({'reference scene ID': None})
            self.coreg_info.update({'reference entity ID': None})
555
            self.coreg_info.update({'reference geotransform': None})
556
557
558
559
560
561
562
            # reference projection must be the own projection in order to avoid overwriting with a wrong EPSG
            self.coreg_info.update({'reference projection': self.meta_odict['coordinate system string']})
            self.coreg_info.update({'reference extent': {'rows': None, 'cols': None}})
            self.coreg_info.update({'reference grid': [list(CFG.usecase.spatial_ref_gridx),
                                                       list(CFG.usecase.spatial_ref_gridy)]})
            self.coreg_info.update(
                {'success': True if not self.coreg_needed else False})  # False means spatRef not available
563

564
565
566
567
568
569
570
571
572
573
574
575
    def correct_spatial_shifts(self, cliptoextent=True, clipextent=None, clipextent_prj=None, v=False):
        # type: (bool, list, any, bool) -> None
        """Corrects the spatial shifts calculated by self.coregister_spatially().

        :param cliptoextent:    whether to clip the output to the given extent
        :param clipextent:      list of XY-coordinate tuples giving the target extent (if not given and cliptoextent is
                                True, the 'trueDataCornerLonLat' attribute of the GMS object is used
        :param clipextent_prj:  WKT projection string or EPSG code of the projection for the coordinates in clipextent
        :param v:
        :return:
        """

576
577
        # cliptoextent is automatically True if an extent is given
        cliptoextent = cliptoextent if not clipextent else True
578

579
580
        if cliptoextent or self.resamp_needed or (self.coreg_needed and self.coreg_info['success']):

581
            # get target bounds # TODO implement boxObj call instead here
582
            if not clipextent:
583
584
                trueDataCornerUTM = [transform_any_prj(EPSG2WKT(4326), self.MetaObj.projection, x, y)
                                     for x, y in self.trueDataCornerLonLat]
585
                xmin, xmax, ymin, ymax = corner_coord_to_minmax(trueDataCornerUTM)
586
                mapBounds = box(xmin, ymin, xmax, ymax).bounds
587
588
589
590
591
592
593
594
            else:
                assert clipextent_prj, \
                    "'clipextent_prj' must be given together with 'clipextent'. Received only 'clipextent'."
                clipextent_UTM = clipextent if prj_equal(self.MetaObj.projection, clipextent_prj) else \
                    [transform_any_prj(clipextent_prj, self.MetaObj.projection, x, y) for x, y in clipextent]
                xmin, xmax, ymin, ymax = corner_coord_to_minmax(clipextent_UTM)
                mapBounds = box(xmin, ymin, xmax, ymax).bounds

595
            # correct shifts and clip to extent
596
597
            # ensure self.masks exists (does not exist in Flink mode because
            # in that case self.fill_from_disk() is skipped)
598
599
600
            if not hasattr(self, 'masks') or self.masks is None:
                self.build_combined_masks_array()  # creates self.masks and self.masks_meta

601
602
603
            # exclude self.mask_nodata, self.mask_clouds from warping
            del self.mask_nodata, self.mask_clouds

604
605
606
            attributes2deshift = [attrname for attrname in
                                  ['arr', 'masks', 'dem', 'ac_errors', 'mask_clouds_confidence']
                                  if getattr(self, '_%s' % attrname) is not None]
607
            for attrname in attributes2deshift:
608
                geoArr = getattr(self, attrname)
609
610

                # do some logging
611
612
                if self.coreg_needed:
                    if self.coreg_info['success']:
613
614
615
                        self.logger.info("Correcting spatial shifts for attribute '%s'..." % attrname)
                    elif cliptoextent and is_coord_grid_equal(
                         geoArr.gt, CFG.usecase.spatial_ref_gridx, CFG.usecase.spatial_ref_gridy):
616
                        self.logger.info("Attribute '%s' has only been clipped to it's extent because no valid "
617
618
                                         "shifts have been detected and the pixel grid equals the target grid."
                                         % attrname)
619
620
                    elif self.resamp_needed:
                        self.logger.info("Resampling attribute '%s' to target grid..." % attrname)
621
622
623
624
                elif self.resamp_needed:
                    self.logger.info("Resampling attribute '%s' to target grid..." % attrname)

                # correct shifts
625
                DS = DESHIFTER(geoArr, self.coreg_info,
626
627
628
629
630
631
632
633
634
                               target_xyGrid=[CFG.usecase.spatial_ref_gridx, CFG.usecase.spatial_ref_gridy],
                               cliptoextent=cliptoextent,
                               clipextent=mapBounds,
                               align_grids=True,
                               resamp_alg='nearest' if attrname == 'masks' else 'cubic',
                               CPUs=None if CFG.job.allow_subMultiprocessing else 1,
                               progress=True if v else False,
                               q=True,
                               v=v)
635
636
637
                DS.correct_shifts()

                # update coreg_info
638
639
                if attrname == 'arr':
                    self.coreg_info['is shifted'] = DS.is_shifted
640
                    self.coreg_info['is resampled'] = DS.is_resampled
641

642
                # update geoinformations and array shape related attributes
643
644
645
                self.logger.info("Updating geoinformations of '%s' attribute..." % attrname)
                if attrname == 'arr':
                    self.meta_odict['map info'] = DS.updated_map_info
646
                    self.meta_odict['coordinate system string'] = EPSG2WKT(WKT2EPSG(DS.updated_projection))
647
                    self.shape_fullArr = DS.arr_shifted.shape
648
649
                    self.meta_odict['lines'], self.meta_odict['samples'] = DS.arr_shifted.shape[:2]
                else:
650
651
                    self.masks_meta['map info'] = DS.updated_map_info
                    self.masks_meta['coordinate system string'] = EPSG2WKT(WKT2EPSG(DS.updated_projection))
652
653
                    self.masks_meta['lines'], self.masks_meta['samples'] = DS.arr_shifted.shape[:2]

654
655
                    # NOTE: mask_nodata and mask_clouds are updated later by L2A_map mapper function (module pipeline)

656
                # update the GeoArray instance without loosing its inherent metadata (nodata, ...)
657
658
659
                geoArr.arr, geoArr.gt, geoArr.prj = \
                    DS.GeoArray_shifted.arr, DS.GeoArray_shifted.gt, DS.GeoArray_shifted.prj
                # setattr(self,attrname, DS.GeoArray_shifted) # NOTE: don't set array earlier because setter will also
660
661
                #                                            # update arr.gt/.prj/.nodata from meta_odict

662
            self.resamp_needed = False
663
            self.coreg_needed = False
664

665
666
            # recreate self.masks_nodata and self.mask_clouds from self.masks
            self.mask_nodata = self.mask_nodata
667
668
            self.mask_clouds = self.mask_clouds
            # FIXME move functionality of self.masks only to output writer and remove self.masks completely