L1B_P.py 38.5 KB
Newer Older
Daniel Scheffler's avatar
Daniel Scheffler committed
1
# -*- coding: utf-8 -*-
2
3
4
5
6
7
"""
Level 1B Processor:

Detection of global/local geometric displacements.
"""

Daniel Scheffler's avatar
Daniel Scheffler committed
8

9
import collections
10
import os
11
import time
12
import warnings
13
from datetime import datetime, timedelta
14
15

import numpy as np
16
from geopandas import GeoDataFrame
17
from shapely.geometry import box
18
import pytz
19
import traceback
20
from typing import Union, TYPE_CHECKING  # noqa F401  # flake8 issue
Daniel Scheffler's avatar
Daniel Scheffler committed
21

22
from arosics import COREG, DESHIFTER
23
from geoarray import GeoArray
24
25
26
27
28
29
from py_tools_ds.geo.coord_grid import is_coord_grid_equal
from py_tools_ds.geo.coord_calc import corner_coord_to_minmax
from py_tools_ds.geo.coord_trafo import reproject_shapelyGeometry, transform_any_prj
from py_tools_ds.geo.projection import prj_equal, EPSG2WKT, WKT2EPSG
from py_tools_ds.geo.vector.topology import get_overlap_polygon

30
from ..options.config import GMS_config as CFG
31
from ..model.gms_object import GMS_object
32
33
34
35
from .L1A_P import L1A_object
from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc import path_generator as PG
36
from ..misc.logging import GMS_logger, close_logger
37
from ..misc.spatial_index_mediator import SpatialIndexMediator
38
from ..misc.definition_dicts import get_GMS_sensorcode, get_outFillZeroSaturated
39

40
if TYPE_CHECKING:
Daniel Scheffler's avatar
Daniel Scheffler committed
41
42
    from shapely.geometry import Polygon  # noqa F401  # flake8 issue
    from logging import Logger  # noqa F401  # flake8 issue
43

44
__author__ = 'Daniel Scheffler'
45
46


47
class Scene_finder(object):
48
49
    """Scene_finder class to query the postgreSQL database to find a suitable reference scene for co-registration."""

50
    def __init__(self, src_boundsLonLat, src_AcqDate, src_prj, src_footprint_poly, sceneID_excluded=None,
51
                 min_overlap=20, min_cloudcov=0, max_cloudcov=20, plusminus_days=30, plusminus_years=10, logger=None):
52
        # type: (list, datetime, str, Polygon, int, int, int, int, int, int, Logger) -> None
53
54
55
56
57
58
59
60
61
62
63
64
65
        """Initialize Scene_finder.

        :param src_boundsLonLat:
        :param src_AcqDate:
        :param src_prj:
        :param src_footprint_poly:
        :param sceneID_excluded:
        :param min_overlap:         minimum overlap of reference scene in percent
        :param min_cloudcov:        minimum cloud cover of reference scene in percent
        :param max_cloudcov:        maximum cloud cover of reference scene in percent
        :param plusminus_days:      maximum time interval between target and reference scene in days
        :param plusminus_years:     maximum time interval between target and reference scene in years
        """
66
67
68
        self.boundsLonLat = src_boundsLonLat
        self.src_AcqDate = src_AcqDate
        self.src_prj = src_prj
69
        self.src_footprint_poly = src_footprint_poly
70
        self.sceneID_excluded = sceneID_excluded
71
72
73
74
75
        self.min_overlap = min_overlap
        self.min_cloudcov = min_cloudcov
        self.max_cloudcov = max_cloudcov
        self.plusminus_days = plusminus_days
        self.plusminus_years = plusminus_years
76
        self.logger = logger or GMS_logger('ReferenceSceneFinder')
77

78
        # get temporal constraints
79
        def add_years(dt, years): return dt.replace(dt.year + years) \
80
81
82
            if not (dt.month == 2 and dt.day == 29) else dt.replace(dt.year + years, 3, 1)
        self.timeStart = add_years(self.src_AcqDate, -plusminus_years)
        timeEnd = add_years(self.src_AcqDate, +plusminus_years)
83
84
        timeNow = datetime.utcnow().replace(tzinfo=pytz.UTC)
        self.timeEnd = timeEnd if timeEnd <= timeNow else timeNow
85

86
87
88
        self.possib_ref_scenes = None  # set by self.spatial_query()
        self.GDF_ref_scenes = GeoDataFrame()  # set by self.spatial_query()
        self.ref_scene = None
89

90
91
92
93
94
95
96
97
98
99
100
    def __getstate__(self):
        """Defines how the attributes of Scene_finder instances are pickled."""
        close_logger(self.logger)
        self.logger = None

        return self.__dict__

    def __del__(self):
        close_logger(self.logger)
        self.logger = None

101
    def spatial_query(self, timeout=5):
102
103
104
105
        """Query the postgreSQL database to find possible reference scenes matching the specified criteria.

        :param timeout:     maximum query duration allowed (seconds)
        """
106
107
108
109
110
111
112
113
114
115
        SpIM = SpatialIndexMediator(host=CFG.spatial_index_server_host, port=CFG.spatial_index_server_port,
                                    timeout=timeout, retries=10)
        self.possib_ref_scenes = SpIM.getFullSceneDataForDataset(envelope=self.boundsLonLat,
                                                                 timeStart=self.timeStart,
                                                                 timeEnd=self.timeEnd,
                                                                 minCloudCover=self.min_cloudcov,
                                                                 maxCloudCover=self.max_cloudcov,
                                                                 datasetid=CFG.datasetid_spatial_ref,
                                                                 refDate=self.src_AcqDate,
                                                                 maxDaysDelta=self.plusminus_days)
116

117
118
119
        if self.possib_ref_scenes:
            # fill GeoDataFrame with possible ref scene parameters
            GDF = GeoDataFrame(self.possib_ref_scenes, columns=['object'])
120
121
122
123
            GDF['sceneid'] = list(GDF['object'].map(lambda scene: scene.sceneid))
            GDF['acquisitiondate'] = list(GDF['object'].map(lambda scene: scene.acquisitiondate))
            GDF['cloudcover'] = list(GDF['object'].map(lambda scene: scene.cloudcover))
            GDF['polyLonLat'] = list(GDF['object'].map(lambda scene: scene.polyLonLat))
124

125
126
            def LonLat2UTM(polyLL):
                return reproject_shapelyGeometry(polyLL, 4326, self.src_prj)
127

128
129
            GDF['polyUTM'] = list(GDF['polyLonLat'].map(LonLat2UTM))
            self.GDF_ref_scenes = GDF
130

131
132
133
    def _collect_refscene_metadata(self):
        """Collect some reference scene metadata needed for later filtering."""
        GDF = self.GDF_ref_scenes
134

135
136
137
138
139
140
141
142
143
144
145
        # get overlap parameter
        def get_OL_prms(poly): return get_overlap_polygon(poly, self.src_footprint_poly)

        GDF['overlapParams'] = list(GDF['polyLonLat'].map(get_OL_prms))
        GDF['overlap area'] = list(GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap area']))
        GDF['overlap percentage'] = list(GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap percentage']))
        GDF['overlap poly'] = list(GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap poly']))
        del GDF['overlapParams']

        # get processing level of reference scenes
        procL = GeoDataFrame(
146
            DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'scenes_proc', ['sceneid', 'proc_level'],
147
148
149
150
151
152
153
154
155
156
157
158
                                            {'sceneid': list(GDF.sceneid)}), columns=['sceneid', 'proc_level'])
        GDF = GDF.merge(procL, on='sceneid', how='left')
        GDF = GDF.where(GDF.notnull(), None)  # replace NaN values with None

        # get path of binary file
        def get_path_binary(GDF_row):
            return PG.path_generator(scene_ID=GDF_row['sceneid'], proc_level=GDF_row['proc_level']) \
                .get_path_imagedata() if GDF_row['proc_level'] else None
        GDF['path_ref'] = GDF.apply(lambda GDF_row: get_path_binary(GDF_row), axis=1)
        GDF['refDs_exists'] = list(GDF['path_ref'].map(lambda p: os.path.exists(p) if p else False))

        # check if a proper entity ID can be gathered from database
159
        eID = GeoDataFrame(DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'scenes', ['id', 'entityid'],
160
161
162
163
164
165
166
167
                                                           {'id': list(GDF.sceneid)}), columns=['sceneid', 'entityid'])
        GDF = GDF.merge(eID, on='sceneid', how='left')
        self.GDF_ref_scenes = GDF.where(GDF.notnull(), None)

    def _filter_excluded_sceneID(self):
        """Filter reference scene with the same scene ID like the target scene."""
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
Daniel Scheffler's avatar
Daniel Scheffler committed
168
            self.logger.info('Same ID filter:  Excluding scene with the same ID like the target scene.')
169
            self.GDF_ref_scenes = GDF.loc[GDF['sceneid'] != self.sceneID_excluded]
170
            self.logger.info('%s scenes => %s scenes' % (len(GDF), len(self.GDF_ref_scenes)))
171

172
    def _filter_by_overlap(self):
173
        """Filter all scenes with less spatial overlap than self.min_overlap."""
174
175
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
Daniel Scheffler's avatar
Daniel Scheffler committed
176
177
            self.logger.info('Overlap filter:  Excluding all scenes with less than %s percent spatial overlap.'
                             % self.min_overlap)
178
            self.GDF_ref_scenes = GDF.loc[GDF['overlap percentage'] >= self.min_overlap]
179
            self.logger.info('%s scenes => %s scenes' % (len(GDF), len(self.GDF_ref_scenes)))
180

181
    def _filter_by_proc_status(self):
182
        """Filter all scenes that have not been processed before according to proc. status (at least L1A is needed)."""
183
184
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
Daniel Scheffler's avatar
Daniel Scheffler committed
185
186
            self.logger.info('Processing level filter:  Exclude all scenes that have not been processed before '
                             'according to processing status (at least L1A is needed).')
187
            self.GDF_ref_scenes = GDF[GDF['proc_level'].notnull()]
188
            self.logger.info('%s scenes => %s scenes' % (len(GDF), len(self.GDF_ref_scenes)))
189

190
    def _filter_by_dataset_existance(self):
191
        """Filter all scenes where no processed data can be found on fileserver."""
192
193
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
Daniel Scheffler's avatar
Daniel Scheffler committed
194
            self.logger.info('Existance filter:  Excluding all scenes where no processed data have been found.')
195
            self.GDF_ref_scenes = GDF[GDF['refDs_exists']]
196
            self.logger.info('%s scenes => %s scenes' % (len(GDF), len(self.GDF_ref_scenes)))
197

198
    def _filter_by_entity_ID_availability(self):
199
        """Filter all scenes where no proper entity ID can be found in the database (database errors)."""
200
201
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
Daniel Scheffler's avatar
Daniel Scheffler committed
202
203
            self.logger.info('DB validity filter:  Exclude all scenes where no proper entity ID can be found in the '
                             'database (database errors).')
204
            self.GDF_ref_scenes = GDF[GDF['entityid'].notnull()]
205
            self.logger.info('%s scenes => %s scenes' % (len(GDF), len(self.GDF_ref_scenes)))
206

207
    def _filter_by_projection(self):
208
        """Filter all scenes that have a different projection than the target image."""
209
        GDF = self.GDF_ref_scenes[self.GDF_ref_scenes.refDs_exists]
210
211
        if not GDF.empty:
            # compare projections of target and reference image
212
213
            GDF['prj_equal'] = \
                list(GDF['path_ref'].map(lambda path_ref: prj_equal(self.src_prj, GeoArray(path_ref).prj)))
214

Daniel Scheffler's avatar
Daniel Scheffler committed
215
216
            self.logger.info('Projection filter:  Exclude all scenes that have a different projection than the target '
                             'image.')
217
            self.GDF_ref_scenes = GDF[GDF['prj_equal']]
218
            self.logger.info('%s scenes => %s scenes' % (len(GDF), len(self.GDF_ref_scenes)))
219

220
221
    def choose_ref_scene(self):
        """Choose reference scene with minimum cloud cover and maximum overlap."""
222
223
224
        if self.possib_ref_scenes:
            # First, collect some relavant reference scene metadata
            self._collect_refscene_metadata()
225

226
227
228
229
230
231
232
            # Filter possible scenes by running all filter functions
            self._filter_excluded_sceneID()
            self._filter_by_overlap()
            self._filter_by_proc_status()
            self._filter_by_dataset_existance()
            self._filter_by_entity_ID_availability()
            self._filter_by_projection()
233

234
235
236
237
238
        # Choose the reference scene out of the filtered DataFrame
        if not self.GDF_ref_scenes.empty:
            GDF = self.GDF_ref_scenes
            GDF = GDF[GDF['cloudcover'] == GDF['cloudcover'].min()]
            GDF = GDF[GDF['overlap percentage'] == GDF['overlap percentage'].max()]
239

240
241
242
243
244
            if not GDF.empty:
                GDF_res = GDF.iloc[0]
                return ref_Scene(GDF_res)
        else:
            return None
245

246

247
248
class ref_Scene:
    def __init__(self, GDF_record):
249
250
251
        self.scene_ID = int(GDF_record['sceneid'])
        self.entity_ID = GDF_record['entityid']
        self.AcqDate = GDF_record['acquisitiondate']
252
253
        self.cloudcover = GDF_record['cloudcover']
        self.polyLonLat = GDF_record['polyLonLat']
254
        self.polyUTM = GDF_record['polyUTM']
255
        self.proc_level = GDF_record['proc_level']
256
        self.filePath = GDF_record['path_ref']
257
258
259
260
261


class L1B_object(L1A_object):
    def __init__(self, L1A_obj=None):

262
        super(L1B_object, self).__init__()
263
264
265

        # set defaults
        self._spatRef_available = None
266
267
        self.spatRef_scene = None  # set by self.get_spatial_reference_scene()
        self.deshift_results = collections.OrderedDict()
268
269
270
271
272
273

        if L1A_obj:
            # populate attributes
            [setattr(self, key, value) for key, value in L1A_obj.__dict__.items()]

        self.proc_level = 'L1B'
274
        self.proc_status = 'initialized'
275
276
277

    @property
    def spatRef_available(self):
278
        if self._spatRef_available is not None:
279
280
281
282
283
284
285
286
287
288
            return self._spatRef_available
        else:
            self.get_spatial_reference_scene()
            return self._spatRef_available

    @spatRef_available.setter
    def spatRef_available(self, spatRef_available):
        self._spatRef_available = spatRef_available

    def get_spatial_reference_scene(self):
289
        boundsLonLat = corner_coord_to_minmax(self.trueDataCornerLonLat)
290
        footprint_poly = HLP_F.CornerLonLat_to_shapelyPoly(self.trueDataCornerLonLat)
291
        RSF = Scene_finder(boundsLonLat, self.acq_datetime, self.MetaObj.projection,
292
293
294
295
296
                           footprint_poly, self.scene_ID,
                           min_overlap=CFG.spatial_ref_min_overlap,
                           min_cloudcov=CFG.spatial_ref_min_cloudcov,
                           max_cloudcov=CFG.spatial_ref_max_cloudcov,
                           plusminus_days=CFG.spatial_ref_plusminus_days,
297
298
                           plusminus_years=CFG.spatial_ref_plusminus_years,
                           logger=self.logger)
299
300
301
302
303
304
305

        # run spatial query
        self.logger.info('Querying database in order to find a suitable reference scene for co-registration.')
        RSF.spatial_query(timeout=5)
        if RSF.possib_ref_scenes:
            self.logger.info('Query result: %s reference scenes with matching metadata.' % len(RSF.possib_ref_scenes))

306
307
308
309
310
311
312
313
314
315
316
            # try to get a spatial reference scene by applying some filter criteria
            self.spatRef_scene = RSF.choose_ref_scene()  # type: Union[ref_Scene, None]
            if self.spatRef_scene:
                self.spatRef_available = True
                self.logger.info('Found a suitable reference image for coregistration: scene ID %s (entity ID %s).'
                                 % (self.spatRef_scene.scene_ID, self.spatRef_scene.entity_ID))
            else:
                self.spatRef_available = False
                self.logger.warning('No scene fulfills all requirements to serve as spatial reference for scene %s '
                                    '(entity ID %s). Coregistration impossible.' % (self.scene_ID, self.entity_ID))

317
        else:
318
            self.logger.warning('Spatial query returned no matches. Coregistration impossible.')
319
            self.spatRef_available = False
320
321

    def _get_reference_image_params_pgSQL(self):
322
        # TODO implement earlier version of this function as a backup for SpatialIndexMediator
323
324
        """postgreSQL query: get IDs of overlapping scenes and select most suitable scene_ID
            (with respect to DGM, cloud cover"""
325
326
        warnings.warn('_get_reference_image_params_pgSQL is deprecated an will not work anymore.', DeprecationWarning)

327
328
        # vorab-check anhand wolkenmaske, welche region von im2shift überhaupt für shift-corr tauglich ist
        # -> diese region als argument in postgresql abfrage
329
        # scene_ID            = 14536400 # LE71510322000093SGS00 im2shift
330

331
        # set query conditions
332
333
        min_overlap = 20  # %
        max_cloudcov = 20  # %
334
        plusminus_days = 30
335
336
        AcqDate = self.im2shift_objDict['acquisition_date']
        date_minmax = [AcqDate - timedelta(days=plusminus_days), AcqDate + timedelta(days=plusminus_days)]
337
        dataset_cond = 'datasetid=%s' % CFG.datasetid_spatial_ref
338
339
340
341
342
        cloudcov_cond = 'cloudcover < %s' % max_cloudcov
        # FIXME cloudcover noch nicht für alle scenes im proc_level METADATA verfügbar
        dayrange_cond = "(EXTRACT(MONTH FROM scenes.acquisitiondate), EXTRACT(DAY FROM scenes.acquisitiondate)) " \
                        "BETWEEN (%s, %s) AND (%s, %s)" \
                        % (date_minmax[0].month, date_minmax[0].day, date_minmax[1].month, date_minmax[1].day)
343
344
        # TODO weitere Kriterien einbauen!

345
346
        def query_scenes(condlist):
            return DB_T.get_overlapping_scenes_from_postgreSQLdb(
347
                CFG.conn_database,
348
349
350
351
352
                table='scenes',
                tgt_corners_lonlat=self.trueDataCornerLonLat,
                conditions=condlist,
                add_cmds='ORDER BY scenes.cloudcover ASC',
                timeout=30000)
353
354
        conds_descImportance = [dataset_cond, cloudcov_cond, dayrange_cond]

355
        self.logger.info('Querying database in order to find a suitable reference scene for co-registration.')
356

357
        count, filt_overlap_scenes = 0, []
358
        while not filt_overlap_scenes:
359
360
361
362
            if count == 0:
                # search within already processed scenes
                # das ist nur Ergebnis aus scenes_proc
                # -> dort liegt nur eine referenz, wenn die szene schon bei CFG.job-Beginn in Datensatzliste drin war
363
                res = DB_T.get_overlapping_scenes_from_postgreSQLdb(
364
                    CFG.conn_database,
365
                    tgt_corners_lonlat=self.trueDataCornerLonLat,
366
                    conditions=['datasetid=%s' % CFG.datasetid_spatial_ref],
367
368
                    add_cmds='ORDER BY scenes.cloudcover ASC',
                    timeout=25000)
369
                filt_overlap_scenes = self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], 20.)
370

371
            else:
372
373
374
                # search within complete scenes table using less filter criteria with each run
                # TODO: Daniels Index einbauen, sonst  bei wachsender Tabellengröße evtl. irgendwann zu langsam
                res = query_scenes(conds_descImportance)
375
                filt_overlap_scenes = self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], min_overlap)
376

377
                if len(conds_descImportance) > 1:  # FIXME anderer Referenzsensor?
378
379
380
381
                    del conds_descImportance[-1]
                else:  # reduce min_overlap to 10 percent if there are overlapping scenes
                    if res:
                        min_overlap = 10
382
383
                        filt_overlap_scenes = \
                            self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], min_overlap)
384
385

                    # raise warnings if no match found
386
                    if not filt_overlap_scenes:
387
388
                        if not res:
                            warnings.warn('No reference scene found for %s (scene ID %s). Coregistration of this scene '
389
                                          'failed.' % (self.baseN, self.scene_ID))
390
391
392
                        else:
                            warnings.warn('Reference scenes for %s (scene ID %s) have been found but none has more '
                                          'than %s percent overlap. Coregistration of this scene failed.'
393
                                          % (self.baseN, self.scene_ID, min_overlap))
394
                        break
395
            count += 1
396
397
398
399

        if filt_overlap_scenes:
            ref_available = False
            for count, sc in enumerate(filt_overlap_scenes):
400
                if count == 2:  # FIXME Abbuch schon bei 3. Szene?
401
                    warnings.warn('No reference scene for %s (scene ID %s) available. '
402
                                  'Coregistration of this scene failed.' % (self.baseN, self.scene_ID))
403
404
405
                    break

                # start download of scene data not available and start L1A processing
406
                def dl_cmd(scene_ID): print('%s %s %s' % (
407
408
                    CFG.java_commands['keyword'].strip(),  # FIXME CFG.java_commands is deprecated
                    CFG.java_commands["value_download"].strip(), scene_ID))
409

410
                path = PG.path_generator(scene_ID=sc['scene_ID']).get_path_imagedata()
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
411

412
413
414
415
416
417
418
419
                if not os.path.exists(path):
                    # add scene 2 download to scenes_jobs.missing_scenes

                    # print JAVA download command
                    t_dl_start = time.time()
                    dl_cmd(sc['scene_ID'])

                    # check if scene is downloading
420
421
                    download_start_timeout = 5  # seconds
                    # set timout for external processing
422
                    # -> DEPRECATED BECAUSE CREATION OF EXTERNAL CFG WITHIN CFG IS NOT ALLOWED
423
                    processing_timeout = 5  # seconds # FIXME increase timeout if processing is really started
424
425
426
                    proc_level = None
                    while True:
                        proc_level_chk = DB_T.get_info_from_postgreSQLdb(
427
                            CFG.conn_database, 'scenes', ['proc_level'], {'id': sc['scene_ID']})[0][0]
428
                        if proc_level != proc_level_chk:
429
                            print('Reference scene %s, current processing level: %s' % (sc['scene_ID'], proc_level_chk))
430
                        proc_level = proc_level_chk
431
432
                        if proc_level_chk in ['SCHEDULED', 'METADATA'] and \
                           time.time() - t_dl_start >= download_start_timeout:
433
                            warnings.warn('Download of reference scene %s (entity ID %s) timed out. '
434
                                          'Coregistration of this scene failed.' % (self.baseN, self.scene_ID))
435
436
                            break
                        if proc_level_chk == 'L1A':
437
438
439
440
                            ref_available = True
                            break
                        elif proc_level_chk == 'DOWNLOADED' and time.time() - t_dl_start >= processing_timeout:
                            # proc_level='DOWNLOADED' but scene has not been processed
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
441
442
443
                            warnings.warn('L1A processing of reference scene %s (entity ID %s) timed out. '
                                          'Coregistration of this scene failed.' % (self.baseN, self.scene_ID))
                            break
444
                            # DB_T.set_info_in_postgreSQLdb(CFG.conn_database,'scenes',
445
                            #                             {'proc_level':'METADATA'},{'id':sc['scene_ID']})
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
446

447
448
449
450
451
452
453
                        time.sleep(5)
                else:
                    ref_available = True

                if not ref_available:
                    continue
                else:
454
455
                    self.path_imref = path
                    self.imref_scene_ID = sc['scene_ID']
456
                    self.imref_footprint_poly = sc['scene poly']
457
458
459
460
                    self.overlap_poly = sc['overlap poly']
                    self.overlap_percentage = sc['overlap percentage']
                    self.overlap_area = sc['overlap area']

461
                    query_res = DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'scenes', ['entityid'],
462
463
464
                                                                {'id': self.imref_scene_ID}, records2fetch=1)
                    assert query_res != [], 'No entity-ID found for scene number %s' % self.imref_scene_ID
                    self.imref_entity_ID = query_res[0][0]  # [('LC81510322013152LGN00',)]
465
                    break
466
        self.logger.close()
467

468
    def _sceneIDList_to_filt_overlap_scenes(self, sceneIDList, min_overlap):
469
470
471
        """find reference scenes that cover at least 20% of the scene with the given ID
        ONLY FIRST 50 scenes are considered"""

472
473
474
        # t0 = time.time()
        dict_sceneID_poly = [{'scene_ID': ID, 'scene poly': HLP_F.scene_ID_to_shapelyPolygon(ID)}
                             for ID in sceneIDList]  # always returns LonLot polygons
475
476

        # get overlap polygons and their parameters. result: [{overlap poly, overlap percentage, overlap area}]
477
478
        for dic in dict_sceneID_poly:  # input: dicts {scene_ID, ref_poly}
            dict_overlap_poly_params = get_overlap_polygon(dic['scene poly'], self.arr.footprint_poly)
479
            dic.update(dict_overlap_poly_params)  # adds {overlap poly, overlap percentage, overlap area}
480
        # print('polygon creation time', time.time()-t0)
481
482
483
484
485
486
487
488

        # filter those scene_IDs out where overlap percentage is below 20%
        if min_overlap:
            filt_overlap_scenes = [scene for scene in dict_sceneID_poly if scene['overlap percentage'] >= min_overlap]
        else:
            filt_overlap_scenes = dict_sceneID_poly

        return filt_overlap_scenes
489

490
    def get_opt_bands4matching(self, target_cwlPos_nm=550):
491
492
493
494
        """Automatically determines the optimal bands used für fourier shift theorem matching

        :param target_cwlPos_nm:   the desired wavelength used for matching
        """
495
496
        # get GMS_object for reference scene
        path_gmsFile = PG.path_generator(scene_ID=self.spatRef_scene.scene_ID).get_path_gmsfile()
497
        ref_obj = GMS_object.from_disk((path_gmsFile, ['cube', None]))
498

499
        # get spectral characteristics
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
500
501
502
        ref_cwl = [float(ref_obj.MetaObj.CWL[bN]) for bN in ref_obj.MetaObj.LayerBandsAssignment]
        shift_cwl = [float(self.MetaObj.CWL[bN]) for bN in self.MetaObj.LayerBandsAssignment]
        shift_fwhm = [float(self.MetaObj.FWHM[bN]) for bN in self.MetaObj.LayerBandsAssignment]
503
504

        # exclude cirrus/oxygen band of Landsat-8/Sentinel-2
505
        shift_bbl, ref_bbl = [False] * len(shift_cwl), [False] * len(ref_cwl)  # bad band lists
506
        for GMS_obj, s_r, bbl in zip([self, ref_obj], ['shift', 'ref'], [shift_bbl, ref_bbl]):
507
            GMS_obj.GMS_identifier.logger = None  # set a dummy value in order to avoid Exception
508
509
510
511
512
            sensorcode = get_GMS_sensorcode(GMS_obj.GMS_identifier)
            if sensorcode in ['LDCM', 'S2A', 'S2B'] and '9' in GMS_obj.LayerBandsAssignment:
                bbl[GMS_obj.LayerBandsAssignment.index('9')] = True
            if sensorcode in ['S2A', 'S2B'] and '10' in GMS_obj.LayerBandsAssignment:
                bbl[GMS_obj.LayerBandsAssignment.index('10')] = True
513

514
        # cwl_overlap = (max(min(shift_cwl),min(ref_cwl)),  min(max(shift_cwl),max(ref_cwl))) # -> (min wvl, max wvl)
515
        # find matching band of reference image for each band of image to be shifted
516
517
518
519
        match_dic = collections.OrderedDict()
        for idx, cwl, fwhm in zip(range(len(shift_cwl)), shift_cwl, shift_fwhm):
            if shift_bbl[idx]:
                continue  # skip cwl if it is declared as bad band above
520
521
522

            def is_inside(r_cwl, s_cwl, s_fwhm): return s_cwl - s_fwhm / 2 < r_cwl < s_cwl + s_fwhm / 2

523
524
            matching_r_cwls = [r_cwl for i, r_cwl in enumerate(ref_cwl) if
                               is_inside(r_cwl, cwl, fwhm) and not ref_bbl[i]]
525
526
            if matching_r_cwls:
                match_dic[cwl] = matching_r_cwls[0] if len(matching_r_cwls) else \
527
                    matching_r_cwls[np.abs(np.array(matching_r_cwls) - cwl).argmin()]
528
529
530
531
532

        # set bands4 match based on the above results
        poss_cwls = [cwl for cwl in shift_cwl if cwl in match_dic]
        if poss_cwls:
            if not target_cwlPos_nm:
533
534
535
536
537
538
539
                shift_band4match = shift_cwl.index(poss_cwls[0]) + 1  # first possible shift band
                ref_band4match = ref_cwl.index(match_dic[poss_cwls[0]]) + 1  # matching reference band
            else:  # target_cwlPos is given
                closestWvl_to_target = poss_cwls[np.abs(np.array(poss_cwls) - target_cwlPos_nm).argmin()]
                shift_band4match = shift_cwl.index(closestWvl_to_target) + 1  # the shift band closest to target
                ref_band4match = ref_cwl.index(match_dic[closestWvl_to_target]) + 1  # matching ref
        else:  # all reference bands are outside of shift-cwl +- fwhm/2
540
541
            self.logger.warning('Optimal bands for matching could not be automatically determined. '
                                'Choosing first band of each image.')
542
543
            shift_band4match = 1
            ref_band4match = 1
544

545
        self.logger.info(
546
            'Target band for matching:    %s (%snm)' % (shift_band4match, shift_cwl[shift_band4match - 1]))
547
548
        self.logger.info(
            'Reference band for matching: %s (%snm)' % (ref_band4match, ref_cwl[ref_band4match - 1]))
549
550
551

        return ref_band4match, shift_band4match

552
    def compute_global_shifts(self):
553
554
555
556
557
        spatIdxSrv_status = os.environ['GMS_SPAT_IDX_SRV_STATUS'] if 'GMS_SPAT_IDX_SRV_STATUS' in os.environ else True

        if spatIdxSrv_status == 'unavailable':
            self.logger.warning('Coregistration skipped due to unavailable Spatial Index Mediator Server!"')

558
        elif CFG.skip_coreg:
559
            self.logger.warning('Coregistration skipped according to user configuration.')
560

561
        elif self.coreg_needed and self.spatRef_available:
562
563
564
            self.coreg_info.update({'reference scene ID': self.spatRef_scene.scene_ID})
            self.coreg_info.update({'reference entity ID': self.spatRef_scene.entity_ID})

565
566
            geoArr_ref = GeoArray(self.spatRef_scene.filePath)
            geoArr_shift = GeoArray(self.arr)
567
            r_b4match, s_b4match = self.get_opt_bands4matching(target_cwlPos_nm=CFG.coreg_band_wavelength_for_matching)
568
569
570
571
572
            coreg_kwargs = dict(
                r_b4match=r_b4match,
                s_b4match=s_b4match,
                align_grids=True,  # FIXME not needed here
                match_gsd=True,  # FIXME not needed here
573
                max_shift=CFG.coreg_max_shift_allowed,
Daniel Scheffler's avatar
Fix.    
Daniel Scheffler committed
574
                ws=CFG.coreg_window_size,
575
                data_corners_ref=[[x, y] for x, y in self.spatRef_scene.polyUTM.convex_hull.exterior.coords],
576
                data_corners_tgt=[transform_any_prj(EPSG2WKT(4326), self.MetaObj.projection, x, y)
577
578
579
580
581
582
583
                                  for x, y in HLP_F.reorder_CornerLonLat(self.trueDataCornerLonLat)],
                nodata=(get_outFillZeroSaturated(geoArr_ref.dtype)[0],
                        get_outFillZeroSaturated(geoArr_shift.dtype)[0]),
                ignore_errors=True,
                v=False,
                q=True
            )
584

585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
            # initialize COREG object
            try:
                COREG_obj = COREG(geoArr_ref, geoArr_shift, **coreg_kwargs)
            except Exception as e:
                COREG_obj = None
                self.logger.error('\nAn error occurred during coregistration. BE AWARE THAT THE SCENE %s '
                                  '(ENTITY ID %s) HAS NOT BEEN COREGISTERED! Error message was: \n%s\n'
                                  % (self.scene_ID, self.entity_ID, repr(e)))
                self.logger.error(traceback.format_exc())
                # TODO include that in the job summary

            # calculate_spatial_shifts
            if COREG_obj:
                COREG_obj.calculate_spatial_shifts()

                self.coreg_info.update(
                    COREG_obj.coreg_info)  # no clipping to trueCornerLonLat until here -> only shift correction
                self.coreg_info.update({'shift_reliability': COREG_obj.shift_reliability})

                if COREG_obj.success:
                    self.coreg_info['success'] = True
                    self.logger.info("Calculated map shifts (X,Y): %s / %s"
                                     % (COREG_obj.x_shift_map,
                                        COREG_obj.y_shift_map))  # FIXME direkt in calculate_spatial_shifts loggen
                    self.logger.info("Reliability of calculated shift: %.1f percent" % COREG_obj.shift_reliability)
610

611
612
613
614
                else:
                    # TODO add database entry with error hint
                    [self.logger.error('ERROR during coregistration of scene %s (entity ID %s):\n%s'
                                       % (self.scene_ID, self.entity_ID, err)) for err in COREG_obj.tracked_errors]
615

616
        else:
617
            if self.coreg_needed:
618
619
                self.logger.warning('Coregistration skipped because no suitable reference scene is available or '
                                    'spatial query failed.')
620
621
            else:
                self.logger.info('Coregistration of scene %s (entity ID %s) skipped because target dataset ID equals '
622
623
                                 'reference dataset ID.' % (self.scene_ID, self.entity_ID))

624
625
    def correct_spatial_shifts(self, cliptoextent=True, clipextent=None, clipextent_prj=None, v=False):
        # type: (bool, list, any, bool) -> None
626
        """Corrects the spatial shifts calculated by self.compute_global_shifts().
627
628
629
630
631
632
633
634
635

        :param cliptoextent:    whether to clip the output to the given extent
        :param clipextent:      list of XY-coordinate tuples giving the target extent (if not given and cliptoextent is
                                True, the 'trueDataCornerLonLat' attribute of the GMS object is used
        :param clipextent_prj:  WKT projection string or EPSG code of the projection for the coordinates in clipextent
        :param v:
        :return:
        """

636
637
        # cliptoextent is automatically True if an extent is given
        cliptoextent = cliptoextent if not clipextent else True
638

639
640
        if cliptoextent or self.resamp_needed or (self.coreg_needed and self.coreg_info['success']):

641
            # get target bounds # TODO implement boxObj call instead here
642
            if not clipextent:
643
644
                trueDataCornerUTM = [transform_any_prj(EPSG2WKT(4326), self.MetaObj.projection, x, y)
                                     for x, y in self.trueDataCornerLonLat]
645
                xmin, xmax, ymin, ymax = corner_coord_to_minmax(trueDataCornerUTM)
646
                mapBounds = box(xmin, ymin, xmax, ymax).bounds
647
648
649
650
651
652
653
654
            else:
                assert clipextent_prj, \
                    "'clipextent_prj' must be given together with 'clipextent'. Received only 'clipextent'."
                clipextent_UTM = clipextent if prj_equal(self.MetaObj.projection, clipextent_prj) else \
                    [transform_any_prj(clipextent_prj, self.MetaObj.projection, x, y) for x, y in clipextent]
                xmin, xmax, ymin, ymax = corner_coord_to_minmax(clipextent_UTM)
                mapBounds = box(xmin, ymin, xmax, ymax).bounds

655
            # correct shifts and clip to extent
656
657
            # ensure self.masks exists (does not exist in case of inmem_serialization mode because
            # then self.fill_from_disk() is skipped)
658
659
660
            if not hasattr(self, 'masks') or self.masks is None:
                self.build_combined_masks_array()  # creates self.masks and self.masks_meta

661
662
663
            # exclude self.mask_nodata, self.mask_clouds from warping
            del self.mask_nodata, self.mask_clouds

664
665
666
            attributes2deshift = [attrname for attrname in
                                  ['arr', 'masks', 'dem', 'ac_errors', 'mask_clouds_confidence']
                                  if getattr(self, '_%s' % attrname) is not None]
667
            for attrname in attributes2deshift:
668
                geoArr = getattr(self, attrname)
669
670

                # do some logging
671
672
                if self.coreg_needed:
                    if self.coreg_info['success']:
673
674
                        self.logger.info("Correcting spatial shifts for attribute '%s'..." % attrname)
                    elif cliptoextent and is_coord_grid_equal(
675
                         geoArr.gt, CFG.spatial_ref_gridx, CFG.spatial_ref_gridy):
676
                        self.logger.info("Attribute '%s' has only been clipped to it's extent because no valid "
677
678
                                         "shifts have been detected and the pixel grid equals the target grid."
                                         % attrname)
679
680
                    elif self.resamp_needed:
                        self.logger.info("Resampling attribute '%s' to target grid..." % attrname)
681
682
683
684
                elif self.resamp_needed:
                    self.logger.info("Resampling attribute '%s' to target grid..." % attrname)

                # correct shifts
685
                DS = DESHIFTER(geoArr, self.coreg_info,
686
                               target_xyGrid=[CFG.spatial_ref_gridx, CFG.spatial_ref_gridy],
687
688
689
                               cliptoextent=cliptoextent,
                               clipextent=mapBounds,
                               align_grids=True,
690
                               resamp_alg='nearest' if attrname == 'masks' else CFG.spatial_resamp_alg,
691
                               CPUs=None if CFG.allow_subMultiprocessing else 1,
692
693
694
                               progress=True if v else False,
                               q=True,
                               v=v)
695
696
697
                DS.correct_shifts()

                # update coreg_info
698
699
                if attrname == 'arr':
                    self.coreg_info['is shifted'] = DS.is_shifted
700
                    self.coreg_info['is resampled'] = DS.is_resampled
701

702
                # update geoinformations and array shape related attributes
703
704
                self.logger.info("Updating geoinformations of '%s' attribute..." % attrname)
                if attrname == 'arr':
705
706
                    self.MetaObj.map_info = DS.updated_map_info
                    self.MetaObj.projection = EPSG2WKT(WKT2EPSG(DS.updated_projection))
707
                    self.shape_fullArr = DS.arr_shifted.shape
708
                    self.MetaObj.rows, self.MetaObj.cols = DS.arr_shifted.shape[:2]
709
                else:
710
711
                    self.masks_meta['map info'] = DS.updated_map_info
                    self.masks_meta['coordinate system string'] = EPSG2WKT(WKT2EPSG(DS.updated_projection))
712
713
                    self.masks_meta['lines'], self.masks_meta['samples'] = DS.arr_shifted.shape[:2]

714
715
                    # NOTE: mask_nodata and mask_clouds are updated later by L2A_map mapper function (module pipeline)

716
                # update the GeoArray instance without loosing its inherent metadata (nodata, ...)
717
718
719
                geoArr.arr, geoArr.gt, geoArr.prj = \
                    DS.GeoArray_shifted.arr, DS.GeoArray_shifted.gt, DS.GeoArray_shifted.prj
                # setattr(self,attrname, DS.GeoArray_shifted) # NOTE: don't set array earlier because setter will also
720
                #                                             # update arr.gt/.prj/.nodata from MetaObj
721

722
            self.resamp_needed = False
723
            self.coreg_needed = False
724

725
726
            # recreate self.masks_nodata and self.mask_clouds from self.masks
            self.mask_nodata = self.mask_nodata
727
728
            self.mask_clouds = self.mask_clouds
            # FIXME move functionality of self.masks only to output writer and remove self.masks completely