database_tools.py 74.7 KB
Newer Older
1
import collections
2
import glob
3
import itertools
4
5
import os
import re
6
import shutil
7
8
import sqlite3
import sys
9
import traceback
10
11
import warnings
from datetime import datetime
Daniel Scheffler's avatar
Daniel Scheffler committed
12
from typing import Union  # noqa F401  # flake8 issue
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
13

14
15
import numpy as np
import pandas as pd
Daniel Scheffler's avatar
Daniel Scheffler committed
16
from pandas.io.sql import pandasSQL_builder, SQLTable
17
import psycopg2
Daniel Scheffler's avatar
Daniel Scheffler committed
18
from shapely.wkb import loads as wkb_loads
19
from geoalchemy2.types import Geometry as GEOMETRY
20
21
22
23
from geopandas import GeoDataFrame, GeoSeries
from shapely.geometry import Polygon, box, MultiPolygon
from sqlalchemy import create_engine
from sqlalchemy.types import to_instance, TypeEngine
24

25
26
from ..config import GMS_config as CFG
from . import path_generator as PG
27
from .definition_dicts import proc_chain
28

29
30
31
# + misc.helper_functions.cornerLonLat_to_postgreSQL_poly: (left out here in order to avoid circular dependencies)
# + misc.path_generator.get_path_metaCSV: (left out here in order to avoid circular dependencies)

32
33
__author__ = 'Daniel Scheffler'

34

35
def execute_pgSQL_query(cursor, query_command):
36
37
    """Executes a postgreSQL query catches the full error message if there is one.
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
38

Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
39
40
41
42
43
44
    try:
        cursor.execute(query_command)
    except psycopg2.ProgrammingError as e:
        raise psycopg2.ProgrammingError(e.pgerror + 'Query failed. Command was:\n%s' % query_command)


45
def get_scene_and_dataset_infos_from_postgreSQLdb(sceneid):
46
47
    # type: (int) -> collections.OrderedDict
    """Creates an OrderedDict containing further information about a given scene ID by querying the pgSQL database.
48
49
50

    :param sceneid:   <int> the GMS scene ID to get information for
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
51

52
53
    def query(tablename, vals2return, cond_dict, records2fetch=0):
        return get_info_from_postgreSQLdb(CFG.job.conn_database, tablename, vals2return, cond_dict, records2fetch)
54
    resultset = query('scenes', ['datasetid', 'satelliteid', 'sensorid', 'subsystemid', 'acquisitiondate', 'entityid',
55
56
57
                                 'filename'], {'id': sceneid})
    if len(resultset) == 0:
        sys.stderr.write("Scene with id %s not found. Skipping.." % sceneid)
58
59
60

    scenedata = resultset[0]
    ds = collections.OrderedDict()
61
62
63
64
65
66
67
68
69
70
71
    proc_level_tmp = query('scenes_proc', 'proc_level', {'sceneid': sceneid})
    ds.update({'proc_level': 'L0A' if proc_level_tmp == [] else proc_level_tmp[0][0]})
    ds.update({'scene_ID': sceneid})
    ds.update({'datasetid': scenedata[0]})
    ds.update({'image_type': query('datasets', 'image_type', {'id': scenedata[0]})[0][0]})
    ds.update({'satellite': query('satellites', 'name', {'id': scenedata[1]})[0][0]})
    ds.update({'sensor': query('sensors', 'name', {'id': scenedata[2]})[0][0]})
    ds.update({'subsystem': query('subsystems', 'name', {'id': scenedata[3]})[0][0] if scenedata[3] else None})
    ds.update({'acq_datetime': scenedata[4]})
    ds.update({'entity_ID': scenedata[5]})
    ds.update({'filename': scenedata[6]})
72
73
74
    return ds


75
def get_postgreSQL_value(value):
76
    # type: (any) -> str
77
78
79
    """Converts Python variable to a postgreSQL value respecting postgreSQL type casts.
    The resulting value can be directly inserted into a postgreSQL query."""

80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
    assert type(value) in [int, float, bool, str, Polygon, datetime, list, tuple] or value is None, \
        "Unsupported value type within postgreSQL matching expression. Got %s." % type(value)
    if isinstance(value, int):
        pgV = value
    elif isinstance(value, float):
        pgV = value
    elif isinstance(value, bool):
        pgV = value
    elif value is None:
        pgV = 'NULL'
    elif isinstance(value, str):
        pgV = "'%s'" % value.replace("'", "")
    elif isinstance(value, Polygon):
        pgV = "'%s'" % value.wkb_hex
    elif isinstance(value, datetime):
        pgV = "TIMESTAMP '%s'" % str(value)
    else:  # list or tuple in value
        if not value:  # empty list/tuple
98
99
100
            pgV = 'NULL'
        else:
            dTypes_in_value = list(set([type(i) for i in value]))
101
102
103
104
105
            assert len(dTypes_in_value) == 1, \
                'Mixed data types in postgreSQL matching expressions are not supported. Got %s.' % dTypes_in_value
            assert dTypes_in_value[0] in [int, str, float, np.int64]
            pgList = ",".join(["'%s'" % i if isinstance(value[0], str) else "%s" % i for i in value])
            pgV = "'{%s}'" % pgList
106
107
108
    return pgV


109
def get_postgreSQL_matchingExp(key, value):
110
    # type: (str,any) -> str
111
112
113
114
    """Converts a key/value pair to a postgreSQL matching expression in the form "column=value" respecting postgreSQL
    type casts. The resulting string can be directly inserted into a postgreSQL query.
    """
    pgVal = get_postgreSQL_value(value)
115
116
117
118
    if isinstance(pgVal, str) and pgVal.startswith("'{") and pgVal.endswith("}'"):
        return '%s in %s' % (key, pgVal.replace("'{", '(').replace("}'", ')'))  # '{1,2,3}' => (1,2,3)
    elif pgVal == 'NULL':
        return '%s is NULL' % key
119
    else:
120
        return '%s=%s' % (key, pgVal)
121

122

123
def get_info_from_postgreSQLdb(conn_params, tablename, vals2return, cond_dict=None, records2fetch=0, timeout=15000):
Daniel Scheffler's avatar
Daniel Scheffler committed
124
    # type: (str, str, Union[list, str], dict, int, int) -> Union[list, str]
125
    """Queries a postgreSQL database for the given parameters.
126

127
    :param conn_params:     <str> connection parameters as provided by CFG.job.conn_params
128
129
130
    :param tablename:       <str> name of the table within the database to be queried
    :param vals2return:     <list or str> a list of strings containing the column titles of the values to be returned
    :param cond_dict:       <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
131
132
                            HINT: <value> can also be a list or a tuple of elements to match, BUT note that the order
                                  of the list items is NOT respected!
133
    :param records2fetch:   <int> number of records to be fetched (default=0: fetch unlimited records)
134
135
    :param timeout:         <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
136

137
138
    if not isinstance(vals2return, list):
        vals2return = [vals2return]
139
140
    assert isinstance(records2fetch, int), "get_info_from_postgreSQLdb: Expected an integer for the argument " \
                                           "'records2return'. Got %s" % type(records2fetch)
141
    cond_dict = cond_dict if cond_dict else {}
142
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
143
    connection = psycopg2.connect(conn_params)
144
145
146
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
147
148
149
150
151
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
    cmd = "SELECT " + ','.join(vals2return) + " FROM " + tablename + " " + condition
    execute_pgSQL_query(cursor, cmd)
152

153
    records2return = cursor.fetchall() if records2fetch == 0 else [cursor.fetchone()] if records2fetch == 1 else \
154
        cursor.fetchmany(size=records2fetch)  # e.g. [('LE71950282003121EDC00',), ('LE71950282003105ASN00',)]
155
156
157
158
    cursor.close()
    connection.close()
    return records2return

159

160
def update_records_in_postgreSQLdb(conn_params, tablename, vals2update_dict, cond_dict=None, timeout=15000):
161
    # type: (str, str, dict, dict, int) -> Union[None, str]
162
    """Queries a postgreSQL database for the given parameters and updates the given columns of the query result.
163

164
    :param conn_params:       <str> connection parameters as provided by CFG.job.conn_params
165
166
167
168
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2update_dict:  <dict> a dictionary containing keys and values to be set in the form {'col_name':<value>}
    :param cond_dict:         <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                              HINT: <value> can also be a list or a tuple of elements to match
169
170
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
171

172
    cond_dict = cond_dict if cond_dict else {}
173
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
174
    connection = psycopg2.connect(conn_params)
175
176
177
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
178
179
180
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
181
182
    update_cond = "SET " + ', '.join(['%s=%s' % (k, get_postgreSQL_value(vals2update_dict[k]))
                                      for k in vals2update_dict.keys()])
183
    execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition)
184
    if cursor.fetchone()[0] == 0:
185
        warnings.warn("No record found fulfilling this condition: \n'%s'." % condition)
186
    else:
187
        execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + update_cond + " " + condition)
188

189
190
191
    if 'connection' in locals():
        connection.commit()
        connection.close()
192
193


194
def append_item_to_arrayCol_in_postgreSQLdb(conn_params, tablename, vals2append_dict, cond_dict=None, timeout=15000):
195
    # type: (str, str, dict, dict, int) -> Union[None, str]
196
197
198
    """Queries a postgreSQL database for the given parameters
    and appends the given value to the specified column of the query result.

199
    :param conn_params:       <str> connection parameters as provided by CFG.job.conn_params
200
201
202
203
204
205
206
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2append_dict:  <dict> a dictionary containing keys and value(s) to be set in the form
                              {'col_name':[<value>,<value>]}
    :param cond_dict:         <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                              HINT: <value> can also be a list or a tuple of elements to match
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
207

208
209
    assert len(vals2append_dict) == 1, 'Values can be appended to only one column at once.'
    if type(list(vals2append_dict.values())[0]) in [list, tuple]:
210
        raise NotImplementedError('Appending multiple values to one column at once is not yet supported.')
211
    cond_dict = cond_dict if cond_dict else {}
212
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
213
    connection = psycopg2.connect(conn_params)
214
215
216
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
217
218
219
220
221
222
223
224
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
    col2update = list(vals2append_dict.keys())[0]
    pgSQL_val = get_postgreSQL_value(vals2append_dict[col2update])
    pgSQL_val = pgSQL_val if type(vals2append_dict[col2update]) in [list, tuple] else '{%s}' % pgSQL_val
    append_cond = "SET %s = array_cat(%s, '%s')" % (col2update, col2update, pgSQL_val)
    execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition)
225
    if cursor.fetchone()[0] == 0:
226
        warnings.warn("No record found fulfilling this condition: \n'%s'." % condition)
227
    else:
228
229
230
231
        execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + append_cond + " " + condition + ';')
    if 'connection' in locals():
        connection.commit()
        connection.close()
232
233
234


def remove_item_from_arrayCol_in_postgreSQLdb(conn_params, tablename, vals2remove_dict, cond_dict=None, timeout=15000):
235
    # type: (str, str, dict, dict, int) -> Union[None, str]
236
237
238
    """Queries a postgreSQL database for the given parameters
    and removes the given value from the specified column of the query result.

239
    :param conn_params:       <str> connection parameters as provided by CFG.job.conn_params
240
241
242
243
244
245
246
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2remove_dict:  <dict> a dictionary containing keys and value(s) to be set in the form
                              {'col_name':[<value>,<value>]}
    :param cond_dict:         <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                              HINT: <value> can also be a list or a tuple of elements to match
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
247

248
249
    assert len(vals2remove_dict) == 1, 'Values can be removed from only one column at once.'
    if type(list(vals2remove_dict.values())[0]) in [list, tuple]:
250
        raise NotImplementedError('Removing multiple values from one column at once is not yet supported.')
251
    cond_dict = cond_dict if cond_dict else {}
252
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
253
    connection = psycopg2.connect(conn_params)
254
255
256
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
257
258
259
260
261
262
263
264
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
    col2update = list(vals2remove_dict.keys())[0]
    pgSQL_val = get_postgreSQL_value(vals2remove_dict[col2update])
    pgSQL_val = pgSQL_val if type(vals2remove_dict[col2update]) in [list, tuple] else '{%s}' % pgSQL_val
    remove_cond = "SET %s = array_remove(%s, '%s')" % (col2update, col2update, pgSQL_val)
    execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition)
265
    if cursor.fetchone()[0] == 0:
266
        warnings.warn("No record found fulfilling this condition: \n'%s'." % condition)
267
    else:
268
269
270
271
        execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + remove_cond + " " + condition + ';')
    if 'connection' in locals():
        connection.commit()
        connection.close()
272
273


274
275
def increment_decrement_arrayCol_in_postgreSQLdb(conn_params, tablename, col2update, idx_val2decrement=None,
                                                 idx_val2increment=None, cond_dict=None, timeout=15000):
276
    # type: (str, str, str, int, int, dict, int) -> Union[None, str]
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
    """Updates an array column of a specific postgreSQL table in the form that it increments or decrements the elements
    at a given position. HINT: The column must have values like that: [0,1,0,3,1,0]

    :param conn_params:         <str> connection parameters as provided by CFG.job.conn_params
    :param tablename:           <str> name of the table within the database to be update
    :param col2update:          <str> column name of the column to be updated
    :param idx_val2decrement:   <int> the index of the array element to be decremented (starts with 1)
    :param idx_val2increment:   <int> the index of the array element to be incremented (starts with 1)
    :param cond_dict:           <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                                HINT: <value> can also be a list or a tuple of elements to match
    :param timeout:             <int> allows to set a custom statement timeout (milliseconds)
    :return:
    """

    cond_dict = cond_dict if cond_dict else {}
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
    connection = psycopg2.connect(conn_params)
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""

    dec_str = '' if idx_val2decrement is None else \
302
        "%s[%s] = %s[%s]-1" % (col2update, idx_val2decrement, col2update, idx_val2decrement)
303
    inc_str = '' if idx_val2increment is None else \
304
        "%s[%s] = %s[%s]+1" % (col2update, idx_val2increment, col2update, idx_val2increment)
305
306
307

    if dec_str or inc_str:
        dec_inc_str = ','.join([dec_str, inc_str])
308
        execute_pgSQL_query(cursor, "UPDATE %s SET %s %s" % (tablename, dec_inc_str, condition))
309

310
311
312
    if 'connection' in locals():
        connection.commit()
        connection.close()
313
314


315
def create_record_in_postgreSQLdb(conn_params, tablename, vals2write_dict, timeout=15000):
316
    # type: (str, str, dict, dict, int) -> Union[int, str]
317
318
    """Creates a single new record in a postgreSQL database and pupulates its columns with the given values.

319
    :param conn_params:       <str> connection parameters as provided by CFG.job.conn_params
320
321
322
323
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2write_dict:   <dict> a dictionary containing keys and values to be set in the form {'col_name':<value>}
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
324

325
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
326
    connection = psycopg2.connect(conn_params)
327
328
329
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
330
    cursor = connection.cursor()
331
332
333
334

    keys, vals = zip(*[(k, str(get_postgreSQL_value(v))) for k, v in vals2write_dict.items()])

    execute_pgSQL_query(cursor, "INSERT INTO %s (%s) VALUES (%s);" % (tablename, ','.join(keys), ','.join(vals)))
335
    execute_pgSQL_query(cursor, "SELECT id FROM %s ORDER BY id DESC LIMIT 1" % tablename)
336
337
    newID = cursor.fetchone()[0]

338
339
340
    if 'connection' in locals():
        connection.commit()
        connection.close()
341
342
343
344

    return newID


345
def get_pgSQL_geospatial_query_cond(conn_params, table2query, geomCol2use='bounds', tgt_corners_lonlat=None,
346
                                    scene_ID=None, queryfunc='ST_Intersects', crossing_dateline_check=True):
347
    assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
348

349
    if tgt_corners_lonlat:
350
351
352
353
354
355
356
        # handle coordinates crossing the 180 degress meridian (dateline)
        # FIXME in that case the polygone has to be split at the dateline. otherwise pgSQL may yield wrong results
        if crossing_dateline_check:
            xvals = [x for x, y in tgt_corners_lonlat]
            if max(xvals) - min(xvals) > 180:
                tgt_corners_lonlat = [(x, y) if x > 0 else (x + 360, y) for x, y in tgt_corners_lonlat]

357
        from .helper_functions import cornerLonLat_to_postgreSQL_poly
358
        pGSQL_poly = cornerLonLat_to_postgreSQL_poly(tgt_corners_lonlat)
359
360
361
362
363
        src_geom = "'SRID=4326;%s'::geometry" % pGSQL_poly  # source geometry is given
        # FIXME scenes tabelle hat "geography" geoinfos -> eigener Index wird bei "geometry" nicht genutzt:
        tgt_geom = "%s.%s::geometry" % (table2query, geomCol2use)
        geocond = "%s(%s, %s)" % (queryfunc, src_geom, tgt_geom)
    else:  # scene_ID is not None:
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
364
        connection = psycopg2.connect(conn_params)
365
366
        if connection is None:
            return 'database connection fault'
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
367
        cursor = connection.cursor()
368
369
370
        cmd = "SELECT ST_AsText(bounds) FROM scenes WHERE scenes.id = %s" % scene_ID
        execute_pgSQL_query(cursor, cmd)
        res = cursor.fetchone()
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
371
372
373
        cursor.close()
        connection.close()
        if len(res):
374
            src_geom = "'SRID=4326;%s'::geometry" % res
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
375
376
377
        else:
            print('The scene with the ID %s does not exist in the scenes table.')
            return []
378
        geocond = "%s(%s, %s.%s::geometry)" % (queryfunc, src_geom, table2query, geomCol2use)
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
379
380
381
    return geocond


382
def get_overlapping_scenes_from_postgreSQLdb(conn_params, table='scenes_proc', scene_ID=None,
383
                                             tgt_corners_lonlat=None, conditions=None, add_cmds='', timeout=15000):
384
    # type: (str, str, int, list, Union[list, str], str, int) -> Union[list, str]
Daniel Scheffler's avatar
Daniel Scheffler committed
385

386
387
    """Queries the postgreSQL database in order to find those scenes of a specified reference satellite (Landsat-8 or
    Sentinel-2) that have an overlap to the given corner coordinates AND that fulfill the given conditions.
388

389
    :param conn_params:             <str> connection parameters as provided by CFG.job.conn_params
390
391
    :param table:                   <str> name of the table within the database to be updated
    :param scene_ID:                <int> a sceneID to get the target geographical extent from
392
393
394
                                        (needed if tgt_corners_lonlat is not provided)
    :param tgt_corners_lonlat:      <list> a list of coordinates defining the target geographical extent
                                           (needed if scene_ID is not provided)
395
396
397
398
    :param conditions:              <list> a list of additional query conditions
    :param add_cmds:                <str> additional pgSQL commands to be added to the pgSQL query
    :param timeout:                 <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
399

400
    conditions = [] if conditions is None else conditions if isinstance(conditions, list) else [conditions]
401
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
402
    connection = psycopg2.connect(conn_params)
403
404
    if connection is None:
        return 'database connection fault'
405
    datasetids = [int(d.split('=')[1].strip()) for d in conditions if d.startswith('datasetid')]
406
407
408
409
410
411
412
    datasetid = datasetids[0] if datasetids else 104  # Landsat-8
    # FIXME: use Landsat-8 instead of Sentinel-2 as long as S2 L1A_P is not working:
    datasetid = 104 if datasetid == 249 else datasetid

    if table != 'scenes_proc':
        assert datasetid, "filtdsId is needed if table is not 'scenes_proc'"
    if scene_ID is None:
413
        assert tgt_corners_lonlat, "Provide either scene_ID or tgt_corners_lonlat!"
414
    if tgt_corners_lonlat is None:
415
        assert scene_ID, "Provide either scene_ID or tgt_corners_lonlat!"
416
417
418
419
420
421
422
423
424
425

    val2get = "scenes.id" if table == 'scenes' else "%s.sceneid" % table
    # refcond  = ['scenes_proc.georef = True'] if not datasetids else ['scenes.datasetid = %s' %datasetid]
    refcond = ['scenes.datasetid = %s' % datasetid]

    geocond = [get_pgSQL_geospatial_query_cond(conn_params, table, tgt_corners_lonlat=tgt_corners_lonlat,
                                               scene_ID=scene_ID, queryfunc='ST_Intersects',
                                               crossing_dateline_check=True)]

    join = "INNER JOIN scenes ON (%s.sceneid = scenes.id) " % table if table != 'scenes' and datasetids else ''
426
    conditions = [c for c in conditions if not c.startswith('datasetid')]
427
428
429
430
    where = "WHERE %s" % " AND ".join(geocond + refcond + conditions)
    usedtbls = "scenes" if table == 'scenes' else "%s, scenes" % table if 'scenes.' in where and join == '' else table
    query = "SELECT %s FROM %s %s%s %s" % (val2get, usedtbls, join, where, add_cmds)
    cursor = connection.cursor()
431
    execute_pgSQL_query(cursor, query)
432
433
434
435
436
    records2return = cursor.fetchall()
    cursor.close()
    connection.close()
    return records2return

437

438
def get_overlapping_MGRS_tiles(conn_params, scene_ID=None, tgt_corners_lonlat=None, timeout=15000):
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
439
440
    """In contrast to pgSQL 'Overlapping' here means that both geometries share some spatial area.
    So it combines ST_Overlaps and ST_Contains."""
441
    assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
442
443
444

    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
    connection = psycopg2.connect(conn_params)
445
446
    if connection is None:
        return 'database connection fault'
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
447
448

    vals2get = ['grid100k', 'grid1mil', 'geom']
449
    # FIXME this is covered by ST_Intersects:
Daniel Scheffler's avatar
Daniel Scheffler committed
450
    # geocond1 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Overlaps',
451
    #                                            tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
Daniel Scheffler's avatar
Daniel Scheffler committed
452
453
454
455
456
    # geocond2 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Contains',
    #                                            tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
    # geocond3 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Within',
    #                                            tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
    geocond = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Intersects',
457
458
                                              tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID,
                                              crossing_dateline_check=True)
459
460
    # query = "SELECT %s FROM %s WHERE %s OR %s OR %s"
    #     % (', '.join(vals2get), 'mgrs_tiles', geocond1, geocond2, geocond3)
Daniel Scheffler's avatar
Daniel Scheffler committed
461
    query = "SELECT %s FROM %s WHERE %s" % (', '.join(vals2get), 'mgrs_tiles', geocond)
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
462
463
464
465
466
467
468
469
    cursor = connection.cursor()
    execute_pgSQL_query(cursor, query)
    records = cursor.fetchall()
    cursor.close()
    connection.close()

    GDF = GeoDataFrame(records, columns=['grid100k', 'grid1mil', 'wkb_hex'])

Daniel Scheffler's avatar
Daniel Scheffler committed
470
    GDF['shapelyPoly_LonLat'] = list(GDF['wkb_hex'].map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
471
472
    GDF['granuleid'] = GDF['grid1mil'].str.cat(GDF['grid100k'])
    return GDF[['granuleid', 'shapelyPoly_LonLat']]
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
473
474


475
476
def get_overlapping_MGRS_tiles2(conn_params, scene_ID=None, tgt_corners_lonlat=None, timeout=15000):
    assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
477
478
479

    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
    connection = psycopg2.connect(conn_params)
480
481
    if connection is None:
        return 'database connection fault'
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
482
483

    vals2get = ['granuleid', 'footprint_wgs84']
484
485
    geocond = get_pgSQL_geospatial_query_cond(conn_params, 'footprints_sentinel2_granules',
                                              geomCol2use='footprint_wgs84',
486
                                              tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
487
488
489
490
491
492
493
494
495
496
    query = "SELECT %s FROM %s WHERE %s" % (', '.join(vals2get), 'footprints_sentinel2_granules', geocond)

    cursor = connection.cursor()
    execute_pgSQL_query(cursor, query)
    records = cursor.fetchall()
    cursor.close()
    connection.close()

    GDF = GeoDataFrame(records, columns=['granuleid', 'wkb_hex'])

Daniel Scheffler's avatar
Daniel Scheffler committed
497
    GDF['shapelyPoly_LonLat'] = list(GDF['wkb_hex'].map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
Daniel Scheffler's avatar
GEOP:    
Daniel Scheffler committed
498
499
500
501

    return GDF[['granuleid', 'shapelyPoly_LonLat']]


502
503
504
def get_dict_satellite_name_id(conn_params):
    # type: (str) -> dict
    """Returns a dictionary with satellite names as keys and satellite IDs as values as read from pgSQL database.
505
506
507

    :param conn_params:     <str> pgSQL database connection parameters
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
508

509
510
    res = get_info_from_postgreSQLdb(conn_params, 'satellites', ['name', 'id'])
    assert len(res) > 0, 'Error getting satellite names from postgreSQL database.'
511
    arr = np.array(res)
512
    return dict(zip(list(arr[:, 0]), list(arr[:, 1])))
513
514
515
516
517
518


def get_dict_sensor_name_id(conn_params):
    # type: (str) -> dict
    """Returns a dictionary with sensor names as keys and sensor IDs as values as read from pgSQL database.
    :param conn_params:     <str> pgSQL database connection parameters """
Daniel Scheffler's avatar
Daniel Scheffler committed
519

520
521
    res = get_info_from_postgreSQLdb(conn_params, 'sensors', ['name', 'id'])
    assert len(res) > 0, 'Error getting sensor names from postgreSQL database.'
522
    arr = np.array(res)
523
    return dict(zip(list(arr[:, 0]), list(arr[:, 1])))
524
525


526
def get_entityIDs_from_filename(conn_DB, filename):
527
528
529
    # type: (str, str) -> list
    """Returns entityID(s) for the given filename. In case of Sentinel-2 there can be more multiple entity IDs if
    multiple granules are saved in one .zip file.
530

531
532
533
    :param conn_DB:     <str> pgSQL database connection parameters
    :param filename:    <str> the filename to get the corresponding entity ID(s) for
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
534

535
    if filename[:2] in ['LE', 'LC', 'LO'] and filename.endswith('.tar.gz'):  # Landsat
536
537
        entityIDs = [filename.split('.tar.gz')[0]]
    else:
538
539
540
        print('Querying database in order to get entityIDs for %s...' % filename)
        res = get_info_from_postgreSQLdb(conn_DB, 'scenes', ['entityid'], {'filename': filename}, timeout=40000)
        entityIDs = [subres[0] for subres in res] if len(res) > 0 else []
541
542
543
544
545
546
    return entityIDs


def get_filename_by_entityID(conn_DB, entityid, satellite):
    # type: (str,str,str) -> str
    """Returns the filename for the given entity ID.
547

548
549
550
551
    :param conn_DB:     <str> pgSQL database connection parameters
    :param entityid:    <str> entity ID
    :param satellite:   <str> satellite name to which the entity ID is belonging
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
552

553
554
555
556
557
558
559
560
561
562
    if re.search('Landsat', satellite, re.I):
        filename = '%s.tar.gz' % entityid
    elif re.search('Sentinel-2', satellite, re.I):
        filename = get_info_from_postgreSQLdb(conn_DB, 'scenes', ['filename'],
                                              {'entityid': entityid}, records2fetch=1)[0][0]
    else:
        raise NotImplementedError
    return filename


563
def get_notDownloadedsceneIDs(conn_DB, entityIDs, satellite, sensor, src_folder):
564
565
566
567
    # type: (str,list,str,str,str) -> np.ndarray
    """Takes a list of entity IDs and extracts those records that have the corresponding archive file in the given
    source folder and that have the processing level 'METADATA' in the pgSQL database. Based on this subset a numpy
    array containing the corresponding scene IDs and the target filenames for the fileserver is returned.
568

569
570
571
572
573
574
    :param conn_DB:     <str> pgSQL database connection parameters
    :param entityIDs:   <list> a list of entity IDs
    :param satellite:   <str> the name of the satellite to restrict the query on
    :param sensor:      <str> the name of the sensor to restrict the query on
    :param src_folder:  <str> the source directory where archive files are saved
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
575

576
577
578
    columns = ['id', 'entityid', 'satelliteid', 'sensorid', 'filename', 'proc_level']
    result = get_info_from_postgreSQLdb(conn_DB, 'scenes', columns, {'entityid': entityIDs})
    df = pd.DataFrame(result, columns=columns)
579
580

    satNameID_dic = get_dict_satellite_name_id(conn_DB)
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
    satID = satNameID_dic[satellite]
    target_folder = os.path.join(CFG.job.path_archive, satellite, sensor)

    def get_fName(entityid): return get_filename_by_entityID(conn_DB, entityid, satellite)

    def tgt_exists(fileName): return os.path.exists(os.path.join(target_folder, fileName))

    def src_exists(entityid):
        return os.path.exists(os.path.join(src_folder, get_filename_by_entityID(conn_DB, entityid, satellite)))
    df['tgt_fileName'] = list(df['entityid'].map(get_fName))
    df['tgtFile_exists'] = list(df['tgt_fileName'].map(tgt_exists))
    df['srcFile_exists'] = list(df['entityid'].map(src_exists))
    tgt_satID = (df.satelliteid == float(satID))
    # isDL = (df.proc_level == 'DOWNLOADED')
    isMET = (df.proc_level == 'METADATA')
    # tgtE = (df.tgtFile_exists == True)
    srcE = df.srcFile_exists  # (df.srcFile_exists == True)
    # sceneIDs_notDL_tgtE = df[tgt_satID & (isDL == 0) & tgtE]  # maybe needed later
    # sceneIDs_DL_tgtNE = df[tgt_satID & isDL & (tgtE == 0)]  # maybe needed later
    # sceneIDs_DL_tgtE = df[tgt_satID & isDL & tgtE]  # maybe needed later
    sceneIDs_isMET_srcE = df[tgt_satID & isMET & srcE]
    return sceneIDs_isMET_srcE[['id', 'tgt_fileName']].values
603
604


605
class GMS_JOB(object):
606
    """gms_preprocessing job manager"""
607

608
    def __init__(self, conn_db):
609
        # type: (str) -> None
610
        """
611
        :param conn_db: <str> the database connection parameters as given by CFG.job.conn_params
612
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
613

614
615
616
        self.conn = conn_db
        self.dataframe = GeoDataFrame()
        self.scene_counts = {}  # set by self.create()
617

618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
        self.exists_in_db = False
        self.id = None  #: int
        self.creationtime = datetime.now()
        self.finishtime = None
        self.sceneids = []
        self.timerange_start = datetime.min
        self.timerange_end = datetime.max
        self.bounds = box(-180, -90, 180, 90)
        self.distribution_index = None
        self.progress = None
        self.feedback = None
        self.failed_sceneids = []
        self.ref_job_id = None
        self.datacube_mgrs_tiles_proc = []
        self.jobs_table_columns = ['id', 'creationtime', 'finishtime', 'sceneids', 'timerange_start',
                                   'timerange_end', 'bounds', 'distribution_index', 'progress', 'feedback',
                                   'failed_sceneids', 'datasetid_spatial_ref',
                                   'virtualsensorid', 'ref_job_id', 'datacube_mgrs_tiles_proc', 'comment',
                                   'status', 'statistics']
        self.virtualsensorid = None  # set by self._set_target_sensor_specs()
        # FIXME notnull but not getable via virtual sensor id and not needed anymore in DB (gsd is given):
        self.datasetid_spatial_ref = 249
        self.datasetname_spatial_ref = 'SENTINEL-2A'
        self.status = None
        self.statistics = []
        self.comment = None
        self.epsg = None  # set by self._set_target_sensor_specs()
        self.ground_spatial_sampling = None  # set by self._set_target_sensor_specs()
646

647
648
649
    def __repr__(self):
        return 'GMS job:\n\n' + GeoSeries(self.db_entry).to_string()

650
    def _set_target_sensor_specs(self, virtual_sensor_id, datasetid_spatial_ref):
651
        self.virtualsensorid = virtual_sensor_id
652
653
654
655
656
657
658
659
660
661
662
663
664
        res = get_info_from_postgreSQLdb(self.conn, 'virtual_sensors', ['spatial_resolution',
                                                                        "projection_epsg"], {'id': virtual_sensor_id})
        assert res, \
            "'virtual_sensor_id'=%s does not exist in the table 'virtual_sensors' of the database." % virtual_sensor_id
        target_gsd = res[0][0]
        self.ground_spatial_sampling = [target_gsd, target_gsd] if type(target_gsd) in [int, float] else target_gsd
        self.epsg = int(res[0][1])

        assert isinstance(datasetid_spatial_ref, int)
        self.datasetid_spatial_ref = datasetid_spatial_ref
        res = get_info_from_postgreSQLdb(self.conn, 'datasets', ['name'], {'id': datasetid_spatial_ref})
        assert res, \
            "'datasetid_spatial_ref'=%s does not exist in the table 'datasets' of the database." % datasetid_spatial_ref
665
666
667
668
        self.datasetname_spatial_ref = res

    @property
    def db_entry(self):
669
670
        """Returns an OrderedDict containing keys and values of the database entry.
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
671

672
673
        db_entry = collections.OrderedDict()
        for i in self.jobs_table_columns:
674
            db_entry[i] = getattr(self, i)
675
676
        return db_entry

677
    def from_dictlist(self, dictlist_data2process, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
678
        # type: (list, int, int, str) -> object
679
        """
680
        :param dictlist_data2process:  <list> a list of dictionaries containing the keys "satellite", "sensor" and
681
682
683
684
685
686
                                        "filenames",
                                        e.g. [{'satellite:'Landsat-8,'sensor':'OLI_TIRS','filenames':file.tar.gz},{...}]
        :param virtual_sensor_id :     <int> a valid ID from the 'virtual_sensors' table of the postgreSQL database
        :param datasetid_spatial_ref:  <int> a valid dataset ID of the dataset to be chosen as spatial reference
                                        (from the 'datasets' table of the postgreSQL database)
                                        (default:249 - Sentinel-2A), 104=Landsat-8
687
        :param comment:                <str> a comment describing the job (e.g. 'Beta job')
688
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
689

690
        self._set_target_sensor_specs(virtual_sensor_id, datasetid_spatial_ref)
691
        self.comment = comment
692
693
694

        dictlist_data2process = dictlist_data2process if dictlist_data2process else []

695
        for idx, datadict in enumerate(dictlist_data2process):
696
            assert isinstance(datadict, dict), "Expected only dictionaries within 'dictlist_data2process'. " \
697
                                               "Got %s in there." % type(datadict)
698
            assert False not in [i in datadict for i in ['satellite', 'sensor', 'filenames']]
699
            assert type(datadict['filenames']) in [list, str]
700

701
            if isinstance(datadict['filenames'], str):
702
703
704
705
706
707
                if datadict['filenames'].endswith('.csv'):
                    assert os.path.exists(datadict['filenames'])
                else:
                    datadict['filenames'] = [datadict['filenames']]

        # find all duplicates in input datadicts and build common geodataframe
708
        all_gdfs = []
709
        for datadict in dictlist_data2process:
710
            assert isinstance(datadict, dict)
711

712
713
            if isinstance(datadict['filenames'], str) and datadict['filenames'].endswith('.csv'):
                datadict['filenames'] = None  # TODO implement csv reader here
714
715
716
717
                raise NotImplementedError
            else:
                temp_gdf = GeoDataFrame(datadict, columns=['satellite', 'sensor', 'filenames'])
                if re.search('Landsat-7', datadict['satellite'], re.I) and re.search('ETM+', datadict['sensor'], re.I):
718
                    from .helper_functions import Landsat_entityID_decrypter as LED
719
720
721

                    def get_L7_sensor(fN): return LED(fN.split('.tar.gz')[0]).sensorIncSLC
                    temp_gdf['sensor'] = list(temp_gdf['filenames'].map(get_L7_sensor))
722
723
724
725
726
727

                all_gdfs.append(temp_gdf)

        gdf = GeoDataFrame(pd.concat(all_gdfs)).drop_duplicates()
        gdf.columns = ['satellite', 'sensor', 'filename']

728
729
730
731
732
733
734
735
736
737
738
        # run self.from_dictlist
        sceneInfoGDF = self._get_validated_sceneInfoGDFs(gdf)

        # populate attributes
        self._populate_jobAttrs_from_sceneInfoGDF(sceneInfoGDF)

        return self

    def from_sceneIDlist(self, list_sceneIDs, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
        # type: (list, int, int, str) -> object
        """
739
        Create a GMS_JOB instance based on the given list of scene IDs.
Daniel Scheffler's avatar
Bugfix    
Daniel Scheffler committed
740

741
742
743
744
745
746
747
748
        :param list_sceneIDs:          <list> of scene IDs, e.g. [26781907, 26781917, 26542650, 26542451, 26541679]
        :param virtual_sensor_id :     <int> a valid ID from the 'virtual_sensors' table of the postgreSQL database
        :param datasetid_spatial_ref:  <int> a valid dataset ID of the dataset to be chosen as spatial reference
                                        (from the 'datasets' table of the postgreSQL database)
                                        (default:249 - Sentinel-2A), 104=Landsat-8
        :param comment:                <str> a comment describing the job (e.g. 'Beta job')
        """

749
        self._set_target_sensor_specs(virtual_sensor_id, datasetid_spatial_ref)
750
751
752
753
754
        self.comment = comment

        list_sceneIDs = list(list_sceneIDs)

        # query 'satellite', 'sensor', 'filename' from database and summarize in GeoDataFrame
755
        with psycopg2.connect(self.conn) as conn:
756
757
758
759
760
761
762
763
            with conn.cursor() as cursor:
                execute_pgSQL_query(cursor,
                                    """SELECT scenes.id, satellites.name, sensors.name, scenes.filename FROM scenes
                                    LEFT JOIN satellites on scenes.satelliteid=satellites.id
                                    LEFT JOIN sensors on scenes.sensorid=sensors.id
                                    WHERE scenes.id in (%s)""" % ','.join([str(i) for i in list_sceneIDs]))
                gdf = GeoDataFrame(cursor.fetchall(), columns=['sceneid', 'satellite', 'sensor', 'filename'])

764
765
766
        # FIXME overwriting 'ETM+_SLC_OFF' with 'ETM+' causes _get_validated_sceneInfoGDFs() to fail because the
        # FIXME sensorid for ETM+_SLC_OFF cannot be retrieved
        # gdf['sensor'] = gdf['sensor'].apply(lambda senN: senN if senN != 'ETM+_SLC_OFF' else 'ETM+')
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
        gdf = gdf.drop_duplicates()

        if gdf.empty:
            raise ValueError('None of the given scene IDs could be found in the GeoMultiSens database. '
                             'Job creation failed.')
        else:
            missing_IDs = [i for i in list_sceneIDs if i not in gdf['sceneid'].values]
            if missing_IDs:
                warnings.warn('The following scene IDs could not been found in the GeoMultiSens database: \n%s'
                              % '\n'.join([str(i) for i in missing_IDs]))

            # run self.from_dictlist
            sceneInfoGDF = self._get_validated_sceneInfoGDFs(gdf)

            # populate attributes
            self._populate_jobAttrs_from_sceneInfoGDF(sceneInfoGDF)

        return self

786
787
788
789
790
791
792
793
    def from_entityIDlist(self, list_entityids, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
        """Create a GMS_JOB instance based on the given list of entity IDs.

        :param list_entityids:
        :param virtual_sensor_id:
        :param datasetid_spatial_ref:
        :param comment:
        :return:
794
        """
Daniel Scheffler's avatar
Bugfix    
Daniel Scheffler committed
795

796
        res_sceneIDs = get_info_from_postgreSQLdb(self.conn, 'scenes', ['id', ], {'entityid': list_entityids})
797
798
799
        if not res_sceneIDs:
            raise ValueError('No matching database entries found for the given entity IDs.')

800
        list_sceneIDs = np.array(res_sceneIDs)[:, 0].tolist()
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
        count_no_match = len(list_entityids) - len(list_sceneIDs)

        if count_no_match:
            warnings.warn('%s datasets could not be found the database. They cannot be processed.')

        return self.from_sceneIDlist(list_sceneIDs, virtual_sensor_id,
                                     datasetid_spatial_ref=datasetid_spatial_ref, comment=comment)

    def from_filenames(self, list_filenames, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
        """Create a GMS_JOB instance based on the given list of provider archive filenames.

        :param list_filenames:
        :param virtual_sensor_id:
        :param datasetid_spatial_ref:
        :param comment:
        :return:
817
818
        """

819
        res_sceneIDs = get_info_from_postgreSQLdb(self.conn, 'scenes', ['id', ], {'filename': list_filenames})
820
821
822
        if not res_sceneIDs:
            raise ValueError('No matching database entries found for the given filenames.')

823
        list_sceneIDs = np.array(res_sceneIDs)[:, 0].tolist()
824
825
826
827
828
829
830
        count_no_match = len(list_filenames) - len(list_sceneIDs)

        if count_no_match:
            warnings.warn('%s datasets could not be found the database. They cannot be processed.')

        return self.from_sceneIDlist(list_sceneIDs, virtual_sensor_id,
                                     datasetid_spatial_ref=datasetid_spatial_ref, comment=comment)
831

832
833
834
835
836
837
838
839
840
841
    def _get_validated_sceneInfoGDFs(self, GDF_SatSenFname):
        # type: (GeoDataFrame) -> GeoDataFrame
        """

        :param GDF_SatSenFname:
        :return:
        """

        gdf = GDF_SatSenFname

842
        # loop through all satellite-sensor combinations and get scene information from database
843
844
845
        all_gdf_recs, all_gdf_miss = [], []
        all_satellites, all_sensors = zip(
            *[i.split('__') for i in (np.unique(gdf['satellite'] + '__' + gdf['sensor']))])
846

847
848
        for satellite, sensor in zip(all_satellites, all_sensors):
            cur_gdf = gdf.loc[(gdf['satellite'] == satellite) & (gdf['sensor'] == sensor)]
849
850
851
            filenames = list(cur_gdf['filename'])

            satID_res = get_info_from_postgreSQLdb(self.conn, 'satellites', ['id'], {'name': satellite})
852
            senID_res = get_info_from_postgreSQLdb(self.conn, 'sensors', ['id'], {'name': sensor})
853
            assert len(satID_res), "No satellite named '%s' found in database." % satellite
854
            assert len(senID_res), "No sensor named '%s' found in database." % sensor
855
856

            # append sceneid and wkb_hex bounds
857
858
859
860
861
862
863
864
865
866
867
868
869
            if 'sceneid' in gdf.columns:
                sceneIDs = list(cur_gdf['sceneid'])
                conddict = dict(id=sceneIDs, satelliteid=satID_res[0][0], sensorid=senID_res[0][0])
            else:
                conddict = dict(filename=filenames, satelliteid=satID_res[0][0], sensorid=senID_res[0][0])

            records = get_info_from_postgreSQLdb(
                self.conn, 'scenes', ['filename', 'id', 'acquisitiondate', 'bounds'], conddict)
            records = GeoDataFrame(records, columns=['filename', 'sceneid', 'acquisitiondate', 'geom'])
            if 'sceneid' in gdf.columns:
                del records['sceneid']

            cur_gdf = cur_gdf.merge(records, on='filename', how="outer", copy=False)
870
871

            # separate records with valid matches in database from invalid matches (filename not found in database)
872
873
            gdf_recs = cur_gdf[
                cur_gdf.sceneid.notnull()].copy()  # creates a copy (needed to be able to apply maps later)
874
875
876
            gdf_miss = cur_gdf[cur_gdf.sceneid.isnull()]  # creates a view

            # convert scene ids from floats to integers
877
            gdf_recs['sceneid'] = list(gdf_recs.sceneid.map(lambda sceneid: int(sceneid)))
878
879

            # wkb_hex bounds to shapely polygons
Daniel Scheffler's avatar
Daniel Scheffler committed
880
            gdf_recs['polygons'] = list(gdf_recs.geom.map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
881
882
883
884
885
886
887
888
889
890
891
892
893

            all_gdf_recs.append(gdf_recs)
            all_gdf_miss.append(gdf_miss)

        # merge all dataframes of all satellite-sensor combinations
        gdf_recs_compl = GeoDataFrame(pd.concat(all_gdf_recs))
        gdf_miss_compl = GeoDataFrame(pd.concat(all_gdf_miss))

        # populate attributes
        if not gdf_miss_compl.empty:
            warnings.warn('The following scene filenames could not been found in the GeoMultiSens database: \n%s'
                          % '\n'.join(list(gdf_miss_compl['filename'])))

894
895
896
897
898
899
900
901
902
903
904
        return gdf_recs_compl

    def _populate_jobAttrs_from_sceneInfoGDF(self, sceneInfoGDF):
        # type: (GeoDataFrame) -> None
        """

        :param sceneInfoGDF:
        :return:
        """

        if not sceneInfoGDF.empty:
905
906
907
908
            self.dataframe = sceneInfoGDF
            self.sceneids = list(self.dataframe['sceneid'])
            self.statistics = [len(self.sceneids)] + [0] * 8
            self.bounds = box(*MultiPolygon(list(self.dataframe['polygons'])).bounds)
909
910
            self.timerange_start = self.dataframe.acquisitiondate.min().to_pydatetime()
            self.timerange_end = self.dataframe.acquisitiondate.max().to_pydatetime()
911

912
    def from_job_ID(self, job_ID):
913
914
        # type: (int) -> object
        """
915
        Create a GMS_JOB instance by querying the database for a specific job ID.
916
917
        :param job_ID:  <int> a valid id from the database table 'jobs'
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
918

919
        res = get_info_from_postgreSQLdb(self.conn, 'jobs', self.jobs_table_columns, {'id': job_ID})
920
        if not res:
921
            raise ValueError("No job with ID %s found in 'jobs' table of the database." % job_ID)
922

923
        self.exists_in_db = True
924
        [setattr(self, attrName, res[0][i]) for i, attrName in enumerate(self.jobs_table_columns)]
Daniel Scheffler's avatar
Daniel Scheffler committed
925
        self.bounds = wkb_loads(self.bounds, hex=True)
926

927
        # fill self.dataframe
928
929
930
931
932
933
934
935
936
937
938
        records = get_info_from_postgreSQLdb(self.conn, 'scenes', ['satelliteid', 'sensorid', 'filename',
                                                                   'id', 'acquisitiondate', 'bounds'],
                                             {'id': self.sceneids})
        gdf = GeoDataFrame(records,
                           columns=['satelliteid', 'sensorid', 'filename', 'sceneid', 'acquisitiondate', 'geom'])
        all_satIDs = gdf.satelliteid.unique().tolist()
        all_senIDs = gdf.sensorid.unique().tolist()
        satName_res = get_info_from_postgreSQLdb(self.conn, 'satellites', ['name'], {'id': all_satIDs})
        senName_res = get_info_from_postgreSQLdb(self.conn, 'sensors', ['name'], {'id': all_senIDs})
        all_satNames = [i[0] for i in satName_res]
        all_senNames = [i[0] for i in senName_res]
939
940
        id_satName_dict = dict(zip(all_satIDs, all_satNames))
        id_senName_dict = dict(zip(all_senIDs, all_senNames))
941
        gdf.insert(0, 'satellite', list(gdf.satelliteid.map(lambda satID: id_satName_dict[satID])))
942
        gdf.insert(1, 'sensor', list(gdf.sensorid.map(lambda senID: id_senName_dict[senID])))
Daniel Scheffler's avatar
Daniel Scheffler committed
943
        gdf['polygons'] = list(gdf.geom.map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
944

945
        self.dataframe = gdf[['satellite', 'sensor', 'filename', 'sceneid', 'acquisitiondate', 'geom', 'polygons']]
946
947
948

        return self

949
950
951
    def reset_job_progress(self):
        """Resets everthing in the database entry that has been written during the last run of the job..
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
952

953
954
955
        self.finishtime = None
        self.failed_sceneids = []
        self.progress = None
956
        self.status = 'pending'
957
958
959
960
        self.statistics = [len(self.sceneids)] + [0] * 8

        self.update_db_entry()

961
962
963
    def _get_dataframe(self, datadict):  # FIXME deprecated
        gdf = GeoDataFrame(datadict, columns=['satellite', 'sensor', 'filenames'])
        gdf.columns = ['satellite', 'sensor', 'filename']
964

965
966
967
968
        satID_res = get_info_from_postgreSQLdb(self.conn, 'satellites', ['id'], {'name': datadict['satellite']})
        senID_res = get_info_from_postgreSQLdb(self.conn, 'sensors', ['id'], {'name': datadict['sensor']})
        assert len(satID_res), "No satellite named '%s' found in database." % datadict['satellite']
        assert len(senID_res), "No sensor named '%s' found in database." % datadict['sensor']
969
970

        # append sceneid and wkb_hex bounds
971
        records = get_info_from_postgreSQLdb(self.conn, 'scenes', ['filename', 'id', 'acquisitiondate', 'bounds'],
972
973
974
                                             {'filename': datadict['filenames'],
                                              'satelliteid': satID_res[0][0], 'sensorid': senID_res[0][0]})
        records = GeoDataFrame(records, columns=['filename', 'sceneid', 'acquisitiondate', 'geom'])
975
        gdf = gdf.merge(records, on='filename', how="outer")
976
977

        # separate records with valid matches in database from invalid matches (filename not found in database)
978
979
        gdf_recs = gdf[gdf.sceneid.notnull()].copy()  # creates a copy (needed to be able to apply maps later)
        gdf_miss = gdf[gdf.sceneid.isnull()]  # creates a view
980
981

        # convert scene ids from floats to integers
982
        gdf_recs['sceneid'] = list(gdf_recs.sceneid.map(lambda sceneid: int(sceneid)))
983
984

        # wkb_hex bounds to shapely polygons
Daniel Scheffler's avatar
Daniel Scheffler committed
985
        gdf_recs['polygons'] = list(gdf_recs.geom.map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
986
987
988
989

        return gdf_recs, gdf_miss

    def create(self):
990
        # type: () -> int