L1B_P.py 42.4 KB
Newer Older
Daniel Scheffler's avatar
Daniel Scheffler committed
1
2
3
4
# -*- coding: utf-8 -*-
###############################################################################
#
#   Level 1B Processor:
5
#
Daniel Scheffler's avatar
Daniel Scheffler committed
6
7
8
9
10
11
12
13
14
15
16
17
#   Performed operations:
#   Generation of RPCs for later Orthorectification:
#   - for satellite data
#   - for DGMs (SRTM)
#   - for atmospheric layers (ozone,aerosols,water vapour)
#   => results are added to header files
#
#   Written by Daniel Scheffler
#
###############################################################################

########################### Library import ####################################
18
19
import collections
import json
20
import os
21
import socket
22
import time
23
import warnings
24
from datetime import datetime, timedelta
25
26

import numpy as np
27
from geopandas import GeoDataFrame
28
from shapely.geometry import box
Daniel Scheffler's avatar
Daniel Scheffler committed
29

30
from arosics import COREG, DESHIFTER
31
from geoarray import GeoArray
32
33
34
35
36
from py_tools_ds.geo.coord_grid       import is_coord_grid_equal
from py_tools_ds.geo.coord_calc       import corner_coord_to_minmax
from py_tools_ds.geo.coord_trafo      import reproject_shapelyGeometry, transform_any_prj
from py_tools_ds.geo.projection       import prj_equal, EPSG2WKT, WKT2EPSG
from py_tools_ds.geo.vector.topology  import get_overlap_polygon
37

38
39
40
41
42
43
from ..config                    import GMS_config       as CFG
from .L1A_P                      import L1A_object
from ..misc                      import database_tools   as DB_T
from ..misc                      import helper_functions as HLP_F
from ..misc                      import path_generator   as PG
from ..misc.SpatialIndexMediator import SpatialIndexMediator
44
from ..misc.definition_dicts     import get_GMS_sensorcode, get_outFillZeroSaturated
45

46

47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#if socket.gethostname() == 'geoms':
#    sys.path.append('/usr/lib/otb/python/')
#    os.environ['ITK_AUTOLOAD_PATH'] = "/usr/lib/otb/applications"
#    sys.path.append('/usr/lib/python2.7/dist-packages') # cv2
#    sys.path.append('/usr/local/lib/python2.7/site-packages')
#    sys.path.append('/home/gfz-fe/scheffler/python')
#if socket.gethostname() == 'mefe18':
#    sys.path.append('/usr/lib64/otb/python/')
#    os.environ['ITK_AUTOLOAD_PATH'] = "/usr/lib64/otb/applications"
#    sys.path.append('/misc/hy5/scheffler/python')
#try:   import otbApplication
#except ImportError: print('otbApplication-lib missing..')
#except SyntaxError: print('The installed otbApplication-lib throughs syntax errors.. Maybe too old?')
#try:   import cv2
#except ImportError: print('cv2-lib missing..')
62

63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# <editor-fold desc="deprecated/unused functions">
# def calculate_TiePoints(im_ref,im_rpc,distance = 200):
#     detector   = cv2.FeatureDetector_create    ("SIFT")
#     descriptor = cv2.DescriptorExtractor_create("SIFT")
#
#     TieP_ref = detector.detect(im_ref)  # gibt Liste von Keypoints aus -> type(skp[0]): cv2.KeyPoint
#     TieP_ref, Detector_ref = descriptor.compute(im_ref, TieP_ref)  # Detector = numpy-array - shape: Anzahl Keypoints x 128
#     print('%s temporary master tie points found.' %len(TieP_ref))
#     TieP_rpc = detector.detect(im_rpc)
#     TieP_rpc, Detector_rpc = descriptor.compute(im_rpc, TieP_rpc)
#     print('%s temporary slave tie points found.' %len(TieP_rpc))
#
#     flann_params = dict(algorithm=1, trees=4)
#     flann = cv2.flann_Index(Detector_ref, flann_params)
#     idx, dist = flann.knnSearch(Detector_rpc, 1, params={})
#     del flann
#     dist = dist[:,0]/2500.0
#     dist = dist.reshape(-1,).tolist()
#     idx = idx.reshape(-1).tolist()
#     indices = range(len(dist))
#     indices.sort(key=lambda i: dist[i])
#     dist = [dist[i] for i in indices]
#     idx = [idx[i] for i in indices]
#     TieP_ref_final = []
#     for i, dis in itertools.izip(idx, dist):
#         if dis < distance:
#             TieP_ref_final.append(TieP_ref[i])
#
#     flann = cv2.flann_Index(Detector_rpc, flann_params)
#     idx, dist = flann.knnSearch(Detector_ref, 1, params={})
#     del flann
#     dist = dist[:,0]/2500.0
#     dist = dist.reshape(-1,).tolist()
#     idx = idx.reshape(-1).tolist()
#     indices = range(len(dist))
#     indices.sort(key=lambda i: dist[i])
#     dist = [dist[i] for i in indices]
#     idx = [idx[i] for i in indices]
#     TieP_rpc_final = []
#     for i, dis in itertools.izip(idx, dist):
#         if dis < distance:
#             TieP_rpc_final.append(TieP_rpc[i])
#
#     return TieP_ref_final,TieP_rpc_final
#
# def Orfeo_homologous_points_extraction(im_ref,im_rpc):
#     HomologousPointsExtraction = otbApplication.Registry.CreateApplication("HomologousPointsExtraction")
#     # The following lines set all the application parameters:
# #    HomologousPointsExtraction.SetParameterString("in1", "sensor_stereo_left.tif")
# #    HomologousPointsExtraction.SetParameterString("in2", "sensor_stereo_right.tif")
#     HomologousPointsExtraction.SetParameterString("in1", im_ref)
#     HomologousPointsExtraction.SetParameterString("in2", im_rpc)
#     HomologousPointsExtraction.SetParameterString("mode","full")
#     HomologousPointsExtraction.SetParameterString("out", "homologous.txt")
#     # The following line execute the application
#     HomologousPointsExtraction.ExecuteAndWriteOutput()
#
# def generate_RPCs(RSD_L1A, DGM_L1A, masks_L1A,path_out_baseN =''):
#     ''' Generates RPC model and returns RPC points as list. '''
#     print('\n##################### Level 1B Processing #####################')
#     logging.info('Level 1B Processing started.')
#     if isinstance(RSD_L1A,np.ndarray):
#         # The following line creates an instance of the GenerateRPCSensorModel application
#         GenerateRPCSensorModel = otbApplication.Registry.CreateApplication("GenerateRPCSensorModel")
#
#         # The following lines set all the application parameters:
#         GenerateRPCSensorModel.SetParameterString("outgeom", "output.geom")
#         GenerateRPCSensorModel.SetParameterString("inpoints", "points.txt")
#         GenerateRPCSensorModel.SetParameterString("map","epsg")
#         GenerateRPCSensorModel.SetParameterInt   ("map.epsg.code", 32631)
#
#         # The following line execute the application
#         GenerateRPCSensorModel.ExecuteAndWriteOutput()
#     else:
#         logging.info('L1B-Processor accepts only numpy arrays as first input argument. Execution stopped.')
#         raise ValueError('L1B-Processor accepts only numpy arrays as first input argument.')
#
#     print('Generating dummy RPCs...')
#     logging.info('Generating dummy RPCs...')
#     list_RPCs = [0]*93
#     return list_RPCs
#
# def update_metadata(list_RPCs, L1A_meta2update, path_out_baseN =''):
#     ''' Adds RPC points to metadata of RS data, masks, atmospheric layers and DGM. '''
#     if isinstance(L1A_meta2update,dict):
#         # metadata dictionary updater
#         L1A_meta2update['rpc coeffs'] = list_RPCs
#         L1B_meta = L1A_meta2update
# #        L1B_meta = L1A_meta2update.update({'rpc coeffs':list_RPCs})  # result = None
#         return L1B_meta
#
#     elif isinstance(L1A_meta2update,str) and os.path.splitext(L1A_meta2update)[1] in ['.BSQ', '.bsq'] and os.path.isfile(os.path.splitext(L1A_meta2update)[0] + '.hdr'):
#         # header file updater
#         hdr_path                      = os.path.splitext(L1A_meta2update)[0] + '.hdr'
#         L1A_meta2update               = envi.read_envi_header(hdr_path)
#         L1A_meta2update['rpc coeffs'] = list_RPCs
#         L1B_meta                      = L1A_meta2update
#         return L1B_meta
#
#     else:
#         logging.info('L1B-Processor accepts only L1A metadata dictionaries or path-strings to L1A header files as second input argument. Execution stopped.')
#         raise ValueError('L1B-Processor accepts only L1A metadata dictionaries or path-strings to L1A header files as second input argument.')
#
# def convert_to_8bit(Inimage,In_dtype):
#     image_min,image_max = (np.min(Inimage), np.max(Inimage))
# #    lut   = np.arange(np.iinfo(In_dtype).max, dtype=In_dtype)
#     lut   = np.arange(2**16, dtype='uint16')
#     Outimage = np.array(lut, copy=True)
#     Outimage.clip(image_min, image_max, out=Outimage)
#     Outimage -= image_min
#     Outimage //= (image_max - image_min + 1) / 256.
#     lut = Outimage.astype(np.uint8)
#     return np.take(lut, Inimage)
#
# def L1B_P__main(L1A_Instances):
#     for i in L1A_Instances:
#         if i.image_type == 'RSD':
#             if i.georef == 'Master':
#                 if i.arr_shape == 'cube':
#                     im_ref = convert_to_8bit(i.arr[:,:,0],i.arr.dtype)
#                 else:
#                     im_ref = convert_to_8bit(i.arr,i.arr.dtype)
#                 print('Calling tie point calculation with master image %s' %i.entity_ID)
#             else:
#                 if i.arr_shape == 'cube':
#                     im_rpc = convert_to_8bit(i.arr[:,:,0],i.arr.dtype)
#                 else:
#                     im_rpc = convert_to_8bit(i.arr,i.arr.dtype)
#                 print('Calling tie point calculation with slave image %s' %i.entity_ID)
#     print(im_ref.shape,im_rpc.shape)
# #    Orfeo_homologous_points_extraction(im_ref,im_rpc)  # funktioniert nur von Platte
#     TieP_ref,TieP_rpc = calculate_TiePoints(im_ref,im_rpc)
#     print('%s master tie points calculated.' %len(TieP_ref))
# </editor-fold>
197
198


199
200
201
class Scene_finder(object):
    def __init__(self, src_boundsLonLat, src_AcqDate, src_prj, src_footprint_poly, min_overlap=20,
                 min_cloudcov=0, max_cloudcov=20, plusminus_days=30, plusminus_years=10):
202

203
204
205
206
207
208
209
210
211
        self.boundsLonLat       = src_boundsLonLat
        self.src_AcqDate        = src_AcqDate
        self.src_prj            = src_prj
        self.src_footprint_poly = src_footprint_poly
        self.min_overlap        = min_overlap
        self.min_cloudcov       = min_cloudcov
        self.max_cloudcov       = max_cloudcov
        self.plusminus_days     = plusminus_days
        self.plusminus_years    = plusminus_years
212

213
214
        # get temporal constraints
        add_years               = lambda dt, years: dt.replace(dt.year + years) \
215
                                            if not (dt.month==2 and dt.day==29) else dt.replace(dt.year+years,3,1)
216
217
218
        self.timeStart          = add_years(self.src_AcqDate,-plusminus_years)
        timeEnd                 = add_years(self.src_AcqDate,+plusminus_years)
        self.timeEnd            = timeEnd if timeEnd <= datetime.now() else datetime.now()
219

220
221
222
        self.possib_ref_scenes  = None           # set by self.spatial_query()
        self.GDF_ref_scenes     = GeoDataFrame() # set by self.spatial_query()
        self.ref_scene          = None
223
224


225
    def spatial_query(self, timeout=5):
226
227
        for i in range(10):
            try:
228
229
230
                SpIM = SpatialIndexMediator(timeout=timeout)
                self.possib_ref_scenes = \
                    SpIM.getFullSceneDataForDataset(self.boundsLonLat , self.timeStart, self.timeEnd, self.min_cloudcov,
231
                                                    self.max_cloudcov, CFG.usecase.datasetid_spatial_ref,
232
                                                    refDate=self.src_AcqDate, maxDaysDelta=self.plusminus_days)
233
234
235
236
237
238
                break
            except socket.timeout:
                if i<9:
                    continue
                else:
                    raise TimeoutError('Spatial query timed out 10 times!')
239
240
            # TODO catch error when index server is not running: ConnectionRefusedError: [Errno 111] Connection refused

241

242
243
244
245
246
247
248
        if self.possib_ref_scenes:
            # fill GeoDataFrame with possible ref scene parameters
            GDF = GeoDataFrame(self.possib_ref_scenes, columns=['object'])
            GDF['sceneid']         = [*GDF['object'].map(lambda scene: scene.sceneid)]
            GDF['acquisitiondate'] = [*GDF['object'].map(lambda scene: scene.acquisitiondate)]
            GDF['cloudcover']      = [*GDF['object'].map(lambda scene: scene.cloudcover)]
            GDF['polyLonLat']      = [*GDF['object'].map(lambda scene: scene.polyLonLat)]
249
            LonLat2UTM             = lambda polyLL: reproject_shapelyGeometry(polyLL, 4326, self.src_prj)
250
251
252
253
254
255
256
257
258
            GDF['polyUTM']         = [*GDF['polyLonLat'].map(LonLat2UTM)]
            self.GDF_ref_scenes    = GDF


    def filter_possib_ref_scenes(self):
        self._filter_by_overlap()
        self._filter_by_proc_status()
        self._filter_by_dataset_existance()
        self._filter_by_entity_ID_availability()
259
        self._filter_by_projection()
260

261

262
263
264
265
266
267
268
    def choose_ref_scene(self):
        """choose refence scene with minimum cloud cover and maximum overlap"""

        if not self.GDF_ref_scenes.empty:
            GDF = self.GDF_ref_scenes
            GDF = GDF[GDF['cloudcover']         == GDF['cloudcover']        .min()]
            GDF = GDF[GDF['overlap percentage'] == GDF['overlap percentage'].max()]
269

270
271
272
273
274
            if not GDF.empty:
                GDF_res = GDF.iloc[0]
                return ref_Scene(GDF_res)
        else:
            return None
275
276


277
278
279
    def _filter_by_overlap(self):
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
280
            #get overlap parameter
281
            get_OL_prms               = lambda poly: get_overlap_polygon(poly, self.src_footprint_poly)
282
283
284
285
            GDF['overlapParams']      = list(GDF['polyLonLat'].map(get_OL_prms))
            GDF['overlap area']       = list(GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap area']))
            GDF['overlap percentage'] = list(GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap percentage']))
            GDF['overlap poly']       = list(GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap poly']))
286
287
288
            del GDF['overlapParams']

            # filter scenes out that have too less overlap
289
            self.GDF_ref_scenes = GDF.loc[GDF['overlap percentage'] >= self.min_overlap]
290
291


292
293
294
295
296
    def _filter_by_proc_status(self):
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
            # get processing level of refernce scenes
            query_procL = lambda sceneID: \
297
                DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'scenes_proc', ['proc_level'], {'sceneid': sceneID})
298
299
            GDF['temp_queryRes'] = list(GDF['sceneid']      .map(query_procL))
            GDF['proc_level']    = list(GDF['temp_queryRes'].map(lambda queryRes: queryRes[0][0] if queryRes else None))
300
301
302
303
            GDF.drop('temp_queryRes',axis=1,inplace=True)

            # filter scenes out that have not yet been processed
            self.GDF_ref_scenes = GDF[GDF['proc_level'].notnull()]
304
305


306
307
308
309
310
311
312
313
314
    def _filter_by_dataset_existance(self):
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
            # get path of binary file and check if the corresponding dataset exists
            GDF = self.GDF_ref_scenes
            get_path_binary = lambda GDF_row: \
                PG.path_generator(scene_ID=GDF_row['sceneid'], proc_level=GDF_row['proc_level']).get_path_imagedata()
            check_exists = lambda path: os.path.exists(path)
            GDF['path_ref']     = GDF.apply(lambda GDF_row: get_path_binary(GDF_row), axis=1)
315
            GDF['refDs_exists'] = list(GDF['path_ref'].map(check_exists))
316

317
318
            # filter scenes out where the corresponding dataset does not exist on fileserver
            self.GDF_ref_scenes = GDF[GDF['refDs_exists'] == True]
319
320


321
322
323
324
    def _filter_by_entity_ID_availability(self):
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
            # check if a proper entity ID can be gathered from database
325
            query_eID = lambda sceneID: DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'scenes', ['entityid'],
326
                                                                        {'id': sceneID}, records2fetch=1)
327
328
            GDF['temp_queryRes'] = list(GDF['sceneid']      .map(query_eID))
            GDF['entityid']      = list(GDF['temp_queryRes'].map(lambda queryRes: queryRes[0][0] if queryRes else None))
329
            GDF.drop('temp_queryRes', axis=1, inplace=True)
330

331
332
            # filter scenes out that have no entity ID (database errors)
            self.GDF_ref_scenes = GDF[GDF['refDs_exists'] == True]
333
334


335
336
337
338
339
340
341
342
    def _filter_by_projection(self):
        GDF = self.GDF_ref_scenes
        if not GDF.empty:
            # compare projections of target and reference image
            from ..io.Input_reader import read_ENVIhdr_to_dict
            get_prj = lambda path_binary: \
                read_ENVIhdr_to_dict(os.path.splitext(path_binary)[0] + '.hdr')['coordinate system string']
            is_prj_equal     = lambda path_binary: prj_equal(self.src_prj, get_prj(path_binary))
343
            GDF['prj_equal'] = list(GDF['path_ref'].map(is_prj_equal))
344

345
346
            # filter scenes out that have a different projection
            self.GDF_ref_scenes = GDF[GDF['prj_equal'] == True]
347

348

349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425

class ref_Scene:
    def __init__(self, GDF_record):
        self.scene_ID   = int(GDF_record['sceneid'])
        self.entity_ID  = GDF_record['entityid']
        self.AcqDate    = GDF_record['acquisitiondate']
        self.cloudcover = GDF_record['cloudcover']
        self.polyLonLat = GDF_record['polyLonLat']
        self.polyUTM    = GDF_record['polyUTM']
        self.proc_level = GDF_record['proc_level']
        self.filePath   = GDF_record['path_ref']


class L1B_object(L1A_object):
    def __init__(self, L1A_obj=None):

        super().__init__()

        # set defaults
        self._spatRef_available = None
        self.spatRef_scene      = None  # set by self.get_spatial_reference_scene()
        self.coreg_info         = {}
        self.deshift_results    = collections.OrderedDict()

        if L1A_obj:
            # populate attributes
            [setattr(self, key, value) for key, value in L1A_obj.__dict__.items()]

        self.proc_level = 'L1B'


    @property
    def spatRef_available(self):
        if self._spatRef_available is None:
            return self._spatRef_available
        else:
            self.get_spatial_reference_scene()
            return self._spatRef_available


    @spatRef_available.setter
    def spatRef_available(self, spatRef_available):
        self._spatRef_available = spatRef_available


    def get_spatial_reference_scene(self):
        boundsLonLat   = corner_coord_to_minmax(self.trueDataCornerLonLat)
        footprint_poly = HLP_F.CornerLonLat_to_shapelyPoly(self.trueDataCornerLonLat)
        RSF            = Scene_finder(boundsLonLat, self.acq_datetime, self.meta_odict['coordinate system string'],
                                      footprint_poly, 20, 0, 20, 30, 10)

        # run spatial query
        self.logger.info('Querying database in order to find a suitable reference scene for co-registration.')
        RSF.spatial_query(timeout=5)
        if RSF.possib_ref_scenes:
            self.logger.info('Query result: %s reference scenes with matching metadata.' % len(RSF.possib_ref_scenes))
        else:
            self.logger.warning('Spatial query returned no matches. Coregistration impossible.')
            self.spatRef_available = False
            return None

        # filter results
        RSF.filter_possib_ref_scenes()
        if RSF.GDF_ref_scenes.empty:
            self.logger.warning('No scene fulfills all requirements to serve as spatial reference for scene %s '
                                '(entity ID %s). Coregistration impossible.' % (self.scene_ID, self.entity_ID))
            self.spatRef_available = False
            return None

        # assign spatial refernce scene
        self.spatRef_scene     = RSF.choose_ref_scene()
        self.spatRef_available = True
        self.logger.info('Found a suitable reference image for coregistration: scene ID %s (entity ID %s).'
                         % (self.spatRef_scene.scene_ID, self.spatRef_scene.entity_ID))


    def _get_reference_image_params_pgSQL(self):
426
        # TODO implement earlier version of this function as a backup for SpatialIndexMediator
427
428
        """postgreSQL query: get IDs of overlapping scenes and select most suitable scene_ID
            (with respect to DGM, cloud cover"""
429
430
        warnings.warn('_get_reference_image_params_pgSQL is deprecated an will not work anymore.', DeprecationWarning)

431
432
433
        # vorab-check anhand wolkenmaske, welche region von im2shift überhaupt für shift-corr tauglich ist
        # -> diese region als argument in postgresql abfrage
        #scene_ID            = 14536400 # LE71510322000093SGS00 im2shift
434

435
436
437
438
439
440
        # set query conditions
        min_overlap    = 20 # %
        max_cloudcov   = 20 # %
        plusminus_days = 30
        AcqDate        = self.im2shift_objDict['acquisition_date']
        date_minmax    = [AcqDate-timedelta(days=plusminus_days),AcqDate+timedelta(days=plusminus_days)]
441
        dataset_cond   = 'datasetid=%s' %CFG.usecase.datasetid_spatial_ref
442
443
444
445
446
447
        cloudcov_cond  = 'cloudcover < %s' %max_cloudcov # FIXME noch nicht für alle scenes im proc_level METADATA verfügbar
        dayrange_cond  = "(EXTRACT(MONTH FROM scenes.acquisitiondate), EXTRACT(DAY FROM scenes.acquisitiondate)) " \
                         "BETWEEN (%s, %s) AND (%s, %s)" \
                         %(date_minmax[0].month, date_minmax[0].day, date_minmax[1].month, date_minmax[1].day)
        # TODO weitere Kriterien einbauen!

448
449
450
451
452
453
454
        query_scenes = lambda condlist: DB_T.get_overlapping_scenes_from_postgreSQLdb(
            CFG.job.conn_database,
            table               = 'scenes',
            tgt_corners_lonlat  = self.trueDataCornerLonLat,
            conditions          = condlist,
            add_cmds            = 'ORDER BY scenes.cloudcover ASC',
            timeout             = 30000)
455
456
        conds_descImportance = [dataset_cond, cloudcov_cond, dayrange_cond]

457
        self.logger.info('Querying database in order to find a suitable reference scene for co-registration.')
458

459
460
461
462
463
        count, filt_overlap_scenes = 0,[]
        while not filt_overlap_scenes:
            if count==0:
                 # search within already processed scenes
                res = DB_T.get_overlapping_scenes_from_postgreSQLdb(
464
465
466
467
468
469
                    CFG.job.conn_database,
                    tgt_corners_lonlat  = self.trueDataCornerLonLat,
                    conditions          = ['datasetid=%s' %CFG.usecase.datasetid_spatial_ref],
                    add_cmds            = 'ORDER BY scenes.cloudcover ASC',
                    timeout             = 25000) # das ist nur Ergebnis aus scenes_proc -> dort liegt nur eine referenz, wenn die szene schon bei CFG.job-Beginn in Datensatzliste drin war
                filt_overlap_scenes = self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], 20.)
470

471
            else:
472
473
474
                # search within complete scenes table using less filter criteria with each run
                # TODO: Daniels Index einbauen, sonst  bei wachsender Tabellengröße evtl. irgendwann zu langsam
                res = query_scenes(conds_descImportance)
475
                filt_overlap_scenes = self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], min_overlap)
476
477
478
479
480
481

                if len(conds_descImportance) > 1: # FIXME anderer Referenzsensor?
                    del conds_descImportance[-1]
                else:  # reduce min_overlap to 10 percent if there are overlapping scenes
                    if res:
                        min_overlap = 10
482
483
                        filt_overlap_scenes = \
                            self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], min_overlap)
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505

                    # raise warnings if no match found
                    if not filt_overlap_scenes :
                        if not res:
                            warnings.warn('No reference scene found for %s (scene ID %s). Coregistration of this scene '
                                          'failed.' %(self.baseN, self.scene_ID))
                        else:
                            warnings.warn('Reference scenes for %s (scene ID %s) have been found but none has more '
                                          'than %s percent overlap. Coregistration of this scene failed.'
                                          %(self.baseN, self.scene_ID, min_overlap))
                        break
            count+=1

        if filt_overlap_scenes:
            ref_available = False
            for count, sc in enumerate(filt_overlap_scenes):
                if count == 2: # FIXME Abbuch schon bei 3. Szene?
                    warnings.warn('No reference scene for %s (scene ID %s) available. '
                                  'Coregistration of this scene failed.' %(self.baseN, self.scene_ID))
                    break

                # start download of scene data not available and start L1A processing
506
                dl_cmd = lambda scene_ID: \
507
                    print('%s %s %s' %(CFG.job.java_commands['keyword'].strip(), # FIXME CFG.job.java_commands is deprecated
508
                                       CFG.job.java_commands["value_download"].strip(), scene_ID))
509
510

                path = PG.path_generator(scene_ID = sc['scene_ID']).get_path_imagedata()
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
511

512
513
514
515
516
517
518
519
520
                if not os.path.exists(path):
                    # add scene 2 download to scenes_jobs.missing_scenes

                    # print JAVA download command
                    t_dl_start = time.time()
                    dl_cmd(sc['scene_ID'])

                    # check if scene is downloading
                    download_start_timeout = 5 # seconds
521
                    # set timout for external processing ## DEPRECATED BECAUSE CREATION OF EXTERNAL CFG.job WITHIN CFG.job IS NOT ALLOWED
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
522
                    processing_timeout     = 5 # seconds # FIXME increase timeout if processing is really started
523
524
525
                    proc_level = None
                    while True:
                        proc_level_chk = DB_T.get_info_from_postgreSQLdb(
526
                                            CFG.job.conn_database,'scenes',['proc_level'],{'id':sc['scene_ID']})[0][0]
527
528
529
530
531
532
533
534
535
536
                        if proc_level != proc_level_chk:
                            print('Reference scene %s, current processing level: %s' %(sc['scene_ID'], proc_level_chk))
                        proc_level = proc_level_chk
                        if proc_level_chk in ['SCHEDULED', 'METADATA'] and\
                                                time.time()-t_dl_start >= download_start_timeout:
                            warnings.warn('Download of reference scene %s (entity ID %s) timed out. '
                                          'Coregistration of this scene failed.' %(self.baseN, self.scene_ID))
                            break
                        if proc_level_chk == 'L1A':
                            ref_available=True; break
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
537
538
539
540
                        elif proc_level_chk == 'DOWNLOADED' and time.time()-t_dl_start >= processing_timeout: # proc_level='DOWNLOADED' but scene has not been processed
                            warnings.warn('L1A processing of reference scene %s (entity ID %s) timed out. '
                                          'Coregistration of this scene failed.' % (self.baseN, self.scene_ID))
                            break
541
                            #DB_T.set_info_in_postgreSQLdb(CFG.job.conn_database,'scenes',
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
542
543
                             #                             {'proc_level':'METADATA'},{'id':sc['scene_ID']})

544
545
546
547
548
549
550
551
552
553
554
555
556
557
                        time.sleep(5)
                else:
                    ref_available = True

                if not ref_available:
                    continue
                else:
                    self.path_imref           = path
                    self.imref_scene_ID       = sc['scene_ID']
                    self.imref_footprint_poly = sc['scene poly']
                    self.overlap_poly         = sc['overlap poly']
                    self.overlap_percentage   = sc['overlap percentage']
                    self.overlap_area         = sc['overlap area']

558
                    query_res = DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database,'scenes',['entityid'],
559
560
561
562
                                                                {'id':self.imref_scene_ID},records2fetch=1)
                    assert query_res != [], 'No entity-ID found for scene number %s' %self.imref_scene_ID
                    self.imref_entity_ID      = query_res[0][0] # [('LC81510322013152LGN00',)]
                    break
563
        self.logger.close()
564

565

566
    def _sceneIDList_to_filt_overlap_scenes(self, sceneIDList, min_overlap):
567
568
569
570
571
572
573
574
575
        """find reference scenes that cover at least 20% of the scene with the given ID
        ONLY FIRST 50 scenes are considered"""

        #t0 = time.time()
        dict_sceneID_poly   = [{'scene_ID':ID,'scene poly':HLP_F.scene_ID_to_shapelyPolygon(ID)}
                               for ID in sceneIDList] # always returns LonLot polygons

        # get overlap polygons and their parameters. result: [{overlap poly, overlap percentage, overlap area}]
        for dic in dict_sceneID_poly: # input: dicts {scene_ID, ref_poly}
576
            dict_overlap_poly_params = get_overlap_polygon(dic['scene poly'], self.footprint_poly)
577
578
579
580
581
582
583
584
585
586
            dic.update(dict_overlap_poly_params)  # adds {overlap poly, overlap percentage, overlap area}
        #print('polygon creation time', time.time()-t0)

        # filter those scene_IDs out where overlap percentage is below 20%
        if min_overlap:
            filt_overlap_scenes = [scene for scene in dict_sceneID_poly if scene['overlap percentage'] >= min_overlap]
        else:
            filt_overlap_scenes = dict_sceneID_poly

        return filt_overlap_scenes
587

588
589
590
591
592
593
594
595

    def get_opt_bands4matching(self, target_cwlPos_nm=550, v=False):
        """Automatically determines the optimal bands used für fourier shift theorem matching

        :param target_cwlPos_nm:   the desired wavelength used for matching
        :param v:                  verbose mode
        """
        # get spectral characteristics
596
597
        shift_cwl   = [float(i) for i in self.meta_odict['wavelength']]
        shift_fwhm  = [float(i) for i in self.meta_odict['bandwidths']]
598
599
600
601
602
603
604
605
        with open(PG.path_generator(scene_ID=self.spatRef_scene.scene_ID).get_path_gmsfile()) as inF:
            ref_gmsDict = json.load(inF)
        ref_cwl     = [float(i) for i in ref_gmsDict['meta']['wavelength']]
        ref_fwhm    = [float(i) for i in ref_gmsDict['meta']['bandwidths']]

        # exclude cirrus/oxygen band of Landsat-8/Sentinel-2
        shift_bbl, ref_bbl = [False]*len(shift_cwl), [False]*len(ref_cwl) # bad band lists
        for dic,s_r in zip([self.__dict__, ref_gmsDict], ['shift', 'ref']):
606
            dic['GMS_identifier']['logger'] = None # set a dummy value in order to avoid Exception
607
            sensorcode = get_GMS_sensorcode(dic['GMS_identifier'])
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
            if sensorcode in ['LDCM','S2A','S2B']  and '9' in dic['LayerBandsAssignment']:
                locals()['%s_bbl' %s_r][dic['LayerBandsAssignment'].index('9')]  = True
            if sensorcode in ['S2A','S2B'] and '10'  in dic['LayerBandsAssignment']:
                locals()['%s_bbl' %s_r][dic['LayerBandsAssignment'].index('10')] = True

        #cwl_overlap = (max(min(shift_cwl),min(ref_cwl)),  min(max(shift_cwl),max(ref_cwl))) # -> (min wvl, max wvl)
        # find matching band of reference image for each band of image to be shifted
        match_dic   = collections.OrderedDict()
        for idx,cwl,fwhm in zip(range(len(shift_cwl)), shift_cwl,shift_fwhm):
            if shift_bbl[idx]: continue # skip cwl if it is declared as bad band above
            is_inside       = lambda r_cwl,s_cwl,s_fwhm: s_cwl-s_fwhm/2 < r_cwl < s_cwl+s_fwhm/2
            matching_r_cwls = [r_cwl for i, r_cwl in enumerate(ref_cwl) if is_inside(r_cwl,cwl,fwhm) and not ref_bbl[i]]
            if matching_r_cwls:
                match_dic[cwl] = matching_r_cwls[0] if len(matching_r_cwls) else \
                    matching_r_cwls[np.abs(np.array(matching_r_cwls)-cwl).argmin()]

        # set bands4 match based on the above results
        poss_cwls = [cwl for cwl in shift_cwl if cwl in match_dic]
        if poss_cwls:
            if not target_cwlPos_nm:
                shift_band4match = shift_cwl.index(poss_cwls[0])+1            # first possible shift band
                ref_band4match    = ref_cwl  .index(match_dic[poss_cwls[0]])+1 # matching reference band
            else: # target_cwlPos is given
                closestWvl_to_target = poss_cwls[np.abs(np.array(poss_cwls)-target_cwlPos_nm).argmin()]
                shift_band4match = shift_cwl.index(closestWvl_to_target)+1    # the shift band closest to target
                ref_band4match   = ref_cwl  .index(match_dic[closestWvl_to_target])+1 # matching ref
        else: # all reference bands are outside of shift-cwl +- fwhm/2
            warnings.warn('Optimal bands for matching could not be automatically determined. Choosing first band of'
                          'each image.')
            shift_band4match  = 1
            ref_band4match    = 1

        if v:  print('Shift band for matching:     %s (%snm)' %(shift_band4match, shift_cwl[shift_band4match-1]))
        if v:  print('Reference band for matching: %s (%snm)' %(ref_band4match,   ref_cwl  [ref_band4match-1]))

        return ref_band4match, shift_band4match


    def coregister_spatially(self):
        if self.coreg_needed and self.spatRef_available:
            geoArr_ref           = GeoArray(self.spatRef_scene.filePath)
            geoArr_shift         = GeoArray(self.arr)
            r_b4match, s_b4match = self.get_opt_bands4matching(target_cwlPos_nm=550,v=False)
            coreg_kwargs = {
                'r_b4match'        : r_b4match,
                's_b4match'        : s_b4match,
                'align_grids'      : True, # FIXME not needed here
                'match_gsd'        : True, # FIXME not needed here
656
                'data_corners_ref' : [[x,y] for x,y in self.spatRef_scene.polyUTM.convex_hull.exterior.coords],
657
658
                'data_corners_tgt' : [transform_any_prj(EPSG2WKT(4326), self.meta_odict['coordinate system string'],x,y)
                                      for x,y in HLP_F.reorder_CornerLonLat(self.trueDataCornerLonLat)],
659
660
                'nodata'           : (get_outFillZeroSaturated(geoArr_ref  .dtype)[0],
                                      get_outFillZeroSaturated(geoArr_shift.dtype)[0]),
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
                'ignore_errors'    : True,
                'v'                : False,
                'q'                : True
            }

            COREG_obj = COREG(geoArr_ref, geoArr_shift, **coreg_kwargs)
            COREG_obj.calculate_spatial_shifts()

            self.coreg_info.update(COREG_obj.coreg_info) # no clipping to trueCornerLonLat until here -> only shift correction
            self.coreg_info.update({'reference scene ID'  : self.spatRef_scene.scene_ID})
            self.coreg_info.update({'reference entity ID' : self.spatRef_scene.entity_ID})

            if COREG_obj.success:
                self.logger.info("Calculated map shifts (X,Y): %s / %s"
                                 % (COREG_obj.x_shift_map, COREG_obj.y_shift_map))  # FIXME direkt in calculate_spatial_shifts loggen
676

677
678
679
680
681
            else:
                # TODO add database entry with error hint
                [self.logger.error('ERROR during coregistration of scene %s (entity ID %s):\n%s'
                                   %(self.scene_ID,self.entity_ID,err)) for err in COREG_obj.tracked_errors]
        else:
682
            if self.coreg_needed:
683
684
                self.logger.warning('Coregistration skipped because no suitable reference scene is available or '
                                    'spatial query failed.')
685
686
687
688
            else:
                self.logger.info('Coregistration of scene %s (entity ID %s) skipped because target dataset ID equals '
                                 'reference dataset ID.' %(self.scene_ID, self.entity_ID))

689
690
            self.coreg_info.update({'corrected_shifts_px'   : {'x':0,  'y':0 }})
            self.coreg_info.update({'corrected_shifts_map'  : {'x':0,  'y':0 }})
691
            self.coreg_info.update({'original map info'     : self.meta_odict['map info']})
692
693
694
695
            self.coreg_info.update({'updated map info'      : None})
            self.coreg_info.update({'reference scene ID'    : None})
            self.coreg_info.update({'reference entity ID'   : None})
            self.coreg_info.update({'reference geotransform': None})
696
            self.coreg_info.update({'reference projection'  : self.meta_odict['coordinate system string']}) # must be the own projection in order to avoid overwriting with a wring EPSG
697
            self.coreg_info.update({'reference extent'      : {'rows':None,  'cols':None }})
698
699
            self.coreg_info.update({'reference grid'        : [list(CFG.usecase.spatial_ref_gridx),
                                                               list(CFG.usecase.spatial_ref_gridy)]})
700
            self.coreg_info.update({'success'               : True if not self.coreg_needed else False}) # False means spatRef not available
701
702


703
704
705
706
707
708
709
710
711
712
713
714
715
    def correct_spatial_shifts(self, cliptoextent=True, clipextent=None, clipextent_prj=None, v=False):
        # type: (bool, list, any, bool) -> None
        """Corrects the spatial shifts calculated by self.coregister_spatially().

        :param cliptoextent:    whether to clip the output to the given extent
        :param clipextent:      list of XY-coordinate tuples giving the target extent (if not given and cliptoextent is
                                True, the 'trueDataCornerLonLat' attribute of the GMS object is used
        :param clipextent_prj:  WKT projection string or EPSG code of the projection for the coordinates in clipextent
        :param v:
        :return:
        """

        cliptoextent = cliptoextent if not clipextent else True # cliptoextent is automatically True if an extent is given
716

717
718
719
        if cliptoextent or self.resamp_needed or (self.coreg_needed and self.coreg_info['success']):

            ## get target bounds # TODO implement boxObj call instead here
720
721
722
723
724
725
726
727
728
729
730
731
732
            if not clipextent:
                trueDataCornerUTM      = [transform_any_prj(EPSG2WKT(4326), self.MetaObj.projection, x,y)
                                            for x, y in self.trueDataCornerLonLat]
                xmin, xmax, ymin, ymax = corner_coord_to_minmax(trueDataCornerUTM)
                mapBounds              = box(xmin, ymin, xmax, ymax).bounds
            else:
                assert clipextent_prj, \
                    "'clipextent_prj' must be given together with 'clipextent'. Received only 'clipextent'."
                clipextent_UTM = clipextent if prj_equal(self.MetaObj.projection, clipextent_prj) else \
                    [transform_any_prj(clipextent_prj, self.MetaObj.projection, x, y) for x, y in clipextent]
                xmin, xmax, ymin, ymax = corner_coord_to_minmax(clipextent_UTM)
                mapBounds = box(xmin, ymin, xmax, ymax).bounds

733
734

            # correct shifts and clip to extent
735
736
737
738
            # ensure self.masks exists (does not exist in Flink mode because in that case self.fill_from_disk() is skipped)
            if not hasattr(self, 'masks') or self.masks is None:
                self.build_combined_masks_array()  # creates self.masks and self.masks_meta

739
740
741
742
743
744
745
746
747
            # exclude self.mask_nodata, self.mask_clouds from warping
            del self.mask_nodata, self.mask_clouds

            attributes2deshift = [attrname for attrname in ['arr', 'masks', 'dem', 'ac_errors', 'mask_clouds_confidence']
                                  if getattr(self,'_%s'%attrname) is not None]
            for attrname in attributes2deshift:
                geoArr  = getattr(self, attrname)

                # do some logging
748
749
750
                if self.coreg_needed:
                    if self.coreg_info['success']:
                        self.logger.info("Correcting spatial shifts for attribute '%s'..." %attrname)
751
752
                    elif cliptoextent and \
                            is_coord_grid_equal(geoArr.gt, CFG.usecase.spatial_ref_gridx, CFG.usecase.spatial_ref_gridy):
753
754
                        self.logger.info("Attribute '%s' has only been clipped to it's extent because no valid "
                                         "shifts have been detected and the pixel grid equals the target grid." % attrname)
755
756
                    elif self.resamp_needed:
                        self.logger.info("Resampling attribute '%s' to target grid..." % attrname)
757
758
759
760
                elif self.resamp_needed:
                    self.logger.info("Resampling attribute '%s' to target grid..." % attrname)

                # correct shifts
761
762
763
764
765
766
767
768
769
770
                DS = DESHIFTER(geoArr, self.coreg_info,
                               target_xyGrid = [CFG.usecase.spatial_ref_gridx, CFG.usecase.spatial_ref_gridy],
                               cliptoextent  = cliptoextent,
                               clipextent    = mapBounds,
                               align_grids   = True,
                               resamp_alg    = 'nearest' if attrname=='masks' else 'cubic',
                               CPUs          = None if CFG.job.allow_subMultiprocessing else 1,
                               progress      = True if v else False,
                               q             = True,
                               v             = v)
771
772
773
774
775
776
                DS.correct_shifts()

                # update coreg_info
                if attrname=='arr':
                    self.coreg_info['is shifted']   = DS.is_shifted
                    self.coreg_info['is resampled'] = DS.is_resampled
777

778
779
                # update geoinformations and array shape related attributes
                self.logger.info("Updating geoinformations of '%s' attribute..." %attrname)
780
                if attrname=='arr':
781
782
783
784
785
786
787
788
789
                    self.meta_odict['map info']                 = DS.updated_map_info
                    self.meta_odict['coordinate system string'] = EPSG2WKT(WKT2EPSG(DS.updated_projection))
                    self.shape_fullArr                          = DS.arr_shifted.shape
                    self.meta_odict['lines'], self.meta_odict['samples'] = DS.arr_shifted.shape[:2]
                else:
                    self.masks_meta['map info']                          = DS.updated_map_info
                    self.masks_meta['coordinate system string']          = EPSG2WKT(WKT2EPSG(DS.updated_projection))
                    self.masks_meta['lines'], self.masks_meta['samples'] = DS.arr_shifted.shape[:2]

790
791
                    # NOTE: mask_nodata and mask_clouds are updated later by L2A_map mapper function (module pipeline)

792
793
794
795
796
                # update the GeoArray instance without loosing its inherent metadata (nodata, ...)
                geoArr.arr, geoArr.gt, geoArr.prj = DS.GeoArray_shifted.arr, DS.GeoArray_shifted.gt, DS.GeoArray_shifted.prj
                #setattr(self,attrname, DS.GeoArray_shifted) # NOTE: don't set array earlier because setter will also
                #                                            # update arr.gt/.prj/.nodata from meta_odict

797
798
            self.resamp_needed = False
            self.coreg_needed  = False
799

800
801
802
            # recreate self.masks_nodata and self.mask_clouds from self.masks
            self.mask_nodata = self.mask_nodata
            self.mask_clouds = self.mask_clouds # FIXME move functionality of self.masks only to output writer and remove self.masks completely
803

804

805
806
807
808
809
810
811