L1B_P.py 35.5 KB
Newer Older
Daniel Scheffler's avatar
Daniel Scheffler committed
1
# -*- coding: utf-8 -*-
2
3
4
5
6
7
"""
Level 1B Processor:

Detection of global/local geometric displacements.
"""

Daniel Scheffler's avatar
Daniel Scheffler committed
8

9
import collections
10
import os
11
import socket
12
import time
13
import warnings
14
from datetime import datetime, timedelta
15
16

import numpy as np
17
from geopandas import GeoDataFrame
18
from shapely.geometry import box
19
import pytz
20
from typing import Union  # noqa F401  # flake8 issue
Daniel Scheffler's avatar
Daniel Scheffler committed
21

22
from arosics import COREG, DESHIFTER
23
from geoarray import GeoArray
24
25
26
27
28
29
from py_tools_ds.geo.coord_grid import is_coord_grid_equal
from py_tools_ds.geo.coord_calc import corner_coord_to_minmax
from py_tools_ds.geo.coord_trafo import reproject_shapelyGeometry, transform_any_prj
from py_tools_ds.geo.projection import prj_equal, EPSG2WKT, WKT2EPSG
from py_tools_ds.geo.vector.topology import get_overlap_polygon

30
from ..options.config import GMS_config as CFG
31
from ..model.gms_object import GMS_object
32
33
34
35
from .L1A_P import L1A_object
from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc import path_generator as PG
36
from ..misc.spatial_index_mediator import SpatialIndexMediator
37
from ..misc.definition_dicts import get_GMS_sensorcode, get_outFillZeroSaturated
38

39
__author__ = 'Daniel Scheffler'
40
41


42
class Scene_finder(object):
43
44
    """Scene_finder class to query the postgreSQL database to find a suitable reference scene for co-registration."""

45
46
    def __init__(self, src_boundsLonLat, src_AcqDate, src_prj, src_footprint_poly, sceneID_excluded=None,
                 min_overlap=20, min_cloudcov=0, max_cloudcov=20, plusminus_days=30, plusminus_years=10):
47
48
49
50
51
52
53
54
55
56
57
58
59
        """Initialize Scene_finder.

        :param src_boundsLonLat:
        :param src_AcqDate:
        :param src_prj:
        :param src_footprint_poly:
        :param sceneID_excluded:
        :param min_overlap:         minimum overlap of reference scene in percent
        :param min_cloudcov:        minimum cloud cover of reference scene in percent
        :param max_cloudcov:        maximum cloud cover of reference scene in percent
        :param plusminus_days:      maximum time interval between target and reference scene in days
        :param plusminus_years:     maximum time interval between target and reference scene in years
        """
60
61
62
        self.boundsLonLat = src_boundsLonLat
        self.src_AcqDate = src_AcqDate
        self.src_prj = src_prj
63
        self.src_footprint_poly = src_footprint_poly
64
        self.sceneID_excluded = sceneID_excluded
65
66
67
68
69
        self.min_overlap = min_overlap
        self.min_cloudcov = min_cloudcov
        self.max_cloudcov = max_cloudcov
        self.plusminus_days = plusminus_days
        self.plusminus_years = plusminus_years
70

71
        # get temporal constraints
72
        def add_years(dt, years): return dt.replace(dt.year + years) \
73
74
75
            if not (dt.month == 2 and dt.day == 29) else dt.replace(dt.year + years, 3, 1)
        self.timeStart = add_years(self.src_AcqDate, -plusminus_years)
        timeEnd = add_years(self.src_AcqDate, +plusminus_years)
76
77
        timeNow = datetime.utcnow().replace(tzinfo=pytz.UTC)
        self.timeEnd = timeEnd if timeEnd <= timeNow else timeNow
78

79
80
81
        self.possib_ref_scenes = None  # set by self.spatial_query()
        self.GDF_ref_scenes = GeoDataFrame()  # set by self.spatial_query()
        self.ref_scene = None
82

83
    def spatial_query(self, timeout=5):
84
85
86
87
        """Query the postgreSQL database to find possible reference scenes matching the specified criteria.

        :param timeout:     maximum query duration allowed (seconds)
        """
88
89
        for i in range(10):
            try:
90
91
                SpIM = SpatialIndexMediator(timeout=timeout)
                self.possib_ref_scenes = \
92
                    SpIM.getFullSceneDataForDataset(self.boundsLonLat, self.timeStart, self.timeEnd, self.min_cloudcov,
93
                                                    self.max_cloudcov, CFG.datasetid_spatial_ref,
94
                                                    refDate=self.src_AcqDate, maxDaysDelta=self.plusminus_days)
95
96
                break
            except socket.timeout:
97
                if i < 9:
98
99
100
                    continue
                else:
                    raise TimeoutError('Spatial query timed out 10 times!')
101
102
                    # TODO catch error when index server is not running:
                    # TODO ConnectionRefusedError: [Errno 111] Connection refused
103

104
105
106
        if self.possib_ref_scenes:
            # fill GeoDataFrame with possible ref scene parameters
            GDF = GeoDataFrame(self.possib_ref_scenes, columns=['object'])
107
108
109
110
            GDF['sceneid'] = list(GDF['object'].map(lambda scene: scene.sceneid))
            GDF['acquisitiondate'] = list(GDF['object'].map(lambda scene: scene.acquisitiondate))
            GDF['cloudcover'] = list(GDF['object'].map(lambda scene: scene.cloudcover))
            GDF['polyLonLat'] = list(GDF['object'].map(lambda scene: scene.polyLonLat))
111

112
113
            def LonLat2UTM(polyLL):
                return reproject_shapelyGeometry(polyLL, 4326, self.src_prj)
114

115
116
            GDF['polyUTM'] = list(GDF['polyLonLat'].map(LonLat2UTM))
            self.GDF_ref_scenes = GDF
117

118
119
120
    def _collect_refscene_metadata(self):
        """Collect some reference scene metadata needed for later filtering."""
        GDF = self.GDF_ref_scenes
121

122
123
124
125
126
127
128
129
130
131
132
        # get overlap parameter
        def get_OL_prms(poly): return get_overlap_polygon(poly, self.src_footprint_poly)

        GDF['overlapParams'] = list(GDF['polyLonLat'].map(get_OL_prms))
        GDF['overlap area'] = list(GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap area']))
        GDF['overlap percentage'] = list(GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap percentage']))
        GDF['overlap poly'] = list(GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap poly']))
        del GDF['overlapParams']

        # get processing level of reference scenes
        procL = GeoDataFrame(
133
            DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'scenes_proc', ['sceneid', 'proc_level'],
134
135
136
137
138
139
140
141
142
143
144
145
                                            {'sceneid': list(GDF.sceneid)}), columns=['sceneid', 'proc_level'])
        GDF = GDF.merge(procL, on='sceneid', how='left')
        GDF = GDF.where(GDF.notnull(), None)  # replace NaN values with None

        # get path of binary file
        def get_path_binary(GDF_row):
            return PG.path_generator(scene_ID=GDF_row['sceneid'], proc_level=GDF_row['proc_level']) \
                .get_path_imagedata() if GDF_row['proc_level'] else None
        GDF['path_ref'] = GDF.apply(lambda GDF_row: get_path_binary(GDF_row), axis=1)
        GDF['refDs_exists'] = list(GDF['path_ref'].map(lambda p: os.path.exists(p) if p else False))

        # check if a proper entity ID can be gathered from database
146
        eID = GeoDataFrame(DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'scenes', ['id', 'entityid'],
147
148
149
150
151
152
153
154
155
                                                           {'id': list(GDF.sceneid)}), columns=['sceneid', 'entityid'])
        GDF = GDF.merge(eID, on='sceneid', how='left')
        self.GDF_ref_scenes = GDF.where(GDF.notnull(), None)

    def _filter_excluded_sceneID(self):
        """Filter reference scene with the same scene ID like the target scene."""
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
            self.GDF_ref_scenes = GDF.loc[GDF['sceneid'] != self.sceneID_excluded]
156

157
    def _filter_by_overlap(self):
158
        """Filter all scenes with less spatial overlap than self.min_overlap."""
159
160
161
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
            self.GDF_ref_scenes = GDF.loc[GDF['overlap percentage'] >= self.min_overlap]
162

163
    def _filter_by_proc_status(self):
164
        """Filter all scenes that have not been processed before according to proc. status (at least L1A is needed)."""
165
166
167
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
            self.GDF_ref_scenes = GDF[GDF['proc_level'].notnull()]
168

169
    def _filter_by_dataset_existance(self):
170
        """Filter all scenes where no processed data can be found on fileserver."""
171
172
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
173
            self.GDF_ref_scenes = GDF[GDF['refDs_exists']]
174

175
    def _filter_by_entity_ID_availability(self):
176
        """Filter all scenes where no proper entity ID can be found in the database (database errors)."""
177
178
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
179
            self.GDF_ref_scenes = GDF[GDF['entityid'].notnull()]
180

181
    def _filter_by_projection(self):
182
        """Filter all scenes that have a different projection than the target image."""
183
        GDF = self.GDF_ref_scenes[self.GDF_ref_scenes.refDs_exists]
184
185
        if not GDF.empty:
            # compare projections of target and reference image
186
187
            GDF['prj_equal'] = \
                list(GDF['path_ref'].map(lambda path_ref: prj_equal(self.src_prj, GeoArray(path_ref).prj)))
188

189
            self.GDF_ref_scenes = GDF[GDF['prj_equal']]
190

191
192
    def choose_ref_scene(self):
        """Choose reference scene with minimum cloud cover and maximum overlap."""
193
194
195
        if self.possib_ref_scenes:
            # First, collect some relavant reference scene metadata
            self._collect_refscene_metadata()
196

197
198
199
200
201
202
203
            # Filter possible scenes by running all filter functions
            self._filter_excluded_sceneID()
            self._filter_by_overlap()
            self._filter_by_proc_status()
            self._filter_by_dataset_existance()
            self._filter_by_entity_ID_availability()
            self._filter_by_projection()
204

205
206
207
208
209
        # Choose the reference scene out of the filtered DataFrame
        if not self.GDF_ref_scenes.empty:
            GDF = self.GDF_ref_scenes
            GDF = GDF[GDF['cloudcover'] == GDF['cloudcover'].min()]
            GDF = GDF[GDF['overlap percentage'] == GDF['overlap percentage'].max()]
210

211
212
213
214
215
            if not GDF.empty:
                GDF_res = GDF.iloc[0]
                return ref_Scene(GDF_res)
        else:
            return None
216

217

218
219
class ref_Scene:
    def __init__(self, GDF_record):
220
221
222
        self.scene_ID = int(GDF_record['sceneid'])
        self.entity_ID = GDF_record['entityid']
        self.AcqDate = GDF_record['acquisitiondate']
223
224
        self.cloudcover = GDF_record['cloudcover']
        self.polyLonLat = GDF_record['polyLonLat']
225
        self.polyUTM = GDF_record['polyUTM']
226
        self.proc_level = GDF_record['proc_level']
227
        self.filePath = GDF_record['path_ref']
228
229
230
231
232


class L1B_object(L1A_object):
    def __init__(self, L1A_obj=None):

233
        super(L1B_object, self).__init__()
234
235
236

        # set defaults
        self._spatRef_available = None
237
238
        self.spatRef_scene = None  # set by self.get_spatial_reference_scene()
        self.deshift_results = collections.OrderedDict()
239
240
241
242
243
244
245
246
247

        if L1A_obj:
            # populate attributes
            [setattr(self, key, value) for key, value in L1A_obj.__dict__.items()]

        self.proc_level = 'L1B'

    @property
    def spatRef_available(self):
248
        if self._spatRef_available is not None:
249
250
251
252
253
254
255
256
257
258
            return self._spatRef_available
        else:
            self.get_spatial_reference_scene()
            return self._spatRef_available

    @spatRef_available.setter
    def spatRef_available(self, spatRef_available):
        self._spatRef_available = spatRef_available

    def get_spatial_reference_scene(self):
259
        boundsLonLat = corner_coord_to_minmax(self.trueDataCornerLonLat)
260
        footprint_poly = HLP_F.CornerLonLat_to_shapelyPoly(self.trueDataCornerLonLat)
261
        RSF = Scene_finder(boundsLonLat, self.acq_datetime, self.meta_odict['coordinate system string'],
262
263
264
265
266
267
                           footprint_poly, self.scene_ID,
                           min_overlap=CFG.spatial_ref_min_overlap,
                           min_cloudcov=CFG.spatial_ref_min_cloudcov,
                           max_cloudcov=CFG.spatial_ref_max_cloudcov,
                           plusminus_days=CFG.spatial_ref_plusminus_days,
                           plusminus_years=CFG.spatial_ref_plusminus_years)
268
269
270
271
272
273
274

        # run spatial query
        self.logger.info('Querying database in order to find a suitable reference scene for co-registration.')
        RSF.spatial_query(timeout=5)
        if RSF.possib_ref_scenes:
            self.logger.info('Query result: %s reference scenes with matching metadata.' % len(RSF.possib_ref_scenes))

275
276
277
278
279
280
281
282
283
284
285
            # try to get a spatial reference scene by applying some filter criteria
            self.spatRef_scene = RSF.choose_ref_scene()  # type: Union[ref_Scene, None]
            if self.spatRef_scene:
                self.spatRef_available = True
                self.logger.info('Found a suitable reference image for coregistration: scene ID %s (entity ID %s).'
                                 % (self.spatRef_scene.scene_ID, self.spatRef_scene.entity_ID))
            else:
                self.spatRef_available = False
                self.logger.warning('No scene fulfills all requirements to serve as spatial reference for scene %s '
                                    '(entity ID %s). Coregistration impossible.' % (self.scene_ID, self.entity_ID))

286
        else:
287
            self.logger.warning('Spatial query returned no matches. Coregistration impossible.')
288
            self.spatRef_available = False
289
290

    def _get_reference_image_params_pgSQL(self):
291
        # TODO implement earlier version of this function as a backup for SpatialIndexMediator
292
293
        """postgreSQL query: get IDs of overlapping scenes and select most suitable scene_ID
            (with respect to DGM, cloud cover"""
294
295
        warnings.warn('_get_reference_image_params_pgSQL is deprecated an will not work anymore.', DeprecationWarning)

296
297
        # vorab-check anhand wolkenmaske, welche region von im2shift überhaupt für shift-corr tauglich ist
        # -> diese region als argument in postgresql abfrage
298
        # scene_ID            = 14536400 # LE71510322000093SGS00 im2shift
299

300
        # set query conditions
301
302
        min_overlap = 20  # %
        max_cloudcov = 20  # %
303
        plusminus_days = 30
304
305
        AcqDate = self.im2shift_objDict['acquisition_date']
        date_minmax = [AcqDate - timedelta(days=plusminus_days), AcqDate + timedelta(days=plusminus_days)]
306
        dataset_cond = 'datasetid=%s' % CFG.datasetid_spatial_ref
307
308
309
310
311
        cloudcov_cond = 'cloudcover < %s' % max_cloudcov
        # FIXME cloudcover noch nicht für alle scenes im proc_level METADATA verfügbar
        dayrange_cond = "(EXTRACT(MONTH FROM scenes.acquisitiondate), EXTRACT(DAY FROM scenes.acquisitiondate)) " \
                        "BETWEEN (%s, %s) AND (%s, %s)" \
                        % (date_minmax[0].month, date_minmax[0].day, date_minmax[1].month, date_minmax[1].day)
312
313
        # TODO weitere Kriterien einbauen!

314
315
        def query_scenes(condlist):
            return DB_T.get_overlapping_scenes_from_postgreSQLdb(
316
                CFG.conn_database,
317
318
319
320
321
                table='scenes',
                tgt_corners_lonlat=self.trueDataCornerLonLat,
                conditions=condlist,
                add_cmds='ORDER BY scenes.cloudcover ASC',
                timeout=30000)
322
323
        conds_descImportance = [dataset_cond, cloudcov_cond, dayrange_cond]

324
        self.logger.info('Querying database in order to find a suitable reference scene for co-registration.')
325

326
        count, filt_overlap_scenes = 0, []
327
        while not filt_overlap_scenes:
328
329
330
331
            if count == 0:
                # search within already processed scenes
                # das ist nur Ergebnis aus scenes_proc
                # -> dort liegt nur eine referenz, wenn die szene schon bei CFG.job-Beginn in Datensatzliste drin war
332
                res = DB_T.get_overlapping_scenes_from_postgreSQLdb(
333
                    CFG.conn_database,
334
                    tgt_corners_lonlat=self.trueDataCornerLonLat,
335
                    conditions=['datasetid=%s' % CFG.datasetid_spatial_ref],
336
337
                    add_cmds='ORDER BY scenes.cloudcover ASC',
                    timeout=25000)
338
                filt_overlap_scenes = self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], 20.)
339

340
            else:
341
342
343
                # search within complete scenes table using less filter criteria with each run
                # TODO: Daniels Index einbauen, sonst  bei wachsender Tabellengröße evtl. irgendwann zu langsam
                res = query_scenes(conds_descImportance)
344
                filt_overlap_scenes = self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], min_overlap)
345

346
                if len(conds_descImportance) > 1:  # FIXME anderer Referenzsensor?
347
348
349
350
                    del conds_descImportance[-1]
                else:  # reduce min_overlap to 10 percent if there are overlapping scenes
                    if res:
                        min_overlap = 10
351
352
                        filt_overlap_scenes = \
                            self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], min_overlap)
353
354

                    # raise warnings if no match found
355
                    if not filt_overlap_scenes:
356
357
                        if not res:
                            warnings.warn('No reference scene found for %s (scene ID %s). Coregistration of this scene '
358
                                          'failed.' % (self.baseN, self.scene_ID))
359
360
361
                        else:
                            warnings.warn('Reference scenes for %s (scene ID %s) have been found but none has more '
                                          'than %s percent overlap. Coregistration of this scene failed.'
362
                                          % (self.baseN, self.scene_ID, min_overlap))
363
                        break
364
            count += 1
365
366
367
368

        if filt_overlap_scenes:
            ref_available = False
            for count, sc in enumerate(filt_overlap_scenes):
369
                if count == 2:  # FIXME Abbuch schon bei 3. Szene?
370
                    warnings.warn('No reference scene for %s (scene ID %s) available. '
371
                                  'Coregistration of this scene failed.' % (self.baseN, self.scene_ID))
372
373
374
                    break

                # start download of scene data not available and start L1A processing
375
                def dl_cmd(scene_ID): print('%s %s %s' % (
376
377
                    CFG.java_commands['keyword'].strip(),  # FIXME CFG.java_commands is deprecated
                    CFG.java_commands["value_download"].strip(), scene_ID))
378

379
                path = PG.path_generator(scene_ID=sc['scene_ID']).get_path_imagedata()
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
380

381
382
383
384
385
386
387
388
                if not os.path.exists(path):
                    # add scene 2 download to scenes_jobs.missing_scenes

                    # print JAVA download command
                    t_dl_start = time.time()
                    dl_cmd(sc['scene_ID'])

                    # check if scene is downloading
389
390
                    download_start_timeout = 5  # seconds
                    # set timout for external processing
391
                    # -> DEPRECATED BECAUSE CREATION OF EXTERNAL CFG WITHIN CFG IS NOT ALLOWED
392
                    processing_timeout = 5  # seconds # FIXME increase timeout if processing is really started
393
394
395
                    proc_level = None
                    while True:
                        proc_level_chk = DB_T.get_info_from_postgreSQLdb(
396
                            CFG.conn_database, 'scenes', ['proc_level'], {'id': sc['scene_ID']})[0][0]
397
                        if proc_level != proc_level_chk:
398
                            print('Reference scene %s, current processing level: %s' % (sc['scene_ID'], proc_level_chk))
399
                        proc_level = proc_level_chk
400
401
                        if proc_level_chk in ['SCHEDULED', 'METADATA'] and \
                           time.time() - t_dl_start >= download_start_timeout:
402
                            warnings.warn('Download of reference scene %s (entity ID %s) timed out. '
403
                                          'Coregistration of this scene failed.' % (self.baseN, self.scene_ID))
404
405
                            break
                        if proc_level_chk == 'L1A':
406
407
408
409
                            ref_available = True
                            break
                        elif proc_level_chk == 'DOWNLOADED' and time.time() - t_dl_start >= processing_timeout:
                            # proc_level='DOWNLOADED' but scene has not been processed
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
410
411
412
                            warnings.warn('L1A processing of reference scene %s (entity ID %s) timed out. '
                                          'Coregistration of this scene failed.' % (self.baseN, self.scene_ID))
                            break
413
                            # DB_T.set_info_in_postgreSQLdb(CFG.conn_database,'scenes',
414
                            #                             {'proc_level':'METADATA'},{'id':sc['scene_ID']})
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
415

416
417
418
419
420
421
422
                        time.sleep(5)
                else:
                    ref_available = True

                if not ref_available:
                    continue
                else:
423
424
                    self.path_imref = path
                    self.imref_scene_ID = sc['scene_ID']
425
                    self.imref_footprint_poly = sc['scene poly']
426
427
428
429
                    self.overlap_poly = sc['overlap poly']
                    self.overlap_percentage = sc['overlap percentage']
                    self.overlap_area = sc['overlap area']

430
                    query_res = DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'scenes', ['entityid'],
431
432
433
                                                                {'id': self.imref_scene_ID}, records2fetch=1)
                    assert query_res != [], 'No entity-ID found for scene number %s' % self.imref_scene_ID
                    self.imref_entity_ID = query_res[0][0]  # [('LC81510322013152LGN00',)]
434
                    break
435
        self.logger.close()
436

437
    def _sceneIDList_to_filt_overlap_scenes(self, sceneIDList, min_overlap):
438
439
440
        """find reference scenes that cover at least 20% of the scene with the given ID
        ONLY FIRST 50 scenes are considered"""

441
442
443
        # t0 = time.time()
        dict_sceneID_poly = [{'scene_ID': ID, 'scene poly': HLP_F.scene_ID_to_shapelyPolygon(ID)}
                             for ID in sceneIDList]  # always returns LonLot polygons
444
445

        # get overlap polygons and their parameters. result: [{overlap poly, overlap percentage, overlap area}]
446
447
        for dic in dict_sceneID_poly:  # input: dicts {scene_ID, ref_poly}
            dict_overlap_poly_params = get_overlap_polygon(dic['scene poly'], self.arr.footprint_poly)
448
            dic.update(dict_overlap_poly_params)  # adds {overlap poly, overlap percentage, overlap area}
449
        # print('polygon creation time', time.time()-t0)
450
451
452
453
454
455
456
457

        # filter those scene_IDs out where overlap percentage is below 20%
        if min_overlap:
            filt_overlap_scenes = [scene for scene in dict_sceneID_poly if scene['overlap percentage'] >= min_overlap]
        else:
            filt_overlap_scenes = dict_sceneID_poly

        return filt_overlap_scenes
458

459
460
461
462
463
464
    def get_opt_bands4matching(self, target_cwlPos_nm=550, v=False):
        """Automatically determines the optimal bands used für fourier shift theorem matching

        :param target_cwlPos_nm:   the desired wavelength used for matching
        :param v:                  verbose mode
        """
465
466
467
468
        # get GMS_object for reference scene
        path_gmsFile = PG.path_generator(scene_ID=self.spatRef_scene.scene_ID).get_path_gmsfile()
        ref_obj = GMS_object().from_disk((path_gmsFile, ['cube', None]))

469
        # get spectral characteristics
470
471
        ref_cwl, shift_cwl = [[float(i) for i in GMS_obj.meta_odict['wavelength']] for GMS_obj in [ref_obj, self]]
        ref_fwhm, shift_fwhm = [[float(i) for i in GMS_obj.meta_odict['bandwidths']] for GMS_obj in [ref_obj, self]]
472
473

        # exclude cirrus/oxygen band of Landsat-8/Sentinel-2
474
        shift_bbl, ref_bbl = [False] * len(shift_cwl), [False] * len(ref_cwl)  # bad band lists
475
476
477
478
479
480
481
        for GMS_obj, s_r, bbl in zip([self, ref_obj], ['shift', 'ref'], [shift_bbl, ref_bbl]):
            GMS_obj.GMS_identifier['logger'] = None  # set a dummy value in order to avoid Exception
            sensorcode = get_GMS_sensorcode(GMS_obj.GMS_identifier)
            if sensorcode in ['LDCM', 'S2A', 'S2B'] and '9' in GMS_obj.LayerBandsAssignment:
                bbl[GMS_obj.LayerBandsAssignment.index('9')] = True
            if sensorcode in ['S2A', 'S2B'] and '10' in GMS_obj.LayerBandsAssignment:
                bbl[GMS_obj.LayerBandsAssignment.index('10')] = True
482

483
        # cwl_overlap = (max(min(shift_cwl),min(ref_cwl)),  min(max(shift_cwl),max(ref_cwl))) # -> (min wvl, max wvl)
484
        # find matching band of reference image for each band of image to be shifted
485
486
487
488
        match_dic = collections.OrderedDict()
        for idx, cwl, fwhm in zip(range(len(shift_cwl)), shift_cwl, shift_fwhm):
            if shift_bbl[idx]:
                continue  # skip cwl if it is declared as bad band above
489
490
491

            def is_inside(r_cwl, s_cwl, s_fwhm): return s_cwl - s_fwhm / 2 < r_cwl < s_cwl + s_fwhm / 2

492
493
            matching_r_cwls = [r_cwl for i, r_cwl in enumerate(ref_cwl) if
                               is_inside(r_cwl, cwl, fwhm) and not ref_bbl[i]]
494
495
            if matching_r_cwls:
                match_dic[cwl] = matching_r_cwls[0] if len(matching_r_cwls) else \
496
                    matching_r_cwls[np.abs(np.array(matching_r_cwls) - cwl).argmin()]
497
498
499
500
501

        # set bands4 match based on the above results
        poss_cwls = [cwl for cwl in shift_cwl if cwl in match_dic]
        if poss_cwls:
            if not target_cwlPos_nm:
502
503
504
505
506
507
508
                shift_band4match = shift_cwl.index(poss_cwls[0]) + 1  # first possible shift band
                ref_band4match = ref_cwl.index(match_dic[poss_cwls[0]]) + 1  # matching reference band
            else:  # target_cwlPos is given
                closestWvl_to_target = poss_cwls[np.abs(np.array(poss_cwls) - target_cwlPos_nm).argmin()]
                shift_band4match = shift_cwl.index(closestWvl_to_target) + 1  # the shift band closest to target
                ref_band4match = ref_cwl.index(match_dic[closestWvl_to_target]) + 1  # matching ref
        else:  # all reference bands are outside of shift-cwl +- fwhm/2
509
510
            warnings.warn('Optimal bands for matching could not be automatically determined. Choosing first band of'
                          'each image.')
511
512
            shift_band4match = 1
            ref_band4match = 1
513

514
515
516
        if v:
            print('Shift band for matching:     %s (%snm)' % (shift_band4match, shift_cwl[shift_band4match - 1]))
            print('Reference band for matching: %s (%snm)' % (ref_band4match, ref_cwl[ref_band4match - 1]))
517
518
519

        return ref_band4match, shift_band4match

520
    def compute_global_shifts(self):
521
522
523
524
525
        spatIdxSrv_status = os.environ['GMS_SPAT_IDX_SRV_STATUS'] if 'GMS_SPAT_IDX_SRV_STATUS' in os.environ else True

        if spatIdxSrv_status == 'unavailable':
            self.logger.warning('Coregistration skipped due to unavailable Spatial Index Mediator Server!"')

526
        elif CFG.skip_coreg:
527
            self.logger.warning('Coregistration skipped according to user configuration.')
528

529
        elif self.coreg_needed and self.spatRef_available:
530
531
            geoArr_ref = GeoArray(self.spatRef_scene.filePath)
            geoArr_shift = GeoArray(self.arr)
532
            r_b4match, s_b4match = self.get_opt_bands4matching(target_cwlPos_nm=CFG.coreg_band_wavelength_for_matching,
533
                                                               v=False)
534
535
536
537
538
            coreg_kwargs = dict(
                r_b4match=r_b4match,
                s_b4match=s_b4match,
                align_grids=True,  # FIXME not needed here
                match_gsd=True,  # FIXME not needed here
539
                max_shift=CFG.coreg_max_shift_allowed,
Daniel Scheffler's avatar
Fix.    
Daniel Scheffler committed
540
                ws=CFG.coreg_window_size,
541
542
543
544
545
546
547
548
549
                data_corners_ref=[[x, y] for x, y in self.spatRef_scene.polyUTM.convex_hull.exterior.coords],
                data_corners_tgt=[transform_any_prj(EPSG2WKT(4326), self.meta_odict['coordinate system string'], x, y)
                                  for x, y in HLP_F.reorder_CornerLonLat(self.trueDataCornerLonLat)],
                nodata=(get_outFillZeroSaturated(geoArr_ref.dtype)[0],
                        get_outFillZeroSaturated(geoArr_shift.dtype)[0]),
                ignore_errors=True,
                v=False,
                q=True
            )
550
551
552
553

            COREG_obj = COREG(geoArr_ref, geoArr_shift, **coreg_kwargs)
            COREG_obj.calculate_spatial_shifts()

554
555
556
557
            self.coreg_info.update(
                COREG_obj.coreg_info)  # no clipping to trueCornerLonLat until here -> only shift correction
            self.coreg_info.update({'reference scene ID': self.spatRef_scene.scene_ID})
            self.coreg_info.update({'reference entity ID': self.spatRef_scene.entity_ID})
558
559

            if COREG_obj.success:
560
                self.coreg_info['success'] = True
561
                self.logger.info("Calculated map shifts (X,Y): %s / %s"
562
563
                                 % (COREG_obj.x_shift_map,
                                    COREG_obj.y_shift_map))  # FIXME direkt in calculate_spatial_shifts loggen
564

565
566
567
            else:
                # TODO add database entry with error hint
                [self.logger.error('ERROR during coregistration of scene %s (entity ID %s):\n%s'
568
                                   % (self.scene_ID, self.entity_ID, err)) for err in COREG_obj.tracked_errors]
569

570
        else:
571
            if self.coreg_needed:
572
573
                self.logger.warning('Coregistration skipped because no suitable reference scene is available or '
                                    'spatial query failed.')
574
575
            else:
                self.logger.info('Coregistration of scene %s (entity ID %s) skipped because target dataset ID equals '
576
577
                                 'reference dataset ID.' % (self.scene_ID, self.entity_ID))

578
579
    def correct_spatial_shifts(self, cliptoextent=True, clipextent=None, clipextent_prj=None, v=False):
        # type: (bool, list, any, bool) -> None
580
        """Corrects the spatial shifts calculated by self.compute_global_shifts().
581
582
583
584
585
586
587
588
589

        :param cliptoextent:    whether to clip the output to the given extent
        :param clipextent:      list of XY-coordinate tuples giving the target extent (if not given and cliptoextent is
                                True, the 'trueDataCornerLonLat' attribute of the GMS object is used
        :param clipextent_prj:  WKT projection string or EPSG code of the projection for the coordinates in clipextent
        :param v:
        :return:
        """

590
591
        # cliptoextent is automatically True if an extent is given
        cliptoextent = cliptoextent if not clipextent else True
592

593
594
        if cliptoextent or self.resamp_needed or (self.coreg_needed and self.coreg_info['success']):

595
            # get target bounds # TODO implement boxObj call instead here
596
            if not clipextent:
597
598
                trueDataCornerUTM = [transform_any_prj(EPSG2WKT(4326), self.MetaObj.projection, x, y)
                                     for x, y in self.trueDataCornerLonLat]
599
                xmin, xmax, ymin, ymax = corner_coord_to_minmax(trueDataCornerUTM)
600
                mapBounds = box(xmin, ymin, xmax, ymax).bounds
601
602
603
604
605
606
607
608
            else:
                assert clipextent_prj, \
                    "'clipextent_prj' must be given together with 'clipextent'. Received only 'clipextent'."
                clipextent_UTM = clipextent if prj_equal(self.MetaObj.projection, clipextent_prj) else \
                    [transform_any_prj(clipextent_prj, self.MetaObj.projection, x, y) for x, y in clipextent]
                xmin, xmax, ymin, ymax = corner_coord_to_minmax(clipextent_UTM)
                mapBounds = box(xmin, ymin, xmax, ymax).bounds

609
            # correct shifts and clip to extent
610
611
            # ensure self.masks exists (does not exist in case of inmem_serialization mode because
            # then self.fill_from_disk() is skipped)
612
613
614
            if not hasattr(self, 'masks') or self.masks is None:
                self.build_combined_masks_array()  # creates self.masks and self.masks_meta

615
616
617
            # exclude self.mask_nodata, self.mask_clouds from warping
            del self.mask_nodata, self.mask_clouds

618
619
620
            attributes2deshift = [attrname for attrname in
                                  ['arr', 'masks', 'dem', 'ac_errors', 'mask_clouds_confidence']
                                  if getattr(self, '_%s' % attrname) is not None]
621
            for attrname in attributes2deshift:
622
                geoArr = getattr(self, attrname)
623
624

                # do some logging
625
626
                if self.coreg_needed:
                    if self.coreg_info['success']:
627
628
                        self.logger.info("Correcting spatial shifts for attribute '%s'..." % attrname)
                    elif cliptoextent and is_coord_grid_equal(
629
                         geoArr.gt, CFG.spatial_ref_gridx, CFG.spatial_ref_gridy):
630
                        self.logger.info("Attribute '%s' has only been clipped to it's extent because no valid "
631
632
                                         "shifts have been detected and the pixel grid equals the target grid."
                                         % attrname)
633
634
                    elif self.resamp_needed:
                        self.logger.info("Resampling attribute '%s' to target grid..." % attrname)
635
636
637
638
                elif self.resamp_needed:
                    self.logger.info("Resampling attribute '%s' to target grid..." % attrname)

                # correct shifts
639
                DS = DESHIFTER(geoArr, self.coreg_info,
640
                               target_xyGrid=[CFG.spatial_ref_gridx, CFG.spatial_ref_gridy],
641
642
643
                               cliptoextent=cliptoextent,
                               clipextent=mapBounds,
                               align_grids=True,
644
                               resamp_alg='nearest' if attrname == 'masks' else CFG.spatial_resamp_alg,
645
                               CPUs=None if CFG.allow_subMultiprocessing else 1,
646
647
648
                               progress=True if v else False,
                               q=True,
                               v=v)
649
650
651
                DS.correct_shifts()

                # update coreg_info
652
653
                if attrname == 'arr':
                    self.coreg_info['is shifted'] = DS.is_shifted
654
                    self.coreg_info['is resampled'] = DS.is_resampled
655

656
                # update geoinformations and array shape related attributes
657
658
659
                self.logger.info("Updating geoinformations of '%s' attribute..." % attrname)
                if attrname == 'arr':
                    self.meta_odict['map info'] = DS.updated_map_info
660
                    self.meta_odict['coordinate system string'] = EPSG2WKT(WKT2EPSG(DS.updated_projection))
661
                    self.shape_fullArr = DS.arr_shifted.shape
662
663
                    self.meta_odict['lines'], self.meta_odict['samples'] = DS.arr_shifted.shape[:2]
                else:
664
665
                    self.masks_meta['map info'] = DS.updated_map_info
                    self.masks_meta['coordinate system string'] = EPSG2WKT(WKT2EPSG(DS.updated_projection))
666
667
                    self.masks_meta['lines'], self.masks_meta['samples'] = DS.arr_shifted.shape[:2]

668
669
                    # NOTE: mask_nodata and mask_clouds are updated later by L2A_map mapper function (module pipeline)

670
                # update the GeoArray instance without loosing its inherent metadata (nodata, ...)
671
672
673
                geoArr.arr, geoArr.gt, geoArr.prj = \
                    DS.GeoArray_shifted.arr, DS.GeoArray_shifted.gt, DS.GeoArray_shifted.prj
                # setattr(self,attrname, DS.GeoArray_shifted) # NOTE: don't set array earlier because setter will also
674
675
                #                                            # update arr.gt/.prj/.nodata from meta_odict

676
            self.resamp_needed = False
677
            self.coreg_needed = False
678

679
680
            # recreate self.masks_nodata and self.mask_clouds from self.masks
            self.mask_nodata = self.mask_nodata
681
682
            self.mask_clouds = self.mask_clouds
            # FIXME move functionality of self.masks only to output writer and remove self.masks completely