database_tools.py 80.2 KB
Newer Older
1
import collections
2
3
import csv
import glob
4
import itertools
5
6
import os
import re
7
import shutil
8
9
import sqlite3
import sys
10
import traceback
11
12
import warnings
from datetime import datetime
Daniel Scheffler's avatar
Daniel Scheffler committed
13
from typing import Union  # noqa F401  # flake8 issue
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
14

15
16
17
18
19
import numpy as np
import pandas as pd
import psycopg2
import shapely
from geoalchemy2.types import Geometry as GEOMETRY
20
21
22
23
from geopandas import GeoDataFrame, GeoSeries
from shapely.geometry import Polygon, box, MultiPolygon
from sqlalchemy import create_engine
from sqlalchemy.types import to_instance, TypeEngine
24

25
26
from ..config import GMS_config as CFG
from . import path_generator as PG
27
from .definition_dicts import proc_chain
28

29
30
31
# + misc.helper_functions.cornerLonLat_to_postgreSQL_poly: (left out here in order to avoid circular dependencies)
# + misc.path_generator.get_path_metaCSV: (left out here in order to avoid circular dependencies)

32
33
__author__ = 'Daniel Scheffler'

34

35
def execute_pgSQL_query(cursor, query_command):
36
37
    """Executes a postgreSQL query catches the full error message if there is one.
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
38

Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
39
40
41
42
43
44
    try:
        cursor.execute(query_command)
    except psycopg2.ProgrammingError as e:
        raise psycopg2.ProgrammingError(e.pgerror + 'Query failed. Command was:\n%s' % query_command)


45
def get_scene_and_dataset_infos_from_postgreSQLdb(sceneid):
46
47
    # type: (int) -> collections.OrderedDict
    """Creates an OrderedDict containing further information about a given scene ID by querying the pgSQL database.
48
49
50

    :param sceneid:   <int> the GMS scene ID to get information for
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
51

52
53
    def query(tablename, vals2return, cond_dict, records2fetch=0):
        return get_info_from_postgreSQLdb(CFG.job.conn_database, tablename, vals2return, cond_dict, records2fetch)
54
    resultset = query('scenes', ['datasetid', 'satelliteid', 'sensorid', 'subsystemid', 'acquisitiondate', 'entityid',
55
56
57
                                 'filename'], {'id': sceneid})
    if len(resultset) == 0:
        sys.stderr.write("Scene with id %s not found. Skipping.." % sceneid)
58
59
60

    scenedata = resultset[0]
    ds = collections.OrderedDict()
61
62
63
64
65
66
67
68
69
70
71
    proc_level_tmp = query('scenes_proc', 'proc_level', {'sceneid': sceneid})
    ds.update({'proc_level': 'L0A' if proc_level_tmp == [] else proc_level_tmp[0][0]})
    ds.update({'scene_ID': sceneid})
    ds.update({'datasetid': scenedata[0]})
    ds.update({'image_type': query('datasets', 'image_type', {'id': scenedata[0]})[0][0]})
    ds.update({'satellite': query('satellites', 'name', {'id': scenedata[1]})[0][0]})
    ds.update({'sensor': query('sensors', 'name', {'id': scenedata[2]})[0][0]})
    ds.update({'subsystem': query('subsystems', 'name', {'id': scenedata[3]})[0][0] if scenedata[3] else None})
    ds.update({'acq_datetime': scenedata[4]})
    ds.update({'entity_ID': scenedata[5]})
    ds.update({'filename': scenedata[6]})
72
73
74
    return ds


75
def get_info_from_SQLdb(path_db, tablename, vals2return, cond_dict, records2fetch=0):
Daniel Scheffler's avatar
Daniel Scheffler committed
76
    # type: (str,str,list,dict,int) -> Union[list, str]
77
    """Queries an SQL database for the given parameters.
78

79
80
81
82
    :param path_db:         <str> the physical path of the SQL database on disk
    :param tablename:       <str> name of the table within the database to be queried
    :param vals2return:     <list or str> a list of strings containing the column titles of the values to be returned
    :param cond_dict:       <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
83
84
    :param records2fetch:   <int> number of records to be fetched (default=0: fetch unlimited records)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
85

86
87
88
89
90
91
    if not isinstance(vals2return, list):
        vals2return = [vals2return]
    assert isinstance(records2fetch, int), \
        "get_info_from_SQLdb: Expected an integer for the argument 'records2return'. Got %s" % type(records2fetch)
    if not os.path.isfile(path_db):
        return 'database connection fault'
92
    connection = sqlite3.connect(path_db)
93
94
95
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join(["%s=?" % (list(cond_dict.keys())[i]) for i in range(len(cond_dict))])
    cursor.execute("SELECT " + ','.join(vals2return) + " FROM " + tablename + " " + condition, list(cond_dict.values()))
96
    records2return = cursor.fetchall() if records2fetch == 0 else [cursor.fetchone()] if records2fetch == 1 else \
97
        cursor.fetchmany(size=records2fetch)  # e.g. [('LE71950282003121EDC00',), ('LE71950282003105ASN00',)]
98
99
100
101
    cursor.close()
    connection.close()
    return records2return

102

103
def get_postgreSQL_value(value):
104
    # type: (any) -> str
105
106
107
    """Converts Python variable to a postgreSQL value respecting postgreSQL type casts.
    The resulting value can be directly inserted into a postgreSQL query."""

108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
    assert type(value) in [int, float, bool, str, Polygon, datetime, list, tuple] or value is None, \
        "Unsupported value type within postgreSQL matching expression. Got %s." % type(value)
    if isinstance(value, int):
        pgV = value
    elif isinstance(value, float):
        pgV = value
    elif isinstance(value, bool):
        pgV = value
    elif value is None:
        pgV = 'NULL'
    elif isinstance(value, str):
        pgV = "'%s'" % value.replace("'", "")
    elif isinstance(value, Polygon):
        pgV = "'%s'" % value.wkb_hex
    elif isinstance(value, datetime):
        pgV = "TIMESTAMP '%s'" % str(value)
    else:  # list or tuple in value
        if not value:  # empty list/tuple
126
127
128
            pgV = 'NULL'
        else:
            dTypes_in_value = list(set([type(i) for i in value]))
129
130
131
132
133
            assert len(dTypes_in_value) == 1, \
                'Mixed data types in postgreSQL matching expressions are not supported. Got %s.' % dTypes_in_value
            assert dTypes_in_value[0] in [int, str, float, np.int64]
            pgList = ",".join(["'%s'" % i if isinstance(value[0], str) else "%s" % i for i in value])
            pgV = "'{%s}'" % pgList
134
135
136
    return pgV


137
def get_postgreSQL_matchingExp(key, value):
138
    # type: (str,any) -> str
139
140
141
142
    """Converts a key/value pair to a postgreSQL matching expression in the form "column=value" respecting postgreSQL
    type casts. The resulting string can be directly inserted into a postgreSQL query.
    """
    pgVal = get_postgreSQL_value(value)
143
144
145
146
    if isinstance(pgVal, str) and pgVal.startswith("'{") and pgVal.endswith("}'"):
        return '%s in %s' % (key, pgVal.replace("'{", '(').replace("}'", ')'))  # '{1,2,3}' => (1,2,3)
    elif pgVal == 'NULL':
        return '%s is NULL' % key
147
    else:
148
        return '%s=%s' % (key, pgVal)
149

150

151
def get_info_from_postgreSQLdb(conn_params, tablename, vals2return, cond_dict=None, records2fetch=0, timeout=15000):
Daniel Scheffler's avatar
Daniel Scheffler committed
152
    # type: (str, str, Union[list, str], dict, int, int) -> Union[list, str]
153
    """Queries a postgreSQL database for the given parameters.
154

155
    :param conn_params:     <str> connection parameters as provided by CFG.job.conn_params
156
157
158
    :param tablename:       <str> name of the table within the database to be queried
    :param vals2return:     <list or str> a list of strings containing the column titles of the values to be returned
    :param cond_dict:       <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
159
160
                            HINT: <value> can also be a list or a tuple of elements to match
    :param records2fetch:   <int> number of records to be fetched (default=0: fetch unlimited records)
161
162
    :param timeout:         <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
163

164
165
166
167
168
169
    if not isinstance(vals2return, list):
        vals2return = [vals2return]
    assert isinstance(records2fetch, int), \
        "get_info_from_postgreSQLdb: Expected an integer for the argument 'records2return'. Got %s" % type(
            records2fetch)
    cond_dict = cond_dict if cond_dict else {}
170
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
171
    connection = psycopg2.connect(conn_params)
172
173
174
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
175
176
177
178
179
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
    cmd = "SELECT " + ','.join(vals2return) + " FROM " + tablename + " " + condition
    execute_pgSQL_query(cursor, cmd)
180

181
    records2return = cursor.fetchall() if records2fetch == 0 else [cursor.fetchone()] if records2fetch == 1 else \
182
        cursor.fetchmany(size=records2fetch)  # e.g. [('LE71950282003121EDC00',), ('LE71950282003105ASN00',)]
183
184
185
186
    cursor.close()
    connection.close()
    return records2return

187

188
def update_records_in_postgreSQLdb(conn_params, tablename, vals2update_dict, cond_dict=None, timeout=15000):
189
    # type: (str, str, dict, dict, int) -> Union[None, str]
190
    """Queries a postgreSQL database for the given parameters and updates the given columns of the query result.
191

192
    :param conn_params:       <str> connection parameters as provided by CFG.job.conn_params
193
194
195
196
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2update_dict:  <dict> a dictionary containing keys and values to be set in the form {'col_name':<value>}
    :param cond_dict:         <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                              HINT: <value> can also be a list or a tuple of elements to match
197
198
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
199

200
    cond_dict = cond_dict if cond_dict else {}
201
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
202
    connection = psycopg2.connect(conn_params)
203
204
205
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
206
207
208
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
209
210
    update_cond = "SET " + ', '.join(['%s=%s' % (k, get_postgreSQL_value(vals2update_dict[k]))
                                      for k in vals2update_dict.keys()])
211
    execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition)
212
    if cursor.fetchone()[0] == 0:
213
        warnings.warn("No record found fulfilling this condition: \n'%s'." % condition)
214
    else:
215
        execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + update_cond + " " + condition)
216

217
218
219
    if 'connection' in locals():
        connection.commit()
        connection.close()
220
221


222
def append_item_to_arrayCol_in_postgreSQLdb(conn_params, tablename, vals2append_dict, cond_dict=None, timeout=15000):
223
    # type: (str, str, dict, dict, int) -> Union[None, str]
224
225
226
    """Queries a postgreSQL database for the given parameters
    and appends the given value to the specified column of the query result.

227
    :param conn_params:       <str> connection parameters as provided by CFG.job.conn_params
228
229
230
231
232
233
234
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2append_dict:  <dict> a dictionary containing keys and value(s) to be set in the form
                              {'col_name':[<value>,<value>]}
    :param cond_dict:         <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                              HINT: <value> can also be a list or a tuple of elements to match
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
235

236
237
    assert len(vals2append_dict) == 1, 'Values can be appended to only one column at once.'
    if type(list(vals2append_dict.values())[0]) in [list, tuple]:
238
        raise NotImplementedError('Appending multiple values to one column at once is not yet supported.')
239
    cond_dict = cond_dict if cond_dict else {}
240
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
241
    connection = psycopg2.connect(conn_params)
242
243
244
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
245
246
247
248
249
250
251
252
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
    col2update = list(vals2append_dict.keys())[0]
    pgSQL_val = get_postgreSQL_value(vals2append_dict[col2update])
    pgSQL_val = pgSQL_val if type(vals2append_dict[col2update]) in [list, tuple] else '{%s}' % pgSQL_val
    append_cond = "SET %s = array_cat(%s, '%s')" % (col2update, col2update, pgSQL_val)
    execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition)
253
    if cursor.fetchone()[0] == 0:
254
        warnings.warn("No record found fulfilling this condition: \n'%s'." % condition)
255
    else:
256
257
258
259
        execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + append_cond + " " + condition + ';')
    if 'connection' in locals():
        connection.commit()
        connection.close()
260
261
262


def remove_item_from_arrayCol_in_postgreSQLdb(conn_params, tablename, vals2remove_dict, cond_dict=None, timeout=15000):
263
    # type: (str, str, dict, dict, int) -> Union[None, str]
264
265
266
    """Queries a postgreSQL database for the given parameters
    and removes the given value from the specified column of the query result.

267
    :param conn_params:       <str> connection parameters as provided by CFG.job.conn_params
268
269
270
271
272
273
274
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2remove_dict:  <dict> a dictionary containing keys and value(s) to be set in the form
                              {'col_name':[<value>,<value>]}
    :param cond_dict:         <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                              HINT: <value> can also be a list or a tuple of elements to match
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
275

276
277
    assert len(vals2remove_dict) == 1, 'Values can be removed from only one column at once.'
    if type(list(vals2remove_dict.values())[0]) in [list, tuple]:
278
        raise NotImplementedError('Removing multiple values from one column at once is not yet supported.')
279
    cond_dict = cond_dict if cond_dict else {}
280
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
281
    connection = psycopg2.connect(conn_params)
282
283
284
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
285
286
287
288
289
290
291
292
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
    col2update = list(vals2remove_dict.keys())[0]
    pgSQL_val = get_postgreSQL_value(vals2remove_dict[col2update])
    pgSQL_val = pgSQL_val if type(vals2remove_dict[col2update]) in [list, tuple] else '{%s}' % pgSQL_val
    remove_cond = "SET %s = array_remove(%s, '%s')" % (col2update, col2update, pgSQL_val)
    execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition)
293
    if cursor.fetchone()[0] == 0:
294
        warnings.warn("No record found fulfilling this condition: \n'%s'." % condition)
295
    else:
296
297
298
299
        execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + remove_cond + " " + condition + ';')
    if 'connection' in locals():
        connection.commit()
        connection.close()
300
301


302
303
def increment_decrement_arrayCol_in_postgreSQLdb(conn_params, tablename, col2update, idx_val2decrement=None,
                                                 idx_val2increment=None, cond_dict=None, timeout=15000):
304
    # type: (str, str, str, int, int, dict, int) -> Union[None, str]
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
    """Updates an array column of a specific postgreSQL table in the form that it increments or decrements the elements
    at a given position. HINT: The column must have values like that: [0,1,0,3,1,0]

    :param conn_params:         <str> connection parameters as provided by CFG.job.conn_params
    :param tablename:           <str> name of the table within the database to be update
    :param col2update:          <str> column name of the column to be updated
    :param idx_val2decrement:   <int> the index of the array element to be decremented (starts with 1)
    :param idx_val2increment:   <int> the index of the array element to be incremented (starts with 1)
    :param cond_dict:           <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                                HINT: <value> can also be a list or a tuple of elements to match
    :param timeout:             <int> allows to set a custom statement timeout (milliseconds)
    :return:
    """

    cond_dict = cond_dict if cond_dict else {}
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
    connection = psycopg2.connect(conn_params)
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""

    dec_str = '' if idx_val2decrement is None else \
330
        "%s[%s] = %s[%s]-1" % (col2update, idx_val2decrement, col2update, idx_val2decrement)
331
    inc_str = '' if idx_val2increment is None else \
332
        "%s[%s] = %s[%s]+1" % (col2update, idx_val2increment, col2update, idx_val2increment)
333
334
335

    if dec_str or inc_str:
        dec_inc_str = ','.join([dec_str, inc_str])
336
        execute_pgSQL_query(cursor, "UPDATE %s SET %s %s" % (tablename, dec_inc_str, condition))
337

338
339
340
    if 'connection' in locals():
        connection.commit()
        connection.close()
341
342


343
def create_record_in_postgreSQLdb(conn_params, tablename, vals2write_dict, timeout=15000):
344
    # type: (str, str, dict, dict, int) -> Union[int, str]
345
346
    """Creates a single new record in a postgreSQL database and pupulates its columns with the given values.

347
    :param conn_params:       <str> connection parameters as provided by CFG.job.conn_params
348
349
350
351
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2write_dict:   <dict> a dictionary containing keys and values to be set in the form {'col_name':<value>}
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
352

353
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
354
    connection = psycopg2.connect(conn_params)
355
356
357
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
358
    cursor = connection.cursor()
359
360
361
362

    keys, vals = zip(*[(k, str(get_postgreSQL_value(v))) for k, v in vals2write_dict.items()])

    execute_pgSQL_query(cursor, "INSERT INTO %s (%s) VALUES (%s);" % (tablename, ','.join(keys), ','.join(vals)))
363
    execute_pgSQL_query(cursor, "SELECT id FROM %s ORDER BY id DESC LIMIT 1" % tablename)
364
365
    newID = cursor.fetchone()[0]

366
367
368
    if 'connection' in locals():
        connection.commit()
        connection.close()
369
370
371
372

    return newID


373
def get_pgSQL_geospatial_query_cond(conn_params, table2query, geomCol2use='bounds', tgt_corners_lonlat=None,
374
                                    scene_ID=None, queryfunc='ST_Intersects', crossing_dateline_check=True):
375
    assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
376

377
    if tgt_corners_lonlat:
378
379
380
381
382
383
384
        # handle coordinates crossing the 180 degress meridian (dateline)
        # FIXME in that case the polygone has to be split at the dateline. otherwise pgSQL may yield wrong results
        if crossing_dateline_check:
            xvals = [x for x, y in tgt_corners_lonlat]
            if max(xvals) - min(xvals) > 180:
                tgt_corners_lonlat = [(x, y) if x > 0 else (x + 360, y) for x, y in tgt_corners_lonlat]

385
        from .helper_functions import cornerLonLat_to_postgreSQL_poly
386
        pGSQL_poly = cornerLonLat_to_postgreSQL_poly(tgt_corners_lonlat)
387
388
389
390
391
        src_geom = "'SRID=4326;%s'::geometry" % pGSQL_poly  # source geometry is given
        # FIXME scenes tabelle hat "geography" geoinfos -> eigener Index wird bei "geometry" nicht genutzt:
        tgt_geom = "%s.%s::geometry" % (table2query, geomCol2use)
        geocond = "%s(%s, %s)" % (queryfunc, src_geom, tgt_geom)
    else:  # scene_ID is not None:
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
392
        connection = psycopg2.connect(conn_params)
393
394
        if connection is None:
            return 'database connection fault'
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
395
        cursor = connection.cursor()
396
397
398
        cmd = "SELECT ST_AsText(bounds) FROM scenes WHERE scenes.id = %s" % scene_ID
        execute_pgSQL_query(cursor, cmd)
        res = cursor.fetchone()
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
399
400
401
        cursor.close()
        connection.close()
        if len(res):
402
            src_geom = "'SRID=4326;%s'::geometry" % res
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
403
404
405
        else:
            print('The scene with the ID %s does not exist in the scenes table.')
            return []
406
        geocond = "%s(%s, %s.%s::geometry)" % (queryfunc, src_geom, table2query, geomCol2use)
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
407
408
409
    return geocond


410
def get_overlapping_scenes_from_postgreSQLdb(conn_params, table='scenes_proc', scene_ID=None,
411
                                             tgt_corners_lonlat=None, conditions=None, add_cmds='', timeout=15000):
412
    # type: (str, str, int, list, Union[list, str], str, int) -> Union[list, str]
Daniel Scheffler's avatar
Daniel Scheffler committed
413

414
415
    """Queries the postgreSQL database in order to find those scenes of a specified reference satellite (Landsat-8 or
    Sentinel-2) that have an overlap to the given corner coordinates AND that fulfill the given conditions.
416

417
    :param conn_params:             <str> connection parameters as provided by CFG.job.conn_params
418
419
    :param table:                   <str> name of the table within the database to be updated
    :param scene_ID:                <int> a sceneID to get the target geographical extent from
420
421
422
                                        (needed if tgt_corners_lonlat is not provided)
    :param tgt_corners_lonlat:      <list> a list of coordinates defining the target geographical extent
                                           (needed if scene_ID is not provided)
423
424
425
426
    :param conditions:              <list> a list of additional query conditions
    :param add_cmds:                <str> additional pgSQL commands to be added to the pgSQL query
    :param timeout:                 <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
427

428
    conditions = [] if conditions is None else conditions if isinstance(conditions, list) else [conditions]
429
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
430
    connection = psycopg2.connect(conn_params)
431
432
    if connection is None:
        return 'database connection fault'
433
    datasetids = [int(d.split('=')[1].strip()) for d in conditions if d.startswith('datasetid')]
434
435
436
437
438
439
440
    datasetid = datasetids[0] if datasetids else 104  # Landsat-8
    # FIXME: use Landsat-8 instead of Sentinel-2 as long as S2 L1A_P is not working:
    datasetid = 104 if datasetid == 249 else datasetid

    if table != 'scenes_proc':
        assert datasetid, "filtdsId is needed if table is not 'scenes_proc'"
    if scene_ID is None:
441
        assert tgt_corners_lonlat, "Provide either scene_ID or tgt_corners_lonlat!"
442
    if tgt_corners_lonlat is None:
443
        assert scene_ID, "Provide either scene_ID or tgt_corners_lonlat!"
444
445
446
447
448
449
450
451
452
453

    val2get = "scenes.id" if table == 'scenes' else "%s.sceneid" % table
    # refcond  = ['scenes_proc.georef = True'] if not datasetids else ['scenes.datasetid = %s' %datasetid]
    refcond = ['scenes.datasetid = %s' % datasetid]

    geocond = [get_pgSQL_geospatial_query_cond(conn_params, table, tgt_corners_lonlat=tgt_corners_lonlat,
                                               scene_ID=scene_ID, queryfunc='ST_Intersects',
                                               crossing_dateline_check=True)]

    join = "INNER JOIN scenes ON (%s.sceneid = scenes.id) " % table if table != 'scenes' and datasetids else ''
454
    conditions = [c for c in conditions if not c.startswith('datasetid')]
455
456
457
458
    where = "WHERE %s" % " AND ".join(geocond + refcond + conditions)
    usedtbls = "scenes" if table == 'scenes' else "%s, scenes" % table if 'scenes.' in where and join == '' else table
    query = "SELECT %s FROM %s %s%s %s" % (val2get, usedtbls, join, where, add_cmds)
    cursor = connection.cursor()
459
    execute_pgSQL_query(cursor, query)
460
461
462
463
464
    records2return = cursor.fetchall()
    cursor.close()
    connection.close()
    return records2return

465

466
def get_overlapping_MGRS_tiles(conn_params, scene_ID=None, tgt_corners_lonlat=None, timeout=15000):
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
467
468
    """In contrast to pgSQL 'Overlapping' here means that both geometries share some spatial area.
    So it combines ST_Overlaps and ST_Contains."""
469
    assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
470
471
472

    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
    connection = psycopg2.connect(conn_params)
473
474
    if connection is None:
        return 'database connection fault'
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
475
476

    vals2get = ['grid100k', 'grid1mil', 'geom']
477
    # FIXME this is covered by ST_Intersects:
Daniel Scheffler's avatar
Daniel Scheffler committed
478
    # geocond1 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Overlaps',
479
    #                                            tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
Daniel Scheffler's avatar
Daniel Scheffler committed
480
481
482
483
484
    # geocond2 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Contains',
    #                                            tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
    # geocond3 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Within',
    #                                            tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
    geocond = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Intersects',
485
486
                                              tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID,
                                              crossing_dateline_check=True)
487
488
    # query = "SELECT %s FROM %s WHERE %s OR %s OR %s"
    #     % (', '.join(vals2get), 'mgrs_tiles', geocond1, geocond2, geocond3)
Daniel Scheffler's avatar
Daniel Scheffler committed
489
    query = "SELECT %s FROM %s WHERE %s" % (', '.join(vals2get), 'mgrs_tiles', geocond)
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
490
491
492
493
494
495
496
497
    cursor = connection.cursor()
    execute_pgSQL_query(cursor, query)
    records = cursor.fetchall()
    cursor.close()
    connection.close()

    GDF = GeoDataFrame(records, columns=['grid100k', 'grid1mil', 'wkb_hex'])

498
499
500
    GDF['shapelyPoly_LonLat'] = list(GDF['wkb_hex'].map(lambda wkb_hex: shapely.wkb.loads(wkb_hex, hex=True)))
    GDF['granuleid'] = GDF['grid1mil'].str.cat(GDF['grid100k'])
    return GDF[['granuleid', 'shapelyPoly_LonLat']]
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
501
502


503
504
def get_overlapping_MGRS_tiles2(conn_params, scene_ID=None, tgt_corners_lonlat=None, timeout=15000):
    assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
505
506
507

    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
    connection = psycopg2.connect(conn_params)
508
509
    if connection is None:
        return 'database connection fault'
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
510
511

    vals2get = ['granuleid', 'footprint_wgs84']
512
513
    geocond = get_pgSQL_geospatial_query_cond(conn_params, 'footprints_sentinel2_granules',
                                              geomCol2use='footprint_wgs84',
514
                                              tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
515
516
517
518
519
520
521
522
523
524
    query = "SELECT %s FROM %s WHERE %s" % (', '.join(vals2get), 'footprints_sentinel2_granules', geocond)

    cursor = connection.cursor()
    execute_pgSQL_query(cursor, query)
    records = cursor.fetchall()
    cursor.close()
    connection.close()

    GDF = GeoDataFrame(records, columns=['granuleid', 'wkb_hex'])

525
    GDF['shapelyPoly_LonLat'] = list(GDF['wkb_hex'].map(lambda wkb_hex: shapely.wkb.loads(wkb_hex, hex=True)))
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
526
527
528
529

    return GDF[['granuleid', 'shapelyPoly_LonLat']]


530
531
532
def get_dict_satellite_name_id(conn_params):
    # type: (str) -> dict
    """Returns a dictionary with satellite names as keys and satellite IDs as values as read from pgSQL database.
533
534
535

    :param conn_params:     <str> pgSQL database connection parameters
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
536

537
538
    res = get_info_from_postgreSQLdb(conn_params, 'satellites', ['name', 'id'])
    assert len(res) > 0, 'Error getting satellite names from postgreSQL database.'
539
    arr = np.array(res)
540
    return dict(zip(list(arr[:, 0]), list(arr[:, 1])))
541
542
543
544
545
546


def get_dict_sensor_name_id(conn_params):
    # type: (str) -> dict
    """Returns a dictionary with sensor names as keys and sensor IDs as values as read from pgSQL database.
    :param conn_params:     <str> pgSQL database connection parameters """
Daniel Scheffler's avatar
Daniel Scheffler committed
547

548
549
    res = get_info_from_postgreSQLdb(conn_params, 'sensors', ['name', 'id'])
    assert len(res) > 0, 'Error getting sensor names from postgreSQL database.'
550
    arr = np.array(res)
551
    return dict(zip(list(arr[:, 0]), list(arr[:, 1])))
552
553


554
def get_entityIDs_from_filename(conn_DB, filename):
555
556
557
    # type: (str, str) -> list
    """Returns entityID(s) for the given filename. In case of Sentinel-2 there can be more multiple entity IDs if
    multiple granules are saved in one .zip file.
558

559
560
561
    :param conn_DB:     <str> pgSQL database connection parameters
    :param filename:    <str> the filename to get the corresponding entity ID(s) for
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
562

563
    if filename[:2] in ['LE', 'LC', 'LO'] and filename.endswith('.tar.gz'):  # Landsat
564
565
        entityIDs = [filename.split('.tar.gz')[0]]
    else:
566
567
568
        print('Querying database in order to get entityIDs for %s...' % filename)
        res = get_info_from_postgreSQLdb(conn_DB, 'scenes', ['entityid'], {'filename': filename}, timeout=40000)
        entityIDs = [subres[0] for subres in res] if len(res) > 0 else []
569
570
571
572
573
574
    return entityIDs


def get_filename_by_entityID(conn_DB, entityid, satellite):
    # type: (str,str,str) -> str
    """Returns the filename for the given entity ID.
575

576
577
578
579
    :param conn_DB:     <str> pgSQL database connection parameters
    :param entityid:    <str> entity ID
    :param satellite:   <str> satellite name to which the entity ID is belonging
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
580

581
582
583
584
585
586
587
588
589
590
    if re.search('Landsat', satellite, re.I):
        filename = '%s.tar.gz' % entityid
    elif re.search('Sentinel-2', satellite, re.I):
        filename = get_info_from_postgreSQLdb(conn_DB, 'scenes', ['filename'],
                                              {'entityid': entityid}, records2fetch=1)[0][0]
    else:
        raise NotImplementedError
    return filename


591
def get_notDownloadedsceneIDs(conn_DB, entityIDs, satellite, sensor, src_folder):
592
593
594
595
    # type: (str,list,str,str,str) -> np.ndarray
    """Takes a list of entity IDs and extracts those records that have the corresponding archive file in the given
    source folder and that have the processing level 'METADATA' in the pgSQL database. Based on this subset a numpy
    array containing the corresponding scene IDs and the target filenames for the fileserver is returned.
596

597
598
599
600
601
602
    :param conn_DB:     <str> pgSQL database connection parameters
    :param entityIDs:   <list> a list of entity IDs
    :param satellite:   <str> the name of the satellite to restrict the query on
    :param sensor:      <str> the name of the sensor to restrict the query on
    :param src_folder:  <str> the source directory where archive files are saved
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
603

604
605
606
    columns = ['id', 'entityid', 'satelliteid', 'sensorid', 'filename', 'proc_level']
    result = get_info_from_postgreSQLdb(conn_DB, 'scenes', columns, {'entityid': entityIDs})
    df = pd.DataFrame(result, columns=columns)
607
608

    satNameID_dic = get_dict_satellite_name_id(conn_DB)
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
    satID = satNameID_dic[satellite]
    target_folder = os.path.join(CFG.job.path_archive, satellite, sensor)

    def get_fName(entityid): return get_filename_by_entityID(conn_DB, entityid, satellite)

    def tgt_exists(fileName): return os.path.exists(os.path.join(target_folder, fileName))

    def src_exists(entityid):
        return os.path.exists(os.path.join(src_folder, get_filename_by_entityID(conn_DB, entityid, satellite)))
    df['tgt_fileName'] = list(df['entityid'].map(get_fName))
    df['tgtFile_exists'] = list(df['tgt_fileName'].map(tgt_exists))
    df['srcFile_exists'] = list(df['entityid'].map(src_exists))
    tgt_satID = (df.satelliteid == float(satID))
    # isDL = (df.proc_level == 'DOWNLOADED')
    isMET = (df.proc_level == 'METADATA')
    # tgtE = (df.tgtFile_exists == True)
    srcE = df.srcFile_exists  # (df.srcFile_exists == True)
    # sceneIDs_notDL_tgtE = df[tgt_satID & (isDL == 0) & tgtE]  # maybe needed later
    # sceneIDs_DL_tgtNE = df[tgt_satID & isDL & (tgtE == 0)]  # maybe needed later
    # sceneIDs_DL_tgtE = df[tgt_satID & isDL & tgtE]  # maybe needed later
    sceneIDs_isMET_srcE = df[tgt_satID & isMET & srcE]
    return sceneIDs_isMET_srcE[['id', 'tgt_fileName']].values
631
632


633
class GMS_JOB(object):
634
    """gms_preprocessing job manager"""
635

636
    def __init__(self, conn_db):
637
        # type: (str) -> None
638
        """
639
        :param conn_db: <str> the database connection parameters as given by CFG.job.conn_params
640
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
641

642
643
644
        self.conn = conn_db
        self.dataframe = GeoDataFrame()
        self.scene_counts = {}  # set by self.create()
645

646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
        self.exists_in_db = False
        self.id = None  #: int
        self.creationtime = datetime.now()
        self.finishtime = None
        self.sceneids = []
        self.timerange_start = datetime.min
        self.timerange_end = datetime.max
        self.bounds = box(-180, -90, 180, 90)
        self.distribution_index = None
        self.progress = None
        self.feedback = None
        self.failed_sceneids = []
        self.ref_job_id = None
        self.datacube_mgrs_tiles_proc = []
        self.jobs_table_columns = ['id', 'creationtime', 'finishtime', 'sceneids', 'timerange_start',
                                   'timerange_end', 'bounds', 'distribution_index', 'progress', 'feedback',
                                   'failed_sceneids', 'datasetid_spatial_ref',
                                   'virtualsensorid', 'ref_job_id', 'datacube_mgrs_tiles_proc', 'comment',
                                   'status', 'statistics']
        self.virtualsensorid = None  # set by self._set_target_sensor_specs()
        # FIXME notnull but not getable via virtual sensor id and not needed anymore in DB (gsd is given):
        self.datasetid_spatial_ref = 249
        self.datasetname_spatial_ref = 'SENTINEL-2A'
        self.status = None
        self.statistics = []
        self.comment = None
        self.epsg = None  # set by self._set_target_sensor_specs()
        self.ground_spatial_sampling = None  # set by self._set_target_sensor_specs()
674

675
676
677
    def __repr__(self):
        return 'GMS job:\n\n' + GeoSeries(self.db_entry).to_string()

678
    def _set_target_sensor_specs(self, virtual_sensor_id, datasetid_spatial_ref):
679
        self.virtualsensorid = virtual_sensor_id
680
681
682
683
684
685
686
687
688
689
690
691
692
        res = get_info_from_postgreSQLdb(self.conn, 'virtual_sensors', ['spatial_resolution',
                                                                        "projection_epsg"], {'id': virtual_sensor_id})
        assert res, \
            "'virtual_sensor_id'=%s does not exist in the table 'virtual_sensors' of the database." % virtual_sensor_id
        target_gsd = res[0][0]
        self.ground_spatial_sampling = [target_gsd, target_gsd] if type(target_gsd) in [int, float] else target_gsd
        self.epsg = int(res[0][1])

        assert isinstance(datasetid_spatial_ref, int)
        self.datasetid_spatial_ref = datasetid_spatial_ref
        res = get_info_from_postgreSQLdb(self.conn, 'datasets', ['name'], {'id': datasetid_spatial_ref})
        assert res, \
            "'datasetid_spatial_ref'=%s does not exist in the table 'datasets' of the database." % datasetid_spatial_ref
693
694
695
696
        self.datasetname_spatial_ref = res

    @property
    def db_entry(self):
697
698
        """Returns an OrderedDict containing keys and values of the database entry.
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
699

700
701
        db_entry = collections.OrderedDict()
        for i in self.jobs_table_columns:
702
            db_entry[i] = getattr(self, i)
703
704
        return db_entry

705
    def from_dictlist(self, dictlist_data2process, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
706
        # type: (list, int, int, str) -> object
707
        """
708
        :param dictlist_data2process:  <list> a list of dictionaries containing the keys "satellite", "sensor" and
709
710
711
712
713
714
                                        "filenames",
                                        e.g. [{'satellite:'Landsat-8,'sensor':'OLI_TIRS','filenames':file.tar.gz},{...}]
        :param virtual_sensor_id :     <int> a valid ID from the 'virtual_sensors' table of the postgreSQL database
        :param datasetid_spatial_ref:  <int> a valid dataset ID of the dataset to be chosen as spatial reference
                                        (from the 'datasets' table of the postgreSQL database)
                                        (default:249 - Sentinel-2A), 104=Landsat-8
715
        :param comment:                <str> a comment describing the job (e.g. 'Beta job')
716
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
717

718
        self._set_target_sensor_specs(virtual_sensor_id, datasetid_spatial_ref)
719
        self.comment = comment
720
721
722

        dictlist_data2process = dictlist_data2process if dictlist_data2process else []

723
        for idx, datadict in enumerate(dictlist_data2process):
724
            assert isinstance(datadict, dict), "Expected only dictionaries within 'dictlist_data2process'. " \
725
                                               "Got %s in there." % type(datadict)
726
            assert False not in [i in datadict for i in ['satellite', 'sensor', 'filenames']]
727
            assert type(datadict['filenames']) in [list, str]
728

729
            if isinstance(datadict['filenames'], str):
730
731
732
733
734
735
                if datadict['filenames'].endswith('.csv'):
                    assert os.path.exists(datadict['filenames'])
                else:
                    datadict['filenames'] = [datadict['filenames']]

        # find all duplicates in input datadicts and build common geodataframe
736
        all_gdfs = []
737
        for datadict in dictlist_data2process:
738
            assert isinstance(datadict, dict)
739

740
741
            if isinstance(datadict['filenames'], str) and datadict['filenames'].endswith('.csv'):
                datadict['filenames'] = None  # TODO implement csv reader here
742
743
744
745
                raise NotImplementedError
            else:
                temp_gdf = GeoDataFrame(datadict, columns=['satellite', 'sensor', 'filenames'])
                if re.search('Landsat-7', datadict['satellite'], re.I) and re.search('ETM+', datadict['sensor'], re.I):
746
                    from .helper_functions import Landsat_entityID_decrypter as LED
747
748
749

                    def get_L7_sensor(fN): return LED(fN.split('.tar.gz')[0]).sensorIncSLC
                    temp_gdf['sensor'] = list(temp_gdf['filenames'].map(get_L7_sensor))
750
751
752
753
754
755

                all_gdfs.append(temp_gdf)

        gdf = GeoDataFrame(pd.concat(all_gdfs)).drop_duplicates()
        gdf.columns = ['satellite', 'sensor', 'filename']

756
757
758
759
760
761
762
763
764
765
766
        # run self.from_dictlist
        sceneInfoGDF = self._get_validated_sceneInfoGDFs(gdf)

        # populate attributes
        self._populate_jobAttrs_from_sceneInfoGDF(sceneInfoGDF)

        return self

    def from_sceneIDlist(self, list_sceneIDs, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
        # type: (list, int, int, str) -> object
        """
767
        Create a GMS_JOB instance based on the given list of scene IDs.
Daniel Scheffler's avatar
Bugfix    
Daniel Scheffler committed
768

769
770
771
772
773
774
775
776
        :param list_sceneIDs:          <list> of scene IDs, e.g. [26781907, 26781917, 26542650, 26542451, 26541679]
        :param virtual_sensor_id :     <int> a valid ID from the 'virtual_sensors' table of the postgreSQL database
        :param datasetid_spatial_ref:  <int> a valid dataset ID of the dataset to be chosen as spatial reference
                                        (from the 'datasets' table of the postgreSQL database)
                                        (default:249 - Sentinel-2A), 104=Landsat-8
        :param comment:                <str> a comment describing the job (e.g. 'Beta job')
        """

777
        self._set_target_sensor_specs(virtual_sensor_id, datasetid_spatial_ref)
778
779
780
781
782
        self.comment = comment

        list_sceneIDs = list(list_sceneIDs)

        # query 'satellite', 'sensor', 'filename' from database and summarize in GeoDataFrame
783
        with psycopg2.connect(self.conn) as conn:
784
785
786
787
788
789
790
791
            with conn.cursor() as cursor:
                execute_pgSQL_query(cursor,
                                    """SELECT scenes.id, satellites.name, sensors.name, scenes.filename FROM scenes
                                    LEFT JOIN satellites on scenes.satelliteid=satellites.id
                                    LEFT JOIN sensors on scenes.sensorid=sensors.id
                                    WHERE scenes.id in (%s)""" % ','.join([str(i) for i in list_sceneIDs]))
                gdf = GeoDataFrame(cursor.fetchall(), columns=['sceneid', 'satellite', 'sensor', 'filename'])

792
793
794
        # FIXME overwriting 'ETM+_SLC_OFF' with 'ETM+' causes _get_validated_sceneInfoGDFs() to fail because the
        # FIXME sensorid for ETM+_SLC_OFF cannot be retrieved
        # gdf['sensor'] = gdf['sensor'].apply(lambda senN: senN if senN != 'ETM+_SLC_OFF' else 'ETM+')
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
        gdf = gdf.drop_duplicates()

        if gdf.empty:
            raise ValueError('None of the given scene IDs could be found in the GeoMultiSens database. '
                             'Job creation failed.')
        else:
            missing_IDs = [i for i in list_sceneIDs if i not in gdf['sceneid'].values]
            if missing_IDs:
                warnings.warn('The following scene IDs could not been found in the GeoMultiSens database: \n%s'
                              % '\n'.join([str(i) for i in missing_IDs]))

            # run self.from_dictlist
            sceneInfoGDF = self._get_validated_sceneInfoGDFs(gdf)

            # populate attributes
            self._populate_jobAttrs_from_sceneInfoGDF(sceneInfoGDF)

        return self

814
815
816
817
818
819
820
821
    def from_entityIDlist(self, list_entityids, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
        """Create a GMS_JOB instance based on the given list of entity IDs.

        :param list_entityids:
        :param virtual_sensor_id:
        :param datasetid_spatial_ref:
        :param comment:
        :return:
822
        """
Daniel Scheffler's avatar
Bugfix    
Daniel Scheffler committed
823

824
        res_sceneIDs = get_info_from_postgreSQLdb(self.conn, 'scenes', ['id', ], {'entityid': list_entityids})
825
826
827
        if not res_sceneIDs:
            raise ValueError('No matching database entries found for the given entity IDs.')

828
        list_sceneIDs = np.array(res_sceneIDs)[:, 0].tolist()
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
        count_no_match = len(list_entityids) - len(list_sceneIDs)

        if count_no_match:
            warnings.warn('%s datasets could not be found the database. They cannot be processed.')

        return self.from_sceneIDlist(list_sceneIDs, virtual_sensor_id,
                                     datasetid_spatial_ref=datasetid_spatial_ref, comment=comment)

    def from_filenames(self, list_filenames, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
        """Create a GMS_JOB instance based on the given list of provider archive filenames.

        :param list_filenames:
        :param virtual_sensor_id:
        :param datasetid_spatial_ref:
        :param comment:
        :return:
845
846
        """

847
        res_sceneIDs = get_info_from_postgreSQLdb(self.conn, 'scenes', ['id', ], {'filename': list_filenames})
848
849
850
        if not res_sceneIDs:
            raise ValueError('No matching database entries found for the given filenames.')

851
        list_sceneIDs = np.array(res_sceneIDs)[:, 0].tolist()
852
853
854
855
856
857
858
        count_no_match = len(list_filenames) - len(list_sceneIDs)

        if count_no_match:
            warnings.warn('%s datasets could not be found the database. They cannot be processed.')

        return self.from_sceneIDlist(list_sceneIDs, virtual_sensor_id,
                                     datasetid_spatial_ref=datasetid_spatial_ref, comment=comment)
859

860
861
862
863
864
865
866
867
868
869
    def _get_validated_sceneInfoGDFs(self, GDF_SatSenFname):
        # type: (GeoDataFrame) -> GeoDataFrame
        """

        :param GDF_SatSenFname:
        :return:
        """

        gdf = GDF_SatSenFname

870
        # loop through all satellite-sensor combinations and get scene information from database
871
872
873
        all_gdf_recs, all_gdf_miss = [], []
        all_satellites, all_sensors = zip(
            *[i.split('__') for i in (np.unique(gdf['satellite'] + '__' + gdf['sensor']))])
874

875
876
        for satellite, sensor in zip(all_satellites, all_sensors):
            cur_gdf = gdf.loc[(gdf['satellite'] == satellite) & (gdf['sensor'] == sensor)]
877
878
879
            filenames = list(cur_gdf['filename'])

            satID_res = get_info_from_postgreSQLdb(self.conn, 'satellites', ['id'], {'name': satellite})
880
            senID_res = get_info_from_postgreSQLdb(self.conn, 'sensors', ['id'], {'name': sensor})
881
            assert len(satID_res), "No satellite named '%s' found in database." % satellite
882
            assert len(senID_res), "No sensor named '%s' found in database." % sensor
883
884

            # append sceneid and wkb_hex bounds
885
886
887
888
889
890
891
892
893
894
895
896
897
            if 'sceneid' in gdf.columns:
                sceneIDs = list(cur_gdf['sceneid'])
                conddict = dict(id=sceneIDs, satelliteid=satID_res[0][0], sensorid=senID_res[0][0])
            else:
                conddict = dict(filename=filenames, satelliteid=satID_res[0][0], sensorid=senID_res[0][0])

            records = get_info_from_postgreSQLdb(
                self.conn, 'scenes', ['filename', 'id', 'acquisitiondate', 'bounds'], conddict)
            records = GeoDataFrame(records, columns=['filename', 'sceneid', 'acquisitiondate', 'geom'])
            if 'sceneid' in gdf.columns:
                del records['sceneid']

            cur_gdf = cur_gdf.merge(records, on='filename', how="outer", copy=False)
898
899

            # separate records with valid matches in database from invalid matches (filename not found in database)
900
901
            gdf_recs = cur_gdf[
                cur_gdf.sceneid.notnull()].copy()  # creates a copy (needed to be able to apply maps later)
902
903
904
            gdf_miss = cur_gdf[cur_gdf.sceneid.isnull()]  # creates a view

            # convert scene ids from floats to integers
905
            gdf_recs['sceneid'] = list(gdf_recs.sceneid.map(lambda sceneid: int(sceneid)))
906
907

            # wkb_hex bounds to shapely polygons
908
            gdf_recs['polygons'] = list(gdf_recs.geom.map(lambda wkb_hex: shapely.wkb.loads(wkb_hex, hex=True)))
909
910
911
912
913
914
915
916
917
918
919
920
921

            all_gdf_recs.append(gdf_recs)
            all_gdf_miss.append(gdf_miss)

        # merge all dataframes of all satellite-sensor combinations
        gdf_recs_compl = GeoDataFrame(pd.concat(all_gdf_recs))
        gdf_miss_compl = GeoDataFrame(pd.concat(all_gdf_miss))

        # populate attributes
        if not gdf_miss_compl.empty:
            warnings.warn('The following scene filenames could not been found in the GeoMultiSens database: \n%s'
                          % '\n'.join(list(gdf_miss_compl['filename'])))

922
923
924
925
926
927
928
929
930
931
932
        return gdf_recs_compl

    def _populate_jobAttrs_from_sceneInfoGDF(self, sceneInfoGDF):
        # type: (GeoDataFrame) -> None
        """

        :param sceneInfoGDF:
        :return:
        """

        if not sceneInfoGDF.empty:
933
934
935
936
            self.dataframe = sceneInfoGDF
            self.sceneids = list(self.dataframe['sceneid'])
            self.statistics = [len(self.sceneids)] + [0] * 8
            self.bounds = box(*MultiPolygon(list(self.dataframe['polygons'])).bounds)
937
938
            self.timerange_start = self.dataframe.acquisitiondate.min().to_pydatetime()
            self.timerange_end = self.dataframe.acquisitiondate.max().to_pydatetime()
939

940
    def from_job_ID(self, job_ID):
941
942
        # type: (int) -> object
        """
943
        Create a GMS_JOB instance by querying the database for a specific job ID.
944
945
        :param job_ID:  <int> a valid id from the database table 'jobs'
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
946

947
        res = get_info_from_postgreSQLdb(self.conn, 'jobs', self.jobs_table_columns, {'id': job_ID})
948
        if not res:
949
            raise ValueError("No job with ID %s found in 'jobs' table of the database." % job_ID)
950

951
        self.exists_in_db = True
952
953
        [setattr(self, attrName, res[0][i]) for i, attrName in enumerate(self.jobs_table_columns)]
        self.bounds = shapely.wkb.loads(self.bounds, hex=True)
954

955
        # fill self.dataframe
956
957
958
959
960
961
962
963
964
965
966
        records = get_info_from_postgreSQLdb(self.conn, 'scenes', ['satelliteid', 'sensorid', 'filename',
                                                                   'id', 'acquisitiondate', 'bounds'],
                                             {'id': self.sceneids})
        gdf = GeoDataFrame(records,
                           columns=['satelliteid', 'sensorid', 'filename', 'sceneid', 'acquisitiondate', 'geom'])
        all_satIDs = gdf.satelliteid.unique().tolist()
        all_senIDs = gdf.sensorid.unique().tolist()
        satName_res = get_info_from_postgreSQLdb(self.conn, 'satellites', ['name'], {'id': all_satIDs})
        senName_res = get_info_from_postgreSQLdb(self.conn, 'sensors', ['name'], {'id': all_senIDs})
        all_satNames = [i[0] for i in satName_res]
        all_senNames = [i[0] for i in senName_res]
967
968
        id_satName_dict = dict(zip(all_satIDs, all_satNames))
        id_senName_dict = dict(zip(all_senIDs, all_senNames))
969
        gdf.insert(0, 'satellite', list(gdf.satelliteid.map(lambda satID: id_satName_dict[satID])))
970
        gdf.insert(1, 'sensor', list(gdf.sensorid.map(lambda senID: id_senName_dict[senID])))
971
972
        gdf['polygons'] = list(gdf.geom.map(lambda wkb_hex: shapely.wkb.loads(wkb_hex, hex=True)))

973
        self.dataframe = gdf[['satellite', 'sensor', 'filename', 'sceneid', 'acquisitiondate', 'geom', 'polygons']]
974
975
976

        return self

977
978
979
    def reset_job_progress(self):
        """Resets everthing in the database entry that has been written during the last run of the job..
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
980

981
982
983
        self.finishtime = None
        self.failed_sceneids = []
        self.progress = None
984
        self.status = 'pending'
985
986
987
988
        self.statistics = [len(self.sceneids)] + [0] * 8

        self.update_db_entry()

989
990
991
    def _get_dataframe(self, datadict):  # FIXME deprecated
        gdf = GeoDataFrame(datadict, columns=['satellite', 'sensor', 'filenames'])
        gdf.columns = ['satellite', 'sensor', 'filename']
992

993
994
995
996
        satID_res = get_info_from_postgreSQLdb(self.conn, 'satellites', ['id'], {'name': datadict['satellite']})
        senID_res = get_info_from_postgreSQLdb(self.conn, 'sensors', ['id'], {'name': datadict['sensor']})
        assert len(satID_res), "No satellite named '%s' found in database." % datadict['satellite']
        assert len(senID_res), "No sensor named '%s' found in database." % datadict['sensor']
997
998

        # append sceneid and wkb_hex bounds
999
        records = get_info_from_postgreSQLdb(self.conn, 'scenes', ['filename', 'id', 'acquisitiondate', 'bounds'],
1000
1001
1002
                                             {'filename': datadict['filenames'],
                                              'satelliteid': satID_res[0][0], 'sensorid': senID_res[0][0]})
        records = GeoDataFrame(records, columns=['filename', 'sceneid', 'acquisitiondate', 'geom'])
1003
        gdf = gdf.merge(records, on='filename', how="outer")
1004
1005

        # separate records with valid matches in database from invalid matches (filename not found in database)