L1B_P.py 41.6 KB
Newer Older
Daniel Scheffler's avatar
Daniel Scheffler committed
1
# -*- coding: utf-8 -*-
2
3
4
5
6
7
8
9
"""
Level 1B Processor:

Detection of global/local geometric displacements.

Written by Daniel Scheffler
"""

Daniel Scheffler's avatar
Daniel Scheffler committed
10

11
12
import collections
import json
13
import os
14
import socket
15
import time
16
import warnings
17
from datetime import datetime, timedelta
18
19

import numpy as np
20
from geopandas import GeoDataFrame
21
from shapely.geometry import box
Daniel Scheffler's avatar
Daniel Scheffler committed
22

23
from arosics import COREG, DESHIFTER
24
from geoarray import GeoArray
25
26
27
28
29
30
31
32
33
34
35
from py_tools_ds.geo.coord_grid import is_coord_grid_equal
from py_tools_ds.geo.coord_calc import corner_coord_to_minmax
from py_tools_ds.geo.coord_trafo import reproject_shapelyGeometry, transform_any_prj
from py_tools_ds.geo.projection import prj_equal, EPSG2WKT, WKT2EPSG
from py_tools_ds.geo.vector.topology import get_overlap_polygon

from ..config import GMS_config as CFG
from .L1A_P import L1A_object
from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc import path_generator as PG
36
from ..misc.SpatialIndexMediator import SpatialIndexMediator
37
from ..misc.definition_dicts import get_GMS_sensorcode, get_outFillZeroSaturated
38

39

40
# if socket.gethostname() == 'geoms':
41
42
43
44
45
#    sys.path.append('/usr/lib/otb/python/')
#    os.environ['ITK_AUTOLOAD_PATH'] = "/usr/lib/otb/applications"
#    sys.path.append('/usr/lib/python2.7/dist-packages') # cv2
#    sys.path.append('/usr/local/lib/python2.7/site-packages')
#    sys.path.append('/home/gfz-fe/scheffler/python')
46
# if socket.gethostname() == 'mefe18':
47
48
49
#    sys.path.append('/usr/lib64/otb/python/')
#    os.environ['ITK_AUTOLOAD_PATH'] = "/usr/lib64/otb/applications"
#    sys.path.append('/misc/hy5/scheffler/python')
50
51
52
53
54
# try:   import otbApplication
# except ImportError: print('otbApplication-lib missing..')
# except SyntaxError: print('The installed otbApplication-lib throughs syntax errors.. Maybe too old?')
# try:   import cv2
# except ImportError: print('cv2-lib missing..')
55

56
57
58
59
60
61
# <editor-fold desc="deprecated/unused functions">
# def calculate_TiePoints(im_ref,im_rpc,distance = 200):
#     detector   = cv2.FeatureDetector_create    ("SIFT")
#     descriptor = cv2.DescriptorExtractor_create("SIFT")
#
#     TieP_ref = detector.detect(im_ref)  # gibt Liste von Keypoints aus -> type(skp[0]): cv2.KeyPoint
62
63
#     # Detector = numpy-array - shape: Anzahl Keypoints x 128
#     TieP_ref, Detector_ref = descriptor.compute(im_ref, TieP_ref)
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#     print('%s temporary master tie points found.' %len(TieP_ref))
#     TieP_rpc = detector.detect(im_rpc)
#     TieP_rpc, Detector_rpc = descriptor.compute(im_rpc, TieP_rpc)
#     print('%s temporary slave tie points found.' %len(TieP_rpc))
#
#     flann_params = dict(algorithm=1, trees=4)
#     flann = cv2.flann_Index(Detector_ref, flann_params)
#     idx, dist = flann.knnSearch(Detector_rpc, 1, params={})
#     del flann
#     dist = dist[:,0]/2500.0
#     dist = dist.reshape(-1,).tolist()
#     idx = idx.reshape(-1).tolist()
#     indices = range(len(dist))
#     indices.sort(key=lambda i: dist[i])
#     dist = [dist[i] for i in indices]
#     idx = [idx[i] for i in indices]
#     TieP_ref_final = []
#     for i, dis in itertools.izip(idx, dist):
#         if dis < distance:
#             TieP_ref_final.append(TieP_ref[i])
#
#     flann = cv2.flann_Index(Detector_rpc, flann_params)
#     idx, dist = flann.knnSearch(Detector_ref, 1, params={})
#     del flann
#     dist = dist[:,0]/2500.0
#     dist = dist.reshape(-1,).tolist()
#     idx = idx.reshape(-1).tolist()
#     indices = range(len(dist))
#     indices.sort(key=lambda i: dist[i])
#     dist = [dist[i] for i in indices]
#     idx = [idx[i] for i in indices]
#     TieP_rpc_final = []
#     for i, dis in itertools.izip(idx, dist):
#         if dis < distance:
#             TieP_rpc_final.append(TieP_rpc[i])
#
#     return TieP_ref_final,TieP_rpc_final
#
# def Orfeo_homologous_points_extraction(im_ref,im_rpc):
#     HomologousPointsExtraction = otbApplication.Registry.CreateApplication("HomologousPointsExtraction")
#     # The following lines set all the application parameters:
# #    HomologousPointsExtraction.SetParameterString("in1", "sensor_stereo_left.tif")
# #    HomologousPointsExtraction.SetParameterString("in2", "sensor_stereo_right.tif")
#     HomologousPointsExtraction.SetParameterString("in1", im_ref)
#     HomologousPointsExtraction.SetParameterString("in2", im_rpc)
#     HomologousPointsExtraction.SetParameterString("mode","full")
#     HomologousPointsExtraction.SetParameterString("out", "homologous.txt")
#     # The following line execute the application
#     HomologousPointsExtraction.ExecuteAndWriteOutput()
#
# def generate_RPCs(RSD_L1A, DGM_L1A, masks_L1A,path_out_baseN =''):
#     ''' Generates RPC model and returns RPC points as list. '''
#     print('\n##################### Level 1B Processing #####################')
#     logging.info('Level 1B Processing started.')
#     if isinstance(RSD_L1A,np.ndarray):
#         # The following line creates an instance of the GenerateRPCSensorModel application
#         GenerateRPCSensorModel = otbApplication.Registry.CreateApplication("GenerateRPCSensorModel")
#
#         # The following lines set all the application parameters:
#         GenerateRPCSensorModel.SetParameterString("outgeom", "output.geom")
#         GenerateRPCSensorModel.SetParameterString("inpoints", "points.txt")
#         GenerateRPCSensorModel.SetParameterString("map","epsg")
#         GenerateRPCSensorModel.SetParameterInt   ("map.epsg.code", 32631)
#
#         # The following line execute the application
#         GenerateRPCSensorModel.ExecuteAndWriteOutput()
#     else:
#         logging.info('L1B-Processor accepts only numpy arrays as first input argument. Execution stopped.')
#         raise ValueError('L1B-Processor accepts only numpy arrays as first input argument.')
#
#     print('Generating dummy RPCs...')
#     logging.info('Generating dummy RPCs...')
#     list_RPCs = [0]*93
#     return list_RPCs
#
# def update_metadata(list_RPCs, L1A_meta2update, path_out_baseN =''):
#     ''' Adds RPC points to metadata of RS data, masks, atmospheric layers and DGM. '''
#     if isinstance(L1A_meta2update,dict):
#         # metadata dictionary updater
#         L1A_meta2update['rpc coeffs'] = list_RPCs
#         L1B_meta = L1A_meta2update
# #        L1B_meta = L1A_meta2update.update({'rpc coeffs':list_RPCs})  # result = None
#         return L1B_meta
#
148
149
#     elif isinstance(L1A_meta2update,str) and os.path.splitext(L1A_meta2update)[1] in ['.BSQ', '.bsq'] and \
#          os.path.isfile(os.path.splitext(L1A_meta2update)[0] + '.hdr'):
150
151
152
153
154
155
156
157
#         # header file updater
#         hdr_path                      = os.path.splitext(L1A_meta2update)[0] + '.hdr'
#         L1A_meta2update               = envi.read_envi_header(hdr_path)
#         L1A_meta2update['rpc coeffs'] = list_RPCs
#         L1B_meta                      = L1A_meta2update
#         return L1B_meta
#
#     else:
158
159
160
161
#         logging.info('L1B-Processor accepts only L1A metadata dictionaries or path-strings to L1A header files as '
#                      'second input argument. Execution stopped.')
#         raise ValueError('L1B-Processor accepts only L1A metadata dictionaries or path-strings to L1A header files '
#                          'as second input argument.')
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
#
# def convert_to_8bit(Inimage,In_dtype):
#     image_min,image_max = (np.min(Inimage), np.max(Inimage))
# #    lut   = np.arange(np.iinfo(In_dtype).max, dtype=In_dtype)
#     lut   = np.arange(2**16, dtype='uint16')
#     Outimage = np.array(lut, copy=True)
#     Outimage.clip(image_min, image_max, out=Outimage)
#     Outimage -= image_min
#     Outimage //= (image_max - image_min + 1) / 256.
#     lut = Outimage.astype(np.uint8)
#     return np.take(lut, Inimage)
#
# def L1B_P__main(L1A_Instances):
#     for i in L1A_Instances:
#         if i.image_type == 'RSD':
#             if i.georef == 'Master':
#                 if i.arr_shape == 'cube':
#                     im_ref = convert_to_8bit(i.arr[:,:,0],i.arr.dtype)
#                 else:
#                     im_ref = convert_to_8bit(i.arr,i.arr.dtype)
#                 print('Calling tie point calculation with master image %s' %i.entity_ID)
#             else:
#                 if i.arr_shape == 'cube':
#                     im_rpc = convert_to_8bit(i.arr[:,:,0],i.arr.dtype)
#                 else:
#                     im_rpc = convert_to_8bit(i.arr,i.arr.dtype)
#                 print('Calling tie point calculation with slave image %s' %i.entity_ID)
#     print(im_ref.shape,im_rpc.shape)
# #    Orfeo_homologous_points_extraction(im_ref,im_rpc)  # funktioniert nur von Platte
#     TieP_ref,TieP_rpc = calculate_TiePoints(im_ref,im_rpc)
#     print('%s master tie points calculated.' %len(TieP_ref))
# </editor-fold>
194
195


196
197
198
class Scene_finder(object):
    def __init__(self, src_boundsLonLat, src_AcqDate, src_prj, src_footprint_poly, min_overlap=20,
                 min_cloudcov=0, max_cloudcov=20, plusminus_days=30, plusminus_years=10):
199

200
201
202
        self.boundsLonLat = src_boundsLonLat
        self.src_AcqDate = src_AcqDate
        self.src_prj = src_prj
203
        self.src_footprint_poly = src_footprint_poly
204
205
206
207
208
        self.min_overlap = min_overlap
        self.min_cloudcov = min_cloudcov
        self.max_cloudcov = max_cloudcov
        self.plusminus_days = plusminus_days
        self.plusminus_years = plusminus_years
209

210
        # get temporal constraints
211
212
213
214
215
        add_years = lambda dt, years: dt.replace(dt.year + years) \
            if not (dt.month == 2 and dt.day == 29) else dt.replace(dt.year + years, 3, 1)
        self.timeStart = add_years(self.src_AcqDate, -plusminus_years)
        timeEnd = add_years(self.src_AcqDate, +plusminus_years)
        self.timeEnd = timeEnd if timeEnd <= datetime.now() else datetime.now()
216

217
218
219
        self.possib_ref_scenes = None  # set by self.spatial_query()
        self.GDF_ref_scenes = GeoDataFrame()  # set by self.spatial_query()
        self.ref_scene = None
220

221
    def spatial_query(self, timeout=5):
222
223
        for i in range(10):
            try:
224
225
                SpIM = SpatialIndexMediator(timeout=timeout)
                self.possib_ref_scenes = \
226
                    SpIM.getFullSceneDataForDataset(self.boundsLonLat, self.timeStart, self.timeEnd, self.min_cloudcov,
227
                                                    self.max_cloudcov, CFG.usecase.datasetid_spatial_ref,
228
                                                    refDate=self.src_AcqDate, maxDaysDelta=self.plusminus_days)
229
230
                break
            except socket.timeout:
231
                if i < 9:
232
233
234
                    continue
                else:
                    raise TimeoutError('Spatial query timed out 10 times!')
235
236
                    # TODO catch error when index server is not running:
                    # TODO ConnectionRefusedError: [Errno 111] Connection refused
237

238
239
240
        if self.possib_ref_scenes:
            # fill GeoDataFrame with possible ref scene parameters
            GDF = GeoDataFrame(self.possib_ref_scenes, columns=['object'])
241
242
243
244
245
246
247
            GDF['sceneid'] = list(GDF['object'].map(lambda scene: scene.sceneid))
            GDF['acquisitiondate'] = list(GDF['object'].map(lambda scene: scene.acquisitiondate))
            GDF['cloudcover'] = list(GDF['object'].map(lambda scene: scene.cloudcover))
            GDF['polyLonLat'] = list(GDF['object'].map(lambda scene: scene.polyLonLat))
            LonLat2UTM = lambda polyLL: reproject_shapelyGeometry(polyLL, 4326, self.src_prj)
            GDF['polyUTM'] = list(GDF['polyLonLat'].map(LonLat2UTM))
            self.GDF_ref_scenes = GDF
248
249
250
251
252
253

    def filter_possib_ref_scenes(self):
        self._filter_by_overlap()
        self._filter_by_proc_status()
        self._filter_by_dataset_existance()
        self._filter_by_entity_ID_availability()
254
        self._filter_by_projection()
255
256
257
258
259
260

    def choose_ref_scene(self):
        """choose refence scene with minimum cloud cover and maximum overlap"""

        if not self.GDF_ref_scenes.empty:
            GDF = self.GDF_ref_scenes
261
            GDF = GDF[GDF['cloudcover'] == GDF['cloudcover'].min()]
262
            GDF = GDF[GDF['overlap percentage'] == GDF['overlap percentage'].max()]
263

264
265
266
267
268
            if not GDF.empty:
                GDF_res = GDF.iloc[0]
                return ref_Scene(GDF_res)
        else:
            return None
269

270
271
272
    def _filter_by_overlap(self):
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
273
274
275
276
            # get overlap parameter
            get_OL_prms = lambda poly: get_overlap_polygon(poly, self.src_footprint_poly)
            GDF['overlapParams'] = list(GDF['polyLonLat'].map(get_OL_prms))
            GDF['overlap area'] = list(GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap area']))
277
            GDF['overlap percentage'] = list(GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap percentage']))
278
            GDF['overlap poly'] = list(GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap poly']))
279
280
281
            del GDF['overlapParams']

            # filter scenes out that have too less overlap
282
            self.GDF_ref_scenes = GDF.loc[GDF['overlap percentage'] >= self.min_overlap]
283

284
285
286
287
288
    def _filter_by_proc_status(self):
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
            # get processing level of refernce scenes
            query_procL = lambda sceneID: \
289
290
291
292
293
                DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'scenes_proc', ['proc_level'],
                                                {'sceneid': sceneID})
            GDF['temp_queryRes'] = list(GDF['sceneid'].map(query_procL))
            GDF['proc_level'] = list(GDF['temp_queryRes'].map(lambda queryRes: queryRes[0][0] if queryRes else None))
            GDF.drop('temp_queryRes', axis=1, inplace=True)
294
295
296

            # filter scenes out that have not yet been processed
            self.GDF_ref_scenes = GDF[GDF['proc_level'].notnull()]
297

298
299
300
301
302
303
304
305
    def _filter_by_dataset_existance(self):
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
            # get path of binary file and check if the corresponding dataset exists
            GDF = self.GDF_ref_scenes
            get_path_binary = lambda GDF_row: \
                PG.path_generator(scene_ID=GDF_row['sceneid'], proc_level=GDF_row['proc_level']).get_path_imagedata()
            check_exists = lambda path: os.path.exists(path)
306
            GDF['path_ref'] = GDF.apply(lambda GDF_row: get_path_binary(GDF_row), axis=1)
307
            GDF['refDs_exists'] = list(GDF['path_ref'].map(check_exists))
308

309
310
            # filter scenes out where the corresponding dataset does not exist on fileserver
            self.GDF_ref_scenes = GDF[GDF['refDs_exists'] == True]
311

312
313
314
315
    def _filter_by_entity_ID_availability(self):
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
            # check if a proper entity ID can be gathered from database
316
            query_eID = lambda sceneID: DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'scenes', ['entityid'],
317
                                                                        {'id': sceneID}, records2fetch=1)
318
319
            GDF['temp_queryRes'] = list(GDF['sceneid'].map(query_eID))
            GDF['entityid'] = list(GDF['temp_queryRes'].map(lambda queryRes: queryRes[0][0] if queryRes else None))
320
            GDF.drop('temp_queryRes', axis=1, inplace=True)
321

322
323
            # filter scenes out that have no entity ID (database errors)
            self.GDF_ref_scenes = GDF[GDF['refDs_exists'] == True]
324

325
326
327
328
329
330
331
    def _filter_by_projection(self):
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
            # compare projections of target and reference image
            from ..io.Input_reader import read_ENVIhdr_to_dict
            get_prj = lambda path_binary: \
                read_ENVIhdr_to_dict(os.path.splitext(path_binary)[0] + '.hdr')['coordinate system string']
332
            is_prj_equal = lambda path_binary: prj_equal(self.src_prj, get_prj(path_binary))
333
            GDF['prj_equal'] = list(GDF['path_ref'].map(is_prj_equal))
334

335
336
            # filter scenes out that have a different projection
            self.GDF_ref_scenes = GDF[GDF['prj_equal'] == True]
337

338

339
340
class ref_Scene:
    def __init__(self, GDF_record):
341
342
343
        self.scene_ID = int(GDF_record['sceneid'])
        self.entity_ID = GDF_record['entityid']
        self.AcqDate = GDF_record['acquisitiondate']
344
345
        self.cloudcover = GDF_record['cloudcover']
        self.polyLonLat = GDF_record['polyLonLat']
346
        self.polyUTM = GDF_record['polyUTM']
347
        self.proc_level = GDF_record['proc_level']
348
        self.filePath = GDF_record['path_ref']
349
350
351
352
353


class L1B_object(L1A_object):
    def __init__(self, L1A_obj=None):

354
        super(L1B_object, self).__init__()
355
356
357

        # set defaults
        self._spatRef_available = None
358
359
360
        self.spatRef_scene = None  # set by self.get_spatial_reference_scene()
        self.coreg_info = {}
        self.deshift_results = collections.OrderedDict()
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380

        if L1A_obj:
            # populate attributes
            [setattr(self, key, value) for key, value in L1A_obj.__dict__.items()]

        self.proc_level = 'L1B'

    @property
    def spatRef_available(self):
        if self._spatRef_available is None:
            return self._spatRef_available
        else:
            self.get_spatial_reference_scene()
            return self._spatRef_available

    @spatRef_available.setter
    def spatRef_available(self, spatRef_available):
        self._spatRef_available = spatRef_available

    def get_spatial_reference_scene(self):
381
        boundsLonLat = corner_coord_to_minmax(self.trueDataCornerLonLat)
382
        footprint_poly = HLP_F.CornerLonLat_to_shapelyPoly(self.trueDataCornerLonLat)
383
384
        RSF = Scene_finder(boundsLonLat, self.acq_datetime, self.meta_odict['coordinate system string'],
                           footprint_poly, 20, 0, 20, 30, 10)
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404

        # run spatial query
        self.logger.info('Querying database in order to find a suitable reference scene for co-registration.')
        RSF.spatial_query(timeout=5)
        if RSF.possib_ref_scenes:
            self.logger.info('Query result: %s reference scenes with matching metadata.' % len(RSF.possib_ref_scenes))
        else:
            self.logger.warning('Spatial query returned no matches. Coregistration impossible.')
            self.spatRef_available = False
            return None

        # filter results
        RSF.filter_possib_ref_scenes()
        if RSF.GDF_ref_scenes.empty:
            self.logger.warning('No scene fulfills all requirements to serve as spatial reference for scene %s '
                                '(entity ID %s). Coregistration impossible.' % (self.scene_ID, self.entity_ID))
            self.spatRef_available = False
            return None

        # assign spatial refernce scene
405
        self.spatRef_scene = RSF.choose_ref_scene()
406
407
408
409
410
        self.spatRef_available = True
        self.logger.info('Found a suitable reference image for coregistration: scene ID %s (entity ID %s).'
                         % (self.spatRef_scene.scene_ID, self.spatRef_scene.entity_ID))

    def _get_reference_image_params_pgSQL(self):
411
        # TODO implement earlier version of this function as a backup for SpatialIndexMediator
412
413
        """postgreSQL query: get IDs of overlapping scenes and select most suitable scene_ID
            (with respect to DGM, cloud cover"""
414
415
        warnings.warn('_get_reference_image_params_pgSQL is deprecated an will not work anymore.', DeprecationWarning)

416
417
        # vorab-check anhand wolkenmaske, welche region von im2shift überhaupt für shift-corr tauglich ist
        # -> diese region als argument in postgresql abfrage
418
        # scene_ID            = 14536400 # LE71510322000093SGS00 im2shift
419

420
        # set query conditions
421
422
        min_overlap = 20  # %
        max_cloudcov = 20  # %
423
        plusminus_days = 30
424
425
426
427
428
429
430
431
        AcqDate = self.im2shift_objDict['acquisition_date']
        date_minmax = [AcqDate - timedelta(days=plusminus_days), AcqDate + timedelta(days=plusminus_days)]
        dataset_cond = 'datasetid=%s' % CFG.usecase.datasetid_spatial_ref
        cloudcov_cond = 'cloudcover < %s' % max_cloudcov
        # FIXME cloudcover noch nicht für alle scenes im proc_level METADATA verfügbar
        dayrange_cond = "(EXTRACT(MONTH FROM scenes.acquisitiondate), EXTRACT(DAY FROM scenes.acquisitiondate)) " \
                        "BETWEEN (%s, %s) AND (%s, %s)" \
                        % (date_minmax[0].month, date_minmax[0].day, date_minmax[1].month, date_minmax[1].day)
432
433
        # TODO weitere Kriterien einbauen!

434
435
        query_scenes = lambda condlist: DB_T.get_overlapping_scenes_from_postgreSQLdb(
            CFG.job.conn_database,
436
437
438
439
440
            table='scenes',
            tgt_corners_lonlat=self.trueDataCornerLonLat,
            conditions=condlist,
            add_cmds='ORDER BY scenes.cloudcover ASC',
            timeout=30000)
441
442
        conds_descImportance = [dataset_cond, cloudcov_cond, dayrange_cond]

443
        self.logger.info('Querying database in order to find a suitable reference scene for co-registration.')
444

445
        count, filt_overlap_scenes = 0, []
446
        while not filt_overlap_scenes:
447
448
449
450
            if count == 0:
                # search within already processed scenes
                # das ist nur Ergebnis aus scenes_proc
                # -> dort liegt nur eine referenz, wenn die szene schon bei CFG.job-Beginn in Datensatzliste drin war
451
                res = DB_T.get_overlapping_scenes_from_postgreSQLdb(
452
                    CFG.job.conn_database,
453
454
455
456
                    tgt_corners_lonlat=self.trueDataCornerLonLat,
                    conditions=['datasetid=%s' % CFG.usecase.datasetid_spatial_ref],
                    add_cmds='ORDER BY scenes.cloudcover ASC',
                    timeout=25000)
457
                filt_overlap_scenes = self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], 20.)
458

459
            else:
460
461
462
                # search within complete scenes table using less filter criteria with each run
                # TODO: Daniels Index einbauen, sonst  bei wachsender Tabellengröße evtl. irgendwann zu langsam
                res = query_scenes(conds_descImportance)
463
                filt_overlap_scenes = self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], min_overlap)
464

465
                if len(conds_descImportance) > 1:  # FIXME anderer Referenzsensor?
466
467
468
469
                    del conds_descImportance[-1]
                else:  # reduce min_overlap to 10 percent if there are overlapping scenes
                    if res:
                        min_overlap = 10
470
471
                        filt_overlap_scenes = \
                            self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], min_overlap)
472
473

                    # raise warnings if no match found
474
                    if not filt_overlap_scenes:
475
476
                        if not res:
                            warnings.warn('No reference scene found for %s (scene ID %s). Coregistration of this scene '
477
                                          'failed.' % (self.baseN, self.scene_ID))
478
479
480
                        else:
                            warnings.warn('Reference scenes for %s (scene ID %s) have been found but none has more '
                                          'than %s percent overlap. Coregistration of this scene failed.'
481
                                          % (self.baseN, self.scene_ID, min_overlap))
482
                        break
483
            count += 1
484
485
486
487

        if filt_overlap_scenes:
            ref_available = False
            for count, sc in enumerate(filt_overlap_scenes):
488
                if count == 2:  # FIXME Abbuch schon bei 3. Szene?
489
                    warnings.warn('No reference scene for %s (scene ID %s) available. '
490
                                  'Coregistration of this scene failed.' % (self.baseN, self.scene_ID))
491
492
493
                    break

                # start download of scene data not available and start L1A processing
494
495
496
                dl_cmd = lambda scene_ID: print('%s %s %s' % (
                    CFG.job.java_commands['keyword'].strip(),  # FIXME CFG.job.java_commands is deprecated
                    CFG.job.java_commands["value_download"].strip(), scene_ID))
497

498
                path = PG.path_generator(scene_ID=sc['scene_ID']).get_path_imagedata()
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
499

500
501
502
503
504
505
506
507
                if not os.path.exists(path):
                    # add scene 2 download to scenes_jobs.missing_scenes

                    # print JAVA download command
                    t_dl_start = time.time()
                    dl_cmd(sc['scene_ID'])

                    # check if scene is downloading
508
509
510
511
                    download_start_timeout = 5  # seconds
                    # set timout for external processing
                    # -> DEPRECATED BECAUSE CREATION OF EXTERNAL CFG.job WITHIN CFG.job IS NOT ALLOWED
                    processing_timeout = 5  # seconds # FIXME increase timeout if processing is really started
512
513
514
                    proc_level = None
                    while True:
                        proc_level_chk = DB_T.get_info_from_postgreSQLdb(
515
                            CFG.job.conn_database, 'scenes', ['proc_level'], {'id': sc['scene_ID']})[0][0]
516
                        if proc_level != proc_level_chk:
517
                            print('Reference scene %s, current processing level: %s' % (sc['scene_ID'], proc_level_chk))
518
                        proc_level = proc_level_chk
519
520
                        if proc_level_chk in ['SCHEDULED', 'METADATA'] and \
                           time.time() - t_dl_start >= download_start_timeout:
521
                            warnings.warn('Download of reference scene %s (entity ID %s) timed out. '
522
                                          'Coregistration of this scene failed.' % (self.baseN, self.scene_ID))
523
524
                            break
                        if proc_level_chk == 'L1A':
525
526
527
528
                            ref_available = True
                            break
                        elif proc_level_chk == 'DOWNLOADED' and time.time() - t_dl_start >= processing_timeout:
                            # proc_level='DOWNLOADED' but scene has not been processed
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
529
530
531
                            warnings.warn('L1A processing of reference scene %s (entity ID %s) timed out. '
                                          'Coregistration of this scene failed.' % (self.baseN, self.scene_ID))
                            break
532
533
                            # DB_T.set_info_in_postgreSQLdb(CFG.job.conn_database,'scenes',
                            #                             {'proc_level':'METADATA'},{'id':sc['scene_ID']})
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
534

535
536
537
538
539
540
541
                        time.sleep(5)
                else:
                    ref_available = True

                if not ref_available:
                    continue
                else:
542
543
                    self.path_imref = path
                    self.imref_scene_ID = sc['scene_ID']
544
                    self.imref_footprint_poly = sc['scene poly']
545
546
547
548
549
550
551
552
                    self.overlap_poly = sc['overlap poly']
                    self.overlap_percentage = sc['overlap percentage']
                    self.overlap_area = sc['overlap area']

                    query_res = DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'scenes', ['entityid'],
                                                                {'id': self.imref_scene_ID}, records2fetch=1)
                    assert query_res != [], 'No entity-ID found for scene number %s' % self.imref_scene_ID
                    self.imref_entity_ID = query_res[0][0]  # [('LC81510322013152LGN00',)]
553
                    break
554
        self.logger.close()
555

556
    def _sceneIDList_to_filt_overlap_scenes(self, sceneIDList, min_overlap):
557
558
559
        """find reference scenes that cover at least 20% of the scene with the given ID
        ONLY FIRST 50 scenes are considered"""

560
561
562
        # t0 = time.time()
        dict_sceneID_poly = [{'scene_ID': ID, 'scene poly': HLP_F.scene_ID_to_shapelyPolygon(ID)}
                             for ID in sceneIDList]  # always returns LonLot polygons
563
564

        # get overlap polygons and their parameters. result: [{overlap poly, overlap percentage, overlap area}]
565
566
        for dic in dict_sceneID_poly:  # input: dicts {scene_ID, ref_poly}
            dict_overlap_poly_params = get_overlap_polygon(dic['scene poly'], self.arr.footprint_poly)
567
            dic.update(dict_overlap_poly_params)  # adds {overlap poly, overlap percentage, overlap area}
568
        # print('polygon creation time', time.time()-t0)
569
570
571
572
573
574
575
576

        # filter those scene_IDs out where overlap percentage is below 20%
        if min_overlap:
            filt_overlap_scenes = [scene for scene in dict_sceneID_poly if scene['overlap percentage'] >= min_overlap]
        else:
            filt_overlap_scenes = dict_sceneID_poly

        return filt_overlap_scenes
577

578
579
580
581
582
583
584
    def get_opt_bands4matching(self, target_cwlPos_nm=550, v=False):
        """Automatically determines the optimal bands used für fourier shift theorem matching

        :param target_cwlPos_nm:   the desired wavelength used for matching
        :param v:                  verbose mode
        """
        # get spectral characteristics
585
586
        shift_cwl = [float(i) for i in self.meta_odict['wavelength']]
        shift_fwhm = [float(i) for i in self.meta_odict['bandwidths']]
587
588
        with open(PG.path_generator(scene_ID=self.spatRef_scene.scene_ID).get_path_gmsfile()) as inF:
            ref_gmsDict = json.load(inF)
589
590
        ref_cwl = [float(i) for i in ref_gmsDict['meta']['wavelength']]
        ref_fwhm = [float(i) for i in ref_gmsDict['meta']['bandwidths']]
591
592

        # exclude cirrus/oxygen band of Landsat-8/Sentinel-2
593
594
595
        shift_bbl, ref_bbl = [False] * len(shift_cwl), [False] * len(ref_cwl)  # bad band lists
        for dic, s_r in zip([self.__dict__, ref_gmsDict], ['shift', 'ref']):
            dic['GMS_identifier']['logger'] = None  # set a dummy value in order to avoid Exception
596
            sensorcode = get_GMS_sensorcode(dic['GMS_identifier'])
597
598
599
600
            if sensorcode in ['LDCM', 'S2A', 'S2B'] and '9' in dic['LayerBandsAssignment']:
                locals()['%s_bbl' % s_r][dic['LayerBandsAssignment'].index('9')] = True
            if sensorcode in ['S2A', 'S2B'] and '10' in dic['LayerBandsAssignment']:
                locals()['%s_bbl' % s_r][dic['LayerBandsAssignment'].index('10')] = True
601

602
        # cwl_overlap = (max(min(shift_cwl),min(ref_cwl)),  min(max(shift_cwl),max(ref_cwl))) # -> (min wvl, max wvl)
603
        # find matching band of reference image for each band of image to be shifted
604
605
606
607
608
609
610
        match_dic = collections.OrderedDict()
        for idx, cwl, fwhm in zip(range(len(shift_cwl)), shift_cwl, shift_fwhm):
            if shift_bbl[idx]:
                continue  # skip cwl if it is declared as bad band above
            is_inside = lambda r_cwl, s_cwl, s_fwhm: s_cwl - s_fwhm / 2 < r_cwl < s_cwl + s_fwhm / 2
            matching_r_cwls = [r_cwl for i, r_cwl in enumerate(ref_cwl) if
                               is_inside(r_cwl, cwl, fwhm) and not ref_bbl[i]]
611
612
            if matching_r_cwls:
                match_dic[cwl] = matching_r_cwls[0] if len(matching_r_cwls) else \
613
                    matching_r_cwls[np.abs(np.array(matching_r_cwls) - cwl).argmin()]
614
615
616
617
618

        # set bands4 match based on the above results
        poss_cwls = [cwl for cwl in shift_cwl if cwl in match_dic]
        if poss_cwls:
            if not target_cwlPos_nm:
619
620
621
622
623
624
625
                shift_band4match = shift_cwl.index(poss_cwls[0]) + 1  # first possible shift band
                ref_band4match = ref_cwl.index(match_dic[poss_cwls[0]]) + 1  # matching reference band
            else:  # target_cwlPos is given
                closestWvl_to_target = poss_cwls[np.abs(np.array(poss_cwls) - target_cwlPos_nm).argmin()]
                shift_band4match = shift_cwl.index(closestWvl_to_target) + 1  # the shift band closest to target
                ref_band4match = ref_cwl.index(match_dic[closestWvl_to_target]) + 1  # matching ref
        else:  # all reference bands are outside of shift-cwl +- fwhm/2
626
627
            warnings.warn('Optimal bands for matching could not be automatically determined. Choosing first band of'
                          'each image.')
628
629
            shift_band4match = 1
            ref_band4match = 1
630

631
632
633
        if v:
            print('Shift band for matching:     %s (%snm)' % (shift_band4match, shift_cwl[shift_band4match - 1]))
            print('Reference band for matching: %s (%snm)' % (ref_band4match, ref_cwl[ref_band4match - 1]))
634
635
636
637
638

        return ref_band4match, shift_band4match

    def coregister_spatially(self):
        if self.coreg_needed and self.spatRef_available:
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
            geoArr_ref = GeoArray(self.spatRef_scene.filePath)
            geoArr_shift = GeoArray(self.arr)
            r_b4match, s_b4match = self.get_opt_bands4matching(target_cwlPos_nm=550, v=False)
            coreg_kwargs = dict(
                r_b4match=r_b4match,
                s_b4match=s_b4match,
                align_grids=True,  # FIXME not needed here
                match_gsd=True,  # FIXME not needed here
                data_corners_ref=[[x, y] for x, y in self.spatRef_scene.polyUTM.convex_hull.exterior.coords],
                data_corners_tgt=[transform_any_prj(EPSG2WKT(4326), self.meta_odict['coordinate system string'], x, y)
                                  for x, y in HLP_F.reorder_CornerLonLat(self.trueDataCornerLonLat)],
                nodata=(get_outFillZeroSaturated(geoArr_ref.dtype)[0],
                        get_outFillZeroSaturated(geoArr_shift.dtype)[0]),
                ignore_errors=True,
                v=False,
                q=True
            )
656
657
658
659

            COREG_obj = COREG(geoArr_ref, geoArr_shift, **coreg_kwargs)
            COREG_obj.calculate_spatial_shifts()

660
661
662
663
            self.coreg_info.update(
                COREG_obj.coreg_info)  # no clipping to trueCornerLonLat until here -> only shift correction
            self.coreg_info.update({'reference scene ID': self.spatRef_scene.scene_ID})
            self.coreg_info.update({'reference entity ID': self.spatRef_scene.entity_ID})
664
665
666

            if COREG_obj.success:
                self.logger.info("Calculated map shifts (X,Y): %s / %s"
667
668
                                 % (COREG_obj.x_shift_map,
                                    COREG_obj.y_shift_map))  # FIXME direkt in calculate_spatial_shifts loggen
669

670
671
672
            else:
                # TODO add database entry with error hint
                [self.logger.error('ERROR during coregistration of scene %s (entity ID %s):\n%s'
673
                                   % (self.scene_ID, self.entity_ID, err)) for err in COREG_obj.tracked_errors]
674
        else:
675
            if self.coreg_needed:
676
677
                self.logger.warning('Coregistration skipped because no suitable reference scene is available or '
                                    'spatial query failed.')
678
679
            else:
                self.logger.info('Coregistration of scene %s (entity ID %s) skipped because target dataset ID equals '
680
681
682
683
684
685
686
687
                                 'reference dataset ID.' % (self.scene_ID, self.entity_ID))

            self.coreg_info.update({'corrected_shifts_px': {'x': 0, 'y': 0}})
            self.coreg_info.update({'corrected_shifts_map': {'x': 0, 'y': 0}})
            self.coreg_info.update({'original map info': self.meta_odict['map info']})
            self.coreg_info.update({'updated map info': None})
            self.coreg_info.update({'reference scene ID': None})
            self.coreg_info.update({'reference entity ID': None})
688
            self.coreg_info.update({'reference geotransform': None})
689
690
691
692
693
694
695
            # reference projection must be the own projection in order to avoid overwriting with a wrong EPSG
            self.coreg_info.update({'reference projection': self.meta_odict['coordinate system string']})
            self.coreg_info.update({'reference extent': {'rows': None, 'cols': None}})
            self.coreg_info.update({'reference grid': [list(CFG.usecase.spatial_ref_gridx),
                                                       list(CFG.usecase.spatial_ref_gridy)]})
            self.coreg_info.update(
                {'success': True if not self.coreg_needed else False})  # False means spatRef not available
696

697
698
699
700
701
702
703
704
705
706
707
708
    def correct_spatial_shifts(self, cliptoextent=True, clipextent=None, clipextent_prj=None, v=False):
        # type: (bool, list, any, bool) -> None
        """Corrects the spatial shifts calculated by self.coregister_spatially().

        :param cliptoextent:    whether to clip the output to the given extent
        :param clipextent:      list of XY-coordinate tuples giving the target extent (if not given and cliptoextent is
                                True, the 'trueDataCornerLonLat' attribute of the GMS object is used
        :param clipextent_prj:  WKT projection string or EPSG code of the projection for the coordinates in clipextent
        :param v:
        :return:
        """

709
710
        # cliptoextent is automatically True if an extent is given
        cliptoextent = cliptoextent if not clipextent else True
711

712
713
        if cliptoextent or self.resamp_needed or (self.coreg_needed and self.coreg_info['success']):

714
            # get target bounds # TODO implement boxObj call instead here
715
            if not clipextent:
716
717
                trueDataCornerUTM = [transform_any_prj(EPSG2WKT(4326), self.MetaObj.projection, x, y)
                                     for x, y in self.trueDataCornerLonLat]
718
                xmin, xmax, ymin, ymax = corner_coord_to_minmax(trueDataCornerUTM)
719
                mapBounds = box(xmin, ymin, xmax, ymax).bounds
720
721
722
723
724
725
726
727
            else:
                assert clipextent_prj, \
                    "'clipextent_prj' must be given together with 'clipextent'. Received only 'clipextent'."
                clipextent_UTM = clipextent if prj_equal(self.MetaObj.projection, clipextent_prj) else \
                    [transform_any_prj(clipextent_prj, self.MetaObj.projection, x, y) for x, y in clipextent]
                xmin, xmax, ymin, ymax = corner_coord_to_minmax(clipextent_UTM)
                mapBounds = box(xmin, ymin, xmax, ymax).bounds

728
            # correct shifts and clip to extent
729
730
            # ensure self.masks exists (does not exist in Flink mode because
            # in that case self.fill_from_disk() is skipped)
731
732
733
            if not hasattr(self, 'masks') or self.masks is None:
                self.build_combined_masks_array()  # creates self.masks and self.masks_meta

734
735
736
            # exclude self.mask_nodata, self.mask_clouds from warping
            del self.mask_nodata, self.mask_clouds

737
738
739
            attributes2deshift = [attrname for attrname in
                                  ['arr', 'masks', 'dem', 'ac_errors', 'mask_clouds_confidence']
                                  if getattr(self, '_%s' % attrname) is not None]
740
            for attrname in attributes2deshift:
741
                geoArr = getattr(self, attrname)
742
743

                # do some logging
744
745
                if self.coreg_needed:
                    if self.coreg_info['success']:
746
747
748
                        self.logger.info("Correcting spatial shifts for attribute '%s'..." % attrname)
                    elif cliptoextent and is_coord_grid_equal(
                         geoArr.gt, CFG.usecase.spatial_ref_gridx, CFG.usecase.spatial_ref_gridy):
749
                        self.logger.info("Attribute '%s' has only been clipped to it's extent because no valid "
750
751
                                         "shifts have been detected and the pixel grid equals the target grid."
                                         % attrname)
752
753
                    elif self.resamp_needed:
                        self.logger.info("Resampling attribute '%s' to target grid..." % attrname)
754
755
756
757
                elif self.resamp_needed:
                    self.logger.info("Resampling attribute '%s' to target grid..." % attrname)

                # correct shifts
758
                DS = DESHIFTER(geoArr, self.coreg_info,
759
760
761
762
763
764
765
766
767
                               target_xyGrid=[CFG.usecase.spatial_ref_gridx, CFG.usecase.spatial_ref_gridy],
                               cliptoextent=cliptoextent,
                               clipextent=mapBounds,
                               align_grids=True,
                               resamp_alg='nearest' if attrname == 'masks' else 'cubic',
                               CPUs=None if CFG.job.allow_subMultiprocessing else 1,
                               progress=True if v else False,
                               q=True,
                               v=v)
768
769
770
                DS.correct_shifts()

                # update coreg_info
771
772
                if attrname == 'arr':
                    self.coreg_info['is shifted'] = DS.is_shifted
773
                    self.coreg_info['is resampled'] = DS.is_resampled
774

775
                # update geoinformations and array shape related attributes
776
777
778
                self.logger.info("Updating geoinformations of '%s' attribute..." % attrname)
                if attrname == 'arr':
                    self.meta_odict['map info'] = DS.updated_map_info
779
                    self.meta_odict['coordinate system string'] = EPSG2WKT(WKT2EPSG(DS.updated_projection))
780
                    self.shape_fullArr = DS.arr_shifted.shape
781
782
                    self.meta_odict['lines'], self.meta_odict['samples'] = DS.arr_shifted.shape[:2]
                else:
783
784
                    self.masks_meta['map info'] = DS.updated_map_info
                    self.masks_meta['coordinate system string'] = EPSG2WKT(WKT2EPSG(DS.updated_projection))
785
786
                    self.masks_meta['lines'], self.masks_meta['samples'] = DS.arr_shifted.shape[:2]

787
788
                    # NOTE: mask_nodata and mask_clouds are updated later by L2A_map mapper function (module pipeline)

789
                # update the GeoArray instance without loosing its inherent metadata (nodata, ...)
790
791
792
                geoArr.arr, geoArr.gt, geoArr.prj = \
                    DS.GeoArray_shifted.arr, DS.GeoArray_shifted.gt, DS.GeoArray_shifted.prj
                # setattr(self,attrname, DS.GeoArray_shifted) # NOTE: don't set array earlier because setter will also
793
794
                #                                            # update arr.gt/.prj/.nodata from meta_odict

795
            self.resamp_needed = False
796
            self.coreg_needed = False
797

798
799
            # recreate self.masks_nodata and self.mask_clouds from self.masks
            self.mask_nodata = self.mask_nodata
800
801
            self.mask_clouds = self.mask_clouds
            # FIXME move functionality of self.masks only to output writer and remove self.masks completely