L1B_P.py 39.8 KB
Newer Older
Daniel Scheffler's avatar
Daniel Scheffler committed
1
# -*- coding: utf-8 -*-
2
3
4

# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
#
5
# Copyright (C) 2020  Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
6
7
8
9
10
11
#
# This software was developed within the context of the GeoMultiSens project funded
# by the German Federal Ministry of Education and Research
# (project grant code: 01 IS 14 010 A-C).
#
# This program is free software: you can redistribute it and/or modify it under
12
13
14
15
16
17
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version. Please note the following exception: `spechomo` depends on tqdm, which
# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
18
19
20
21
22
23
24
25
26
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.

27
28
29
30
31
32
"""
Level 1B Processor:

Detection of global/local geometric displacements.
"""

Daniel Scheffler's avatar
Daniel Scheffler committed
33

34
import collections
35
import os
36
import time
37
import warnings
38
from datetime import datetime, timedelta
39
40

import numpy as np
41
from geopandas import GeoDataFrame
42
from shapely.geometry import box
43
import pytz
44
import traceback
45
from typing import Union, TYPE_CHECKING  # noqa F401  # flake8 issue
Daniel Scheffler's avatar
Daniel Scheffler committed
46

47
from arosics import COREG, DESHIFTER
48
from geoarray import GeoArray
49
50
51
52
53
54
from py_tools_ds.geo.coord_grid import is_coord_grid_equal
from py_tools_ds.geo.coord_calc import corner_coord_to_minmax
from py_tools_ds.geo.coord_trafo import reproject_shapelyGeometry, transform_any_prj
from py_tools_ds.geo.projection import prj_equal, EPSG2WKT, WKT2EPSG
from py_tools_ds.geo.vector.topology import get_overlap_polygon

55
from ..options.config import GMS_config as CFG
56
from ..model.gms_object import GMS_object
57
58
59
60
from .L1A_P import L1A_object
from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc import path_generator as PG
61
from ..misc.logging import GMS_logger, close_logger
62
from ..misc.spatial_index_mediator import SpatialIndexMediator
63
from ..misc.definition_dicts import get_GMS_sensorcode, get_outFillZeroSaturated
64

65
if TYPE_CHECKING:
Daniel Scheffler's avatar
Daniel Scheffler committed
66
67
    from shapely.geometry import Polygon  # noqa F401  # flake8 issue
    from logging import Logger  # noqa F401  # flake8 issue
68

69
__author__ = 'Daniel Scheffler'
70
71


72
class Scene_finder(object):
73
74
    """Scene_finder class to query the postgreSQL database to find a suitable reference scene for co-registration."""

75
    def __init__(self, src_boundsLonLat, src_AcqDate, src_prj, src_footprint_poly, sceneID_excluded=None,
76
                 min_overlap=20, min_cloudcov=0, max_cloudcov=20, plusminus_days=30, plusminus_years=10, logger=None):
77
        # type: (list, datetime, str, Polygon, int, int, int, int, int, int, Logger) -> None
78
79
80
81
82
83
84
85
86
87
88
89
90
        """Initialize Scene_finder.

        :param src_boundsLonLat:
        :param src_AcqDate:
        :param src_prj:
        :param src_footprint_poly:
        :param sceneID_excluded:
        :param min_overlap:         minimum overlap of reference scene in percent
        :param min_cloudcov:        minimum cloud cover of reference scene in percent
        :param max_cloudcov:        maximum cloud cover of reference scene in percent
        :param plusminus_days:      maximum time interval between target and reference scene in days
        :param plusminus_years:     maximum time interval between target and reference scene in years
        """
91
92
93
        self.boundsLonLat = src_boundsLonLat
        self.src_AcqDate = src_AcqDate
        self.src_prj = src_prj
94
        self.src_footprint_poly = src_footprint_poly
95
        self.sceneID_excluded = sceneID_excluded
96
97
98
99
100
        self.min_overlap = min_overlap
        self.min_cloudcov = min_cloudcov
        self.max_cloudcov = max_cloudcov
        self.plusminus_days = plusminus_days
        self.plusminus_years = plusminus_years
101
        self.logger = logger or GMS_logger('ReferenceSceneFinder')
102

103
        # get temporal constraints
104
        def add_years(dt, years): return dt.replace(dt.year + years) \
105
106
107
            if not (dt.month == 2 and dt.day == 29) else dt.replace(dt.year + years, 3, 1)
        self.timeStart = add_years(self.src_AcqDate, -plusminus_years)
        timeEnd = add_years(self.src_AcqDate, +plusminus_years)
108
109
        timeNow = datetime.utcnow().replace(tzinfo=pytz.UTC)
        self.timeEnd = timeEnd if timeEnd <= timeNow else timeNow
110

111
112
113
        self.possib_ref_scenes = None  # set by self.spatial_query()
        self.GDF_ref_scenes = GeoDataFrame()  # set by self.spatial_query()
        self.ref_scene = None
114

115
116
117
118
119
120
121
122
123
124
125
    def __getstate__(self):
        """Defines how the attributes of Scene_finder instances are pickled."""
        close_logger(self.logger)
        self.logger = None

        return self.__dict__

    def __del__(self):
        close_logger(self.logger)
        self.logger = None

126
    def spatial_query(self, timeout=5):
127
128
129
130
        """Query the postgreSQL database to find possible reference scenes matching the specified criteria.

        :param timeout:     maximum query duration allowed (seconds)
        """
131
132
133
134
135
136
137
138
139
140
        SpIM = SpatialIndexMediator(host=CFG.spatial_index_server_host, port=CFG.spatial_index_server_port,
                                    timeout=timeout, retries=10)
        self.possib_ref_scenes = SpIM.getFullSceneDataForDataset(envelope=self.boundsLonLat,
                                                                 timeStart=self.timeStart,
                                                                 timeEnd=self.timeEnd,
                                                                 minCloudCover=self.min_cloudcov,
                                                                 maxCloudCover=self.max_cloudcov,
                                                                 datasetid=CFG.datasetid_spatial_ref,
                                                                 refDate=self.src_AcqDate,
                                                                 maxDaysDelta=self.plusminus_days)
141

142
143
144
        if self.possib_ref_scenes:
            # fill GeoDataFrame with possible ref scene parameters
            GDF = GeoDataFrame(self.possib_ref_scenes, columns=['object'])
145
146
147
148
            GDF['sceneid'] = list(GDF['object'].map(lambda scene: scene.sceneid))
            GDF['acquisitiondate'] = list(GDF['object'].map(lambda scene: scene.acquisitiondate))
            GDF['cloudcover'] = list(GDF['object'].map(lambda scene: scene.cloudcover))
            GDF['polyLonLat'] = list(GDF['object'].map(lambda scene: scene.polyLonLat))
149

150
151
            def LonLat2UTM(polyLL):
                return reproject_shapelyGeometry(polyLL, 4326, self.src_prj)
152

153
154
            GDF['polyUTM'] = list(GDF['polyLonLat'].map(LonLat2UTM))
            self.GDF_ref_scenes = GDF
155

156
157
158
    def _collect_refscene_metadata(self):
        """Collect some reference scene metadata needed for later filtering."""
        GDF = self.GDF_ref_scenes
159

160
161
162
163
164
165
166
167
168
169
170
        # get overlap parameter
        def get_OL_prms(poly): return get_overlap_polygon(poly, self.src_footprint_poly)

        GDF['overlapParams'] = list(GDF['polyLonLat'].map(get_OL_prms))
        GDF['overlap area'] = list(GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap area']))
        GDF['overlap percentage'] = list(GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap percentage']))
        GDF['overlap poly'] = list(GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap poly']))
        del GDF['overlapParams']

        # get processing level of reference scenes
        procL = GeoDataFrame(
171
            DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'scenes_proc', ['sceneid', 'proc_level'],
172
173
174
175
176
177
178
179
180
181
182
183
                                            {'sceneid': list(GDF.sceneid)}), columns=['sceneid', 'proc_level'])
        GDF = GDF.merge(procL, on='sceneid', how='left')
        GDF = GDF.where(GDF.notnull(), None)  # replace NaN values with None

        # get path of binary file
        def get_path_binary(GDF_row):
            return PG.path_generator(scene_ID=GDF_row['sceneid'], proc_level=GDF_row['proc_level']) \
                .get_path_imagedata() if GDF_row['proc_level'] else None
        GDF['path_ref'] = GDF.apply(lambda GDF_row: get_path_binary(GDF_row), axis=1)
        GDF['refDs_exists'] = list(GDF['path_ref'].map(lambda p: os.path.exists(p) if p else False))

        # check if a proper entity ID can be gathered from database
184
        eID = GeoDataFrame(DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'scenes', ['id', 'entityid'],
185
186
187
188
189
190
191
192
                                                           {'id': list(GDF.sceneid)}), columns=['sceneid', 'entityid'])
        GDF = GDF.merge(eID, on='sceneid', how='left')
        self.GDF_ref_scenes = GDF.where(GDF.notnull(), None)

    def _filter_excluded_sceneID(self):
        """Filter reference scene with the same scene ID like the target scene."""
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
Daniel Scheffler's avatar
Daniel Scheffler committed
193
            self.logger.info('Same ID filter:  Excluding scene with the same ID like the target scene.')
194
            self.GDF_ref_scenes = GDF.loc[GDF['sceneid'] != self.sceneID_excluded]
195
            self.logger.info('%s scenes => %s scenes' % (len(GDF), len(self.GDF_ref_scenes)))
196

197
    def _filter_by_overlap(self):
198
        """Filter all scenes with less spatial overlap than self.min_overlap."""
199
200
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
Daniel Scheffler's avatar
Daniel Scheffler committed
201
202
            self.logger.info('Overlap filter:  Excluding all scenes with less than %s percent spatial overlap.'
                             % self.min_overlap)
203
            self.GDF_ref_scenes = GDF.loc[GDF['overlap percentage'] >= self.min_overlap]
204
            self.logger.info('%s scenes => %s scenes' % (len(GDF), len(self.GDF_ref_scenes)))
205

206
    def _filter_by_proc_status(self):
207
        """Filter all scenes that have not been processed before according to proc. status (at least L1A is needed)."""
208
209
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
Daniel Scheffler's avatar
Daniel Scheffler committed
210
211
            self.logger.info('Processing level filter:  Exclude all scenes that have not been processed before '
                             'according to processing status (at least L1A is needed).')
212
            self.GDF_ref_scenes = GDF[GDF['proc_level'].notnull()]
213
            self.logger.info('%s scenes => %s scenes' % (len(GDF), len(self.GDF_ref_scenes)))
214

Daniel Scheffler's avatar
Daniel Scheffler committed
215
    def _filter_by_dataset_existence(self):
216
        """Filter all scenes where no processed data can be found on fileserver."""
217
218
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
Daniel Scheffler's avatar
Daniel Scheffler committed
219
            self.logger.info('Existence filter:  Excluding all scenes where no processed data have been found.')
220
            self.GDF_ref_scenes = GDF[GDF['refDs_exists']]
221
            self.logger.info('%s scenes => %s scenes' % (len(GDF), len(self.GDF_ref_scenes)))
222

223
    def _filter_by_entity_ID_availability(self):
224
        """Filter all scenes where no proper entity ID can be found in the database (database errors)."""
225
226
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
Daniel Scheffler's avatar
Daniel Scheffler committed
227
228
            self.logger.info('DB validity filter:  Exclude all scenes where no proper entity ID can be found in the '
                             'database (database errors).')
229
            self.GDF_ref_scenes = GDF[GDF['entityid'].notnull()]
230
            self.logger.info('%s scenes => %s scenes' % (len(GDF), len(self.GDF_ref_scenes)))
231

232
    def _filter_by_projection(self):
233
        """Filter all scenes that have a different projection than the target image."""
234
        GDF = self.GDF_ref_scenes[self.GDF_ref_scenes.refDs_exists]
235
236
        if not GDF.empty:
            # compare projections of target and reference image
237
238
            GDF['prj_equal'] = \
                list(GDF['path_ref'].map(lambda path_ref: prj_equal(self.src_prj, GeoArray(path_ref).prj)))
239

Daniel Scheffler's avatar
Daniel Scheffler committed
240
241
            self.logger.info('Projection filter:  Exclude all scenes that have a different projection than the target '
                             'image.')
242
            self.GDF_ref_scenes = GDF[GDF['prj_equal']]
243
            self.logger.info('%s scenes => %s scenes' % (len(GDF), len(self.GDF_ref_scenes)))
244

245
246
    def choose_ref_scene(self):
        """Choose reference scene with minimum cloud cover and maximum overlap."""
247
248
249
        if self.possib_ref_scenes:
            # First, collect some relavant reference scene metadata
            self._collect_refscene_metadata()
250

251
252
253
254
            # Filter possible scenes by running all filter functions
            self._filter_excluded_sceneID()
            self._filter_by_overlap()
            self._filter_by_proc_status()
Daniel Scheffler's avatar
Daniel Scheffler committed
255
            self._filter_by_dataset_existence()
256
257
            self._filter_by_entity_ID_availability()
            self._filter_by_projection()
258

259
260
261
262
263
        # Choose the reference scene out of the filtered DataFrame
        if not self.GDF_ref_scenes.empty:
            GDF = self.GDF_ref_scenes
            GDF = GDF[GDF['cloudcover'] == GDF['cloudcover'].min()]
            GDF = GDF[GDF['overlap percentage'] == GDF['overlap percentage'].max()]
264

265
266
267
268
269
            if not GDF.empty:
                GDF_res = GDF.iloc[0]
                return ref_Scene(GDF_res)
        else:
            return None
270

271

272
273
class ref_Scene:
    def __init__(self, GDF_record):
274
275
276
        self.scene_ID = int(GDF_record['sceneid'])
        self.entity_ID = GDF_record['entityid']
        self.AcqDate = GDF_record['acquisitiondate']
277
278
        self.cloudcover = GDF_record['cloudcover']
        self.polyLonLat = GDF_record['polyLonLat']
279
        self.polyUTM = GDF_record['polyUTM']
280
        self.proc_level = GDF_record['proc_level']
281
        self.filePath = GDF_record['path_ref']
282
283
284
285
286


class L1B_object(L1A_object):
    def __init__(self, L1A_obj=None):

287
        super(L1B_object, self).__init__()
288
289
290

        # set defaults
        self._spatRef_available = None
291
292
        self.spatRef_scene = None  # set by self.get_spatial_reference_scene()
        self.deshift_results = collections.OrderedDict()
293
294
295
296
297
298

        if L1A_obj:
            # populate attributes
            [setattr(self, key, value) for key, value in L1A_obj.__dict__.items()]

        self.proc_level = 'L1B'
299
        self.proc_status = 'initialized'
300
301
302

    @property
    def spatRef_available(self):
303
        if self._spatRef_available is not None:
304
305
306
307
308
309
310
311
312
313
            return self._spatRef_available
        else:
            self.get_spatial_reference_scene()
            return self._spatRef_available

    @spatRef_available.setter
    def spatRef_available(self, spatRef_available):
        self._spatRef_available = spatRef_available

    def get_spatial_reference_scene(self):
314
        boundsLonLat = corner_coord_to_minmax(self.trueDataCornerLonLat)
315
        footprint_poly = HLP_F.CornerLonLat_to_shapelyPoly(self.trueDataCornerLonLat)
316
        RSF = Scene_finder(boundsLonLat, self.acq_datetime, self.MetaObj.projection,
317
318
319
320
321
                           footprint_poly, self.scene_ID,
                           min_overlap=CFG.spatial_ref_min_overlap,
                           min_cloudcov=CFG.spatial_ref_min_cloudcov,
                           max_cloudcov=CFG.spatial_ref_max_cloudcov,
                           plusminus_days=CFG.spatial_ref_plusminus_days,
322
323
                           plusminus_years=CFG.spatial_ref_plusminus_years,
                           logger=self.logger)
324
325
326
327
328
329
330

        # run spatial query
        self.logger.info('Querying database in order to find a suitable reference scene for co-registration.')
        RSF.spatial_query(timeout=5)
        if RSF.possib_ref_scenes:
            self.logger.info('Query result: %s reference scenes with matching metadata.' % len(RSF.possib_ref_scenes))

331
332
333
334
335
336
337
338
339
340
341
            # try to get a spatial reference scene by applying some filter criteria
            self.spatRef_scene = RSF.choose_ref_scene()  # type: Union[ref_Scene, None]
            if self.spatRef_scene:
                self.spatRef_available = True
                self.logger.info('Found a suitable reference image for coregistration: scene ID %s (entity ID %s).'
                                 % (self.spatRef_scene.scene_ID, self.spatRef_scene.entity_ID))
            else:
                self.spatRef_available = False
                self.logger.warning('No scene fulfills all requirements to serve as spatial reference for scene %s '
                                    '(entity ID %s). Coregistration impossible.' % (self.scene_ID, self.entity_ID))

342
        else:
343
            self.logger.warning('Spatial query returned no matches. Coregistration impossible.')
344
            self.spatRef_available = False
345
346

    def _get_reference_image_params_pgSQL(self):
347
        # TODO implement earlier version of this function as a backup for SpatialIndexMediator
348
349
        """postgreSQL query: get IDs of overlapping scenes and select most suitable scene_ID
            (with respect to DGM, cloud cover"""
350
351
        warnings.warn('_get_reference_image_params_pgSQL is deprecated an will not work anymore.', DeprecationWarning)

352
353
        # vorab-check anhand wolkenmaske, welche region von im2shift überhaupt für shift-corr tauglich ist
        # -> diese region als argument in postgresql abfrage
354
        # scene_ID            = 14536400 # LE71510322000093SGS00 im2shift
355

356
        # set query conditions
357
358
        min_overlap = 20  # %
        max_cloudcov = 20  # %
359
        plusminus_days = 30
360
361
        AcqDate = self.im2shift_objDict['acquisition_date']
        date_minmax = [AcqDate - timedelta(days=plusminus_days), AcqDate + timedelta(days=plusminus_days)]
362
        dataset_cond = 'datasetid=%s' % CFG.datasetid_spatial_ref
363
364
365
366
367
        cloudcov_cond = 'cloudcover < %s' % max_cloudcov
        # FIXME cloudcover noch nicht für alle scenes im proc_level METADATA verfügbar
        dayrange_cond = "(EXTRACT(MONTH FROM scenes.acquisitiondate), EXTRACT(DAY FROM scenes.acquisitiondate)) " \
                        "BETWEEN (%s, %s) AND (%s, %s)" \
                        % (date_minmax[0].month, date_minmax[0].day, date_minmax[1].month, date_minmax[1].day)
368
369
        # TODO weitere Kriterien einbauen!

370
371
        def query_scenes(condlist):
            return DB_T.get_overlapping_scenes_from_postgreSQLdb(
372
                CFG.conn_database,
373
374
375
376
377
                table='scenes',
                tgt_corners_lonlat=self.trueDataCornerLonLat,
                conditions=condlist,
                add_cmds='ORDER BY scenes.cloudcover ASC',
                timeout=30000)
378
379
        conds_descImportance = [dataset_cond, cloudcov_cond, dayrange_cond]

380
        self.logger.info('Querying database in order to find a suitable reference scene for co-registration.')
381

382
        count, filt_overlap_scenes = 0, []
383
        while not filt_overlap_scenes:
384
385
386
387
            if count == 0:
                # search within already processed scenes
                # das ist nur Ergebnis aus scenes_proc
                # -> dort liegt nur eine referenz, wenn die szene schon bei CFG.job-Beginn in Datensatzliste drin war
388
                res = DB_T.get_overlapping_scenes_from_postgreSQLdb(
389
                    CFG.conn_database,
390
                    tgt_corners_lonlat=self.trueDataCornerLonLat,
391
                    conditions=['datasetid=%s' % CFG.datasetid_spatial_ref],
392
393
                    add_cmds='ORDER BY scenes.cloudcover ASC',
                    timeout=25000)
394
                filt_overlap_scenes = self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], 20.)
395

396
            else:
397
398
399
                # search within complete scenes table using less filter criteria with each run
                # TODO: Daniels Index einbauen, sonst  bei wachsender Tabellengröße evtl. irgendwann zu langsam
                res = query_scenes(conds_descImportance)
400
                filt_overlap_scenes = self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], min_overlap)
401

402
                if len(conds_descImportance) > 1:  # FIXME anderer Referenzsensor?
403
404
405
406
                    del conds_descImportance[-1]
                else:  # reduce min_overlap to 10 percent if there are overlapping scenes
                    if res:
                        min_overlap = 10
407
408
                        filt_overlap_scenes = \
                            self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], min_overlap)
409
410

                    # raise warnings if no match found
411
                    if not filt_overlap_scenes:
412
413
                        if not res:
                            warnings.warn('No reference scene found for %s (scene ID %s). Coregistration of this scene '
414
                                          'failed.' % (self.baseN, self.scene_ID))
415
416
417
                        else:
                            warnings.warn('Reference scenes for %s (scene ID %s) have been found but none has more '
                                          'than %s percent overlap. Coregistration of this scene failed.'
418
                                          % (self.baseN, self.scene_ID, min_overlap))
419
                        break
420
            count += 1
421
422
423
424

        if filt_overlap_scenes:
            ref_available = False
            for count, sc in enumerate(filt_overlap_scenes):
425
                if count == 2:  # FIXME Abbuch schon bei 3. Szene?
426
                    warnings.warn('No reference scene for %s (scene ID %s) available. '
427
                                  'Coregistration of this scene failed.' % (self.baseN, self.scene_ID))
428
429
430
                    break

                # start download of scene data not available and start L1A processing
431
                def dl_cmd(scene_ID): print('%s %s %s' % (
432
433
                    CFG.java_commands['keyword'].strip(),  # FIXME CFG.java_commands is deprecated
                    CFG.java_commands["value_download"].strip(), scene_ID))
434

435
                path = PG.path_generator(scene_ID=sc['scene_ID']).get_path_imagedata()
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
436

437
438
439
440
441
442
443
444
                if not os.path.exists(path):
                    # add scene 2 download to scenes_jobs.missing_scenes

                    # print JAVA download command
                    t_dl_start = time.time()
                    dl_cmd(sc['scene_ID'])

                    # check if scene is downloading
445
446
                    download_start_timeout = 5  # seconds
                    # set timout for external processing
447
                    # -> DEPRECATED BECAUSE CREATION OF EXTERNAL CFG WITHIN CFG IS NOT ALLOWED
448
                    processing_timeout = 5  # seconds # FIXME increase timeout if processing is really started
449
450
451
                    proc_level = None
                    while True:
                        proc_level_chk = DB_T.get_info_from_postgreSQLdb(
452
                            CFG.conn_database, 'scenes', ['proc_level'], {'id': sc['scene_ID']})[0][0]
453
                        if proc_level != proc_level_chk:
454
                            print('Reference scene %s, current processing level: %s' % (sc['scene_ID'], proc_level_chk))
455
                        proc_level = proc_level_chk
456
457
                        if proc_level_chk in ['SCHEDULED', 'METADATA'] and \
                           time.time() - t_dl_start >= download_start_timeout:
458
                            warnings.warn('Download of reference scene %s (entity ID %s) timed out. '
459
                                          'Coregistration of this scene failed.' % (self.baseN, self.scene_ID))
460
461
                            break
                        if proc_level_chk == 'L1A':
462
463
464
465
                            ref_available = True
                            break
                        elif proc_level_chk == 'DOWNLOADED' and time.time() - t_dl_start >= processing_timeout:
                            # proc_level='DOWNLOADED' but scene has not been processed
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
466
467
468
                            warnings.warn('L1A processing of reference scene %s (entity ID %s) timed out. '
                                          'Coregistration of this scene failed.' % (self.baseN, self.scene_ID))
                            break
469
                            # DB_T.set_info_in_postgreSQLdb(CFG.conn_database,'scenes',
470
                            #                             {'proc_level':'METADATA'},{'id':sc['scene_ID']})
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
471

472
473
474
475
476
477
478
                        time.sleep(5)
                else:
                    ref_available = True

                if not ref_available:
                    continue
                else:
479
480
                    self.path_imref = path
                    self.imref_scene_ID = sc['scene_ID']
481
                    self.imref_footprint_poly = sc['scene poly']
482
483
484
485
                    self.overlap_poly = sc['overlap poly']
                    self.overlap_percentage = sc['overlap percentage']
                    self.overlap_area = sc['overlap area']

486
                    query_res = DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'scenes', ['entityid'],
487
488
489
                                                                {'id': self.imref_scene_ID}, records2fetch=1)
                    assert query_res != [], 'No entity-ID found for scene number %s' % self.imref_scene_ID
                    self.imref_entity_ID = query_res[0][0]  # [('LC81510322013152LGN00',)]
490
                    break
491
        self.logger.close()
492

493
    def _sceneIDList_to_filt_overlap_scenes(self, sceneIDList, min_overlap):
494
495
496
        """find reference scenes that cover at least 20% of the scene with the given ID
        ONLY FIRST 50 scenes are considered"""

497
498
499
        # t0 = time.time()
        dict_sceneID_poly = [{'scene_ID': ID, 'scene poly': HLP_F.scene_ID_to_shapelyPolygon(ID)}
                             for ID in sceneIDList]  # always returns LonLot polygons
500
501

        # get overlap polygons and their parameters. result: [{overlap poly, overlap percentage, overlap area}]
502
503
        for dic in dict_sceneID_poly:  # input: dicts {scene_ID, ref_poly}
            dict_overlap_poly_params = get_overlap_polygon(dic['scene poly'], self.arr.footprint_poly)
504
            dic.update(dict_overlap_poly_params)  # adds {overlap poly, overlap percentage, overlap area}
505
        # print('polygon creation time', time.time()-t0)
506
507
508
509
510
511
512
513

        # filter those scene_IDs out where overlap percentage is below 20%
        if min_overlap:
            filt_overlap_scenes = [scene for scene in dict_sceneID_poly if scene['overlap percentage'] >= min_overlap]
        else:
            filt_overlap_scenes = dict_sceneID_poly

        return filt_overlap_scenes
514

515
    def get_opt_bands4matching(self, target_cwlPos_nm=550):
516
517
518
519
        """Automatically determines the optimal bands used für fourier shift theorem matching

        :param target_cwlPos_nm:   the desired wavelength used for matching
        """
520
521
        # get GMS_object for reference scene
        path_gmsFile = PG.path_generator(scene_ID=self.spatRef_scene.scene_ID).get_path_gmsfile()
522
        ref_obj = GMS_object.from_disk((path_gmsFile, ['cube', None]))
523

524
        # get spectral characteristics
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
525
526
527
        ref_cwl = [float(ref_obj.MetaObj.CWL[bN]) for bN in ref_obj.MetaObj.LayerBandsAssignment]
        shift_cwl = [float(self.MetaObj.CWL[bN]) for bN in self.MetaObj.LayerBandsAssignment]
        shift_fwhm = [float(self.MetaObj.FWHM[bN]) for bN in self.MetaObj.LayerBandsAssignment]
528
529

        # exclude cirrus/oxygen band of Landsat-8/Sentinel-2
530
        shift_bbl, ref_bbl = [False] * len(shift_cwl), [False] * len(ref_cwl)  # bad band lists
531
        for GMS_obj, s_r, bbl in zip([self, ref_obj], ['shift', 'ref'], [shift_bbl, ref_bbl]):
532
            GMS_obj.GMS_identifier.logger = None  # set a dummy value in order to avoid Exception
533
534
535
536
537
            sensorcode = get_GMS_sensorcode(GMS_obj.GMS_identifier)
            if sensorcode in ['LDCM', 'S2A', 'S2B'] and '9' in GMS_obj.LayerBandsAssignment:
                bbl[GMS_obj.LayerBandsAssignment.index('9')] = True
            if sensorcode in ['S2A', 'S2B'] and '10' in GMS_obj.LayerBandsAssignment:
                bbl[GMS_obj.LayerBandsAssignment.index('10')] = True
538

539
        # cwl_overlap = (max(min(shift_cwl),min(ref_cwl)),  min(max(shift_cwl),max(ref_cwl))) # -> (min wvl, max wvl)
540
        # find matching band of reference image for each band of image to be shifted
541
542
543
544
        match_dic = collections.OrderedDict()
        for idx, cwl, fwhm in zip(range(len(shift_cwl)), shift_cwl, shift_fwhm):
            if shift_bbl[idx]:
                continue  # skip cwl if it is declared as bad band above
545
546
547

            def is_inside(r_cwl, s_cwl, s_fwhm): return s_cwl - s_fwhm / 2 < r_cwl < s_cwl + s_fwhm / 2

548
549
            matching_r_cwls = [r_cwl for i, r_cwl in enumerate(ref_cwl) if
                               is_inside(r_cwl, cwl, fwhm) and not ref_bbl[i]]
550
551
            if matching_r_cwls:
                match_dic[cwl] = matching_r_cwls[0] if len(matching_r_cwls) else \
552
                    matching_r_cwls[np.abs(np.array(matching_r_cwls) - cwl).argmin()]
553
554
555
556
557

        # set bands4 match based on the above results
        poss_cwls = [cwl for cwl in shift_cwl if cwl in match_dic]
        if poss_cwls:
            if not target_cwlPos_nm:
558
559
560
561
562
563
564
                shift_band4match = shift_cwl.index(poss_cwls[0]) + 1  # first possible shift band
                ref_band4match = ref_cwl.index(match_dic[poss_cwls[0]]) + 1  # matching reference band
            else:  # target_cwlPos is given
                closestWvl_to_target = poss_cwls[np.abs(np.array(poss_cwls) - target_cwlPos_nm).argmin()]
                shift_band4match = shift_cwl.index(closestWvl_to_target) + 1  # the shift band closest to target
                ref_band4match = ref_cwl.index(match_dic[closestWvl_to_target]) + 1  # matching ref
        else:  # all reference bands are outside of shift-cwl +- fwhm/2
565
566
            self.logger.warning('Optimal bands for matching could not be automatically determined. '
                                'Choosing first band of each image.')
567
568
            shift_band4match = 1
            ref_band4match = 1
569

570
        self.logger.info(
571
            'Target band for matching:    %s (%snm)' % (shift_band4match, shift_cwl[shift_band4match - 1]))
572
573
        self.logger.info(
            'Reference band for matching: %s (%snm)' % (ref_band4match, ref_cwl[ref_band4match - 1]))
574
575
576

        return ref_band4match, shift_band4match

577
    def compute_global_shifts(self):
578
579
580
581
582
        spatIdxSrv_status = os.environ['GMS_SPAT_IDX_SRV_STATUS'] if 'GMS_SPAT_IDX_SRV_STATUS' in os.environ else True

        if spatIdxSrv_status == 'unavailable':
            self.logger.warning('Coregistration skipped due to unavailable Spatial Index Mediator Server!"')

583
        elif CFG.skip_coreg:
584
            self.logger.warning('Coregistration skipped according to user configuration.')
585

586
        elif self.coreg_needed and self.spatRef_available:
587
588
589
            self.coreg_info.update({'reference scene ID': self.spatRef_scene.scene_ID})
            self.coreg_info.update({'reference entity ID': self.spatRef_scene.entity_ID})

590
591
            geoArr_ref = GeoArray(self.spatRef_scene.filePath)
            geoArr_shift = GeoArray(self.arr)
592
            r_b4match, s_b4match = self.get_opt_bands4matching(target_cwlPos_nm=CFG.coreg_band_wavelength_for_matching)
593
594
595
596
597
            coreg_kwargs = dict(
                r_b4match=r_b4match,
                s_b4match=s_b4match,
                align_grids=True,  # FIXME not needed here
                match_gsd=True,  # FIXME not needed here
598
                max_shift=CFG.coreg_max_shift_allowed,
Daniel Scheffler's avatar
Fix.    
Daniel Scheffler committed
599
                ws=CFG.coreg_window_size,
600
                data_corners_ref=[[x, y] for x, y in self.spatRef_scene.polyUTM.convex_hull.exterior.coords],
601
                data_corners_tgt=[transform_any_prj(EPSG2WKT(4326), self.MetaObj.projection, x, y)
602
603
604
605
606
607
608
                                  for x, y in HLP_F.reorder_CornerLonLat(self.trueDataCornerLonLat)],
                nodata=(get_outFillZeroSaturated(geoArr_ref.dtype)[0],
                        get_outFillZeroSaturated(geoArr_shift.dtype)[0]),
                ignore_errors=True,
                v=False,
                q=True
            )
609

610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
            # initialize COREG object
            try:
                COREG_obj = COREG(geoArr_ref, geoArr_shift, **coreg_kwargs)
            except Exception as e:
                COREG_obj = None
                self.logger.error('\nAn error occurred during coregistration. BE AWARE THAT THE SCENE %s '
                                  '(ENTITY ID %s) HAS NOT BEEN COREGISTERED! Error message was: \n%s\n'
                                  % (self.scene_ID, self.entity_ID, repr(e)))
                self.logger.error(traceback.format_exc())
                # TODO include that in the job summary

            # calculate_spatial_shifts
            if COREG_obj:
                COREG_obj.calculate_spatial_shifts()

                self.coreg_info.update(
                    COREG_obj.coreg_info)  # no clipping to trueCornerLonLat until here -> only shift correction
                self.coreg_info.update({'shift_reliability': COREG_obj.shift_reliability})

                if COREG_obj.success:
                    self.coreg_info['success'] = True
                    self.logger.info("Calculated map shifts (X,Y): %s / %s"
                                     % (COREG_obj.x_shift_map,
                                        COREG_obj.y_shift_map))  # FIXME direkt in calculate_spatial_shifts loggen
                    self.logger.info("Reliability of calculated shift: %.1f percent" % COREG_obj.shift_reliability)
635

636
637
638
639
                else:
                    # TODO add database entry with error hint
                    [self.logger.error('ERROR during coregistration of scene %s (entity ID %s):\n%s'
                                       % (self.scene_ID, self.entity_ID, err)) for err in COREG_obj.tracked_errors]
640

641
        else:
642
            if self.coreg_needed:
643
644
                self.logger.warning('Coregistration skipped because no suitable reference scene is available or '
                                    'spatial query failed.')
645
646
            else:
                self.logger.info('Coregistration of scene %s (entity ID %s) skipped because target dataset ID equals '
647
648
                                 'reference dataset ID.' % (self.scene_ID, self.entity_ID))

649
650
    def correct_spatial_shifts(self, cliptoextent=True, clipextent=None, clipextent_prj=None, v=False):
        # type: (bool, list, any, bool) -> None
651
        """Corrects the spatial shifts calculated by self.compute_global_shifts().
652
653
654
655
656
657
658
659
660

        :param cliptoextent:    whether to clip the output to the given extent
        :param clipextent:      list of XY-coordinate tuples giving the target extent (if not given and cliptoextent is
                                True, the 'trueDataCornerLonLat' attribute of the GMS object is used
        :param clipextent_prj:  WKT projection string or EPSG code of the projection for the coordinates in clipextent
        :param v:
        :return:
        """

661
662
        # cliptoextent is automatically True if an extent is given
        cliptoextent = cliptoextent if not clipextent else True
663

664
665
        if cliptoextent or self.resamp_needed or (self.coreg_needed and self.coreg_info['success']):

666
            # get target bounds # TODO implement boxObj call instead here
667
            if not clipextent:
668
669
                trueDataCornerUTM = [transform_any_prj(EPSG2WKT(4326), self.MetaObj.projection, x, y)
                                     for x, y in self.trueDataCornerLonLat]
670
                xmin, xmax, ymin, ymax = corner_coord_to_minmax(trueDataCornerUTM)
671
                mapBounds = box(xmin, ymin, xmax, ymax).bounds
672
673
674
675
676
677
678
679
            else:
                assert clipextent_prj, \
                    "'clipextent_prj' must be given together with 'clipextent'. Received only 'clipextent'."
                clipextent_UTM = clipextent if prj_equal(self.MetaObj.projection, clipextent_prj) else \
                    [transform_any_prj(clipextent_prj, self.MetaObj.projection, x, y) for x, y in clipextent]
                xmin, xmax, ymin, ymax = corner_coord_to_minmax(clipextent_UTM)
                mapBounds = box(xmin, ymin, xmax, ymax).bounds

680
            # correct shifts and clip to extent
681
682
            # ensure self.masks exists (does not exist in case of inmem_serialization mode because
            # then self.fill_from_disk() is skipped)
683
684
685
            if not hasattr(self, 'masks') or self.masks is None:
                self.build_combined_masks_array()  # creates self.masks and self.masks_meta

686
687
688
            # exclude self.mask_nodata, self.mask_clouds from warping
            del self.mask_nodata, self.mask_clouds

689
690
691
            attributes2deshift = [attrname for attrname in
                                  ['arr', 'masks', 'dem', 'ac_errors', 'mask_clouds_confidence']
                                  if getattr(self, '_%s' % attrname) is not None]
692
            for attrname in attributes2deshift:
693
                geoArr = getattr(self, attrname)
694
695

                # do some logging
696
697
                if self.coreg_needed:
                    if self.coreg_info['success']:
698
699
                        self.logger.info("Correcting spatial shifts for attribute '%s'..." % attrname)
                    elif cliptoextent and is_coord_grid_equal(
700
                         geoArr.gt, CFG.spatial_ref_gridx, CFG.spatial_ref_gridy):
701
                        self.logger.info("Attribute '%s' has only been clipped to it's extent because no valid "
702
703
                                         "shifts have been detected and the pixel grid equals the target grid."
                                         % attrname)
704
705
                    elif self.resamp_needed:
                        self.logger.info("Resampling attribute '%s' to target grid..." % attrname)
706
707
708
709
                elif self.resamp_needed:
                    self.logger.info("Resampling attribute '%s' to target grid..." % attrname)

                # correct shifts
710
                DS = DESHIFTER(geoArr, self.coreg_info,
711
                               target_xyGrid=[CFG.spatial_ref_gridx, CFG.spatial_ref_gridy],
712
713
714
                               cliptoextent=cliptoextent,
                               clipextent=mapBounds,
                               align_grids=True,
715
                               resamp_alg='nearest' if attrname == 'masks' else CFG.spatial_resamp_alg,
716
                               CPUs=None if CFG.allow_subMultiprocessing else 1,
717
718
719
                               progress=True if v else False,
                               q=True,
                               v=v)
720
721
722
                DS.correct_shifts()

                # update coreg_info
723
724
                if attrname == 'arr':
                    self.coreg_info['is shifted'] = DS.is_shifted
725
                    self.coreg_info['is resampled'] = DS.is_resampled
726

727
                # update geoinformations and array shape related attributes
728
729
                self.logger.info("Updating geoinformations of '%s' attribute..." % attrname)
                if attrname == 'arr':
730
731
                    self.MetaObj.map_info = DS.updated_map_info
                    self.MetaObj.projection = EPSG2WKT(WKT2EPSG(DS.updated_projection))
732
                    self.shape_fullArr = DS.arr_shifted.shape
733
                    self.MetaObj.rows, self.MetaObj.cols = DS.arr_shifted.shape[:2]
734
                else:
735
736
                    self.masks_meta['map info'] = DS.updated_map_info
                    self.masks_meta['coordinate system string'] = EPSG2WKT(WKT2EPSG(DS.updated_projection))
737
738
                    self.masks_meta['lines'], self.masks_meta['samples'] = DS.arr_shifted.shape[:2]

739
740
                    # NOTE: mask_nodata and mask_clouds are updated later by L2A_map mapper function (module pipeline)

741
                # update the GeoArray instance without loosing its inherent metadata (nodata, ...)
742
743
744
                geoArr.arr, geoArr.gt, geoArr.prj = \
                    DS.GeoArray_shifted.arr, DS.GeoArray_shifted.gt, DS.GeoArray_shifted.prj
                # setattr(self,attrname, DS.GeoArray_shifted) # NOTE: don't set array earlier because setter will also
745
                #                                             # update arr.gt/.prj/.nodata from MetaObj
746

747
            self.resamp_needed = False
748
            self.coreg_needed = False
749

750
751
            # recreate self.masks_nodata and self.mask_clouds from self.masks
            self.mask_nodata = self.mask_nodata
752
753
            self.mask_clouds = self.mask_clouds
            # FIXME move functionality of self.masks only to output writer and remove self.masks completely