database_tools.py 70.8 KB
Newer Older
1
import collections
2
import glob
3
import itertools
4
5
import os
import re
6
import shutil
7
import sys
8
import traceback
9
10
import warnings
from datetime import datetime
Daniel Scheffler's avatar
Daniel Scheffler committed
11
from typing import Union  # noqa F401  # flake8 issue
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
12

13
14
import numpy as np
import pandas as pd
Daniel Scheffler's avatar
Daniel Scheffler committed
15
from pandas.io.sql import pandasSQL_builder, SQLTable
16
import psycopg2
Daniel Scheffler's avatar
Daniel Scheffler committed
17
from shapely.wkb import loads as wkb_loads
18
from geoalchemy2.types import Geometry as GEOMETRY
19
20
21
22
from geopandas import GeoDataFrame, GeoSeries
from shapely.geometry import Polygon, box, MultiPolygon
from sqlalchemy import create_engine
from sqlalchemy.types import to_instance, TypeEngine
23

24
from ..options.config import GMS_config as CFG
25
from . import path_generator as PG
26
from .definition_dicts import proc_chain
27

28
29
# + misc.helper_functions.cornerLonLat_to_postgreSQL_poly: (left out here in order to avoid circular dependencies)

30
31
__author__ = 'Daniel Scheffler'

32

33
def execute_pgSQL_query(cursor, query_command):
34
35
    """Executes a postgreSQL query catches the full error message if there is one.
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
36

Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
37
38
39
40
41
42
    try:
        cursor.execute(query_command)
    except psycopg2.ProgrammingError as e:
        raise psycopg2.ProgrammingError(e.pgerror + 'Query failed. Command was:\n%s' % query_command)


43
def get_scene_and_dataset_infos_from_postgreSQLdb(sceneid):
44
45
    # type: (int) -> collections.OrderedDict
    """Creates an OrderedDict containing further information about a given scene ID by querying the pgSQL database.
46
47
48

    :param sceneid:   <int> the GMS scene ID to get information for
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
49

50
    def query(tablename, vals2return, cond_dict, records2fetch=0):
51
        return get_info_from_postgreSQLdb(CFG.conn_database, tablename, vals2return, cond_dict, records2fetch)
52
    resultset = query('scenes', ['datasetid', 'satelliteid', 'sensorid', 'subsystemid', 'acquisitiondate', 'entityid',
53
54
55
                                 'filename'], {'id': sceneid})
    if len(resultset) == 0:
        sys.stderr.write("Scene with id %s not found. Skipping.." % sceneid)
56
57
58

    scenedata = resultset[0]
    ds = collections.OrderedDict()
59
60
61
62
63
64
65
66
67
68
69
    proc_level_tmp = query('scenes_proc', 'proc_level', {'sceneid': sceneid})
    ds.update({'proc_level': 'L0A' if proc_level_tmp == [] else proc_level_tmp[0][0]})
    ds.update({'scene_ID': sceneid})
    ds.update({'datasetid': scenedata[0]})
    ds.update({'image_type': query('datasets', 'image_type', {'id': scenedata[0]})[0][0]})
    ds.update({'satellite': query('satellites', 'name', {'id': scenedata[1]})[0][0]})
    ds.update({'sensor': query('sensors', 'name', {'id': scenedata[2]})[0][0]})
    ds.update({'subsystem': query('subsystems', 'name', {'id': scenedata[3]})[0][0] if scenedata[3] else None})
    ds.update({'acq_datetime': scenedata[4]})
    ds.update({'entity_ID': scenedata[5]})
    ds.update({'filename': scenedata[6]})
70
71
72
    return ds


73
def get_postgreSQL_value(value):
74
    # type: (any) -> str
75
76
77
    """Converts Python variable to a postgreSQL value respecting postgreSQL type casts.
    The resulting value can be directly inserted into a postgreSQL query."""

78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
    assert type(value) in [int, float, bool, str, Polygon, datetime, list, tuple] or value is None, \
        "Unsupported value type within postgreSQL matching expression. Got %s." % type(value)
    if isinstance(value, int):
        pgV = value
    elif isinstance(value, float):
        pgV = value
    elif isinstance(value, bool):
        pgV = value
    elif value is None:
        pgV = 'NULL'
    elif isinstance(value, str):
        pgV = "'%s'" % value.replace("'", "")
    elif isinstance(value, Polygon):
        pgV = "'%s'" % value.wkb_hex
    elif isinstance(value, datetime):
        pgV = "TIMESTAMP '%s'" % str(value)
    else:  # list or tuple in value
        if not value:  # empty list/tuple
96
97
98
            pgV = 'NULL'
        else:
            dTypes_in_value = list(set([type(i) for i in value]))
99
100
            assert len(dTypes_in_value) == 1, \
                'Mixed data types in postgreSQL matching expressions are not supported. Got %s.' % dTypes_in_value
101
            assert dTypes_in_value[0] in [int, str, float, np.int64, bool]
102
103
            pgList = ",".join(["'%s'" % i if isinstance(value[0], str) else "%s" % i for i in value])
            pgV = "'{%s}'" % pgList
104
105
106
    return pgV


107
def get_postgreSQL_matchingExp(key, value):
108
    # type: (str,any) -> str
109
110
111
112
    """Converts a key/value pair to a postgreSQL matching expression in the form "column=value" respecting postgreSQL
    type casts. The resulting string can be directly inserted into a postgreSQL query.
    """
    pgVal = get_postgreSQL_value(value)
113
114
115
116
    if isinstance(pgVal, str) and pgVal.startswith("'{") and pgVal.endswith("}'"):
        return '%s in %s' % (key, pgVal.replace("'{", '(').replace("}'", ')'))  # '{1,2,3}' => (1,2,3)
    elif pgVal == 'NULL':
        return '%s is NULL' % key
117
    else:
118
        return '%s=%s' % (key, pgVal)
119

120

121
def get_info_from_postgreSQLdb(conn_params, tablename, vals2return, cond_dict=None, records2fetch=0, timeout=15000):
Daniel Scheffler's avatar
Daniel Scheffler committed
122
    # type: (str, str, Union[list, str], dict, int, int) -> Union[list, str]
123
    """Queries a postgreSQL database for the given parameters.
124

125
    :param conn_params:     <str> connection parameters as provided by CFG.conn_params
126
127
128
    :param tablename:       <str> name of the table within the database to be queried
    :param vals2return:     <list or str> a list of strings containing the column titles of the values to be returned
    :param cond_dict:       <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
129
130
                            HINT: <value> can also be a list or a tuple of elements to match, BUT note that the order
                                  of the list items is NOT respected!
131
    :param records2fetch:   <int> number of records to be fetched (default=0: fetch unlimited records)
132
133
    :param timeout:         <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
134

135
136
    if not isinstance(vals2return, list):
        vals2return = [vals2return]
137
138
    assert isinstance(records2fetch, int), "get_info_from_postgreSQLdb: Expected an integer for the argument " \
                                           "'records2return'. Got %s" % type(records2fetch)
139
    cond_dict = cond_dict if cond_dict else {}
140
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
141
    connection = psycopg2.connect(conn_params)
142
143
144
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
145
146
147
148
149
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
    cmd = "SELECT " + ','.join(vals2return) + " FROM " + tablename + " " + condition
    execute_pgSQL_query(cursor, cmd)
150

151
    records2return = cursor.fetchall() if records2fetch == 0 else [cursor.fetchone()] if records2fetch == 1 else \
152
        cursor.fetchmany(size=records2fetch)  # e.g. [('LE71950282003121EDC00',), ('LE71950282003105ASN00',)]
153
154
155
156
    cursor.close()
    connection.close()
    return records2return

157

158
def update_records_in_postgreSQLdb(conn_params, tablename, vals2update_dict, cond_dict=None, timeout=15000):
159
    # type: (str, str, dict, dict, int) -> Union[None, str]
160
    """Queries a postgreSQL database for the given parameters and updates the given columns of the query result.
161

162
    :param conn_params:       <str> connection parameters as provided by CFG.conn_params
163
164
165
166
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2update_dict:  <dict> a dictionary containing keys and values to be set in the form {'col_name':<value>}
    :param cond_dict:         <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                              HINT: <value> can also be a list or a tuple of elements to match
167
168
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
169

170
    cond_dict = cond_dict if cond_dict else {}
171
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
172
    connection = psycopg2.connect(conn_params)
173
174
175
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
176
177
178
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
179
180
    update_cond = "SET " + ', '.join(['%s=%s' % (k, get_postgreSQL_value(vals2update_dict[k]))
                                      for k in vals2update_dict.keys()])
181
    execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition)
182
    if cursor.fetchone()[0] == 0:
183
        warnings.warn("No record found fulfilling this condition: \n'%s'." % condition)
184
    else:
185
        execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + update_cond + " " + condition)
186

187
188
189
    if 'connection' in locals():
        connection.commit()
        connection.close()
190
191


192
def append_item_to_arrayCol_in_postgreSQLdb(conn_params, tablename, vals2append_dict, cond_dict=None, timeout=15000):
193
    # type: (str, str, dict, dict, int) -> Union[None, str]
194
195
196
    """Queries a postgreSQL database for the given parameters
    and appends the given value to the specified column of the query result.

197
    :param conn_params:       <str> connection parameters as provided by CFG.conn_params
198
199
200
201
202
203
204
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2append_dict:  <dict> a dictionary containing keys and value(s) to be set in the form
                              {'col_name':[<value>,<value>]}
    :param cond_dict:         <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                              HINT: <value> can also be a list or a tuple of elements to match
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
205

206
207
    assert len(vals2append_dict) == 1, 'Values can be appended to only one column at once.'
    if type(list(vals2append_dict.values())[0]) in [list, tuple]:
208
        raise NotImplementedError('Appending multiple values to one column at once is not yet supported.')
209
    cond_dict = cond_dict if cond_dict else {}
210
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
211
    connection = psycopg2.connect(conn_params)
212
213
214
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
215
216
217
218
219
220
221
222
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
    col2update = list(vals2append_dict.keys())[0]
    pgSQL_val = get_postgreSQL_value(vals2append_dict[col2update])
    pgSQL_val = pgSQL_val if type(vals2append_dict[col2update]) in [list, tuple] else '{%s}' % pgSQL_val
    append_cond = "SET %s = array_cat(%s, '%s')" % (col2update, col2update, pgSQL_val)
    execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition)
223
    if cursor.fetchone()[0] == 0:
224
        warnings.warn("No record found fulfilling this condition: \n'%s'." % condition)
225
    else:
226
227
228
229
        execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + append_cond + " " + condition + ';')
    if 'connection' in locals():
        connection.commit()
        connection.close()
230
231
232


def remove_item_from_arrayCol_in_postgreSQLdb(conn_params, tablename, vals2remove_dict, cond_dict=None, timeout=15000):
233
    # type: (str, str, dict, dict, int) -> Union[None, str]
234
235
236
    """Queries a postgreSQL database for the given parameters
    and removes the given value from the specified column of the query result.

237
    :param conn_params:       <str> connection parameters as provided by CFG.conn_params
238
239
240
241
242
243
244
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2remove_dict:  <dict> a dictionary containing keys and value(s) to be set in the form
                              {'col_name':[<value>,<value>]}
    :param cond_dict:         <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                              HINT: <value> can also be a list or a tuple of elements to match
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
245

246
247
    assert len(vals2remove_dict) == 1, 'Values can be removed from only one column at once.'
    if type(list(vals2remove_dict.values())[0]) in [list, tuple]:
248
        raise NotImplementedError('Removing multiple values from one column at once is not yet supported.')
249
    cond_dict = cond_dict if cond_dict else {}
250
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
251
    connection = psycopg2.connect(conn_params)
252
253
254
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
255
256
257
258
259
260
261
262
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
    col2update = list(vals2remove_dict.keys())[0]
    pgSQL_val = get_postgreSQL_value(vals2remove_dict[col2update])
    pgSQL_val = pgSQL_val if type(vals2remove_dict[col2update]) in [list, tuple] else '{%s}' % pgSQL_val
    remove_cond = "SET %s = array_remove(%s, '%s')" % (col2update, col2update, pgSQL_val)
    execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition)
263
    if cursor.fetchone()[0] == 0:
264
        warnings.warn("No record found fulfilling this condition: \n'%s'." % condition)
265
    else:
266
267
268
269
        execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + remove_cond + " " + condition + ';')
    if 'connection' in locals():
        connection.commit()
        connection.close()
270
271


272
273
def increment_decrement_arrayCol_in_postgreSQLdb(conn_params, tablename, col2update, idx_val2decrement=None,
                                                 idx_val2increment=None, cond_dict=None, timeout=15000):
274
    # type: (str, str, str, int, int, dict, int) -> Union[None, str]
275
276
277
    """Updates an array column of a specific postgreSQL table in the form that it increments or decrements the elements
    at a given position. HINT: The column must have values like that: [0,1,0,3,1,0]

278
    :param conn_params:         <str> connection parameters as provided by CFG.conn_params
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
    :param tablename:           <str> name of the table within the database to be update
    :param col2update:          <str> column name of the column to be updated
    :param idx_val2decrement:   <int> the index of the array element to be decremented (starts with 1)
    :param idx_val2increment:   <int> the index of the array element to be incremented (starts with 1)
    :param cond_dict:           <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                                HINT: <value> can also be a list or a tuple of elements to match
    :param timeout:             <int> allows to set a custom statement timeout (milliseconds)
    :return:
    """

    cond_dict = cond_dict if cond_dict else {}
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
    connection = psycopg2.connect(conn_params)
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""

    dec_str = '' if idx_val2decrement is None else \
300
        "%s[%s] = %s[%s]-1" % (col2update, idx_val2decrement, col2update, idx_val2decrement)
301
    inc_str = '' if idx_val2increment is None else \
302
        "%s[%s] = %s[%s]+1" % (col2update, idx_val2increment, col2update, idx_val2increment)
303
304
305

    if dec_str or inc_str:
        dec_inc_str = ','.join([dec_str, inc_str])
306
        execute_pgSQL_query(cursor, "UPDATE %s SET %s %s" % (tablename, dec_inc_str, condition))
307

308
309
310
    if 'connection' in locals():
        connection.commit()
        connection.close()
311
312


313
def create_record_in_postgreSQLdb(conn_params, tablename, vals2write_dict, timeout=15000):
314
    # type: (str, str, dict, dict, int) -> Union[int, str]
315
316
    """Creates a single new record in a postgreSQL database and pupulates its columns with the given values.

317
    :param conn_params:       <str> connection parameters as provided by CFG.conn_params
318
319
320
321
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2write_dict:   <dict> a dictionary containing keys and values to be set in the form {'col_name':<value>}
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
322

323
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
324
    connection = psycopg2.connect(conn_params)
325
326
327
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
328
    cursor = connection.cursor()
329
330
331
332

    keys, vals = zip(*[(k, str(get_postgreSQL_value(v))) for k, v in vals2write_dict.items()])

    execute_pgSQL_query(cursor, "INSERT INTO %s (%s) VALUES (%s);" % (tablename, ','.join(keys), ','.join(vals)))
333
    execute_pgSQL_query(cursor, "SELECT id FROM %s ORDER BY id DESC LIMIT 1" % tablename)
334
335
    newID = cursor.fetchone()[0]

336
337
338
    if 'connection' in locals():
        connection.commit()
        connection.close()
339
340
341
342

    return newID


343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def delete_record_in_postgreSQLdb(conn_params, tablename, record_id, timeout=15000):
    # type: (str, str, dict, dict, int) -> Union[int, str]
    """Delete a single record in a postgreSQL database.

    :param conn_params:       <str> connection parameters as provided by CFG.conn_params
    :param tablename:         <str> name of the table within the database to be updated
    :param record_id:         <dict> ID of the record to be deleted
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """

    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
    connection = psycopg2.connect(conn_params)
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
    cursor = connection.cursor()

    execute_pgSQL_query(cursor, "DELETE FROM %s WHERE id=%s;" % (tablename, record_id))
    execute_pgSQL_query(cursor, "SELECT id FROM %s WHERE id=%s" % (tablename,  record_id))

    res = cursor.fetchone()

    if 'connection' in locals():
        connection.commit()
        connection.close()

    return 'success' if res is None else 'fail'


372
def get_pgSQL_geospatial_query_cond(conn_params, table2query, geomCol2use='bounds', tgt_corners_lonlat=None,
373
                                    scene_ID=None, queryfunc='ST_Intersects', crossing_dateline_check=True):
374
    assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
375

376
    if tgt_corners_lonlat:
377
378
379
380
381
382
383
        # handle coordinates crossing the 180 degress meridian (dateline)
        # FIXME in that case the polygone has to be split at the dateline. otherwise pgSQL may yield wrong results
        if crossing_dateline_check:
            xvals = [x for x, y in tgt_corners_lonlat]
            if max(xvals) - min(xvals) > 180:
                tgt_corners_lonlat = [(x, y) if x > 0 else (x + 360, y) for x, y in tgt_corners_lonlat]

384
        from .helper_functions import cornerLonLat_to_postgreSQL_poly
385
        pGSQL_poly = cornerLonLat_to_postgreSQL_poly(tgt_corners_lonlat)
386
387
388
389
390
        src_geom = "'SRID=4326;%s'::geometry" % pGSQL_poly  # source geometry is given
        # FIXME scenes tabelle hat "geography" geoinfos -> eigener Index wird bei "geometry" nicht genutzt:
        tgt_geom = "%s.%s::geometry" % (table2query, geomCol2use)
        geocond = "%s(%s, %s)" % (queryfunc, src_geom, tgt_geom)
    else:  # scene_ID is not None:
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
391
        connection = psycopg2.connect(conn_params)
392
393
        if connection is None:
            return 'database connection fault'
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
394
        cursor = connection.cursor()
395
396
397
        cmd = "SELECT ST_AsText(bounds) FROM scenes WHERE scenes.id = %s" % scene_ID
        execute_pgSQL_query(cursor, cmd)
        res = cursor.fetchone()
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
398
399
400
        cursor.close()
        connection.close()
        if len(res):
401
            src_geom = "'SRID=4326;%s'::geometry" % res
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
402
403
404
        else:
            print('The scene with the ID %s does not exist in the scenes table.')
            return []
405
        geocond = "%s(%s, %s.%s::geometry)" % (queryfunc, src_geom, table2query, geomCol2use)
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
406
407
408
    return geocond


409
def get_overlapping_scenes_from_postgreSQLdb(conn_params, table='scenes_proc', scene_ID=None,
410
                                             tgt_corners_lonlat=None, conditions=None, add_cmds='', timeout=15000):
411
    # type: (str, str, int, list, Union[list, str], str, int) -> Union[list, str]
Daniel Scheffler's avatar
Daniel Scheffler committed
412

413
414
    """Queries the postgreSQL database in order to find those scenes of a specified reference satellite (Landsat-8 or
    Sentinel-2) that have an overlap to the given corner coordinates AND that fulfill the given conditions.
415

416
    :param conn_params:             <str> connection parameters as provided by CFG.conn_params
417
418
    :param table:                   <str> name of the table within the database to be updated
    :param scene_ID:                <int> a sceneID to get the target geographical extent from
419
420
421
                                        (needed if tgt_corners_lonlat is not provided)
    :param tgt_corners_lonlat:      <list> a list of coordinates defining the target geographical extent
                                           (needed if scene_ID is not provided)
422
423
424
425
    :param conditions:              <list> a list of additional query conditions
    :param add_cmds:                <str> additional pgSQL commands to be added to the pgSQL query
    :param timeout:                 <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
426

427
    conditions = [] if conditions is None else conditions if isinstance(conditions, list) else [conditions]
428
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
429
    connection = psycopg2.connect(conn_params)
430
431
    if connection is None:
        return 'database connection fault'
432
    datasetids = [int(d.split('=')[1].strip()) for d in conditions if d.startswith('datasetid')]
433
434
435
436
437
438
439
    datasetid = datasetids[0] if datasetids else 104  # Landsat-8
    # FIXME: use Landsat-8 instead of Sentinel-2 as long as S2 L1A_P is not working:
    datasetid = 104 if datasetid == 249 else datasetid

    if table != 'scenes_proc':
        assert datasetid, "filtdsId is needed if table is not 'scenes_proc'"
    if scene_ID is None:
440
        assert tgt_corners_lonlat, "Provide either scene_ID or tgt_corners_lonlat!"
441
    if tgt_corners_lonlat is None:
442
        assert scene_ID, "Provide either scene_ID or tgt_corners_lonlat!"
443
444
445
446
447
448
449
450
451
452

    val2get = "scenes.id" if table == 'scenes' else "%s.sceneid" % table
    # refcond  = ['scenes_proc.georef = True'] if not datasetids else ['scenes.datasetid = %s' %datasetid]
    refcond = ['scenes.datasetid = %s' % datasetid]

    geocond = [get_pgSQL_geospatial_query_cond(conn_params, table, tgt_corners_lonlat=tgt_corners_lonlat,
                                               scene_ID=scene_ID, queryfunc='ST_Intersects',
                                               crossing_dateline_check=True)]

    join = "INNER JOIN scenes ON (%s.sceneid = scenes.id) " % table if table != 'scenes' and datasetids else ''
453
    conditions = [c for c in conditions if not c.startswith('datasetid')]
454
455
456
457
    where = "WHERE %s" % " AND ".join(geocond + refcond + conditions)
    usedtbls = "scenes" if table == 'scenes' else "%s, scenes" % table if 'scenes.' in where and join == '' else table
    query = "SELECT %s FROM %s %s%s %s" % (val2get, usedtbls, join, where, add_cmds)
    cursor = connection.cursor()
458
    execute_pgSQL_query(cursor, query)
459
460
461
462
463
    records2return = cursor.fetchall()
    cursor.close()
    connection.close()
    return records2return

464

465
def get_overlapping_MGRS_tiles(conn_params, scene_ID=None, tgt_corners_lonlat=None, timeout=15000):
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
466
467
    """In contrast to pgSQL 'Overlapping' here means that both geometries share some spatial area.
    So it combines ST_Overlaps and ST_Contains."""
468
    assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
469
470
471

    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
    connection = psycopg2.connect(conn_params)
472
473
    if connection is None:
        return 'database connection fault'
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
474
475

    vals2get = ['grid100k', 'grid1mil', 'geom']
476
    # FIXME this is covered by ST_Intersects:
Daniel Scheffler's avatar
Daniel Scheffler committed
477
    # geocond1 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Overlaps',
478
    #                                            tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
Daniel Scheffler's avatar
Daniel Scheffler committed
479
480
481
482
483
    # geocond2 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Contains',
    #                                            tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
    # geocond3 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Within',
    #                                            tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
    geocond = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Intersects',
484
485
                                              tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID,
                                              crossing_dateline_check=True)
486
487
    # query = "SELECT %s FROM %s WHERE %s OR %s OR %s"
    #     % (', '.join(vals2get), 'mgrs_tiles', geocond1, geocond2, geocond3)
Daniel Scheffler's avatar
Daniel Scheffler committed
488
    query = "SELECT %s FROM %s WHERE %s" % (', '.join(vals2get), 'mgrs_tiles', geocond)
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
489
490
491
492
493
494
495
496
    cursor = connection.cursor()
    execute_pgSQL_query(cursor, query)
    records = cursor.fetchall()
    cursor.close()
    connection.close()

    GDF = GeoDataFrame(records, columns=['grid100k', 'grid1mil', 'wkb_hex'])

Daniel Scheffler's avatar
Daniel Scheffler committed
497
    GDF['shapelyPoly_LonLat'] = list(GDF['wkb_hex'].map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
498
499
    GDF['granuleid'] = GDF['grid1mil'].str.cat(GDF['grid100k'])
    return GDF[['granuleid', 'shapelyPoly_LonLat']]
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
500
501


502
503
def get_overlapping_MGRS_tiles2(conn_params, scene_ID=None, tgt_corners_lonlat=None, timeout=15000):
    assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
504
505
506

    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
    connection = psycopg2.connect(conn_params)
507
508
    if connection is None:
        return 'database connection fault'
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
509
510

    vals2get = ['granuleid', 'footprint_wgs84']
511
512
    geocond = get_pgSQL_geospatial_query_cond(conn_params, 'footprints_sentinel2_granules',
                                              geomCol2use='footprint_wgs84',
513
                                              tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
514
515
516
517
518
519
520
521
522
523
    query = "SELECT %s FROM %s WHERE %s" % (', '.join(vals2get), 'footprints_sentinel2_granules', geocond)

    cursor = connection.cursor()
    execute_pgSQL_query(cursor, query)
    records = cursor.fetchall()
    cursor.close()
    connection.close()

    GDF = GeoDataFrame(records, columns=['granuleid', 'wkb_hex'])

Daniel Scheffler's avatar
Daniel Scheffler committed
524
    GDF['shapelyPoly_LonLat'] = list(GDF['wkb_hex'].map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
525
526
527
528

    return GDF[['granuleid', 'shapelyPoly_LonLat']]


529
530
531
def get_dict_satellite_name_id(conn_params):
    # type: (str) -> dict
    """Returns a dictionary with satellite names as keys and satellite IDs as values as read from pgSQL database.
532
533
534

    :param conn_params:     <str> pgSQL database connection parameters
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
535

536
537
    res = get_info_from_postgreSQLdb(conn_params, 'satellites', ['name', 'id'])
    assert len(res) > 0, 'Error getting satellite names from postgreSQL database.'
538
    arr = np.array(res)
539
    return dict(zip(list(arr[:, 0]), list(arr[:, 1])))
540
541
542
543
544
545


def get_dict_sensor_name_id(conn_params):
    # type: (str) -> dict
    """Returns a dictionary with sensor names as keys and sensor IDs as values as read from pgSQL database.
    :param conn_params:     <str> pgSQL database connection parameters """
Daniel Scheffler's avatar
Daniel Scheffler committed
546

547
548
    res = get_info_from_postgreSQLdb(conn_params, 'sensors', ['name', 'id'])
    assert len(res) > 0, 'Error getting sensor names from postgreSQL database.'
549
    arr = np.array(res)
550
    return dict(zip(list(arr[:, 0]), list(arr[:, 1])))
551
552


553
def get_entityIDs_from_filename(conn_DB, filename):
554
555
556
    # type: (str, str) -> list
    """Returns entityID(s) for the given filename. In case of Sentinel-2 there can be more multiple entity IDs if
    multiple granules are saved in one .zip file.
557

558
559
560
    :param conn_DB:     <str> pgSQL database connection parameters
    :param filename:    <str> the filename to get the corresponding entity ID(s) for
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
561

562
    if filename[:2] in ['LE', 'LC', 'LO'] and filename.endswith('.tar.gz'):  # Landsat
563
564
        entityIDs = [filename.split('.tar.gz')[0]]
    else:
565
566
567
        print('Querying database in order to get entityIDs for %s...' % filename)
        res = get_info_from_postgreSQLdb(conn_DB, 'scenes', ['entityid'], {'filename': filename}, timeout=40000)
        entityIDs = [subres[0] for subres in res] if len(res) > 0 else []
568
569
570
571
572
573
    return entityIDs


def get_filename_by_entityID(conn_DB, entityid, satellite):
    # type: (str,str,str) -> str
    """Returns the filename for the given entity ID.
574

575
576
577
578
    :param conn_DB:     <str> pgSQL database connection parameters
    :param entityid:    <str> entity ID
    :param satellite:   <str> satellite name to which the entity ID is belonging
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
579

580
581
582
583
584
585
586
587
588
589
    if re.search('Landsat', satellite, re.I):
        filename = '%s.tar.gz' % entityid
    elif re.search('Sentinel-2', satellite, re.I):
        filename = get_info_from_postgreSQLdb(conn_DB, 'scenes', ['filename'],
                                              {'entityid': entityid}, records2fetch=1)[0][0]
    else:
        raise NotImplementedError
    return filename


590
def get_notDownloadedsceneIDs(conn_DB, entityIDs, satellite, sensor, src_folder):
591
592
593
594
    # type: (str,list,str,str,str) -> np.ndarray
    """Takes a list of entity IDs and extracts those records that have the corresponding archive file in the given
    source folder and that have the processing level 'METADATA' in the pgSQL database. Based on this subset a numpy
    array containing the corresponding scene IDs and the target filenames for the fileserver is returned.
595

596
597
598
599
600
601
    :param conn_DB:     <str> pgSQL database connection parameters
    :param entityIDs:   <list> a list of entity IDs
    :param satellite:   <str> the name of the satellite to restrict the query on
    :param sensor:      <str> the name of the sensor to restrict the query on
    :param src_folder:  <str> the source directory where archive files are saved
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
602

603
604
605
    columns = ['id', 'entityid', 'satelliteid', 'sensorid', 'filename', 'proc_level']
    result = get_info_from_postgreSQLdb(conn_DB, 'scenes', columns, {'entityid': entityIDs})
    df = pd.DataFrame(result, columns=columns)
606
607

    satNameID_dic = get_dict_satellite_name_id(conn_DB)
608
    satID = satNameID_dic[satellite]
609
    target_folder = os.path.join(CFG.path_archive, satellite, sensor)
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629

    def get_fName(entityid): return get_filename_by_entityID(conn_DB, entityid, satellite)

    def tgt_exists(fileName): return os.path.exists(os.path.join(target_folder, fileName))

    def src_exists(entityid):
        return os.path.exists(os.path.join(src_folder, get_filename_by_entityID(conn_DB, entityid, satellite)))
    df['tgt_fileName'] = list(df['entityid'].map(get_fName))
    df['tgtFile_exists'] = list(df['tgt_fileName'].map(tgt_exists))
    df['srcFile_exists'] = list(df['entityid'].map(src_exists))
    tgt_satID = (df.satelliteid == float(satID))
    # isDL = (df.proc_level == 'DOWNLOADED')
    isMET = (df.proc_level == 'METADATA')
    # tgtE = (df.tgtFile_exists == True)
    srcE = df.srcFile_exists  # (df.srcFile_exists == True)
    # sceneIDs_notDL_tgtE = df[tgt_satID & (isDL == 0) & tgtE]  # maybe needed later
    # sceneIDs_DL_tgtNE = df[tgt_satID & isDL & (tgtE == 0)]  # maybe needed later
    # sceneIDs_DL_tgtE = df[tgt_satID & isDL & tgtE]  # maybe needed later
    sceneIDs_isMET_srcE = df[tgt_satID & isMET & srcE]
    return sceneIDs_isMET_srcE[['id', 'tgt_fileName']].values
630
631


632
class GMS_JOB(object):
633
    """gms_preprocessing job manager"""
634

635
    def __init__(self, conn_db):
636
        # type: (str) -> None
637
        """
638
        :param conn_db: <str> the database connection parameters as given by CFG.conn_params
639
        """
640
641
        # privates
        self._virtualsensorid = None
Daniel Scheffler's avatar
Daniel Scheffler committed
642

643
        # defaults
644
645
646
        self.conn = conn_db
        self.dataframe = GeoDataFrame()
        self.scene_counts = {}  # set by self.create()
647

648
649
        self.exists_in_db = False
        self.id = None  #: int
650
        self.creationtime = datetime.now()  # default, needed to create new job
651
652
653
654
        self.finishtime = None
        self.sceneids = []
        self.timerange_start = datetime.min
        self.timerange_end = datetime.max
655
        self.bounds = box(-180, -90, 180, 90)  # default, needed to create new job
656
657
658
659
660
661
        self.distribution_index = None
        self.progress = None
        self.feedback = None
        self.failed_sceneids = []
        self.ref_job_id = None
        self.datacube_mgrs_tiles_proc = []
662
663
664
665
666
        self.non_ref_datasetids = []
        self.max_cloudcover = None
        self.season_code = None  # type: int
        self.path_analysis_script = ''  # TODO
        self.job_mode = 'processing_only'  # FIXME download/processing/...
667
668
669
670
        self.jobs_table_columns = ['id', 'creationtime', 'finishtime', 'sceneids', 'timerange_start',
                                   'timerange_end', 'bounds', 'distribution_index', 'progress', 'feedback',
                                   'failed_sceneids', 'datasetid_spatial_ref',
                                   'virtualsensorid', 'ref_job_id', 'datacube_mgrs_tiles_proc', 'comment',
671
672
673
674
                                   'non_ref_datasetids', 'max_cloudcover', 'season_code', 'status',
                                   'path_analysis_script', 'analysis_parameter', 'statistics', 'job_mode']
        self.datasetid_spatial_ref = 249  # this is overwritten if existing job is read from DB but needed to create new
        self.datasetname_spatial_ref = 'SENTINEL-2A'  # same here
675
676
677
678
679
        self.status = None
        self.statistics = []
        self.comment = None
        self.epsg = None  # set by self._set_target_sensor_specs()
        self.ground_spatial_sampling = None  # set by self._set_target_sensor_specs()
680
        self.analysis_parameter = None
681

682
683
684
    def __repr__(self):
        return 'GMS job:\n\n' + GeoSeries(self.db_entry).to_string()

685
686
687
688
689
690
691
692
693
694
695
    @property
    def virtualsensorid(self):
        return self._virtualsensorid

    @virtualsensorid.setter
    def virtualsensorid(self, value):
        """Set virtual sensor ID but continue if no data value is received
        NOTE:  set by self._set_target_sensor_specs() and self.from_ID()"""
        if value != -1:  # no data value
            self._virtualsensorid = value

696
    def _set_target_sensor_specs(self, virtual_sensor_id, datasetid_spatial_ref):
697
        self.virtualsensorid = virtual_sensor_id
698
699
700
701

        if not isinstance(datasetid_spatial_ref, int):
            raise ValueError(datasetid_spatial_ref)

702
703
704
705
706
707
708
709
710
711
712
713
        res = get_info_from_postgreSQLdb(self.conn, 'virtual_sensors', ['spatial_resolution',
                                                                        "projection_epsg"], {'id': virtual_sensor_id})
        assert res, \
            "'virtual_sensor_id'=%s does not exist in the table 'virtual_sensors' of the database." % virtual_sensor_id
        target_gsd = res[0][0]
        self.ground_spatial_sampling = [target_gsd, target_gsd] if type(target_gsd) in [int, float] else target_gsd
        self.epsg = int(res[0][1])

        self.datasetid_spatial_ref = datasetid_spatial_ref
        res = get_info_from_postgreSQLdb(self.conn, 'datasets', ['name'], {'id': datasetid_spatial_ref})
        assert res, \
            "'datasetid_spatial_ref'=%s does not exist in the table 'datasets' of the database." % datasetid_spatial_ref
714
715
716
717
        self.datasetname_spatial_ref = res

    @property
    def db_entry(self):
718
719
        """Returns an OrderedDict containing keys and values of the database entry.
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
720

721
722
        db_entry = collections.OrderedDict()
        for i in self.jobs_table_columns:
723
724
725
726
727
728
729
            val = getattr(self, i)

            if i == 'virtualsensorid' and val is None:
                val = -1  # nodata value

            db_entry[i] = val

730
731
        return db_entry

732
    def from_dictlist(self, dictlist_data2process, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
733
        # type: (list, int, int, str) -> GMS_JOB
734
        """
735
        :param dictlist_data2process:  <list> a list of dictionaries containing the keys "satellite", "sensor" and
736
737
738
739
740
741
                                        "filenames",
                                        e.g. [{'satellite:'Landsat-8,'sensor':'OLI_TIRS','filenames':file.tar.gz},{...}]
        :param virtual_sensor_id :     <int> a valid ID from the 'virtual_sensors' table of the postgreSQL database
        :param datasetid_spatial_ref:  <int> a valid dataset ID of the dataset to be chosen as spatial reference
                                        (from the 'datasets' table of the postgreSQL database)
                                        (default:249 - Sentinel-2A), 104=Landsat-8
742
        :param comment:                <str> a comment describing the job (e.g. 'Beta job')
743
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
744

745
        self._set_target_sensor_specs(virtual_sensor_id, datasetid_spatial_ref)
746
        self.comment = comment
747
748
749

        dictlist_data2process = dictlist_data2process if dictlist_data2process else []

750
        for idx, datadict in enumerate(dictlist_data2process):
751
            assert isinstance(datadict, dict), "Expected only dictionaries within 'dictlist_data2process'. " \
752
                                               "Got %s in there." % type(datadict)
753
            assert False not in [i in datadict for i in ['satellite', 'sensor', 'filenames']]
754
            assert type(datadict['filenames']) in [list, str]
755

756
            if isinstance(datadict['filenames'], str):
757
758
759
760
761
762
                if datadict['filenames'].endswith('.csv'):
                    assert os.path.exists(datadict['filenames'])
                else:
                    datadict['filenames'] = [datadict['filenames']]

        # find all duplicates in input datadicts and build common geodataframe
763
        all_gdfs = []
764
        for datadict in dictlist_data2process:
765
            assert isinstance(datadict, dict)
766

767
768
            if isinstance(datadict['filenames'], str) and datadict['filenames'].endswith('.csv'):
                datadict['filenames'] = None  # TODO implement csv reader here
769
770
771
772
                raise NotImplementedError
            else:
                temp_gdf = GeoDataFrame(datadict, columns=['satellite', 'sensor', 'filenames'])
                if re.search('Landsat-7', datadict['satellite'], re.I) and re.search('ETM+', datadict['sensor'], re.I):
773
                    from .helper_functions import Landsat_entityID_decrypter as LED
774
775
776

                    def get_L7_sensor(fN): return LED(fN.split('.tar.gz')[0]).sensorIncSLC
                    temp_gdf['sensor'] = list(temp_gdf['filenames'].map(get_L7_sensor))
777
778
779
780
781
782

                all_gdfs.append(temp_gdf)

        gdf = GeoDataFrame(pd.concat(all_gdfs)).drop_duplicates()
        gdf.columns = ['satellite', 'sensor', 'filename']

783
784
785
786
787
788
789
790
791
792
793
        # run self.from_dictlist
        sceneInfoGDF = self._get_validated_sceneInfoGDFs(gdf)

        # populate attributes
        self._populate_jobAttrs_from_sceneInfoGDF(sceneInfoGDF)

        return self

    def from_sceneIDlist(self, list_sceneIDs, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
        # type: (list, int, int, str) -> object
        """
794
        Create a GMS_JOB instance based on the given list of scene IDs.
Daniel Scheffler's avatar
Bugfix    
Daniel Scheffler committed
795

796
797
798
799
800
801
802
803
        :param list_sceneIDs:          <list> of scene IDs, e.g. [26781907, 26781917, 26542650, 26542451, 26541679]
        :param virtual_sensor_id :     <int> a valid ID from the 'virtual_sensors' table of the postgreSQL database
        :param datasetid_spatial_ref:  <int> a valid dataset ID of the dataset to be chosen as spatial reference
                                        (from the 'datasets' table of the postgreSQL database)
                                        (default:249 - Sentinel-2A), 104=Landsat-8
        :param comment:                <str> a comment describing the job (e.g. 'Beta job')
        """

804
        self._set_target_sensor_specs(virtual_sensor_id, datasetid_spatial_ref)
805
806
807
808
809
        self.comment = comment

        list_sceneIDs = list(list_sceneIDs)

        # query 'satellite', 'sensor', 'filename' from database and summarize in GeoDataFrame
810
        with psycopg2.connect(self.conn) as conn:
811
812
813
814
815
816
817
818
            with conn.cursor() as cursor:
                execute_pgSQL_query(cursor,
                                    """SELECT scenes.id, satellites.name, sensors.name, scenes.filename FROM scenes
                                    LEFT JOIN satellites on scenes.satelliteid=satellites.id
                                    LEFT JOIN sensors on scenes.sensorid=sensors.id
                                    WHERE scenes.id in (%s)""" % ','.join([str(i) for i in list_sceneIDs]))
                gdf = GeoDataFrame(cursor.fetchall(), columns=['sceneid', 'satellite', 'sensor', 'filename'])

819
820
821
        # FIXME overwriting 'ETM+_SLC_OFF' with 'ETM+' causes _get_validated_sceneInfoGDFs() to fail because the
        # FIXME sensorid for ETM+_SLC_OFF cannot be retrieved
        # gdf['sensor'] = gdf['sensor'].apply(lambda senN: senN if senN != 'ETM+_SLC_OFF' else 'ETM+')
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
        gdf = gdf.drop_duplicates()

        if gdf.empty:
            raise ValueError('None of the given scene IDs could be found in the GeoMultiSens database. '
                             'Job creation failed.')
        else:
            missing_IDs = [i for i in list_sceneIDs if i not in gdf['sceneid'].values]
            if missing_IDs:
                warnings.warn('The following scene IDs could not been found in the GeoMultiSens database: \n%s'
                              % '\n'.join([str(i) for i in missing_IDs]))

            # run self.from_dictlist
            sceneInfoGDF = self._get_validated_sceneInfoGDFs(gdf)

            # populate attributes
            self._populate_jobAttrs_from_sceneInfoGDF(sceneInfoGDF)

        return self

841
842
843
844
845
846
847
848
    def from_entityIDlist(self, list_entityids, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
        """Create a GMS_JOB instance based on the given list of entity IDs.

        :param list_entityids:
        :param virtual_sensor_id:
        :param datasetid_spatial_ref:
        :param comment:
        :return:
849
        """
Daniel Scheffler's avatar
Bugfix    
Daniel Scheffler committed
850

851
        res_sceneIDs = get_info_from_postgreSQLdb(self.conn, 'scenes', ['id', ], {'entityid': list_entityids})
852
853
854
        if not res_sceneIDs:
            raise ValueError('No matching database entries found for the given entity IDs.')

855
        list_sceneIDs = np.array(res_sceneIDs)[:, 0].tolist()
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
        count_no_match = len(list_entityids) - len(list_sceneIDs)

        if count_no_match:
            warnings.warn('%s datasets could not be found the database. They cannot be processed.')

        return self.from_sceneIDlist(list_sceneIDs, virtual_sensor_id,
                                     datasetid_spatial_ref=datasetid_spatial_ref, comment=comment)

    def from_filenames(self, list_filenames, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
        """Create a GMS_JOB instance based on the given list of provider archive filenames.

        :param list_filenames:
        :param virtual_sensor_id:
        :param datasetid_spatial_ref:
        :param comment:
        :return:
872
873
        """

874
        res_sceneIDs = get_info_from_postgreSQLdb(self.conn, 'scenes', ['id', ], {'filename': list_filenames})
875
876
877
        if not res_sceneIDs:
            raise ValueError('No matching database entries found for the given filenames.')

878
        list_sceneIDs = np.array(res_sceneIDs)[:, 0].tolist()
879
880
881
882
883
884
885
        count_no_match = len(list_filenames) - len(list_sceneIDs)

        if count_no_match:
            warnings.warn('%s datasets could not be found the database. They cannot be processed.')

        return self.from_sceneIDlist(list_sceneIDs, virtual_sensor_id,
                                     datasetid_spatial_ref=datasetid_spatial_ref, comment=comment)
886

887
888
889
890
891
892
893
894
895
896
    def _get_validated_sceneInfoGDFs(self, GDF_SatSenFname):
        # type: (GeoDataFrame) -> GeoDataFrame
        """

        :param GDF_SatSenFname:
        :return:
        """

        gdf = GDF_SatSenFname

897
        # loop through all satellite-sensor combinations and get scene information from database
898
899
900
        all_gdf_recs, all_gdf_miss = [], []
        all_satellites, all_sensors = zip(
            *[i.split('__') for i in (np.unique(gdf['satellite'] + '__' + gdf['sensor']))])
901

902
903
        for satellite, sensor in zip(all_satellites, all_sensors):
            cur_gdf = gdf.loc[(gdf['satellite'] == satellite) & (gdf['sensor'] == sensor)]
904
905
906
            filenames = list(cur_gdf['filename'])

            satID_res = get_info_from_postgreSQLdb(self.conn, 'satellites', ['id'], {'name': satellite})
907
            senID_res = get_info_from_postgreSQLdb(self.conn, 'sensors', ['id'], {'name': sensor})
908
            assert len(satID_res), "No satellite named '%s' found in database." % satellite
909
            assert len(senID_res), "No sensor named '%s' found in database." % sensor
910
911

            # append sceneid and wkb_hex bounds
912
913
914
915
916
917
918
919
920
921
922
923
924
            if 'sceneid' in gdf.columns:
                sceneIDs = list(cur_gdf['sceneid'])
                conddict = dict(id=sceneIDs, satelliteid=satID_res[0][0], sensorid=senID_res[0][0])
            else:
                conddict = dict(filename=filenames, satelliteid=satID_res[0][0], sensorid=senID_res[0][0])

            records = get_info_from_postgreSQLdb(
                self.conn, 'scenes', ['filename', 'id', 'acquisitiondate', 'bounds'], conddict)
            records = GeoDataFrame(records, columns=['filename', 'sceneid', 'acquisitiondate', 'geom'])
            if 'sceneid' in gdf.columns:
                del records['sceneid']

            cur_gdf = cur_gdf.merge(records, on='filename', how="outer", copy=False)
925
926

            # separate records with valid matches in database from invalid matches (filename not found in database)
927
928
            gdf_recs = cur_gdf[
                cur_gdf.sceneid.notnull()].copy()  # creates a copy (needed to be able to apply maps later)
929
930
931
            gdf_miss = cur_gdf[cur_gdf.sceneid.isnull()]  # creates a view

            # convert scene ids from floats to integers
932
            gdf_recs['sceneid'] = list(gdf_recs.sceneid.map(lambda sceneid: int(sceneid)))
933
934

            # wkb_hex bounds to shapely polygons
Daniel Scheffler's avatar
Daniel Scheffler committed
935
            gdf_recs['polygons'] = list(gdf_recs.geom.map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
936
937
938
939
940
941
942
943
944
945
946
947
948

            all_gdf_recs.append(gdf_recs)
            all_gdf_miss.append(gdf_miss)

        # merge all dataframes of all satellite-sensor combinations
        gdf_recs_compl = GeoDataFrame(pd.concat(all_gdf_recs))
        gdf_miss_compl = GeoDataFrame(pd.concat(all_gdf_miss))

        # populate attributes
        if not gdf_miss_compl.empty:
            warnings.warn('The following scene filenames could not been found in the GeoMultiSens database: \n%s'
                          % '\n'.join(list(gdf_miss_compl['filename'])))

949
950
951
952
953
954
955
956
957
958
959
        return gdf_recs_compl

    def _populate_jobAttrs_from_sceneInfoGDF(self, sceneInfoGDF):
        # type: (GeoDataFrame) -> None
        """

        :param sceneInfoGDF:
        :return:
        """

        if not sceneInfoGDF.empty:
960
961
962
963
            self.dataframe = sceneInfoGDF
            self.sceneids = list(self.dataframe['sceneid'])
            self.statistics = [len(self.sceneids)] + [0] * 8
            self.bounds = box(*MultiPolygon(list(self.dataframe['polygons'])).bounds)
964
965
            self.timerange_start = self.dataframe.acquisitiondate.min().to_pydatetime()
            self.timerange_end = self.dataframe.acquisitiondate.max().to_pydatetime()
966

967
    def from_job_ID(self, job_ID):
968
        # type: (int) -> GMS_JOB
969
        """
970
        Create a GMS_JOB instance by querying the database for a specific job ID.
971
972
        :param job_ID:  <int> a valid id from the database table 'jobs'
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
973

974
        res = get_info_from_postgreSQLdb(self.conn, 'jobs', self.jobs_table_columns, {'id': job_ID})
975
        if not res:
976
            raise ValueError("No job with ID %s found in 'jobs' table of the database." % job_ID)
977

978
        self.exists_in_db = True
979
        [setattr(self, attrName, res[0][i]) for i, attrName in enumerate(self.jobs_table_columns)]
Daniel Scheffler's avatar
Daniel Scheffler committed
980
        self.bounds = wkb_loads(self.bounds, hex=True)
981

982
        # fill self.dataframe
983
984
985
986
987
988
989
990
991
992
993
        records = get_info_from_postgreSQLdb(self.conn, 'scenes', ['satelliteid', 'sensorid', 'filename',
                                                                   'id', 'acquisitiondate', 'bounds'],
                                             {'id': self.sceneids})
        gdf = GeoDataFrame(records,
                           columns=['satelliteid', 'sensorid', 'filename', 'sceneid', 'acquisitiondate', 'geom'])
        all_satIDs = gdf.satelliteid.unique().tolist()
        all_senIDs = gdf.sensorid.unique().tolist()
        satName_res = get_info_from_postgreSQLdb(self.conn, 'satellites', ['name'], {'id': all_satIDs})
        senName_res = get_info_from_postgreSQLdb(self.conn, 'sensors', ['name'], {'id': all_senIDs})
        all_satNames = [i[0] for i in satName_res]
        all_senNames = [i[0] for i in senName_res]
994
995
        id_satName_dict = dict(zip(all_satIDs, all_satNames))
        id_senName_dict = dict(zip(all_senIDs, all_senNames))
996
        gdf.insert(0, 'satellite', list(gdf.satelliteid.map(lambda satID: id_satName_dict[satID])))
997
        gdf.insert(1, 'sensor', list(gdf.sensorid.map(lambda senID: id_senName_dict[senID])))
Daniel Scheffler's avatar
Daniel Scheffler committed
998
        gdf['polygons'] = list(gdf.geom.map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
999

Daniel Scheffler's avatar