L1B_P.py 34.1 KB
Newer Older
Daniel Scheffler's avatar
Daniel Scheffler committed
1
# -*- coding: utf-8 -*-
2
3
4
5
6
7
"""
Level 1B Processor:

Detection of global/local geometric displacements.
"""

Daniel Scheffler's avatar
Daniel Scheffler committed
8

9
import collections
10
import os
11
import socket
12
import time
13
import warnings
14
from datetime import datetime, timedelta
15
16

import numpy as np
17
from geopandas import GeoDataFrame
18
from shapely.geometry import box
19
import pytz
Daniel Scheffler's avatar
Daniel Scheffler committed
20

21
from arosics import COREG, DESHIFTER
22
from geoarray import GeoArray
23
24
25
26
27
28
29
from py_tools_ds.geo.coord_grid import is_coord_grid_equal
from py_tools_ds.geo.coord_calc import corner_coord_to_minmax
from py_tools_ds.geo.coord_trafo import reproject_shapelyGeometry, transform_any_prj
from py_tools_ds.geo.projection import prj_equal, EPSG2WKT, WKT2EPSG
from py_tools_ds.geo.vector.topology import get_overlap_polygon

from ..config import GMS_config as CFG
30
from ..model.gms_object import GMS_object
31
32
33
34
from .L1A_P import L1A_object
from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc import path_generator as PG
35
from ..misc.spatial_index_mediator import SpatialIndexMediator
36
from ..misc.definition_dicts import get_GMS_sensorcode, get_outFillZeroSaturated
37

38
__author__ = 'Daniel Scheffler'
39
40


41
class Scene_finder(object):
42
43
    def __init__(self, src_boundsLonLat, src_AcqDate, src_prj, src_footprint_poly, sceneID_excluded=None,
                 min_overlap=20, min_cloudcov=0, max_cloudcov=20, plusminus_days=30, plusminus_years=10):
44

45
46
47
        self.boundsLonLat = src_boundsLonLat
        self.src_AcqDate = src_AcqDate
        self.src_prj = src_prj
48
        self.src_footprint_poly = src_footprint_poly
49
        self.sceneID_excluded = sceneID_excluded
50
51
52
53
54
        self.min_overlap = min_overlap
        self.min_cloudcov = min_cloudcov
        self.max_cloudcov = max_cloudcov
        self.plusminus_days = plusminus_days
        self.plusminus_years = plusminus_years
55

56
        # get temporal constraints
57
        def add_years(dt, years): return dt.replace(dt.year + years) \
58
59
60
            if not (dt.month == 2 and dt.day == 29) else dt.replace(dt.year + years, 3, 1)
        self.timeStart = add_years(self.src_AcqDate, -plusminus_years)
        timeEnd = add_years(self.src_AcqDate, +plusminus_years)
61
62
        timeNow = datetime.utcnow().replace(tzinfo=pytz.UTC)
        self.timeEnd = timeEnd if timeEnd <= timeNow else timeNow
63

64
65
66
        self.possib_ref_scenes = None  # set by self.spatial_query()
        self.GDF_ref_scenes = GeoDataFrame()  # set by self.spatial_query()
        self.ref_scene = None
67

68
    def spatial_query(self, timeout=5):
69
70
71
72
        """Query the postgreSQL database to find possible reference scenes matching the specified criteria.

        :param timeout:     maximum query duration allowed (seconds)
        """
73
74
        for i in range(10):
            try:
75
76
                SpIM = SpatialIndexMediator(timeout=timeout)
                self.possib_ref_scenes = \
77
                    SpIM.getFullSceneDataForDataset(self.boundsLonLat, self.timeStart, self.timeEnd, self.min_cloudcov,
78
                                                    self.max_cloudcov, CFG.usecase.datasetid_spatial_ref,
79
                                                    refDate=self.src_AcqDate, maxDaysDelta=self.plusminus_days)
80
81
                break
            except socket.timeout:
82
                if i < 9:
83
84
85
                    continue
                else:
                    raise TimeoutError('Spatial query timed out 10 times!')
86
87
                    # TODO catch error when index server is not running:
                    # TODO ConnectionRefusedError: [Errno 111] Connection refused
88

89
90
91
        if self.possib_ref_scenes:
            # fill GeoDataFrame with possible ref scene parameters
            GDF = GeoDataFrame(self.possib_ref_scenes, columns=['object'])
92
93
94
95
            GDF['sceneid'] = list(GDF['object'].map(lambda scene: scene.sceneid))
            GDF['acquisitiondate'] = list(GDF['object'].map(lambda scene: scene.acquisitiondate))
            GDF['cloudcover'] = list(GDF['object'].map(lambda scene: scene.cloudcover))
            GDF['polyLonLat'] = list(GDF['object'].map(lambda scene: scene.polyLonLat))
96

97
98
            def LonLat2UTM(polyLL):
                return reproject_shapelyGeometry(polyLL, 4326, self.src_prj)
99

100
101
            GDF['polyUTM'] = list(GDF['polyLonLat'].map(LonLat2UTM))
            self.GDF_ref_scenes = GDF
102

103
104
105
    def _collect_refscene_metadata(self):
        """Collect some reference scene metadata needed for later filtering."""
        GDF = self.GDF_ref_scenes
106

107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
        # get overlap parameter
        def get_OL_prms(poly): return get_overlap_polygon(poly, self.src_footprint_poly)

        GDF['overlapParams'] = list(GDF['polyLonLat'].map(get_OL_prms))
        GDF['overlap area'] = list(GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap area']))
        GDF['overlap percentage'] = list(GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap percentage']))
        GDF['overlap poly'] = list(GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap poly']))
        del GDF['overlapParams']

        # get processing level of reference scenes
        procL = GeoDataFrame(
            DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'scenes_proc', ['sceneid', 'proc_level'],
                                            {'sceneid': list(GDF.sceneid)}), columns=['sceneid', 'proc_level'])
        GDF = GDF.merge(procL, on='sceneid', how='left')
        GDF = GDF.where(GDF.notnull(), None)  # replace NaN values with None

        # get path of binary file
        def get_path_binary(GDF_row):
            return PG.path_generator(scene_ID=GDF_row['sceneid'], proc_level=GDF_row['proc_level']) \
                .get_path_imagedata() if GDF_row['proc_level'] else None
        GDF['path_ref'] = GDF.apply(lambda GDF_row: get_path_binary(GDF_row), axis=1)
        GDF['refDs_exists'] = list(GDF['path_ref'].map(lambda p: os.path.exists(p) if p else False))

        # check if a proper entity ID can be gathered from database
        eID = GeoDataFrame(DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'scenes', ['id', 'entityid'],
                                                           {'id': list(GDF.sceneid)}), columns=['sceneid', 'entityid'])
        GDF = GDF.merge(eID, on='sceneid', how='left')
        self.GDF_ref_scenes = GDF.where(GDF.notnull(), None)

    def _filter_excluded_sceneID(self):
        """Filter reference scene with the same scene ID like the target scene."""
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
            self.GDF_ref_scenes = GDF.loc[GDF['sceneid'] != self.sceneID_excluded]
141

142
    def _filter_by_overlap(self):
143
        """Filter all scenes with less spatial overlap than self.min_overlap."""
144
145
146
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
            self.GDF_ref_scenes = GDF.loc[GDF['overlap percentage'] >= self.min_overlap]
147

148
    def _filter_by_proc_status(self):
149
        """Filter all scenes that have not been processed before according to proc. status (at least L1A is needed)."""
150
151
152
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
            self.GDF_ref_scenes = GDF[GDF['proc_level'].notnull()]
153

154
    def _filter_by_dataset_existance(self):
155
        """Filter all scenes where no processed data can be found on fileserver."""
156
157
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
158
            self.GDF_ref_scenes = GDF[GDF['refDs_exists']]
159

160
    def _filter_by_entity_ID_availability(self):
161
        """Filter all scenes where no proper entity ID can be found in the database (database errors)."""
162
163
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
164
            self.GDF_ref_scenes = GDF[GDF['entityid'].notnull()]
165

166
    def _filter_by_projection(self):
167
        """Filter all scenes that have a different projection than the target image."""
168
        GDF = self.GDF_ref_scenes[self.GDF_ref_scenes.refDs_exists]
169
170
        if not GDF.empty:
            # compare projections of target and reference image
171
172
            GDF['prj_equal'] = \
                list(GDF['path_ref'].map(lambda path_ref: prj_equal(self.src_prj, GeoArray(path_ref).prj)))
173

174
            self.GDF_ref_scenes = GDF[GDF['prj_equal']]
175

176
177
    def choose_ref_scene(self):
        """Choose reference scene with minimum cloud cover and maximum overlap."""
178
179
180
        if self.possib_ref_scenes:
            # First, collect some relavant reference scene metadata
            self._collect_refscene_metadata()
181

182
183
184
185
186
187
188
            # Filter possible scenes by running all filter functions
            self._filter_excluded_sceneID()
            self._filter_by_overlap()
            self._filter_by_proc_status()
            self._filter_by_dataset_existance()
            self._filter_by_entity_ID_availability()
            self._filter_by_projection()
189

190
191
192
193
194
        # Choose the reference scene out of the filtered DataFrame
        if not self.GDF_ref_scenes.empty:
            GDF = self.GDF_ref_scenes
            GDF = GDF[GDF['cloudcover'] == GDF['cloudcover'].min()]
            GDF = GDF[GDF['overlap percentage'] == GDF['overlap percentage'].max()]
195

196
197
198
199
200
            if not GDF.empty:
                GDF_res = GDF.iloc[0]
                return ref_Scene(GDF_res)
        else:
            return None
201

202

203
204
class ref_Scene:
    def __init__(self, GDF_record):
205
206
207
        self.scene_ID = int(GDF_record['sceneid'])
        self.entity_ID = GDF_record['entityid']
        self.AcqDate = GDF_record['acquisitiondate']
208
209
        self.cloudcover = GDF_record['cloudcover']
        self.polyLonLat = GDF_record['polyLonLat']
210
        self.polyUTM = GDF_record['polyUTM']
211
        self.proc_level = GDF_record['proc_level']
212
        self.filePath = GDF_record['path_ref']
213
214
215
216
217


class L1B_object(L1A_object):
    def __init__(self, L1A_obj=None):

218
        super(L1B_object, self).__init__()
219
220
221

        # set defaults
        self._spatRef_available = None
222
223
        self.spatRef_scene = None  # set by self.get_spatial_reference_scene()
        self.deshift_results = collections.OrderedDict()
224
225
226
227
228
229
230
231
232

        if L1A_obj:
            # populate attributes
            [setattr(self, key, value) for key, value in L1A_obj.__dict__.items()]

        self.proc_level = 'L1B'

    @property
    def spatRef_available(self):
233
        if self._spatRef_available is not None:
234
235
236
237
238
239
240
241
242
243
            return self._spatRef_available
        else:
            self.get_spatial_reference_scene()
            return self._spatRef_available

    @spatRef_available.setter
    def spatRef_available(self, spatRef_available):
        self._spatRef_available = spatRef_available

    def get_spatial_reference_scene(self):
244
        boundsLonLat = corner_coord_to_minmax(self.trueDataCornerLonLat)
245
        footprint_poly = HLP_F.CornerLonLat_to_shapelyPoly(self.trueDataCornerLonLat)
246
        RSF = Scene_finder(boundsLonLat, self.acq_datetime, self.meta_odict['coordinate system string'],
247
                           footprint_poly, self.scene_ID, 20, 0, 20, 30, 10)
248
249
250
251
252
253
254
255
256
257

        # run spatial query
        self.logger.info('Querying database in order to find a suitable reference scene for co-registration.')
        RSF.spatial_query(timeout=5)
        if RSF.possib_ref_scenes:
            self.logger.info('Query result: %s reference scenes with matching metadata.' % len(RSF.possib_ref_scenes))
        else:
            self.logger.warning('Spatial query returned no matches. Coregistration impossible.')
            self.spatRef_available = False

258
259
260
261
262
263
264
265
        # try to get a spatial reference scene by applying some filter criteria
        self.spatRef_scene = RSF.choose_ref_scene()
        if self.spatRef_scene:
            self.spatRef_available = True
            self.logger.info('Found a suitable reference image for coregistration: scene ID %s (entity ID %s).'
                             % (self.spatRef_scene.scene_ID, self.spatRef_scene.entity_ID))
        else:
            self.spatRef_available = False
266
267
268
269
            self.logger.warning('No scene fulfills all requirements to serve as spatial reference for scene %s '
                                '(entity ID %s). Coregistration impossible.' % (self.scene_ID, self.entity_ID))

    def _get_reference_image_params_pgSQL(self):
270
        # TODO implement earlier version of this function as a backup for SpatialIndexMediator
271
272
        """postgreSQL query: get IDs of overlapping scenes and select most suitable scene_ID
            (with respect to DGM, cloud cover"""
273
274
        warnings.warn('_get_reference_image_params_pgSQL is deprecated an will not work anymore.', DeprecationWarning)

275
276
        # vorab-check anhand wolkenmaske, welche region von im2shift überhaupt für shift-corr tauglich ist
        # -> diese region als argument in postgresql abfrage
277
        # scene_ID            = 14536400 # LE71510322000093SGS00 im2shift
278

279
        # set query conditions
280
281
        min_overlap = 20  # %
        max_cloudcov = 20  # %
282
        plusminus_days = 30
283
284
285
286
287
288
289
290
        AcqDate = self.im2shift_objDict['acquisition_date']
        date_minmax = [AcqDate - timedelta(days=plusminus_days), AcqDate + timedelta(days=plusminus_days)]
        dataset_cond = 'datasetid=%s' % CFG.usecase.datasetid_spatial_ref
        cloudcov_cond = 'cloudcover < %s' % max_cloudcov
        # FIXME cloudcover noch nicht für alle scenes im proc_level METADATA verfügbar
        dayrange_cond = "(EXTRACT(MONTH FROM scenes.acquisitiondate), EXTRACT(DAY FROM scenes.acquisitiondate)) " \
                        "BETWEEN (%s, %s) AND (%s, %s)" \
                        % (date_minmax[0].month, date_minmax[0].day, date_minmax[1].month, date_minmax[1].day)
291
292
        # TODO weitere Kriterien einbauen!

293
294
295
296
297
298
299
300
        def query_scenes(condlist):
            return DB_T.get_overlapping_scenes_from_postgreSQLdb(
                CFG.job.conn_database,
                table='scenes',
                tgt_corners_lonlat=self.trueDataCornerLonLat,
                conditions=condlist,
                add_cmds='ORDER BY scenes.cloudcover ASC',
                timeout=30000)
301
302
        conds_descImportance = [dataset_cond, cloudcov_cond, dayrange_cond]

303
        self.logger.info('Querying database in order to find a suitable reference scene for co-registration.')
304

305
        count, filt_overlap_scenes = 0, []
306
        while not filt_overlap_scenes:
307
308
309
310
            if count == 0:
                # search within already processed scenes
                # das ist nur Ergebnis aus scenes_proc
                # -> dort liegt nur eine referenz, wenn die szene schon bei CFG.job-Beginn in Datensatzliste drin war
311
                res = DB_T.get_overlapping_scenes_from_postgreSQLdb(
312
                    CFG.job.conn_database,
313
314
315
316
                    tgt_corners_lonlat=self.trueDataCornerLonLat,
                    conditions=['datasetid=%s' % CFG.usecase.datasetid_spatial_ref],
                    add_cmds='ORDER BY scenes.cloudcover ASC',
                    timeout=25000)
317
                filt_overlap_scenes = self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], 20.)
318

319
            else:
320
321
322
                # search within complete scenes table using less filter criteria with each run
                # TODO: Daniels Index einbauen, sonst  bei wachsender Tabellengröße evtl. irgendwann zu langsam
                res = query_scenes(conds_descImportance)
323
                filt_overlap_scenes = self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], min_overlap)
324

325
                if len(conds_descImportance) > 1:  # FIXME anderer Referenzsensor?
326
327
328
329
                    del conds_descImportance[-1]
                else:  # reduce min_overlap to 10 percent if there are overlapping scenes
                    if res:
                        min_overlap = 10
330
331
                        filt_overlap_scenes = \
                            self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], min_overlap)
332
333

                    # raise warnings if no match found
334
                    if not filt_overlap_scenes:
335
336
                        if not res:
                            warnings.warn('No reference scene found for %s (scene ID %s). Coregistration of this scene '
337
                                          'failed.' % (self.baseN, self.scene_ID))
338
339
340
                        else:
                            warnings.warn('Reference scenes for %s (scene ID %s) have been found but none has more '
                                          'than %s percent overlap. Coregistration of this scene failed.'
341
                                          % (self.baseN, self.scene_ID, min_overlap))
342
                        break
343
            count += 1
344
345
346
347

        if filt_overlap_scenes:
            ref_available = False
            for count, sc in enumerate(filt_overlap_scenes):
348
                if count == 2:  # FIXME Abbuch schon bei 3. Szene?
349
                    warnings.warn('No reference scene for %s (scene ID %s) available. '
350
                                  'Coregistration of this scene failed.' % (self.baseN, self.scene_ID))
351
352
353
                    break

                # start download of scene data not available and start L1A processing
354
                def dl_cmd(scene_ID): print('%s %s %s' % (
355
356
                    CFG.job.java_commands['keyword'].strip(),  # FIXME CFG.job.java_commands is deprecated
                    CFG.job.java_commands["value_download"].strip(), scene_ID))
357

358
                path = PG.path_generator(scene_ID=sc['scene_ID']).get_path_imagedata()
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
359

360
361
362
363
364
365
366
367
                if not os.path.exists(path):
                    # add scene 2 download to scenes_jobs.missing_scenes

                    # print JAVA download command
                    t_dl_start = time.time()
                    dl_cmd(sc['scene_ID'])

                    # check if scene is downloading
368
369
370
371
                    download_start_timeout = 5  # seconds
                    # set timout for external processing
                    # -> DEPRECATED BECAUSE CREATION OF EXTERNAL CFG.job WITHIN CFG.job IS NOT ALLOWED
                    processing_timeout = 5  # seconds # FIXME increase timeout if processing is really started
372
373
374
                    proc_level = None
                    while True:
                        proc_level_chk = DB_T.get_info_from_postgreSQLdb(
375
                            CFG.job.conn_database, 'scenes', ['proc_level'], {'id': sc['scene_ID']})[0][0]
376
                        if proc_level != proc_level_chk:
377
                            print('Reference scene %s, current processing level: %s' % (sc['scene_ID'], proc_level_chk))
378
                        proc_level = proc_level_chk
379
380
                        if proc_level_chk in ['SCHEDULED', 'METADATA'] and \
                           time.time() - t_dl_start >= download_start_timeout:
381
                            warnings.warn('Download of reference scene %s (entity ID %s) timed out. '
382
                                          'Coregistration of this scene failed.' % (self.baseN, self.scene_ID))
383
384
                            break
                        if proc_level_chk == 'L1A':
385
386
387
388
                            ref_available = True
                            break
                        elif proc_level_chk == 'DOWNLOADED' and time.time() - t_dl_start >= processing_timeout:
                            # proc_level='DOWNLOADED' but scene has not been processed
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
389
390
391
                            warnings.warn('L1A processing of reference scene %s (entity ID %s) timed out. '
                                          'Coregistration of this scene failed.' % (self.baseN, self.scene_ID))
                            break
392
393
                            # DB_T.set_info_in_postgreSQLdb(CFG.job.conn_database,'scenes',
                            #                             {'proc_level':'METADATA'},{'id':sc['scene_ID']})
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
394

395
396
397
398
399
400
401
                        time.sleep(5)
                else:
                    ref_available = True

                if not ref_available:
                    continue
                else:
402
403
                    self.path_imref = path
                    self.imref_scene_ID = sc['scene_ID']
404
                    self.imref_footprint_poly = sc['scene poly']
405
406
407
408
409
410
411
412
                    self.overlap_poly = sc['overlap poly']
                    self.overlap_percentage = sc['overlap percentage']
                    self.overlap_area = sc['overlap area']

                    query_res = DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'scenes', ['entityid'],
                                                                {'id': self.imref_scene_ID}, records2fetch=1)
                    assert query_res != [], 'No entity-ID found for scene number %s' % self.imref_scene_ID
                    self.imref_entity_ID = query_res[0][0]  # [('LC81510322013152LGN00',)]
413
                    break
414
        self.logger.close()
415

416
    def _sceneIDList_to_filt_overlap_scenes(self, sceneIDList, min_overlap):
417
418
419
        """find reference scenes that cover at least 20% of the scene with the given ID
        ONLY FIRST 50 scenes are considered"""

420
421
422
        # t0 = time.time()
        dict_sceneID_poly = [{'scene_ID': ID, 'scene poly': HLP_F.scene_ID_to_shapelyPolygon(ID)}
                             for ID in sceneIDList]  # always returns LonLot polygons
423
424

        # get overlap polygons and their parameters. result: [{overlap poly, overlap percentage, overlap area}]
425
426
        for dic in dict_sceneID_poly:  # input: dicts {scene_ID, ref_poly}
            dict_overlap_poly_params = get_overlap_polygon(dic['scene poly'], self.arr.footprint_poly)
427
            dic.update(dict_overlap_poly_params)  # adds {overlap poly, overlap percentage, overlap area}
428
        # print('polygon creation time', time.time()-t0)
429
430
431
432
433
434
435
436

        # filter those scene_IDs out where overlap percentage is below 20%
        if min_overlap:
            filt_overlap_scenes = [scene for scene in dict_sceneID_poly if scene['overlap percentage'] >= min_overlap]
        else:
            filt_overlap_scenes = dict_sceneID_poly

        return filt_overlap_scenes
437

438
439
440
441
442
443
    def get_opt_bands4matching(self, target_cwlPos_nm=550, v=False):
        """Automatically determines the optimal bands used für fourier shift theorem matching

        :param target_cwlPos_nm:   the desired wavelength used for matching
        :param v:                  verbose mode
        """
444
445
446
447
        # get GMS_object for reference scene
        path_gmsFile = PG.path_generator(scene_ID=self.spatRef_scene.scene_ID).get_path_gmsfile()
        ref_obj = GMS_object().from_disk((path_gmsFile, ['cube', None]))

448
        # get spectral characteristics
449
450
        ref_cwl, shift_cwl = [[float(i) for i in GMS_obj.meta_odict['wavelength']] for GMS_obj in [ref_obj, self]]
        ref_fwhm, shift_fwhm = [[float(i) for i in GMS_obj.meta_odict['bandwidths']] for GMS_obj in [ref_obj, self]]
451
452

        # exclude cirrus/oxygen band of Landsat-8/Sentinel-2
453
        shift_bbl, ref_bbl = [False] * len(shift_cwl), [False] * len(ref_cwl)  # bad band lists
454
455
456
457
458
459
460
        for GMS_obj, s_r, bbl in zip([self, ref_obj], ['shift', 'ref'], [shift_bbl, ref_bbl]):
            GMS_obj.GMS_identifier['logger'] = None  # set a dummy value in order to avoid Exception
            sensorcode = get_GMS_sensorcode(GMS_obj.GMS_identifier)
            if sensorcode in ['LDCM', 'S2A', 'S2B'] and '9' in GMS_obj.LayerBandsAssignment:
                bbl[GMS_obj.LayerBandsAssignment.index('9')] = True
            if sensorcode in ['S2A', 'S2B'] and '10' in GMS_obj.LayerBandsAssignment:
                bbl[GMS_obj.LayerBandsAssignment.index('10')] = True
461

462
        # cwl_overlap = (max(min(shift_cwl),min(ref_cwl)),  min(max(shift_cwl),max(ref_cwl))) # -> (min wvl, max wvl)
463
        # find matching band of reference image for each band of image to be shifted
464
465
466
467
        match_dic = collections.OrderedDict()
        for idx, cwl, fwhm in zip(range(len(shift_cwl)), shift_cwl, shift_fwhm):
            if shift_bbl[idx]:
                continue  # skip cwl if it is declared as bad band above
468
469
470

            def is_inside(r_cwl, s_cwl, s_fwhm): return s_cwl - s_fwhm / 2 < r_cwl < s_cwl + s_fwhm / 2

471
472
            matching_r_cwls = [r_cwl for i, r_cwl in enumerate(ref_cwl) if
                               is_inside(r_cwl, cwl, fwhm) and not ref_bbl[i]]
473
474
            if matching_r_cwls:
                match_dic[cwl] = matching_r_cwls[0] if len(matching_r_cwls) else \
475
                    matching_r_cwls[np.abs(np.array(matching_r_cwls) - cwl).argmin()]
476
477
478
479
480

        # set bands4 match based on the above results
        poss_cwls = [cwl for cwl in shift_cwl if cwl in match_dic]
        if poss_cwls:
            if not target_cwlPos_nm:
481
482
483
484
485
486
487
                shift_band4match = shift_cwl.index(poss_cwls[0]) + 1  # first possible shift band
                ref_band4match = ref_cwl.index(match_dic[poss_cwls[0]]) + 1  # matching reference band
            else:  # target_cwlPos is given
                closestWvl_to_target = poss_cwls[np.abs(np.array(poss_cwls) - target_cwlPos_nm).argmin()]
                shift_band4match = shift_cwl.index(closestWvl_to_target) + 1  # the shift band closest to target
                ref_band4match = ref_cwl.index(match_dic[closestWvl_to_target]) + 1  # matching ref
        else:  # all reference bands are outside of shift-cwl +- fwhm/2
488
489
            warnings.warn('Optimal bands for matching could not be automatically determined. Choosing first band of'
                          'each image.')
490
491
            shift_band4match = 1
            ref_band4match = 1
492

493
494
495
        if v:
            print('Shift band for matching:     %s (%snm)' % (shift_band4match, shift_cwl[shift_band4match - 1]))
            print('Reference band for matching: %s (%snm)' % (ref_band4match, ref_cwl[ref_band4match - 1]))
496
497
498
499

        return ref_band4match, shift_band4match

    def coregister_spatially(self):
500
501
502
503
504
505
        spatIdxSrv_status = os.environ['GMS_SPAT_IDX_SRV_STATUS'] if 'GMS_SPAT_IDX_SRV_STATUS' in os.environ else True

        if spatIdxSrv_status == 'unavailable':
            self.logger.warning('Coregistration skipped due to unavailable Spatial Index Mediator Server!"')

        elif CFG.job.skip_coreg:
506
            self.logger.warning('Coregistration skipped according to user configuration.')
507

508
        elif self.coreg_needed and self.spatRef_available:
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
            geoArr_ref = GeoArray(self.spatRef_scene.filePath)
            geoArr_shift = GeoArray(self.arr)
            r_b4match, s_b4match = self.get_opt_bands4matching(target_cwlPos_nm=550, v=False)
            coreg_kwargs = dict(
                r_b4match=r_b4match,
                s_b4match=s_b4match,
                align_grids=True,  # FIXME not needed here
                match_gsd=True,  # FIXME not needed here
                data_corners_ref=[[x, y] for x, y in self.spatRef_scene.polyUTM.convex_hull.exterior.coords],
                data_corners_tgt=[transform_any_prj(EPSG2WKT(4326), self.meta_odict['coordinate system string'], x, y)
                                  for x, y in HLP_F.reorder_CornerLonLat(self.trueDataCornerLonLat)],
                nodata=(get_outFillZeroSaturated(geoArr_ref.dtype)[0],
                        get_outFillZeroSaturated(geoArr_shift.dtype)[0]),
                ignore_errors=True,
                v=False,
                q=True
            )
526
527
528
529

            COREG_obj = COREG(geoArr_ref, geoArr_shift, **coreg_kwargs)
            COREG_obj.calculate_spatial_shifts()

530
531
532
533
            self.coreg_info.update(
                COREG_obj.coreg_info)  # no clipping to trueCornerLonLat until here -> only shift correction
            self.coreg_info.update({'reference scene ID': self.spatRef_scene.scene_ID})
            self.coreg_info.update({'reference entity ID': self.spatRef_scene.entity_ID})
534
535

            if COREG_obj.success:
536
                self.coreg_info['success'] = True
537
                self.logger.info("Calculated map shifts (X,Y): %s / %s"
538
539
                                 % (COREG_obj.x_shift_map,
                                    COREG_obj.y_shift_map))  # FIXME direkt in calculate_spatial_shifts loggen
540

541
542
543
            else:
                # TODO add database entry with error hint
                [self.logger.error('ERROR during coregistration of scene %s (entity ID %s):\n%s'
544
                                   % (self.scene_ID, self.entity_ID, err)) for err in COREG_obj.tracked_errors]
545

546
        else:
547
            if self.coreg_needed:
548
549
                self.logger.warning('Coregistration skipped because no suitable reference scene is available or '
                                    'spatial query failed.')
550
551
            else:
                self.logger.info('Coregistration of scene %s (entity ID %s) skipped because target dataset ID equals '
552
553
                                 'reference dataset ID.' % (self.scene_ID, self.entity_ID))

554
555
556
557
558
559
560
561
562
563
564
565
    def correct_spatial_shifts(self, cliptoextent=True, clipextent=None, clipextent_prj=None, v=False):
        # type: (bool, list, any, bool) -> None
        """Corrects the spatial shifts calculated by self.coregister_spatially().

        :param cliptoextent:    whether to clip the output to the given extent
        :param clipextent:      list of XY-coordinate tuples giving the target extent (if not given and cliptoextent is
                                True, the 'trueDataCornerLonLat' attribute of the GMS object is used
        :param clipextent_prj:  WKT projection string or EPSG code of the projection for the coordinates in clipextent
        :param v:
        :return:
        """

566
567
        # cliptoextent is automatically True if an extent is given
        cliptoextent = cliptoextent if not clipextent else True
568

569
570
        if cliptoextent or self.resamp_needed or (self.coreg_needed and self.coreg_info['success']):

571
            # get target bounds # TODO implement boxObj call instead here
572
            if not clipextent:
573
574
                trueDataCornerUTM = [transform_any_prj(EPSG2WKT(4326), self.MetaObj.projection, x, y)
                                     for x, y in self.trueDataCornerLonLat]
575
                xmin, xmax, ymin, ymax = corner_coord_to_minmax(trueDataCornerUTM)
576
                mapBounds = box(xmin, ymin, xmax, ymax).bounds
577
578
579
580
581
582
583
584
            else:
                assert clipextent_prj, \
                    "'clipextent_prj' must be given together with 'clipextent'. Received only 'clipextent'."
                clipextent_UTM = clipextent if prj_equal(self.MetaObj.projection, clipextent_prj) else \
                    [transform_any_prj(clipextent_prj, self.MetaObj.projection, x, y) for x, y in clipextent]
                xmin, xmax, ymin, ymax = corner_coord_to_minmax(clipextent_UTM)
                mapBounds = box(xmin, ymin, xmax, ymax).bounds

585
            # correct shifts and clip to extent
586
587
            # ensure self.masks exists (does not exist in Flink mode because
            # in that case self.fill_from_disk() is skipped)
588
589
590
            if not hasattr(self, 'masks') or self.masks is None:
                self.build_combined_masks_array()  # creates self.masks and self.masks_meta

591
592
593
            # exclude self.mask_nodata, self.mask_clouds from warping
            del self.mask_nodata, self.mask_clouds

594
595
596
            attributes2deshift = [attrname for attrname in
                                  ['arr', 'masks', 'dem', 'ac_errors', 'mask_clouds_confidence']
                                  if getattr(self, '_%s' % attrname) is not None]
597
            for attrname in attributes2deshift:
598
                geoArr = getattr(self, attrname)
599
600

                # do some logging
601
602
                if self.coreg_needed:
                    if self.coreg_info['success']:
603
604
605
                        self.logger.info("Correcting spatial shifts for attribute '%s'..." % attrname)
                    elif cliptoextent and is_coord_grid_equal(
                         geoArr.gt, CFG.usecase.spatial_ref_gridx, CFG.usecase.spatial_ref_gridy):
606
                        self.logger.info("Attribute '%s' has only been clipped to it's extent because no valid "
607
608
                                         "shifts have been detected and the pixel grid equals the target grid."
                                         % attrname)
609
610
                    elif self.resamp_needed:
                        self.logger.info("Resampling attribute '%s' to target grid..." % attrname)
611
612
613
614
                elif self.resamp_needed:
                    self.logger.info("Resampling attribute '%s' to target grid..." % attrname)

                # correct shifts
615
                DS = DESHIFTER(geoArr, self.coreg_info,
616
617
618
619
620
621
622
623
624
                               target_xyGrid=[CFG.usecase.spatial_ref_gridx, CFG.usecase.spatial_ref_gridy],
                               cliptoextent=cliptoextent,
                               clipextent=mapBounds,
                               align_grids=True,
                               resamp_alg='nearest' if attrname == 'masks' else 'cubic',
                               CPUs=None if CFG.job.allow_subMultiprocessing else 1,
                               progress=True if v else False,
                               q=True,
                               v=v)
625
626
627
                DS.correct_shifts()

                # update coreg_info
628
629
                if attrname == 'arr':
                    self.coreg_info['is shifted'] = DS.is_shifted
630
                    self.coreg_info['is resampled'] = DS.is_resampled
631

632
                # update geoinformations and array shape related attributes
633
634
635
                self.logger.info("Updating geoinformations of '%s' attribute..." % attrname)
                if attrname == 'arr':
                    self.meta_odict['map info'] = DS.updated_map_info
636
                    self.meta_odict['coordinate system string'] = EPSG2WKT(WKT2EPSG(DS.updated_projection))
637
                    self.shape_fullArr = DS.arr_shifted.shape
638
639
                    self.meta_odict['lines'], self.meta_odict['samples'] = DS.arr_shifted.shape[:2]
                else:
640
641
                    self.masks_meta['map info'] = DS.updated_map_info
                    self.masks_meta['coordinate system string'] = EPSG2WKT(WKT2EPSG(DS.updated_projection))
642
643
                    self.masks_meta['lines'], self.masks_meta['samples'] = DS.arr_shifted.shape[:2]

644
645
                    # NOTE: mask_nodata and mask_clouds are updated later by L2A_map mapper function (module pipeline)

646
                # update the GeoArray instance without loosing its inherent metadata (nodata, ...)
647
648
649
                geoArr.arr, geoArr.gt, geoArr.prj = \
                    DS.GeoArray_shifted.arr, DS.GeoArray_shifted.gt, DS.GeoArray_shifted.prj
                # setattr(self,attrname, DS.GeoArray_shifted) # NOTE: don't set array earlier because setter will also
650
651
                #                                            # update arr.gt/.prj/.nodata from meta_odict

652
            self.resamp_needed = False
653
            self.coreg_needed = False
654

655
656
            # recreate self.masks_nodata and self.mask_clouds from self.masks
            self.mask_nodata = self.mask_nodata
657
658
            self.mask_clouds = self.mask_clouds
            # FIXME move functionality of self.masks only to output writer and remove self.masks completely