database_tools.py 78 KB
Newer Older
1
import collections
2
import glob
3
import itertools
4
5
import os
import re
6
import shutil
7
8
import sqlite3
import sys
9
import traceback
10
11
import warnings
from datetime import datetime
Daniel Scheffler's avatar
Daniel Scheffler committed
12
from typing import Union  # noqa F401  # flake8 issue
13
14
from logging import getLogger
from urllib.request import urlretrieve
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
15

16
17
import numpy as np
import pandas as pd
Daniel Scheffler's avatar
Daniel Scheffler committed
18
from pandas.io.sql import pandasSQL_builder, SQLTable
19
import psycopg2
Daniel Scheffler's avatar
Daniel Scheffler committed
20
from shapely.wkb import loads as wkb_loads
21
from geoalchemy2.types import Geometry as GEOMETRY
22
23
24
25
from geopandas import GeoDataFrame, GeoSeries
from shapely.geometry import Polygon, box, MultiPolygon
from sqlalchemy import create_engine
from sqlalchemy.types import to_instance, TypeEngine
26

27
28
29
from py_tools_ds.compression.decompress import decompress
from py_tools_ds.processing.progress_mon import ProgressBar

30
from ..options.config import GMS_config as CFG
31
from . import path_generator as PG
32
from .definition_dicts import proc_chain
33

34
35
36
# + misc.helper_functions.cornerLonLat_to_postgreSQL_poly: (left out here in order to avoid circular dependencies)
# + misc.path_generator.get_path_metaCSV: (left out here in order to avoid circular dependencies)

37
38
__author__ = 'Daniel Scheffler'

39

40
def execute_pgSQL_query(cursor, query_command):
41
42
    """Executes a postgreSQL query catches the full error message if there is one.
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
43

Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
44
45
46
47
48
49
    try:
        cursor.execute(query_command)
    except psycopg2.ProgrammingError as e:
        raise psycopg2.ProgrammingError(e.pgerror + 'Query failed. Command was:\n%s' % query_command)


50
def get_scene_and_dataset_infos_from_postgreSQLdb(sceneid):
51
52
    # type: (int) -> collections.OrderedDict
    """Creates an OrderedDict containing further information about a given scene ID by querying the pgSQL database.
53
54
55

    :param sceneid:   <int> the GMS scene ID to get information for
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
56

57
    def query(tablename, vals2return, cond_dict, records2fetch=0):
58
        return get_info_from_postgreSQLdb(CFG.conn_database, tablename, vals2return, cond_dict, records2fetch)
59
    resultset = query('scenes', ['datasetid', 'satelliteid', 'sensorid', 'subsystemid', 'acquisitiondate', 'entityid',
60
61
62
                                 'filename'], {'id': sceneid})
    if len(resultset) == 0:
        sys.stderr.write("Scene with id %s not found. Skipping.." % sceneid)
63
64
65

    scenedata = resultset[0]
    ds = collections.OrderedDict()
66
67
68
69
70
71
72
73
74
75
76
    proc_level_tmp = query('scenes_proc', 'proc_level', {'sceneid': sceneid})
    ds.update({'proc_level': 'L0A' if proc_level_tmp == [] else proc_level_tmp[0][0]})
    ds.update({'scene_ID': sceneid})
    ds.update({'datasetid': scenedata[0]})
    ds.update({'image_type': query('datasets', 'image_type', {'id': scenedata[0]})[0][0]})
    ds.update({'satellite': query('satellites', 'name', {'id': scenedata[1]})[0][0]})
    ds.update({'sensor': query('sensors', 'name', {'id': scenedata[2]})[0][0]})
    ds.update({'subsystem': query('subsystems', 'name', {'id': scenedata[3]})[0][0] if scenedata[3] else None})
    ds.update({'acq_datetime': scenedata[4]})
    ds.update({'entity_ID': scenedata[5]})
    ds.update({'filename': scenedata[6]})
77
78
79
    return ds


80
def get_postgreSQL_value(value):
81
    # type: (any) -> str
82
83
84
    """Converts Python variable to a postgreSQL value respecting postgreSQL type casts.
    The resulting value can be directly inserted into a postgreSQL query."""

85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
    assert type(value) in [int, float, bool, str, Polygon, datetime, list, tuple] or value is None, \
        "Unsupported value type within postgreSQL matching expression. Got %s." % type(value)
    if isinstance(value, int):
        pgV = value
    elif isinstance(value, float):
        pgV = value
    elif isinstance(value, bool):
        pgV = value
    elif value is None:
        pgV = 'NULL'
    elif isinstance(value, str):
        pgV = "'%s'" % value.replace("'", "")
    elif isinstance(value, Polygon):
        pgV = "'%s'" % value.wkb_hex
    elif isinstance(value, datetime):
        pgV = "TIMESTAMP '%s'" % str(value)
    else:  # list or tuple in value
        if not value:  # empty list/tuple
103
104
105
            pgV = 'NULL'
        else:
            dTypes_in_value = list(set([type(i) for i in value]))
106
107
108
109
110
            assert len(dTypes_in_value) == 1, \
                'Mixed data types in postgreSQL matching expressions are not supported. Got %s.' % dTypes_in_value
            assert dTypes_in_value[0] in [int, str, float, np.int64]
            pgList = ",".join(["'%s'" % i if isinstance(value[0], str) else "%s" % i for i in value])
            pgV = "'{%s}'" % pgList
111
112
113
    return pgV


114
def get_postgreSQL_matchingExp(key, value):
115
    # type: (str,any) -> str
116
117
118
119
    """Converts a key/value pair to a postgreSQL matching expression in the form "column=value" respecting postgreSQL
    type casts. The resulting string can be directly inserted into a postgreSQL query.
    """
    pgVal = get_postgreSQL_value(value)
120
121
122
123
    if isinstance(pgVal, str) and pgVal.startswith("'{") and pgVal.endswith("}'"):
        return '%s in %s' % (key, pgVal.replace("'{", '(').replace("}'", ')'))  # '{1,2,3}' => (1,2,3)
    elif pgVal == 'NULL':
        return '%s is NULL' % key
124
    else:
125
        return '%s=%s' % (key, pgVal)
126

127

128
def get_info_from_postgreSQLdb(conn_params, tablename, vals2return, cond_dict=None, records2fetch=0, timeout=15000):
Daniel Scheffler's avatar
Daniel Scheffler committed
129
    # type: (str, str, Union[list, str], dict, int, int) -> Union[list, str]
130
    """Queries a postgreSQL database for the given parameters.
131

132
    :param conn_params:     <str> connection parameters as provided by CFG.conn_params
133
134
135
    :param tablename:       <str> name of the table within the database to be queried
    :param vals2return:     <list or str> a list of strings containing the column titles of the values to be returned
    :param cond_dict:       <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
136
137
                            HINT: <value> can also be a list or a tuple of elements to match, BUT note that the order
                                  of the list items is NOT respected!
138
    :param records2fetch:   <int> number of records to be fetched (default=0: fetch unlimited records)
139
140
    :param timeout:         <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
141

142
143
    if not isinstance(vals2return, list):
        vals2return = [vals2return]
144
145
    assert isinstance(records2fetch, int), "get_info_from_postgreSQLdb: Expected an integer for the argument " \
                                           "'records2return'. Got %s" % type(records2fetch)
146
    cond_dict = cond_dict if cond_dict else {}
147
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
148
    connection = psycopg2.connect(conn_params)
149
150
151
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
152
153
154
155
156
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
    cmd = "SELECT " + ','.join(vals2return) + " FROM " + tablename + " " + condition
    execute_pgSQL_query(cursor, cmd)
157

158
    records2return = cursor.fetchall() if records2fetch == 0 else [cursor.fetchone()] if records2fetch == 1 else \
159
        cursor.fetchmany(size=records2fetch)  # e.g. [('LE71950282003121EDC00',), ('LE71950282003105ASN00',)]
160
161
162
163
    cursor.close()
    connection.close()
    return records2return

164

165
def update_records_in_postgreSQLdb(conn_params, tablename, vals2update_dict, cond_dict=None, timeout=15000):
166
    # type: (str, str, dict, dict, int) -> Union[None, str]
167
    """Queries a postgreSQL database for the given parameters and updates the given columns of the query result.
168

169
    :param conn_params:       <str> connection parameters as provided by CFG.conn_params
170
171
172
173
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2update_dict:  <dict> a dictionary containing keys and values to be set in the form {'col_name':<value>}
    :param cond_dict:         <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                              HINT: <value> can also be a list or a tuple of elements to match
174
175
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
176

177
    cond_dict = cond_dict if cond_dict else {}
178
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
179
    connection = psycopg2.connect(conn_params)
180
181
182
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
183
184
185
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
186
187
    update_cond = "SET " + ', '.join(['%s=%s' % (k, get_postgreSQL_value(vals2update_dict[k]))
                                      for k in vals2update_dict.keys()])
188
    execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition)
189
    if cursor.fetchone()[0] == 0:
190
        warnings.warn("No record found fulfilling this condition: \n'%s'." % condition)
191
    else:
192
        execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + update_cond + " " + condition)
193

194
195
196
    if 'connection' in locals():
        connection.commit()
        connection.close()
197
198


199
def append_item_to_arrayCol_in_postgreSQLdb(conn_params, tablename, vals2append_dict, cond_dict=None, timeout=15000):
200
    # type: (str, str, dict, dict, int) -> Union[None, str]
201
202
203
    """Queries a postgreSQL database for the given parameters
    and appends the given value to the specified column of the query result.

204
    :param conn_params:       <str> connection parameters as provided by CFG.conn_params
205
206
207
208
209
210
211
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2append_dict:  <dict> a dictionary containing keys and value(s) to be set in the form
                              {'col_name':[<value>,<value>]}
    :param cond_dict:         <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                              HINT: <value> can also be a list or a tuple of elements to match
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
212

213
214
    assert len(vals2append_dict) == 1, 'Values can be appended to only one column at once.'
    if type(list(vals2append_dict.values())[0]) in [list, tuple]:
215
        raise NotImplementedError('Appending multiple values to one column at once is not yet supported.')
216
    cond_dict = cond_dict if cond_dict else {}
217
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
218
    connection = psycopg2.connect(conn_params)
219
220
221
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
222
223
224
225
226
227
228
229
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
    col2update = list(vals2append_dict.keys())[0]
    pgSQL_val = get_postgreSQL_value(vals2append_dict[col2update])
    pgSQL_val = pgSQL_val if type(vals2append_dict[col2update]) in [list, tuple] else '{%s}' % pgSQL_val
    append_cond = "SET %s = array_cat(%s, '%s')" % (col2update, col2update, pgSQL_val)
    execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition)
230
    if cursor.fetchone()[0] == 0:
231
        warnings.warn("No record found fulfilling this condition: \n'%s'." % condition)
232
    else:
233
234
235
236
        execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + append_cond + " " + condition + ';')
    if 'connection' in locals():
        connection.commit()
        connection.close()
237
238
239


def remove_item_from_arrayCol_in_postgreSQLdb(conn_params, tablename, vals2remove_dict, cond_dict=None, timeout=15000):
240
    # type: (str, str, dict, dict, int) -> Union[None, str]
241
242
243
    """Queries a postgreSQL database for the given parameters
    and removes the given value from the specified column of the query result.

244
    :param conn_params:       <str> connection parameters as provided by CFG.conn_params
245
246
247
248
249
250
251
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2remove_dict:  <dict> a dictionary containing keys and value(s) to be set in the form
                              {'col_name':[<value>,<value>]}
    :param cond_dict:         <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                              HINT: <value> can also be a list or a tuple of elements to match
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
252

253
254
    assert len(vals2remove_dict) == 1, 'Values can be removed from only one column at once.'
    if type(list(vals2remove_dict.values())[0]) in [list, tuple]:
255
        raise NotImplementedError('Removing multiple values from one column at once is not yet supported.')
256
    cond_dict = cond_dict if cond_dict else {}
257
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
258
    connection = psycopg2.connect(conn_params)
259
260
261
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
262
263
264
265
266
267
268
269
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
    col2update = list(vals2remove_dict.keys())[0]
    pgSQL_val = get_postgreSQL_value(vals2remove_dict[col2update])
    pgSQL_val = pgSQL_val if type(vals2remove_dict[col2update]) in [list, tuple] else '{%s}' % pgSQL_val
    remove_cond = "SET %s = array_remove(%s, '%s')" % (col2update, col2update, pgSQL_val)
    execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition)
270
    if cursor.fetchone()[0] == 0:
271
        warnings.warn("No record found fulfilling this condition: \n'%s'." % condition)
272
    else:
273
274
275
276
        execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + remove_cond + " " + condition + ';')
    if 'connection' in locals():
        connection.commit()
        connection.close()
277
278


279
280
def increment_decrement_arrayCol_in_postgreSQLdb(conn_params, tablename, col2update, idx_val2decrement=None,
                                                 idx_val2increment=None, cond_dict=None, timeout=15000):
281
    # type: (str, str, str, int, int, dict, int) -> Union[None, str]
282
283
284
    """Updates an array column of a specific postgreSQL table in the form that it increments or decrements the elements
    at a given position. HINT: The column must have values like that: [0,1,0,3,1,0]

285
    :param conn_params:         <str> connection parameters as provided by CFG.conn_params
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
    :param tablename:           <str> name of the table within the database to be update
    :param col2update:          <str> column name of the column to be updated
    :param idx_val2decrement:   <int> the index of the array element to be decremented (starts with 1)
    :param idx_val2increment:   <int> the index of the array element to be incremented (starts with 1)
    :param cond_dict:           <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                                HINT: <value> can also be a list or a tuple of elements to match
    :param timeout:             <int> allows to set a custom statement timeout (milliseconds)
    :return:
    """

    cond_dict = cond_dict if cond_dict else {}
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
    connection = psycopg2.connect(conn_params)
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""

    dec_str = '' if idx_val2decrement is None else \
307
        "%s[%s] = %s[%s]-1" % (col2update, idx_val2decrement, col2update, idx_val2decrement)
308
    inc_str = '' if idx_val2increment is None else \
309
        "%s[%s] = %s[%s]+1" % (col2update, idx_val2increment, col2update, idx_val2increment)
310
311
312

    if dec_str or inc_str:
        dec_inc_str = ','.join([dec_str, inc_str])
313
        execute_pgSQL_query(cursor, "UPDATE %s SET %s %s" % (tablename, dec_inc_str, condition))
314

315
316
317
    if 'connection' in locals():
        connection.commit()
        connection.close()
318
319


320
def create_record_in_postgreSQLdb(conn_params, tablename, vals2write_dict, timeout=15000):
321
    # type: (str, str, dict, dict, int) -> Union[int, str]
322
323
    """Creates a single new record in a postgreSQL database and pupulates its columns with the given values.

324
    :param conn_params:       <str> connection parameters as provided by CFG.conn_params
325
326
327
328
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2write_dict:   <dict> a dictionary containing keys and values to be set in the form {'col_name':<value>}
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
329

330
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
331
    connection = psycopg2.connect(conn_params)
332
333
334
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
335
    cursor = connection.cursor()
336
337
338
339

    keys, vals = zip(*[(k, str(get_postgreSQL_value(v))) for k, v in vals2write_dict.items()])

    execute_pgSQL_query(cursor, "INSERT INTO %s (%s) VALUES (%s);" % (tablename, ','.join(keys), ','.join(vals)))
340
    execute_pgSQL_query(cursor, "SELECT id FROM %s ORDER BY id DESC LIMIT 1" % tablename)
341
342
    newID = cursor.fetchone()[0]

343
344
345
    if 'connection' in locals():
        connection.commit()
        connection.close()
346
347
348
349

    return newID


350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def delete_record_in_postgreSQLdb(conn_params, tablename, record_id, timeout=15000):
    # type: (str, str, dict, dict, int) -> Union[int, str]
    """Delete a single record in a postgreSQL database.

    :param conn_params:       <str> connection parameters as provided by CFG.conn_params
    :param tablename:         <str> name of the table within the database to be updated
    :param record_id:         <dict> ID of the record to be deleted
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """

    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
    connection = psycopg2.connect(conn_params)
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
    cursor = connection.cursor()

    execute_pgSQL_query(cursor, "DELETE FROM %s WHERE id=%s;" % (tablename, record_id))
    execute_pgSQL_query(cursor, "SELECT id FROM %s WHERE id=%s" % (tablename,  record_id))

    res = cursor.fetchone()

    if 'connection' in locals():
        connection.commit()
        connection.close()

    return 'success' if res is None else 'fail'


379
def get_pgSQL_geospatial_query_cond(conn_params, table2query, geomCol2use='bounds', tgt_corners_lonlat=None,
380
                                    scene_ID=None, queryfunc='ST_Intersects', crossing_dateline_check=True):
381
    assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
382

383
    if tgt_corners_lonlat:
384
385
386
387
388
389
390
        # handle coordinates crossing the 180 degress meridian (dateline)
        # FIXME in that case the polygone has to be split at the dateline. otherwise pgSQL may yield wrong results
        if crossing_dateline_check:
            xvals = [x for x, y in tgt_corners_lonlat]
            if max(xvals) - min(xvals) > 180:
                tgt_corners_lonlat = [(x, y) if x > 0 else (x + 360, y) for x, y in tgt_corners_lonlat]

391
        from .helper_functions import cornerLonLat_to_postgreSQL_poly
392
        pGSQL_poly = cornerLonLat_to_postgreSQL_poly(tgt_corners_lonlat)
393
394
395
396
397
        src_geom = "'SRID=4326;%s'::geometry" % pGSQL_poly  # source geometry is given
        # FIXME scenes tabelle hat "geography" geoinfos -> eigener Index wird bei "geometry" nicht genutzt:
        tgt_geom = "%s.%s::geometry" % (table2query, geomCol2use)
        geocond = "%s(%s, %s)" % (queryfunc, src_geom, tgt_geom)
    else:  # scene_ID is not None:
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
398
        connection = psycopg2.connect(conn_params)
399
400
        if connection is None:
            return 'database connection fault'
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
401
        cursor = connection.cursor()
402
403
404
        cmd = "SELECT ST_AsText(bounds) FROM scenes WHERE scenes.id = %s" % scene_ID
        execute_pgSQL_query(cursor, cmd)
        res = cursor.fetchone()
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
405
406
407
        cursor.close()
        connection.close()
        if len(res):
408
            src_geom = "'SRID=4326;%s'::geometry" % res
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
409
410
411
        else:
            print('The scene with the ID %s does not exist in the scenes table.')
            return []
412
        geocond = "%s(%s, %s.%s::geometry)" % (queryfunc, src_geom, table2query, geomCol2use)
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
413
414
415
    return geocond


416
def get_overlapping_scenes_from_postgreSQLdb(conn_params, table='scenes_proc', scene_ID=None,
417
                                             tgt_corners_lonlat=None, conditions=None, add_cmds='', timeout=15000):
418
    # type: (str, str, int, list, Union[list, str], str, int) -> Union[list, str]
Daniel Scheffler's avatar
Daniel Scheffler committed
419

420
421
    """Queries the postgreSQL database in order to find those scenes of a specified reference satellite (Landsat-8 or
    Sentinel-2) that have an overlap to the given corner coordinates AND that fulfill the given conditions.
422

423
    :param conn_params:             <str> connection parameters as provided by CFG.conn_params
424
425
    :param table:                   <str> name of the table within the database to be updated
    :param scene_ID:                <int> a sceneID to get the target geographical extent from
426
427
428
                                        (needed if tgt_corners_lonlat is not provided)
    :param tgt_corners_lonlat:      <list> a list of coordinates defining the target geographical extent
                                           (needed if scene_ID is not provided)
429
430
431
432
    :param conditions:              <list> a list of additional query conditions
    :param add_cmds:                <str> additional pgSQL commands to be added to the pgSQL query
    :param timeout:                 <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
433

434
    conditions = [] if conditions is None else conditions if isinstance(conditions, list) else [conditions]
435
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
436
    connection = psycopg2.connect(conn_params)
437
438
    if connection is None:
        return 'database connection fault'
439
    datasetids = [int(d.split('=')[1].strip()) for d in conditions if d.startswith('datasetid')]
440
441
442
443
444
445
446
    datasetid = datasetids[0] if datasetids else 104  # Landsat-8
    # FIXME: use Landsat-8 instead of Sentinel-2 as long as S2 L1A_P is not working:
    datasetid = 104 if datasetid == 249 else datasetid

    if table != 'scenes_proc':
        assert datasetid, "filtdsId is needed if table is not 'scenes_proc'"
    if scene_ID is None:
447
        assert tgt_corners_lonlat, "Provide either scene_ID or tgt_corners_lonlat!"
448
    if tgt_corners_lonlat is None:
449
        assert scene_ID, "Provide either scene_ID or tgt_corners_lonlat!"
450
451
452
453
454
455
456
457
458
459

    val2get = "scenes.id" if table == 'scenes' else "%s.sceneid" % table
    # refcond  = ['scenes_proc.georef = True'] if not datasetids else ['scenes.datasetid = %s' %datasetid]
    refcond = ['scenes.datasetid = %s' % datasetid]

    geocond = [get_pgSQL_geospatial_query_cond(conn_params, table, tgt_corners_lonlat=tgt_corners_lonlat,
                                               scene_ID=scene_ID, queryfunc='ST_Intersects',
                                               crossing_dateline_check=True)]

    join = "INNER JOIN scenes ON (%s.sceneid = scenes.id) " % table if table != 'scenes' and datasetids else ''
460
    conditions = [c for c in conditions if not c.startswith('datasetid')]
461
462
463
464
    where = "WHERE %s" % " AND ".join(geocond + refcond + conditions)
    usedtbls = "scenes" if table == 'scenes' else "%s, scenes" % table if 'scenes.' in where and join == '' else table
    query = "SELECT %s FROM %s %s%s %s" % (val2get, usedtbls, join, where, add_cmds)
    cursor = connection.cursor()
465
    execute_pgSQL_query(cursor, query)
466
467
468
469
470
    records2return = cursor.fetchall()
    cursor.close()
    connection.close()
    return records2return

471

472
def get_overlapping_MGRS_tiles(conn_params, scene_ID=None, tgt_corners_lonlat=None, timeout=15000):
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
473
474
    """In contrast to pgSQL 'Overlapping' here means that both geometries share some spatial area.
    So it combines ST_Overlaps and ST_Contains."""
475
    assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
476
477
478

    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
    connection = psycopg2.connect(conn_params)
479
480
    if connection is None:
        return 'database connection fault'
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
481
482

    vals2get = ['grid100k', 'grid1mil', 'geom']
483
    # FIXME this is covered by ST_Intersects:
Daniel Scheffler's avatar
Daniel Scheffler committed
484
    # geocond1 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Overlaps',
485
    #                                            tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
Daniel Scheffler's avatar
Daniel Scheffler committed
486
487
488
489
490
    # geocond2 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Contains',
    #                                            tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
    # geocond3 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Within',
    #                                            tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
    geocond = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Intersects',
491
492
                                              tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID,
                                              crossing_dateline_check=True)
493
494
    # query = "SELECT %s FROM %s WHERE %s OR %s OR %s"
    #     % (', '.join(vals2get), 'mgrs_tiles', geocond1, geocond2, geocond3)
Daniel Scheffler's avatar
Daniel Scheffler committed
495
    query = "SELECT %s FROM %s WHERE %s" % (', '.join(vals2get), 'mgrs_tiles', geocond)
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
496
497
498
499
500
501
502
503
    cursor = connection.cursor()
    execute_pgSQL_query(cursor, query)
    records = cursor.fetchall()
    cursor.close()
    connection.close()

    GDF = GeoDataFrame(records, columns=['grid100k', 'grid1mil', 'wkb_hex'])

Daniel Scheffler's avatar
Daniel Scheffler committed
504
    GDF['shapelyPoly_LonLat'] = list(GDF['wkb_hex'].map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
505
506
    GDF['granuleid'] = GDF['grid1mil'].str.cat(GDF['grid100k'])
    return GDF[['granuleid', 'shapelyPoly_LonLat']]
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
507
508


509
510
def get_overlapping_MGRS_tiles2(conn_params, scene_ID=None, tgt_corners_lonlat=None, timeout=15000):
    assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
511
512
513

    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
    connection = psycopg2.connect(conn_params)
514
515
    if connection is None:
        return 'database connection fault'
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
516
517

    vals2get = ['granuleid', 'footprint_wgs84']
518
519
    geocond = get_pgSQL_geospatial_query_cond(conn_params, 'footprints_sentinel2_granules',
                                              geomCol2use='footprint_wgs84',
520
                                              tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
521
522
523
524
525
526
527
528
529
530
    query = "SELECT %s FROM %s WHERE %s" % (', '.join(vals2get), 'footprints_sentinel2_granules', geocond)

    cursor = connection.cursor()
    execute_pgSQL_query(cursor, query)
    records = cursor.fetchall()
    cursor.close()
    connection.close()

    GDF = GeoDataFrame(records, columns=['granuleid', 'wkb_hex'])

Daniel Scheffler's avatar
Daniel Scheffler committed
531
    GDF['shapelyPoly_LonLat'] = list(GDF['wkb_hex'].map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
532
533
534
535

    return GDF[['granuleid', 'shapelyPoly_LonLat']]


536
537
538
def get_dict_satellite_name_id(conn_params):
    # type: (str) -> dict
    """Returns a dictionary with satellite names as keys and satellite IDs as values as read from pgSQL database.
539
540
541

    :param conn_params:     <str> pgSQL database connection parameters
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
542

543
544
    res = get_info_from_postgreSQLdb(conn_params, 'satellites', ['name', 'id'])
    assert len(res) > 0, 'Error getting satellite names from postgreSQL database.'
545
    arr = np.array(res)
546
    return dict(zip(list(arr[:, 0]), list(arr[:, 1])))
547
548
549
550
551
552


def get_dict_sensor_name_id(conn_params):
    # type: (str) -> dict
    """Returns a dictionary with sensor names as keys and sensor IDs as values as read from pgSQL database.
    :param conn_params:     <str> pgSQL database connection parameters """
Daniel Scheffler's avatar
Daniel Scheffler committed
553

554
555
    res = get_info_from_postgreSQLdb(conn_params, 'sensors', ['name', 'id'])
    assert len(res) > 0, 'Error getting sensor names from postgreSQL database.'
556
    arr = np.array(res)
557
    return dict(zip(list(arr[:, 0]), list(arr[:, 1])))
558
559


560
def get_entityIDs_from_filename(conn_DB, filename):
561
562
563
    # type: (str, str) -> list
    """Returns entityID(s) for the given filename. In case of Sentinel-2 there can be more multiple entity IDs if
    multiple granules are saved in one .zip file.
564

565
566
567
    :param conn_DB:     <str> pgSQL database connection parameters
    :param filename:    <str> the filename to get the corresponding entity ID(s) for
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
568

569
    if filename[:2] in ['LE', 'LC', 'LO'] and filename.endswith('.tar.gz'):  # Landsat
570
571
        entityIDs = [filename.split('.tar.gz')[0]]
    else:
572
573
574
        print('Querying database in order to get entityIDs for %s...' % filename)
        res = get_info_from_postgreSQLdb(conn_DB, 'scenes', ['entityid'], {'filename': filename}, timeout=40000)
        entityIDs = [subres[0] for subres in res] if len(res) > 0 else []
575
576
577
578
579
580
    return entityIDs


def get_filename_by_entityID(conn_DB, entityid, satellite):
    # type: (str,str,str) -> str
    """Returns the filename for the given entity ID.
581

582
583
584
585
    :param conn_DB:     <str> pgSQL database connection parameters
    :param entityid:    <str> entity ID
    :param satellite:   <str> satellite name to which the entity ID is belonging
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
586

587
588
589
590
591
592
593
594
595
596
    if re.search('Landsat', satellite, re.I):
        filename = '%s.tar.gz' % entityid
    elif re.search('Sentinel-2', satellite, re.I):
        filename = get_info_from_postgreSQLdb(conn_DB, 'scenes', ['filename'],
                                              {'entityid': entityid}, records2fetch=1)[0][0]
    else:
        raise NotImplementedError
    return filename


597
def get_notDownloadedsceneIDs(conn_DB, entityIDs, satellite, sensor, src_folder):
598
599
600
601
    # type: (str,list,str,str,str) -> np.ndarray
    """Takes a list of entity IDs and extracts those records that have the corresponding archive file in the given
    source folder and that have the processing level 'METADATA' in the pgSQL database. Based on this subset a numpy
    array containing the corresponding scene IDs and the target filenames for the fileserver is returned.
602

603
604
605
606
607
608
    :param conn_DB:     <str> pgSQL database connection parameters
    :param entityIDs:   <list> a list of entity IDs
    :param satellite:   <str> the name of the satellite to restrict the query on
    :param sensor:      <str> the name of the sensor to restrict the query on
    :param src_folder:  <str> the source directory where archive files are saved
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
609

610
611
612
    columns = ['id', 'entityid', 'satelliteid', 'sensorid', 'filename', 'proc_level']
    result = get_info_from_postgreSQLdb(conn_DB, 'scenes', columns, {'entityid': entityIDs})
    df = pd.DataFrame(result, columns=columns)
613
614

    satNameID_dic = get_dict_satellite_name_id(conn_DB)
615
    satID = satNameID_dic[satellite]
616
    target_folder = os.path.join(CFG.path_archive, satellite, sensor)
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636

    def get_fName(entityid): return get_filename_by_entityID(conn_DB, entityid, satellite)

    def tgt_exists(fileName): return os.path.exists(os.path.join(target_folder, fileName))

    def src_exists(entityid):
        return os.path.exists(os.path.join(src_folder, get_filename_by_entityID(conn_DB, entityid, satellite)))
    df['tgt_fileName'] = list(df['entityid'].map(get_fName))
    df['tgtFile_exists'] = list(df['tgt_fileName'].map(tgt_exists))
    df['srcFile_exists'] = list(df['entityid'].map(src_exists))
    tgt_satID = (df.satelliteid == float(satID))
    # isDL = (df.proc_level == 'DOWNLOADED')
    isMET = (df.proc_level == 'METADATA')
    # tgtE = (df.tgtFile_exists == True)
    srcE = df.srcFile_exists  # (df.srcFile_exists == True)
    # sceneIDs_notDL_tgtE = df[tgt_satID & (isDL == 0) & tgtE]  # maybe needed later
    # sceneIDs_DL_tgtNE = df[tgt_satID & isDL & (tgtE == 0)]  # maybe needed later
    # sceneIDs_DL_tgtE = df[tgt_satID & isDL & tgtE]  # maybe needed later
    sceneIDs_isMET_srcE = df[tgt_satID & isMET & srcE]
    return sceneIDs_isMET_srcE[['id', 'tgt_fileName']].values
637
638


639
class GMS_JOB(object):
640
    """gms_preprocessing job manager"""
641

642
    def __init__(self, conn_db):
643
        # type: (str) -> None
644
        """
645
        :param conn_db: <str> the database connection parameters as given by CFG.conn_params
646
        """
647
648
        # privates
        self._virtualsensorid = None
Daniel Scheffler's avatar
Daniel Scheffler committed
649

650
        # defaults
651
652
653
        self.conn = conn_db
        self.dataframe = GeoDataFrame()
        self.scene_counts = {}  # set by self.create()
654

655
656
        self.exists_in_db = False
        self.id = None  #: int
657
        self.creationtime = datetime.now()  # default, needed to create new job
658
659
660
661
        self.finishtime = None
        self.sceneids = []
        self.timerange_start = datetime.min
        self.timerange_end = datetime.max
662
        self.bounds = box(-180, -90, 180, 90)  # default, needed to create new job
663
664
665
666
667
668
        self.distribution_index = None
        self.progress = None
        self.feedback = None
        self.failed_sceneids = []
        self.ref_job_id = None
        self.datacube_mgrs_tiles_proc = []
669
670
671
672
673
        self.non_ref_datasetids = []
        self.max_cloudcover = None
        self.season_code = None  # type: int
        self.path_analysis_script = ''  # TODO
        self.job_mode = 'processing_only'  # FIXME download/processing/...
674
675
676
677
        self.jobs_table_columns = ['id', 'creationtime', 'finishtime', 'sceneids', 'timerange_start',
                                   'timerange_end', 'bounds', 'distribution_index', 'progress', 'feedback',
                                   'failed_sceneids', 'datasetid_spatial_ref',
                                   'virtualsensorid', 'ref_job_id', 'datacube_mgrs_tiles_proc', 'comment',
678
679
680
681
                                   'non_ref_datasetids', 'max_cloudcover', 'season_code', 'status',
                                   'path_analysis_script', 'analysis_parameter', 'statistics', 'job_mode']
        self.datasetid_spatial_ref = 249  # this is overwritten if existing job is read from DB but needed to create new
        self.datasetname_spatial_ref = 'SENTINEL-2A'  # same here
682
683
684
685
686
        self.status = None
        self.statistics = []
        self.comment = None
        self.epsg = None  # set by self._set_target_sensor_specs()
        self.ground_spatial_sampling = None  # set by self._set_target_sensor_specs()
687
        self.analysis_parameter = None
688

689
690
691
    def __repr__(self):
        return 'GMS job:\n\n' + GeoSeries(self.db_entry).to_string()

692
693
694
695
696
697
698
699
700
701
702
    @property
    def virtualsensorid(self):
        return self._virtualsensorid

    @virtualsensorid.setter
    def virtualsensorid(self, value):
        """Set virtual sensor ID but continue if no data value is received
        NOTE:  set by self._set_target_sensor_specs() and self.from_ID()"""
        if value != -1:  # no data value
            self._virtualsensorid = value

703
    def _set_target_sensor_specs(self, virtual_sensor_id, datasetid_spatial_ref):
704
        self.virtualsensorid = virtual_sensor_id
705
706
707
708

        if not isinstance(datasetid_spatial_ref, int):
            raise ValueError(datasetid_spatial_ref)

709
710
711
712
713
714
715
716
717
718
719
720
        res = get_info_from_postgreSQLdb(self.conn, 'virtual_sensors', ['spatial_resolution',
                                                                        "projection_epsg"], {'id': virtual_sensor_id})
        assert res, \
            "'virtual_sensor_id'=%s does not exist in the table 'virtual_sensors' of the database." % virtual_sensor_id
        target_gsd = res[0][0]
        self.ground_spatial_sampling = [target_gsd, target_gsd] if type(target_gsd) in [int, float] else target_gsd
        self.epsg = int(res[0][1])

        self.datasetid_spatial_ref = datasetid_spatial_ref
        res = get_info_from_postgreSQLdb(self.conn, 'datasets', ['name'], {'id': datasetid_spatial_ref})
        assert res, \
            "'datasetid_spatial_ref'=%s does not exist in the table 'datasets' of the database." % datasetid_spatial_ref
721
722
723
724
        self.datasetname_spatial_ref = res

    @property
    def db_entry(self):
725
726
        """Returns an OrderedDict containing keys and values of the database entry.
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
727

728
729
        db_entry = collections.OrderedDict()
        for i in self.jobs_table_columns:
730
731
732
733
734
735
736
            val = getattr(self, i)

            if i == 'virtualsensorid' and val is None:
                val = -1  # nodata value

            db_entry[i] = val

737
738
        return db_entry

739
    def from_dictlist(self, dictlist_data2process, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
740
        # type: (list, int, int, str) -> GMS_JOB
741
        """
742
        :param dictlist_data2process:  <list> a list of dictionaries containing the keys "satellite", "sensor" and
743
744
745
746
747
748
                                        "filenames",
                                        e.g. [{'satellite:'Landsat-8,'sensor':'OLI_TIRS','filenames':file.tar.gz},{...}]
        :param virtual_sensor_id :     <int> a valid ID from the 'virtual_sensors' table of the postgreSQL database
        :param datasetid_spatial_ref:  <int> a valid dataset ID of the dataset to be chosen as spatial reference
                                        (from the 'datasets' table of the postgreSQL database)
                                        (default:249 - Sentinel-2A), 104=Landsat-8
749
        :param comment:                <str> a comment describing the job (e.g. 'Beta job')
750
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
751

752
        self._set_target_sensor_specs(virtual_sensor_id, datasetid_spatial_ref)
753
        self.comment = comment
754
755
756

        dictlist_data2process = dictlist_data2process if dictlist_data2process else []

757
        for idx, datadict in enumerate(dictlist_data2process):
758
            assert isinstance(datadict, dict), "Expected only dictionaries within 'dictlist_data2process'. " \
759
                                               "Got %s in there." % type(datadict)
760
            assert False not in [i in datadict for i in ['satellite', 'sensor', 'filenames']]
761
            assert type(datadict['filenames']) in [list, str]
762

763
            if isinstance(datadict['filenames'], str):
764
765
766
767
768
769
                if datadict['filenames'].endswith('.csv'):
                    assert os.path.exists(datadict['filenames'])
                else:
                    datadict['filenames'] = [datadict['filenames']]

        # find all duplicates in input datadicts and build common geodataframe
770
        all_gdfs = []
771
        for datadict in dictlist_data2process:
772
            assert isinstance(datadict, dict)
773

774
775
            if isinstance(datadict['filenames'], str) and datadict['filenames'].endswith('.csv'):
                datadict['filenames'] = None  # TODO implement csv reader here
776
777
778
779
                raise NotImplementedError
            else:
                temp_gdf = GeoDataFrame(datadict, columns=['satellite', 'sensor', 'filenames'])
                if re.search('Landsat-7', datadict['satellite'], re.I) and re.search('ETM+', datadict['sensor'], re.I):
780
                    from .helper_functions import Landsat_entityID_decrypter as LED
781
782
783

                    def get_L7_sensor(fN): return LED(fN.split('.tar.gz')[0]).sensorIncSLC
                    temp_gdf['sensor'] = list(temp_gdf['filenames'].map(get_L7_sensor))
784
785
786
787
788
789

                all_gdfs.append(temp_gdf)

        gdf = GeoDataFrame(pd.concat(all_gdfs)).drop_duplicates()
        gdf.columns = ['satellite', 'sensor', 'filename']

790
791
792
793
794
795
796
797
798
799
800
        # run self.from_dictlist
        sceneInfoGDF = self._get_validated_sceneInfoGDFs(gdf)

        # populate attributes
        self._populate_jobAttrs_from_sceneInfoGDF(sceneInfoGDF)

        return self

    def from_sceneIDlist(self, list_sceneIDs, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
        # type: (list, int, int, str) -> object
        """
801
        Create a GMS_JOB instance based on the given list of scene IDs.
Daniel Scheffler's avatar
Bugfix    
Daniel Scheffler committed
802

803
804
805
806
807
808
809
810
        :param list_sceneIDs:          <list> of scene IDs, e.g. [26781907, 26781917, 26542650, 26542451, 26541679]
        :param virtual_sensor_id :     <int> a valid ID from the 'virtual_sensors' table of the postgreSQL database
        :param datasetid_spatial_ref:  <int> a valid dataset ID of the dataset to be chosen as spatial reference
                                        (from the 'datasets' table of the postgreSQL database)
                                        (default:249 - Sentinel-2A), 104=Landsat-8
        :param comment:                <str> a comment describing the job (e.g. 'Beta job')
        """

811
        self._set_target_sensor_specs(virtual_sensor_id, datasetid_spatial_ref)
812
813
814
815
816
        self.comment = comment

        list_sceneIDs = list(list_sceneIDs)

        # query 'satellite', 'sensor', 'filename' from database and summarize in GeoDataFrame
817
        with psycopg2.connect(self.conn) as conn:
818
819
820
821
822
823
824
825
            with conn.cursor() as cursor:
                execute_pgSQL_query(cursor,
                                    """SELECT scenes.id, satellites.name, sensors.name, scenes.filename FROM scenes
                                    LEFT JOIN satellites on scenes.satelliteid=satellites.id
                                    LEFT JOIN sensors on scenes.sensorid=sensors.id
                                    WHERE scenes.id in (%s)""" % ','.join([str(i) for i in list_sceneIDs]))
                gdf = GeoDataFrame(cursor.fetchall(), columns=['sceneid', 'satellite', 'sensor', 'filename'])

826
827
828
        # FIXME overwriting 'ETM+_SLC_OFF' with 'ETM+' causes _get_validated_sceneInfoGDFs() to fail because the
        # FIXME sensorid for ETM+_SLC_OFF cannot be retrieved
        # gdf['sensor'] = gdf['sensor'].apply(lambda senN: senN if senN != 'ETM+_SLC_OFF' else 'ETM+')
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
        gdf = gdf.drop_duplicates()

        if gdf.empty:
            raise ValueError('None of the given scene IDs could be found in the GeoMultiSens database. '
                             'Job creation failed.')
        else:
            missing_IDs = [i for i in list_sceneIDs if i not in gdf['sceneid'].values]
            if missing_IDs:
                warnings.warn('The following scene IDs could not been found in the GeoMultiSens database: \n%s'
                              % '\n'.join([str(i) for i in missing_IDs]))

            # run self.from_dictlist
            sceneInfoGDF = self._get_validated_sceneInfoGDFs(gdf)

            # populate attributes
            self._populate_jobAttrs_from_sceneInfoGDF(sceneInfoGDF)

        return self

848
849
850
851
852
853
854
855
    def from_entityIDlist(self, list_entityids, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
        """Create a GMS_JOB instance based on the given list of entity IDs.

        :param list_entityids:
        :param virtual_sensor_id:
        :param datasetid_spatial_ref:
        :param comment:
        :return:
856
        """
Daniel Scheffler's avatar
Bugfix    
Daniel Scheffler committed
857

858
        res_sceneIDs = get_info_from_postgreSQLdb(self.conn, 'scenes', ['id', ], {'entityid': list_entityids})
859
860
861
        if not res_sceneIDs:
            raise ValueError('No matching database entries found for the given entity IDs.')

862
        list_sceneIDs = np.array(res_sceneIDs)[:, 0].tolist()
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
        count_no_match = len(list_entityids) - len(list_sceneIDs)

        if count_no_match:
            warnings.warn('%s datasets could not be found the database. They cannot be processed.')

        return self.from_sceneIDlist(list_sceneIDs, virtual_sensor_id,
                                     datasetid_spatial_ref=datasetid_spatial_ref, comment=comment)

    def from_filenames(self, list_filenames, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
        """Create a GMS_JOB instance based on the given list of provider archive filenames.

        :param list_filenames:
        :param virtual_sensor_id:
        :param datasetid_spatial_ref:
        :param comment:
        :return:
879
880
        """

881
        res_sceneIDs = get_info_from_postgreSQLdb(self.conn, 'scenes', ['id', ], {'filename': list_filenames})
882
883
884
        if not res_sceneIDs:
            raise ValueError('No matching database entries found for the given filenames.')

885
        list_sceneIDs = np.array(res_sceneIDs)[:, 0].tolist()
886
887
888
889
890
891
892
        count_no_match = len(list_filenames) - len(list_sceneIDs)

        if count_no_match:
            warnings.warn('%s datasets could not be found the database. They cannot be processed.')

        return self.from_sceneIDlist(list_sceneIDs, virtual_sensor_id,
                                     datasetid_spatial_ref=datasetid_spatial_ref, comment=comment)
893

894
895
896
897
898
899
900
901
902
903
    def _get_validated_sceneInfoGDFs(self, GDF_SatSenFname):
        # type: (GeoDataFrame) -> GeoDataFrame
        """

        :param GDF_SatSenFname:
        :return:
        """

        gdf = GDF_SatSenFname

904
        # loop through all satellite-sensor combinations and get scene information from database
905
906
907
        all_gdf_recs, all_gdf_miss = [], []
        all_satellites, all_sensors = zip(
            *[i.split('__') for i in (np.unique(gdf['satellite'] + '__' + gdf['sensor']))])
908

909
910
        for satellite, sensor in zip(all_satellites, all_sensors):
            cur_gdf = gdf.loc[(gdf['satellite'] == satellite) & (gdf['sensor'] == sensor)]
911
912
913
            filenames = list(cur_gdf['filename'])

            satID_res = get_info_from_postgreSQLdb(self.conn, 'satellites', ['id'], {'name': satellite})
914
            senID_res = get_info_from_postgreSQLdb(self.conn, 'sensors', ['id'], {'name': sensor})
915
            assert len(satID_res), "No satellite named '%s' found in database." % satellite
916
            assert len(senID_res), "No sensor named '%s' found in database." % sensor
917
918

            # append sceneid and wkb_hex bounds
919
920
921
922
923
924
925
926
927
928
929
930
931
            if 'sceneid' in gdf.columns:
                sceneIDs = list(cur_gdf['sceneid'])
                conddict = dict(id=sceneIDs, satelliteid=satID_res[0][0], sensorid=senID_res[0][0])
            else:
                conddict = dict(filename=filenames, satelliteid=satID_res[0][0], sensorid=senID_res[0][0])

            records = get_info_from_postgreSQLdb(
                self.conn, 'scenes', ['filename', 'id', 'acquisitiondate', 'bounds'], conddict)
            records = GeoDataFrame(records, columns=['filename', 'sceneid', 'acquisitiondate', 'geom'])
            if 'sceneid' in gdf.columns:
                del records['sceneid']

            cur_gdf = cur_gdf.merge(records, on='filename', how="outer", copy=False)
932
933

            # separate records with valid matches in database from invalid matches (filename not found in database)
934
935
            gdf_recs = cur_gdf[
                cur_gdf.sceneid.notnull()].copy()  # creates a copy (needed to be able to apply maps later)
936
937
938
            gdf_miss = cur_gdf[cur_gdf.sceneid.isnull()]  # creates a view

            # convert scene ids from floats to integers
939
            gdf_recs['sceneid'] = list(gdf_recs.sceneid.map(lambda sceneid: int(sceneid)))
940
941

            # wkb_hex bounds to shapely polygons
Daniel Scheffler's avatar
Daniel Scheffler committed
942
            gdf_recs['polygons'] = list(gdf_recs.geom.map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
943
944
945
946
947
948
949
950
951
952
953
954
955

            all_gdf_recs.append(gdf_recs)
            all_gdf_miss.append(gdf_miss)

        # merge all dataframes of all satellite-sensor combinations
        gdf_recs_compl = GeoDataFrame(pd.concat(all_gdf_recs))
        gdf_miss_compl = GeoDataFrame(pd.concat(all_gdf_miss))

        # populate attributes
        if not gdf_miss_compl.empty:
            warnings.warn('The following scene filenames could not been found in the GeoMultiSens database: \n%s'
                          % '\n'.join(list(gdf_miss_compl['filename'])))

956
957
958
959
960
961
962
963
964
965
966
        return gdf_recs_compl

    def _populate_jobAttrs_from_sceneInfoGDF(self, sceneInfoGDF):
        # type: (GeoDataFrame) -> None
        """

        :param sceneInfoGDF:
        :return:
        """

        if not sceneInfoGDF.empty:
967
968
969
970
            self.dataframe = sceneInfoGDF
            self.sceneids = list(self.dataframe['sceneid'])
            self.statistics = [len(self.sceneids)] + [0] * 8
            self.bounds = box(*MultiPolygon(list(self.dataframe['polygons'])).bounds)
971
972
            self.timerange_start = self.dataframe.acquisitiondate.min().to_pydatetime()
            self.timerange_end = self.dataframe.acquisitiondate.max().to_pydatetime()
973

974
    def from_job_ID(self, job_ID):
975
        # type: (int) -> GMS_JOB
976
        """
977
        Create a GMS_JOB instance by querying the database for a specific job ID.
978
979
        :param job_ID:  <int> a valid id from the database table 'jobs'
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
980

981
        res = get_info_from_postgreSQLdb(self.conn, 'jobs', self.jobs_table_columns, {'id': job_ID})
982
        if not res:
983
            raise ValueError("No job with ID %s found in 'jobs' table of the database." % job_ID)
984

985
        self.exists_in_db = True
986
        [setattr(self, attrName, res[0][i]) for i, attrName in enumerate(self.jobs_table_columns)]
Daniel Scheffler's avatar
Daniel Scheffler committed
987
        self.bounds = wkb_loads(self.bounds, hex=True)
988

989
        # fill self.dataframe
990
991
992
993
994
995
996
997
998
999
1000
        records = get_info_from_postgreSQLdb(self.conn, 'scenes', ['satelliteid', 'sensorid', 'filename',
                                                                   'id', 'acquisitiondate', 'bounds'],
                                             {'id': self.sceneids})
        gdf = GeoDataFrame(records,
                           columns=['satelliteid', 'sensorid', 'filename', 'sceneid', 'acquisitiondate', 'geom'])
        all_satIDs = gdf.satelliteid.unique().tolist()
        all_senIDs = gdf.sensorid.unique().tolist()
        satName_res = get_info_from_postgreSQLdb(self.conn, 'satellites', ['name'], {'id': all_satIDs})
        senName_res = get_info_from_postgreSQLdb(self.conn, 'sensors', ['name'], {'id': all_senIDs})
        all_satNames = [i[0] for i in satName_res]
        all_senNames = [i[0] for i in senName_res]
1001
1002
        id_satName_dict = dict(zip(all_satIDs, all_satNames))
        id_senName_dict = dict(zip(all_senIDs, all_senNames))
1003
        gdf.insert(0, 'satellite', list(gdf.satelliteid.map(lambda satID: id_satName_dict[satID])))
1004
        gdf.insert(1, 'sensor', list(gdf.sensorid.map(lambda senID: id_senName_dict[senID])))
Daniel Scheffler's avatar
Daniel Scheffler committed
1005
        gdf['polygons'] = list(gdf.geom.map(lambda wkb_hex: wkb_loads(wkb_hex