database_tools.py 80.1 KB
Newer Older
1
import collections
2
3
import csv
import glob
4
import itertools
5
6
import os
import re
7
import shutil
8
9
import sqlite3
import sys
10
import traceback
11
12
import warnings
from datetime import datetime
Daniel Scheffler's avatar
Daniel Scheffler committed
13
from typing import Union  # noqa F401  # flake8 issue
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
14

Daniel Scheffler's avatar
Daniel Scheffler committed
15
from six import PY3
16
17
import numpy as np
import pandas as pd
Daniel Scheffler's avatar
Daniel Scheffler committed
18
from pandas.io.sql import pandasSQL_builder, SQLTable
19
import psycopg2
Daniel Scheffler's avatar
Daniel Scheffler committed
20
from shapely.wkb import loads as wkb_loads
21
from geoalchemy2.types import Geometry as GEOMETRY
22
23
24
25
from geopandas import GeoDataFrame, GeoSeries
from shapely.geometry import Polygon, box, MultiPolygon
from sqlalchemy import create_engine
from sqlalchemy.types import to_instance, TypeEngine
26

27
28
from ..config import GMS_config as CFG
from . import path_generator as PG
29
from .definition_dicts import proc_chain
30

31
32
33
# + misc.helper_functions.cornerLonLat_to_postgreSQL_poly: (left out here in order to avoid circular dependencies)
# + misc.path_generator.get_path_metaCSV: (left out here in order to avoid circular dependencies)

34
35
__author__ = 'Daniel Scheffler'

36

37
def execute_pgSQL_query(cursor, query_command):
38
39
    """Executes a postgreSQL query catches the full error message if there is one.
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
40

Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
41
42
43
44
45
46
    try:
        cursor.execute(query_command)
    except psycopg2.ProgrammingError as e:
        raise psycopg2.ProgrammingError(e.pgerror + 'Query failed. Command was:\n%s' % query_command)


47
def get_scene_and_dataset_infos_from_postgreSQLdb(sceneid):
48
49
    # type: (int) -> collections.OrderedDict
    """Creates an OrderedDict containing further information about a given scene ID by querying the pgSQL database.
50
51
52

    :param sceneid:   <int> the GMS scene ID to get information for
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
53

54
55
    def query(tablename, vals2return, cond_dict, records2fetch=0):
        return get_info_from_postgreSQLdb(CFG.job.conn_database, tablename, vals2return, cond_dict, records2fetch)
56
    resultset = query('scenes', ['datasetid', 'satelliteid', 'sensorid', 'subsystemid', 'acquisitiondate', 'entityid',
57
58
59
                                 'filename'], {'id': sceneid})
    if len(resultset) == 0:
        sys.stderr.write("Scene with id %s not found. Skipping.." % sceneid)
60
61
62

    scenedata = resultset[0]
    ds = collections.OrderedDict()
63
64
65
66
67
68
69
70
71
72
73
    proc_level_tmp = query('scenes_proc', 'proc_level', {'sceneid': sceneid})
    ds.update({'proc_level': 'L0A' if proc_level_tmp == [] else proc_level_tmp[0][0]})
    ds.update({'scene_ID': sceneid})
    ds.update({'datasetid': scenedata[0]})
    ds.update({'image_type': query('datasets', 'image_type', {'id': scenedata[0]})[0][0]})
    ds.update({'satellite': query('satellites', 'name', {'id': scenedata[1]})[0][0]})
    ds.update({'sensor': query('sensors', 'name', {'id': scenedata[2]})[0][0]})
    ds.update({'subsystem': query('subsystems', 'name', {'id': scenedata[3]})[0][0] if scenedata[3] else None})
    ds.update({'acq_datetime': scenedata[4]})
    ds.update({'entity_ID': scenedata[5]})
    ds.update({'filename': scenedata[6]})
74
75
76
    return ds


77
def get_info_from_SQLdb(path_db, tablename, vals2return, cond_dict, records2fetch=0):
Daniel Scheffler's avatar
Daniel Scheffler committed
78
    # type: (str,str,list,dict,int) -> Union[list, str]
79
    """Queries an SQL database for the given parameters.
80

81
82
83
84
    :param path_db:         <str> the physical path of the SQL database on disk
    :param tablename:       <str> name of the table within the database to be queried
    :param vals2return:     <list or str> a list of strings containing the column titles of the values to be returned
    :param cond_dict:       <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
85
86
    :param records2fetch:   <int> number of records to be fetched (default=0: fetch unlimited records)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
87

88
89
90
91
92
93
    if not isinstance(vals2return, list):
        vals2return = [vals2return]
    assert isinstance(records2fetch, int), \
        "get_info_from_SQLdb: Expected an integer for the argument 'records2return'. Got %s" % type(records2fetch)
    if not os.path.isfile(path_db):
        return 'database connection fault'
94
    connection = sqlite3.connect(path_db)
95
96
97
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join(["%s=?" % (list(cond_dict.keys())[i]) for i in range(len(cond_dict))])
    cursor.execute("SELECT " + ','.join(vals2return) + " FROM " + tablename + " " + condition, list(cond_dict.values()))
98
    records2return = cursor.fetchall() if records2fetch == 0 else [cursor.fetchone()] if records2fetch == 1 else \
99
        cursor.fetchmany(size=records2fetch)  # e.g. [('LE71950282003121EDC00',), ('LE71950282003105ASN00',)]
100
101
102
103
    cursor.close()
    connection.close()
    return records2return

104

105
def get_postgreSQL_value(value):
106
    # type: (any) -> str
107
108
109
    """Converts Python variable to a postgreSQL value respecting postgreSQL type casts.
    The resulting value can be directly inserted into a postgreSQL query."""

110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
    assert type(value) in [int, float, bool, str, Polygon, datetime, list, tuple] or value is None, \
        "Unsupported value type within postgreSQL matching expression. Got %s." % type(value)
    if isinstance(value, int):
        pgV = value
    elif isinstance(value, float):
        pgV = value
    elif isinstance(value, bool):
        pgV = value
    elif value is None:
        pgV = 'NULL'
    elif isinstance(value, str):
        pgV = "'%s'" % value.replace("'", "")
    elif isinstance(value, Polygon):
        pgV = "'%s'" % value.wkb_hex
    elif isinstance(value, datetime):
        pgV = "TIMESTAMP '%s'" % str(value)
    else:  # list or tuple in value
        if not value:  # empty list/tuple
128
129
130
            pgV = 'NULL'
        else:
            dTypes_in_value = list(set([type(i) for i in value]))
131
132
133
134
135
            assert len(dTypes_in_value) == 1, \
                'Mixed data types in postgreSQL matching expressions are not supported. Got %s.' % dTypes_in_value
            assert dTypes_in_value[0] in [int, str, float, np.int64]
            pgList = ",".join(["'%s'" % i if isinstance(value[0], str) else "%s" % i for i in value])
            pgV = "'{%s}'" % pgList
136
137
138
    return pgV


139
def get_postgreSQL_matchingExp(key, value):
140
    # type: (str,any) -> str
141
142
143
144
    """Converts a key/value pair to a postgreSQL matching expression in the form "column=value" respecting postgreSQL
    type casts. The resulting string can be directly inserted into a postgreSQL query.
    """
    pgVal = get_postgreSQL_value(value)
145
146
147
148
    if isinstance(pgVal, str) and pgVal.startswith("'{") and pgVal.endswith("}'"):
        return '%s in %s' % (key, pgVal.replace("'{", '(').replace("}'", ')'))  # '{1,2,3}' => (1,2,3)
    elif pgVal == 'NULL':
        return '%s is NULL' % key
149
    else:
150
        return '%s=%s' % (key, pgVal)
151

152

153
def get_info_from_postgreSQLdb(conn_params, tablename, vals2return, cond_dict=None, records2fetch=0, timeout=15000):
Daniel Scheffler's avatar
Daniel Scheffler committed
154
    # type: (str, str, Union[list, str], dict, int, int) -> Union[list, str]
155
    """Queries a postgreSQL database for the given parameters.
156

157
    :param conn_params:     <str> connection parameters as provided by CFG.job.conn_params
158
159
160
    :param tablename:       <str> name of the table within the database to be queried
    :param vals2return:     <list or str> a list of strings containing the column titles of the values to be returned
    :param cond_dict:       <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
161
162
                            HINT: <value> can also be a list or a tuple of elements to match, BUT note that the order
                                  of the list items is NOT respected!
163
    :param records2fetch:   <int> number of records to be fetched (default=0: fetch unlimited records)
164
165
    :param timeout:         <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
166

167
168
    if not isinstance(vals2return, list):
        vals2return = [vals2return]
169
170
    assert isinstance(records2fetch, int), "get_info_from_postgreSQLdb: Expected an integer for the argument " \
                                           "'records2return'. Got %s" % type(records2fetch)
171
    cond_dict = cond_dict if cond_dict else {}
172
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
173
    connection = psycopg2.connect(conn_params)
174
175
176
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
177
178
179
180
181
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
    cmd = "SELECT " + ','.join(vals2return) + " FROM " + tablename + " " + condition
    execute_pgSQL_query(cursor, cmd)
182

183
    records2return = cursor.fetchall() if records2fetch == 0 else [cursor.fetchone()] if records2fetch == 1 else \
184
        cursor.fetchmany(size=records2fetch)  # e.g. [('LE71950282003121EDC00',), ('LE71950282003105ASN00',)]
185
186
187
188
    cursor.close()
    connection.close()
    return records2return

189

190
def update_records_in_postgreSQLdb(conn_params, tablename, vals2update_dict, cond_dict=None, timeout=15000):
191
    # type: (str, str, dict, dict, int) -> Union[None, str]
192
    """Queries a postgreSQL database for the given parameters and updates the given columns of the query result.
193

194
    :param conn_params:       <str> connection parameters as provided by CFG.job.conn_params
195
196
197
198
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2update_dict:  <dict> a dictionary containing keys and values to be set in the form {'col_name':<value>}
    :param cond_dict:         <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                              HINT: <value> can also be a list or a tuple of elements to match
199
200
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
201

202
    cond_dict = cond_dict if cond_dict else {}
203
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
204
    connection = psycopg2.connect(conn_params)
205
206
207
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
208
209
210
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
211
212
    update_cond = "SET " + ', '.join(['%s=%s' % (k, get_postgreSQL_value(vals2update_dict[k]))
                                      for k in vals2update_dict.keys()])
213
    execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition)
214
    if cursor.fetchone()[0] == 0:
215
        warnings.warn("No record found fulfilling this condition: \n'%s'." % condition)
216
    else:
217
        execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + update_cond + " " + condition)
218

219
220
221
    if 'connection' in locals():
        connection.commit()
        connection.close()
222
223


224
def append_item_to_arrayCol_in_postgreSQLdb(conn_params, tablename, vals2append_dict, cond_dict=None, timeout=15000):
225
    # type: (str, str, dict, dict, int) -> Union[None, str]
226
227
228
    """Queries a postgreSQL database for the given parameters
    and appends the given value to the specified column of the query result.

229
    :param conn_params:       <str> connection parameters as provided by CFG.job.conn_params
230
231
232
233
234
235
236
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2append_dict:  <dict> a dictionary containing keys and value(s) to be set in the form
                              {'col_name':[<value>,<value>]}
    :param cond_dict:         <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                              HINT: <value> can also be a list or a tuple of elements to match
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
237

238
239
    assert len(vals2append_dict) == 1, 'Values can be appended to only one column at once.'
    if type(list(vals2append_dict.values())[0]) in [list, tuple]:
240
        raise NotImplementedError('Appending multiple values to one column at once is not yet supported.')
241
    cond_dict = cond_dict if cond_dict else {}
242
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
243
    connection = psycopg2.connect(conn_params)
244
245
246
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
247
248
249
250
251
252
253
254
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
    col2update = list(vals2append_dict.keys())[0]
    pgSQL_val = get_postgreSQL_value(vals2append_dict[col2update])
    pgSQL_val = pgSQL_val if type(vals2append_dict[col2update]) in [list, tuple] else '{%s}' % pgSQL_val
    append_cond = "SET %s = array_cat(%s, '%s')" % (col2update, col2update, pgSQL_val)
    execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition)
255
    if cursor.fetchone()[0] == 0:
256
        warnings.warn("No record found fulfilling this condition: \n'%s'." % condition)
257
    else:
258
259
260
261
        execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + append_cond + " " + condition + ';')
    if 'connection' in locals():
        connection.commit()
        connection.close()
262
263
264


def remove_item_from_arrayCol_in_postgreSQLdb(conn_params, tablename, vals2remove_dict, cond_dict=None, timeout=15000):
265
    # type: (str, str, dict, dict, int) -> Union[None, str]
266
267
268
    """Queries a postgreSQL database for the given parameters
    and removes the given value from the specified column of the query result.

269
    :param conn_params:       <str> connection parameters as provided by CFG.job.conn_params
270
271
272
273
274
275
276
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2remove_dict:  <dict> a dictionary containing keys and value(s) to be set in the form
                              {'col_name':[<value>,<value>]}
    :param cond_dict:         <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                              HINT: <value> can also be a list or a tuple of elements to match
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
277

278
279
    assert len(vals2remove_dict) == 1, 'Values can be removed from only one column at once.'
    if type(list(vals2remove_dict.values())[0]) in [list, tuple]:
280
        raise NotImplementedError('Removing multiple values from one column at once is not yet supported.')
281
    cond_dict = cond_dict if cond_dict else {}
282
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
283
    connection = psycopg2.connect(conn_params)
284
285
286
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
287
288
289
290
291
292
293
294
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
    col2update = list(vals2remove_dict.keys())[0]
    pgSQL_val = get_postgreSQL_value(vals2remove_dict[col2update])
    pgSQL_val = pgSQL_val if type(vals2remove_dict[col2update]) in [list, tuple] else '{%s}' % pgSQL_val
    remove_cond = "SET %s = array_remove(%s, '%s')" % (col2update, col2update, pgSQL_val)
    execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition)
295
    if cursor.fetchone()[0] == 0:
296
        warnings.warn("No record found fulfilling this condition: \n'%s'." % condition)
297
    else:
298
299
300
301
        execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + remove_cond + " " + condition + ';')
    if 'connection' in locals():
        connection.commit()
        connection.close()
302
303


304
305
def increment_decrement_arrayCol_in_postgreSQLdb(conn_params, tablename, col2update, idx_val2decrement=None,
                                                 idx_val2increment=None, cond_dict=None, timeout=15000):
306
    # type: (str, str, str, int, int, dict, int) -> Union[None, str]
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
    """Updates an array column of a specific postgreSQL table in the form that it increments or decrements the elements
    at a given position. HINT: The column must have values like that: [0,1,0,3,1,0]

    :param conn_params:         <str> connection parameters as provided by CFG.job.conn_params
    :param tablename:           <str> name of the table within the database to be update
    :param col2update:          <str> column name of the column to be updated
    :param idx_val2decrement:   <int> the index of the array element to be decremented (starts with 1)
    :param idx_val2increment:   <int> the index of the array element to be incremented (starts with 1)
    :param cond_dict:           <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                                HINT: <value> can also be a list or a tuple of elements to match
    :param timeout:             <int> allows to set a custom statement timeout (milliseconds)
    :return:
    """

    cond_dict = cond_dict if cond_dict else {}
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
    connection = psycopg2.connect(conn_params)
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""

    dec_str = '' if idx_val2decrement is None else \
332
        "%s[%s] = %s[%s]-1" % (col2update, idx_val2decrement, col2update, idx_val2decrement)
333
    inc_str = '' if idx_val2increment is None else \
334
        "%s[%s] = %s[%s]+1" % (col2update, idx_val2increment, col2update, idx_val2increment)
335
336
337

    if dec_str or inc_str:
        dec_inc_str = ','.join([dec_str, inc_str])
338
        execute_pgSQL_query(cursor, "UPDATE %s SET %s %s" % (tablename, dec_inc_str, condition))
339

340
341
342
    if 'connection' in locals():
        connection.commit()
        connection.close()
343
344


345
def create_record_in_postgreSQLdb(conn_params, tablename, vals2write_dict, timeout=15000):
346
    # type: (str, str, dict, dict, int) -> Union[int, str]
347
348
    """Creates a single new record in a postgreSQL database and pupulates its columns with the given values.

349
    :param conn_params:       <str> connection parameters as provided by CFG.job.conn_params
350
351
352
353
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2write_dict:   <dict> a dictionary containing keys and values to be set in the form {'col_name':<value>}
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
354

355
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
356
    connection = psycopg2.connect(conn_params)
357
358
359
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
360
    cursor = connection.cursor()
361
362
363
364

    keys, vals = zip(*[(k, str(get_postgreSQL_value(v))) for k, v in vals2write_dict.items()])

    execute_pgSQL_query(cursor, "INSERT INTO %s (%s) VALUES (%s);" % (tablename, ','.join(keys), ','.join(vals)))
365
    execute_pgSQL_query(cursor, "SELECT id FROM %s ORDER BY id DESC LIMIT 1" % tablename)
366
367
    newID = cursor.fetchone()[0]

368
369
370
    if 'connection' in locals():
        connection.commit()
        connection.close()
371
372
373
374

    return newID


375
def get_pgSQL_geospatial_query_cond(conn_params, table2query, geomCol2use='bounds', tgt_corners_lonlat=None,
376
                                    scene_ID=None, queryfunc='ST_Intersects', crossing_dateline_check=True):
377
    assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
378

379
    if tgt_corners_lonlat:
380
381
382
383
384
385
386
        # handle coordinates crossing the 180 degress meridian (dateline)
        # FIXME in that case the polygone has to be split at the dateline. otherwise pgSQL may yield wrong results
        if crossing_dateline_check:
            xvals = [x for x, y in tgt_corners_lonlat]
            if max(xvals) - min(xvals) > 180:
                tgt_corners_lonlat = [(x, y) if x > 0 else (x + 360, y) for x, y in tgt_corners_lonlat]

387
        from .helper_functions import cornerLonLat_to_postgreSQL_poly
388
        pGSQL_poly = cornerLonLat_to_postgreSQL_poly(tgt_corners_lonlat)
389
390
391
392
393
        src_geom = "'SRID=4326;%s'::geometry" % pGSQL_poly  # source geometry is given
        # FIXME scenes tabelle hat "geography" geoinfos -> eigener Index wird bei "geometry" nicht genutzt:
        tgt_geom = "%s.%s::geometry" % (table2query, geomCol2use)
        geocond = "%s(%s, %s)" % (queryfunc, src_geom, tgt_geom)
    else:  # scene_ID is not None:
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
394
        connection = psycopg2.connect(conn_params)
395
396
        if connection is None:
            return 'database connection fault'
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
397
        cursor = connection.cursor()
398
399
400
        cmd = "SELECT ST_AsText(bounds) FROM scenes WHERE scenes.id = %s" % scene_ID
        execute_pgSQL_query(cursor, cmd)
        res = cursor.fetchone()
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
401
402
403
        cursor.close()
        connection.close()
        if len(res):
404
            src_geom = "'SRID=4326;%s'::geometry" % res
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
405
406
407
        else:
            print('The scene with the ID %s does not exist in the scenes table.')
            return []
408
        geocond = "%s(%s, %s.%s::geometry)" % (queryfunc, src_geom, table2query, geomCol2use)
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
409
410
411
    return geocond


412
def get_overlapping_scenes_from_postgreSQLdb(conn_params, table='scenes_proc', scene_ID=None,
413
                                             tgt_corners_lonlat=None, conditions=None, add_cmds='', timeout=15000):
414
    # type: (str, str, int, list, Union[list, str], str, int) -> Union[list, str]
Daniel Scheffler's avatar
Daniel Scheffler committed
415

416
417
    """Queries the postgreSQL database in order to find those scenes of a specified reference satellite (Landsat-8 or
    Sentinel-2) that have an overlap to the given corner coordinates AND that fulfill the given conditions.
418

419
    :param conn_params:             <str> connection parameters as provided by CFG.job.conn_params
420
421
    :param table:                   <str> name of the table within the database to be updated
    :param scene_ID:                <int> a sceneID to get the target geographical extent from
422
423
424
                                        (needed if tgt_corners_lonlat is not provided)
    :param tgt_corners_lonlat:      <list> a list of coordinates defining the target geographical extent
                                           (needed if scene_ID is not provided)
425
426
427
428
    :param conditions:              <list> a list of additional query conditions
    :param add_cmds:                <str> additional pgSQL commands to be added to the pgSQL query
    :param timeout:                 <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
429

430
    conditions = [] if conditions is None else conditions if isinstance(conditions, list) else [conditions]
431
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
432
    connection = psycopg2.connect(conn_params)
433
434
    if connection is None:
        return 'database connection fault'
435
    datasetids = [int(d.split('=')[1].strip()) for d in conditions if d.startswith('datasetid')]
436
437
438
439
440
441
442
    datasetid = datasetids[0] if datasetids else 104  # Landsat-8
    # FIXME: use Landsat-8 instead of Sentinel-2 as long as S2 L1A_P is not working:
    datasetid = 104 if datasetid == 249 else datasetid

    if table != 'scenes_proc':
        assert datasetid, "filtdsId is needed if table is not 'scenes_proc'"
    if scene_ID is None:
443
        assert tgt_corners_lonlat, "Provide either scene_ID or tgt_corners_lonlat!"
444
    if tgt_corners_lonlat is None:
445
        assert scene_ID, "Provide either scene_ID or tgt_corners_lonlat!"
446
447
448
449
450
451
452
453
454
455

    val2get = "scenes.id" if table == 'scenes' else "%s.sceneid" % table
    # refcond  = ['scenes_proc.georef = True'] if not datasetids else ['scenes.datasetid = %s' %datasetid]
    refcond = ['scenes.datasetid = %s' % datasetid]

    geocond = [get_pgSQL_geospatial_query_cond(conn_params, table, tgt_corners_lonlat=tgt_corners_lonlat,
                                               scene_ID=scene_ID, queryfunc='ST_Intersects',
                                               crossing_dateline_check=True)]

    join = "INNER JOIN scenes ON (%s.sceneid = scenes.id) " % table if table != 'scenes' and datasetids else ''
456
    conditions = [c for c in conditions if not c.startswith('datasetid')]
457
458
459
460
    where = "WHERE %s" % " AND ".join(geocond + refcond + conditions)
    usedtbls = "scenes" if table == 'scenes' else "%s, scenes" % table if 'scenes.' in where and join == '' else table
    query = "SELECT %s FROM %s %s%s %s" % (val2get, usedtbls, join, where, add_cmds)
    cursor = connection.cursor()
461
    execute_pgSQL_query(cursor, query)
462
463
464
465
466
    records2return = cursor.fetchall()
    cursor.close()
    connection.close()
    return records2return

467

468
def get_overlapping_MGRS_tiles(conn_params, scene_ID=None, tgt_corners_lonlat=None, timeout=15000):
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
469
470
    """In contrast to pgSQL 'Overlapping' here means that both geometries share some spatial area.
    So it combines ST_Overlaps and ST_Contains."""
471
    assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
472
473
474

    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
    connection = psycopg2.connect(conn_params)
475
476
    if connection is None:
        return 'database connection fault'
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
477
478

    vals2get = ['grid100k', 'grid1mil', 'geom']
479
    # FIXME this is covered by ST_Intersects:
Daniel Scheffler's avatar
Daniel Scheffler committed
480
    # geocond1 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Overlaps',
481
    #                                            tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
Daniel Scheffler's avatar
Daniel Scheffler committed
482
483
484
485
486
    # geocond2 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Contains',
    #                                            tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
    # geocond3 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Within',
    #                                            tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
    geocond = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Intersects',
487
488
                                              tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID,
                                              crossing_dateline_check=True)
489
490
    # query = "SELECT %s FROM %s WHERE %s OR %s OR %s"
    #     % (', '.join(vals2get), 'mgrs_tiles', geocond1, geocond2, geocond3)
Daniel Scheffler's avatar
Daniel Scheffler committed
491
    query = "SELECT %s FROM %s WHERE %s" % (', '.join(vals2get), 'mgrs_tiles', geocond)
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
492
493
494
495
496
497
498
499
    cursor = connection.cursor()
    execute_pgSQL_query(cursor, query)
    records = cursor.fetchall()
    cursor.close()
    connection.close()

    GDF = GeoDataFrame(records, columns=['grid100k', 'grid1mil', 'wkb_hex'])

Daniel Scheffler's avatar
Daniel Scheffler committed
500
    GDF['shapelyPoly_LonLat'] = list(GDF['wkb_hex'].map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
501
502
    GDF['granuleid'] = GDF['grid1mil'].str.cat(GDF['grid100k'])
    return GDF[['granuleid', 'shapelyPoly_LonLat']]
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
503
504


505
506
def get_overlapping_MGRS_tiles2(conn_params, scene_ID=None, tgt_corners_lonlat=None, timeout=15000):
    assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
507
508
509

    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
    connection = psycopg2.connect(conn_params)
510
511
    if connection is None:
        return 'database connection fault'
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
512
513

    vals2get = ['granuleid', 'footprint_wgs84']
514
515
    geocond = get_pgSQL_geospatial_query_cond(conn_params, 'footprints_sentinel2_granules',
                                              geomCol2use='footprint_wgs84',
516
                                              tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
517
518
519
520
521
522
523
524
525
526
    query = "SELECT %s FROM %s WHERE %s" % (', '.join(vals2get), 'footprints_sentinel2_granules', geocond)

    cursor = connection.cursor()
    execute_pgSQL_query(cursor, query)
    records = cursor.fetchall()
    cursor.close()
    connection.close()

    GDF = GeoDataFrame(records, columns=['granuleid', 'wkb_hex'])

Daniel Scheffler's avatar
Daniel Scheffler committed
527
    GDF['shapelyPoly_LonLat'] = list(GDF['wkb_hex'].map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
528
529
530
531

    return GDF[['granuleid', 'shapelyPoly_LonLat']]


532
533
534
def get_dict_satellite_name_id(conn_params):
    # type: (str) -> dict
    """Returns a dictionary with satellite names as keys and satellite IDs as values as read from pgSQL database.
535
536
537

    :param conn_params:     <str> pgSQL database connection parameters
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
538

539
540
    res = get_info_from_postgreSQLdb(conn_params, 'satellites', ['name', 'id'])
    assert len(res) > 0, 'Error getting satellite names from postgreSQL database.'
541
    arr = np.array(res)
542
    return dict(zip(list(arr[:, 0]), list(arr[:, 1])))
543
544
545
546
547
548


def get_dict_sensor_name_id(conn_params):
    # type: (str) -> dict
    """Returns a dictionary with sensor names as keys and sensor IDs as values as read from pgSQL database.
    :param conn_params:     <str> pgSQL database connection parameters """
Daniel Scheffler's avatar
Daniel Scheffler committed
549

550
551
    res = get_info_from_postgreSQLdb(conn_params, 'sensors', ['name', 'id'])
    assert len(res) > 0, 'Error getting sensor names from postgreSQL database.'
552
    arr = np.array(res)
553
    return dict(zip(list(arr[:, 0]), list(arr[:, 1])))
554
555


556
def get_entityIDs_from_filename(conn_DB, filename):
557
558
559
    # type: (str, str) -> list
    """Returns entityID(s) for the given filename. In case of Sentinel-2 there can be more multiple entity IDs if
    multiple granules are saved in one .zip file.
560

561
562
563
    :param conn_DB:     <str> pgSQL database connection parameters
    :param filename:    <str> the filename to get the corresponding entity ID(s) for
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
564

565
    if filename[:2] in ['LE', 'LC', 'LO'] and filename.endswith('.tar.gz'):  # Landsat
566
567
        entityIDs = [filename.split('.tar.gz')[0]]
    else:
568
569
570
        print('Querying database in order to get entityIDs for %s...' % filename)
        res = get_info_from_postgreSQLdb(conn_DB, 'scenes', ['entityid'], {'filename': filename}, timeout=40000)
        entityIDs = [subres[0] for subres in res] if len(res) > 0 else []
571
572
573
574
575
576
    return entityIDs


def get_filename_by_entityID(conn_DB, entityid, satellite):
    # type: (str,str,str) -> str
    """Returns the filename for the given entity ID.
577

578
579
580
581
    :param conn_DB:     <str> pgSQL database connection parameters
    :param entityid:    <str> entity ID
    :param satellite:   <str> satellite name to which the entity ID is belonging
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
582

583
584
585
586
587
588
589
590
591
592
    if re.search('Landsat', satellite, re.I):
        filename = '%s.tar.gz' % entityid
    elif re.search('Sentinel-2', satellite, re.I):
        filename = get_info_from_postgreSQLdb(conn_DB, 'scenes', ['filename'],
                                              {'entityid': entityid}, records2fetch=1)[0][0]
    else:
        raise NotImplementedError
    return filename


593
def get_notDownloadedsceneIDs(conn_DB, entityIDs, satellite, sensor, src_folder):
594
595
596
597
    # type: (str,list,str,str,str) -> np.ndarray
    """Takes a list of entity IDs and extracts those records that have the corresponding archive file in the given
    source folder and that have the processing level 'METADATA' in the pgSQL database. Based on this subset a numpy
    array containing the corresponding scene IDs and the target filenames for the fileserver is returned.
598

599
600
601
602
603
604
    :param conn_DB:     <str> pgSQL database connection parameters
    :param entityIDs:   <list> a list of entity IDs
    :param satellite:   <str> the name of the satellite to restrict the query on
    :param sensor:      <str> the name of the sensor to restrict the query on
    :param src_folder:  <str> the source directory where archive files are saved
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
605

606
607
608
    columns = ['id', 'entityid', 'satelliteid', 'sensorid', 'filename', 'proc_level']
    result = get_info_from_postgreSQLdb(conn_DB, 'scenes', columns, {'entityid': entityIDs})
    df = pd.DataFrame(result, columns=columns)
609
610

    satNameID_dic = get_dict_satellite_name_id(conn_DB)
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
    satID = satNameID_dic[satellite]
    target_folder = os.path.join(CFG.job.path_archive, satellite, sensor)

    def get_fName(entityid): return get_filename_by_entityID(conn_DB, entityid, satellite)

    def tgt_exists(fileName): return os.path.exists(os.path.join(target_folder, fileName))

    def src_exists(entityid):
        return os.path.exists(os.path.join(src_folder, get_filename_by_entityID(conn_DB, entityid, satellite)))
    df['tgt_fileName'] = list(df['entityid'].map(get_fName))
    df['tgtFile_exists'] = list(df['tgt_fileName'].map(tgt_exists))
    df['srcFile_exists'] = list(df['entityid'].map(src_exists))
    tgt_satID = (df.satelliteid == float(satID))
    # isDL = (df.proc_level == 'DOWNLOADED')
    isMET = (df.proc_level == 'METADATA')
    # tgtE = (df.tgtFile_exists == True)
    srcE = df.srcFile_exists  # (df.srcFile_exists == True)
    # sceneIDs_notDL_tgtE = df[tgt_satID & (isDL == 0) & tgtE]  # maybe needed later
    # sceneIDs_DL_tgtNE = df[tgt_satID & isDL & (tgtE == 0)]  # maybe needed later
    # sceneIDs_DL_tgtE = df[tgt_satID & isDL & tgtE]  # maybe needed later
    sceneIDs_isMET_srcE = df[tgt_satID & isMET & srcE]
    return sceneIDs_isMET_srcE[['id', 'tgt_fileName']].values
633
634


635
class GMS_JOB(object):
636
    """gms_preprocessing job manager"""
637

638
    def __init__(self, conn_db):
639
        # type: (str) -> None
640
        """
641
        :param conn_db: <str> the database connection parameters as given by CFG.job.conn_params
642
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
643

644
645
646
        self.conn = conn_db
        self.dataframe = GeoDataFrame()
        self.scene_counts = {}  # set by self.create()
647

648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
        self.exists_in_db = False
        self.id = None  #: int
        self.creationtime = datetime.now()
        self.finishtime = None
        self.sceneids = []
        self.timerange_start = datetime.min
        self.timerange_end = datetime.max
        self.bounds = box(-180, -90, 180, 90)
        self.distribution_index = None
        self.progress = None
        self.feedback = None
        self.failed_sceneids = []
        self.ref_job_id = None
        self.datacube_mgrs_tiles_proc = []
        self.jobs_table_columns = ['id', 'creationtime', 'finishtime', 'sceneids', 'timerange_start',
                                   'timerange_end', 'bounds', 'distribution_index', 'progress', 'feedback',
                                   'failed_sceneids', 'datasetid_spatial_ref',
                                   'virtualsensorid', 'ref_job_id', 'datacube_mgrs_tiles_proc', 'comment',
                                   'status', 'statistics']
        self.virtualsensorid = None  # set by self._set_target_sensor_specs()
        # FIXME notnull but not getable via virtual sensor id and not needed anymore in DB (gsd is given):
        self.datasetid_spatial_ref = 249
        self.datasetname_spatial_ref = 'SENTINEL-2A'
        self.status = None
        self.statistics = []
        self.comment = None
        self.epsg = None  # set by self._set_target_sensor_specs()
        self.ground_spatial_sampling = None  # set by self._set_target_sensor_specs()
676

677
678
679
    def __repr__(self):
        return 'GMS job:\n\n' + GeoSeries(self.db_entry).to_string()

680
    def _set_target_sensor_specs(self, virtual_sensor_id, datasetid_spatial_ref):
681
        self.virtualsensorid = virtual_sensor_id
682
683
684
685
686
687
688
689
690
691
692
693
694
        res = get_info_from_postgreSQLdb(self.conn, 'virtual_sensors', ['spatial_resolution',
                                                                        "projection_epsg"], {'id': virtual_sensor_id})
        assert res, \
            "'virtual_sensor_id'=%s does not exist in the table 'virtual_sensors' of the database." % virtual_sensor_id
        target_gsd = res[0][0]
        self.ground_spatial_sampling = [target_gsd, target_gsd] if type(target_gsd) in [int, float] else target_gsd
        self.epsg = int(res[0][1])

        assert isinstance(datasetid_spatial_ref, int)
        self.datasetid_spatial_ref = datasetid_spatial_ref
        res = get_info_from_postgreSQLdb(self.conn, 'datasets', ['name'], {'id': datasetid_spatial_ref})
        assert res, \
            "'datasetid_spatial_ref'=%s does not exist in the table 'datasets' of the database." % datasetid_spatial_ref
695
696
697
698
        self.datasetname_spatial_ref = res

    @property
    def db_entry(self):
699
700
        """Returns an OrderedDict containing keys and values of the database entry.
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
701

702
703
        db_entry = collections.OrderedDict()
        for i in self.jobs_table_columns:
704
            db_entry[i] = getattr(self, i)
705
706
        return db_entry

707
    def from_dictlist(self, dictlist_data2process, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
708
        # type: (list, int, int, str) -> object
709
        """
710
        :param dictlist_data2process:  <list> a list of dictionaries containing the keys "satellite", "sensor" and
711
712
713
714
715
716
                                        "filenames",
                                        e.g. [{'satellite:'Landsat-8,'sensor':'OLI_TIRS','filenames':file.tar.gz},{...}]
        :param virtual_sensor_id :     <int> a valid ID from the 'virtual_sensors' table of the postgreSQL database
        :param datasetid_spatial_ref:  <int> a valid dataset ID of the dataset to be chosen as spatial reference
                                        (from the 'datasets' table of the postgreSQL database)
                                        (default:249 - Sentinel-2A), 104=Landsat-8
717
        :param comment:                <str> a comment describing the job (e.g. 'Beta job')
718
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
719

720
        self._set_target_sensor_specs(virtual_sensor_id, datasetid_spatial_ref)
721
        self.comment = comment
722
723
724

        dictlist_data2process = dictlist_data2process if dictlist_data2process else []

725
        for idx, datadict in enumerate(dictlist_data2process):
726
            assert isinstance(datadict, dict), "Expected only dictionaries within 'dictlist_data2process'. " \
727
                                               "Got %s in there." % type(datadict)
728
            assert False not in [i in datadict for i in ['satellite', 'sensor', 'filenames']]
729
            assert type(datadict['filenames']) in [list, str]
730

731
            if isinstance(datadict['filenames'], str):
732
733
734
735
736
737
                if datadict['filenames'].endswith('.csv'):
                    assert os.path.exists(datadict['filenames'])
                else:
                    datadict['filenames'] = [datadict['filenames']]

        # find all duplicates in input datadicts and build common geodataframe
738
        all_gdfs = []
739
        for datadict in dictlist_data2process:
740
            assert isinstance(datadict, dict)
741

742
743
            if isinstance(datadict['filenames'], str) and datadict['filenames'].endswith('.csv'):
                datadict['filenames'] = None  # TODO implement csv reader here
744
745
746
747
                raise NotImplementedError
            else:
                temp_gdf = GeoDataFrame(datadict, columns=['satellite', 'sensor', 'filenames'])
                if re.search('Landsat-7', datadict['satellite'], re.I) and re.search('ETM+', datadict['sensor'], re.I):
748
                    from .helper_functions import Landsat_entityID_decrypter as LED
749
750
751

                    def get_L7_sensor(fN): return LED(fN.split('.tar.gz')[0]).sensorIncSLC
                    temp_gdf['sensor'] = list(temp_gdf['filenames'].map(get_L7_sensor))
752
753
754
755
756
757

                all_gdfs.append(temp_gdf)

        gdf = GeoDataFrame(pd.concat(all_gdfs)).drop_duplicates()
        gdf.columns = ['satellite', 'sensor', 'filename']

758
759
760
761
762
763
764
765
766
767
768
        # run self.from_dictlist
        sceneInfoGDF = self._get_validated_sceneInfoGDFs(gdf)

        # populate attributes
        self._populate_jobAttrs_from_sceneInfoGDF(sceneInfoGDF)

        return self

    def from_sceneIDlist(self, list_sceneIDs, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
        # type: (list, int, int, str) -> object
        """
769
        Create a GMS_JOB instance based on the given list of scene IDs.
Daniel Scheffler's avatar
Bugfix    
Daniel Scheffler committed
770

771
772
773
774
775
776
777
778
        :param list_sceneIDs:          <list> of scene IDs, e.g. [26781907, 26781917, 26542650, 26542451, 26541679]
        :param virtual_sensor_id :     <int> a valid ID from the 'virtual_sensors' table of the postgreSQL database
        :param datasetid_spatial_ref:  <int> a valid dataset ID of the dataset to be chosen as spatial reference
                                        (from the 'datasets' table of the postgreSQL database)
                                        (default:249 - Sentinel-2A), 104=Landsat-8
        :param comment:                <str> a comment describing the job (e.g. 'Beta job')
        """

779
        self._set_target_sensor_specs(virtual_sensor_id, datasetid_spatial_ref)
780
781
782
783
784
        self.comment = comment

        list_sceneIDs = list(list_sceneIDs)

        # query 'satellite', 'sensor', 'filename' from database and summarize in GeoDataFrame
785
        with psycopg2.connect(self.conn) as conn:
786
787
788
789
790
791
792
793
            with conn.cursor() as cursor:
                execute_pgSQL_query(cursor,
                                    """SELECT scenes.id, satellites.name, sensors.name, scenes.filename FROM scenes
                                    LEFT JOIN satellites on scenes.satelliteid=satellites.id
                                    LEFT JOIN sensors on scenes.sensorid=sensors.id
                                    WHERE scenes.id in (%s)""" % ','.join([str(i) for i in list_sceneIDs]))
                gdf = GeoDataFrame(cursor.fetchall(), columns=['sceneid', 'satellite', 'sensor', 'filename'])

794
795
796
        # FIXME overwriting 'ETM+_SLC_OFF' with 'ETM+' causes _get_validated_sceneInfoGDFs() to fail because the
        # FIXME sensorid for ETM+_SLC_OFF cannot be retrieved
        # gdf['sensor'] = gdf['sensor'].apply(lambda senN: senN if senN != 'ETM+_SLC_OFF' else 'ETM+')
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
        gdf = gdf.drop_duplicates()

        if gdf.empty:
            raise ValueError('None of the given scene IDs could be found in the GeoMultiSens database. '
                             'Job creation failed.')
        else:
            missing_IDs = [i for i in list_sceneIDs if i not in gdf['sceneid'].values]
            if missing_IDs:
                warnings.warn('The following scene IDs could not been found in the GeoMultiSens database: \n%s'
                              % '\n'.join([str(i) for i in missing_IDs]))

            # run self.from_dictlist
            sceneInfoGDF = self._get_validated_sceneInfoGDFs(gdf)

            # populate attributes
            self._populate_jobAttrs_from_sceneInfoGDF(sceneInfoGDF)

        return self

816
817
818
819
820
821
822
823
    def from_entityIDlist(self, list_entityids, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
        """Create a GMS_JOB instance based on the given list of entity IDs.

        :param list_entityids:
        :param virtual_sensor_id:
        :param datasetid_spatial_ref:
        :param comment:
        :return:
824
        """
Daniel Scheffler's avatar
Bugfix    
Daniel Scheffler committed
825

826
        res_sceneIDs = get_info_from_postgreSQLdb(self.conn, 'scenes', ['id', ], {'entityid': list_entityids})
827
828
829
        if not res_sceneIDs:
            raise ValueError('No matching database entries found for the given entity IDs.')

830
        list_sceneIDs = np.array(res_sceneIDs)[:, 0].tolist()
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
        count_no_match = len(list_entityids) - len(list_sceneIDs)

        if count_no_match:
            warnings.warn('%s datasets could not be found the database. They cannot be processed.')

        return self.from_sceneIDlist(list_sceneIDs, virtual_sensor_id,
                                     datasetid_spatial_ref=datasetid_spatial_ref, comment=comment)

    def from_filenames(self, list_filenames, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
        """Create a GMS_JOB instance based on the given list of provider archive filenames.

        :param list_filenames:
        :param virtual_sensor_id:
        :param datasetid_spatial_ref:
        :param comment:
        :return:
847
848
        """

849
        res_sceneIDs = get_info_from_postgreSQLdb(self.conn, 'scenes', ['id', ], {'filename': list_filenames})
850
851
852
        if not res_sceneIDs:
            raise ValueError('No matching database entries found for the given filenames.')

853
        list_sceneIDs = np.array(res_sceneIDs)[:, 0].tolist()
854
855
856
857
858
859
860
        count_no_match = len(list_filenames) - len(list_sceneIDs)

        if count_no_match:
            warnings.warn('%s datasets could not be found the database. They cannot be processed.')

        return self.from_sceneIDlist(list_sceneIDs, virtual_sensor_id,
                                     datasetid_spatial_ref=datasetid_spatial_ref, comment=comment)
861

862
863
864
865
866
867
868
869
870
871
    def _get_validated_sceneInfoGDFs(self, GDF_SatSenFname):
        # type: (GeoDataFrame) -> GeoDataFrame
        """

        :param GDF_SatSenFname:
        :return:
        """

        gdf = GDF_SatSenFname

872
        # loop through all satellite-sensor combinations and get scene information from database
873
874
875
        all_gdf_recs, all_gdf_miss = [], []
        all_satellites, all_sensors = zip(
            *[i.split('__') for i in (np.unique(gdf['satellite'] + '__' + gdf['sensor']))])
876

877
878
        for satellite, sensor in zip(all_satellites, all_sensors):
            cur_gdf = gdf.loc[(gdf['satellite'] == satellite) & (gdf['sensor'] == sensor)]
879
880
881
            filenames = list(cur_gdf['filename'])

            satID_res = get_info_from_postgreSQLdb(self.conn, 'satellites', ['id'], {'name': satellite})
882
            senID_res = get_info_from_postgreSQLdb(self.conn, 'sensors', ['id'], {'name': sensor})
883
            assert len(satID_res), "No satellite named '%s' found in database." % satellite
884
            assert len(senID_res), "No sensor named '%s' found in database." % sensor
885
886

            # append sceneid and wkb_hex bounds
887
888
889
890
891
892
893
894
895
896
897
898
899
            if 'sceneid' in gdf.columns:
                sceneIDs = list(cur_gdf['sceneid'])
                conddict = dict(id=sceneIDs, satelliteid=satID_res[0][0], sensorid=senID_res[0][0])
            else:
                conddict = dict(filename=filenames, satelliteid=satID_res[0][0], sensorid=senID_res[0][0])

            records = get_info_from_postgreSQLdb(
                self.conn, 'scenes', ['filename', 'id', 'acquisitiondate', 'bounds'], conddict)
            records = GeoDataFrame(records, columns=['filename', 'sceneid', 'acquisitiondate', 'geom'])
            if 'sceneid' in gdf.columns:
                del records['sceneid']

            cur_gdf = cur_gdf.merge(records, on='filename', how="outer", copy=False)
900
901

            # separate records with valid matches in database from invalid matches (filename not found in database)
902
903
            gdf_recs = cur_gdf[
                cur_gdf.sceneid.notnull()].copy()  # creates a copy (needed to be able to apply maps later)
904
905
906
            gdf_miss = cur_gdf[cur_gdf.sceneid.isnull()]  # creates a view

            # convert scene ids from floats to integers
907
            gdf_recs['sceneid'] = list(gdf_recs.sceneid.map(lambda sceneid: int(sceneid)))
908
909

            # wkb_hex bounds to shapely polygons
Daniel Scheffler's avatar
Daniel Scheffler committed
910
            gdf_recs['polygons'] = list(gdf_recs.geom.map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
911
912
913
914
915
916
917
918
919
920
921
922
923

            all_gdf_recs.append(gdf_recs)
            all_gdf_miss.append(gdf_miss)

        # merge all dataframes of all satellite-sensor combinations
        gdf_recs_compl = GeoDataFrame(pd.concat(all_gdf_recs))
        gdf_miss_compl = GeoDataFrame(pd.concat(all_gdf_miss))

        # populate attributes
        if not gdf_miss_compl.empty:
            warnings.warn('The following scene filenames could not been found in the GeoMultiSens database: \n%s'
                          % '\n'.join(list(gdf_miss_compl['filename'])))

924
925
926
927
928
929
930
931
932
933
934
        return gdf_recs_compl

    def _populate_jobAttrs_from_sceneInfoGDF(self, sceneInfoGDF):
        # type: (GeoDataFrame) -> None
        """

        :param sceneInfoGDF:
        :return:
        """

        if not sceneInfoGDF.empty:
935
936
937
938
            self.dataframe = sceneInfoGDF
            self.sceneids = list(self.dataframe['sceneid'])
            self.statistics = [len(self.sceneids)] + [0] * 8
            self.bounds = box(*MultiPolygon(list(self.dataframe['polygons'])).bounds)
939
940
            self.timerange_start = self.dataframe.acquisitiondate.min().to_pydatetime()
            self.timerange_end = self.dataframe.acquisitiondate.max().to_pydatetime()
941

942
    def from_job_ID(self, job_ID):
943
944
        # type: (int) -> object
        """
945
        Create a GMS_JOB instance by querying the database for a specific job ID.
946
947
        :param job_ID:  <int> a valid id from the database table 'jobs'
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
948

949
        res = get_info_from_postgreSQLdb(self.conn, 'jobs', self.jobs_table_columns, {'id': job_ID})
950
        if not res:
951
            raise ValueError("No job with ID %s found in 'jobs' table of the database." % job_ID)
952

953
        self.exists_in_db = True
954
        [setattr(self, attrName, res[0][i]) for i, attrName in enumerate(self.jobs_table_columns)]
Daniel Scheffler's avatar
Daniel Scheffler committed
955
        self.bounds = wkb_loads(self.bounds, hex=True)
956

957
        # fill self.dataframe
958
959
960
961
962
963
964
965
966
967
968
        records = get_info_from_postgreSQLdb(self.conn, 'scenes', ['satelliteid', 'sensorid', 'filename',
                                                                   'id', 'acquisitiondate', 'bounds'],
                                             {'id': self.sceneids})
        gdf = GeoDataFrame(records,
                           columns=['satelliteid', 'sensorid', 'filename', 'sceneid', 'acquisitiondate', 'geom'])
        all_satIDs = gdf.satelliteid.unique().tolist()
        all_senIDs = gdf.sensorid.unique().tolist()
        satName_res = get_info_from_postgreSQLdb(self.conn, 'satellites', ['name'], {'id': all_satIDs})
        senName_res = get_info_from_postgreSQLdb(self.conn, 'sensors', ['name'], {'id': all_senIDs})
        all_satNames = [i[0] for i in satName_res]
        all_senNames = [i[0] for i in senName_res]
969
970
        id_satName_dict = dict(zip(all_satIDs, all_satNames))
        id_senName_dict = dict(zip(all_senIDs, all_senNames))
971
        gdf.insert(0, 'satellite', list(gdf.satelliteid.map(lambda satID: id_satName_dict[satID])))
972
        gdf.insert(1, 'sensor', list(gdf.sensorid.map(lambda senID: id_senName_dict[senID])))
Daniel Scheffler's avatar
Daniel Scheffler committed
973
        gdf['polygons'] = list(gdf.geom.map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
974

975
        self.dataframe = gdf[['satellite', 'sensor', 'filename', 'sceneid', 'acquisitiondate', 'geom', 'polygons']]
976
977
978

        return self

979
980
981
    def reset_job_progress(self):
        """Resets everthing in the database entry that has been written during the last run of the job..
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
982

983
984
985
        self.finishtime = None
        self.failed_sceneids = []
        self.progress = None
986
        self.status = 'pending'
987
988
989
990
        self.statistics = [len(self.sceneids)] + [0] * 8

        self.update_db_entry()

991
992
993
    def _get_dataframe(self, datadict):  # FIXME deprecated
        gdf = GeoDataFrame(datadict, columns=['satellite', 'sensor', 'filenames'])
        gdf.columns =</