database_tools.py 75.6 KB
Newer Older
1 2 3 4
# -*- coding: utf-8 -*-

# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
#
5
# Copyright (C) 2020  Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
6 7 8 9 10 11
#
# This software was developed within the context of the GeoMultiSens project funded
# by the German Federal Ministry of Education and Research
# (project grant code: 01 IS 14 010 A-C).
#
# This program is free software: you can redistribute it and/or modify it under
12
# the terms of the GNU General Public License as published by the Free Software
13 14
# Foundation, either version 3 of the License, or (at your option) any later version.
# Please note the following exception: `gms_preprocessing` depends on tqdm, which
15 16 17
# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
18 19 20 21 22 23 24 25 26
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.

27
import collections
28
import glob
29
import itertools
30 31
import os
import re
32
import shutil
33
import sys
34
import traceback
35 36
import warnings
from datetime import datetime
37
from typing import Union, TYPE_CHECKING  # noqa F401  # flake8 issue
38
from pkg_resources import parse_version
Daniel Scheffler's avatar
GEOP:  
Daniel Scheffler committed
39

40 41
import numpy as np
import pandas as pd
42
from pandas.io.sql import pandasSQL_builder, SQLTable, DataFrame, Series
43
import psycopg2
Daniel Scheffler's avatar
Daniel Scheffler committed
44
from shapely.wkb import loads as wkb_loads
45
from geoalchemy2.types import Geometry as GEOMETRY
46
from geopandas import GeoDataFrame
47 48 49
from shapely.geometry import Polygon, box, MultiPolygon
from sqlalchemy import create_engine
from sqlalchemy.types import to_instance, TypeEngine
50

51
from ..options.config import GMS_config as CFG
52
from . import path_generator as PG
53
from .definition_dicts import proc_chain
54

55 56 57
if TYPE_CHECKING:
    from ..model.gms_object import GMS_object  # noqa F401  # flake8 issue

58 59
# + misc.helper_functions.cornerLonLat_to_postgreSQL_poly: (left out here in order to avoid circular dependencies)

60 61
__author__ = 'Daniel Scheffler'

62

63
def execute_pgSQL_query(cursor, query_command):
64 65
    """Executes a postgreSQL query catches the full error message if there is one.
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
66

Daniel Scheffler's avatar
GEOP:  
Daniel Scheffler committed
67 68 69 70 71 72
    try:
        cursor.execute(query_command)
    except psycopg2.ProgrammingError as e:
        raise psycopg2.ProgrammingError(e.pgerror + 'Query failed. Command was:\n%s' % query_command)


73
def get_scene_and_dataset_infos_from_postgreSQLdb(sceneid):
74 75
    # type: (int) -> collections.OrderedDict
    """Creates an OrderedDict containing further information about a given scene ID by querying the pgSQL database.
76 77 78

    :param sceneid:   <int> the GMS scene ID to get information for
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
79

80
    def query(tablename, vals2return, cond_dict, records2fetch=0):
81
        return get_info_from_postgreSQLdb(CFG.conn_database, tablename, vals2return, cond_dict, records2fetch)
82
    resultset = query('scenes', ['datasetid', 'satelliteid', 'sensorid', 'subsystemid', 'acquisitiondate', 'entityid',
83 84 85
                                 'filename'], {'id': sceneid})
    if len(resultset) == 0:
        sys.stderr.write("Scene with id %s not found. Skipping.." % sceneid)
86 87 88

    scenedata = resultset[0]
    ds = collections.OrderedDict()
89 90 91 92 93 94 95 96 97 98 99
    proc_level_tmp = query('scenes_proc', 'proc_level', {'sceneid': sceneid})
    ds.update({'proc_level': 'L0A' if proc_level_tmp == [] else proc_level_tmp[0][0]})
    ds.update({'scene_ID': sceneid})
    ds.update({'datasetid': scenedata[0]})
    ds.update({'image_type': query('datasets', 'image_type', {'id': scenedata[0]})[0][0]})
    ds.update({'satellite': query('satellites', 'name', {'id': scenedata[1]})[0][0]})
    ds.update({'sensor': query('sensors', 'name', {'id': scenedata[2]})[0][0]})
    ds.update({'subsystem': query('subsystems', 'name', {'id': scenedata[3]})[0][0] if scenedata[3] else None})
    ds.update({'acq_datetime': scenedata[4]})
    ds.update({'entity_ID': scenedata[5]})
    ds.update({'filename': scenedata[6]})
100 101 102
    return ds


103
def get_postgreSQL_value(value):
104
    # type: (any) -> str
105 106 107
    """Converts Python variable to a postgreSQL value respecting postgreSQL type casts.
    The resulting value can be directly inserted into a postgreSQL query."""

108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
    assert type(value) in [int, float, bool, str, Polygon, datetime, list, tuple] or value is None, \
        "Unsupported value type within postgreSQL matching expression. Got %s." % type(value)
    if isinstance(value, int):
        pgV = value
    elif isinstance(value, float):
        pgV = value
    elif isinstance(value, bool):
        pgV = value
    elif value is None:
        pgV = 'NULL'
    elif isinstance(value, str):
        pgV = "'%s'" % value.replace("'", "")
    elif isinstance(value, Polygon):
        pgV = "'%s'" % value.wkb_hex
    elif isinstance(value, datetime):
        pgV = "TIMESTAMP '%s'" % str(value)
    else:  # list or tuple in value
        if not value:  # empty list/tuple
126 127 128
            pgV = 'NULL'
        else:
            dTypes_in_value = list(set([type(i) for i in value]))
129 130
            assert len(dTypes_in_value) == 1, \
                'Mixed data types in postgreSQL matching expressions are not supported. Got %s.' % dTypes_in_value
131
            assert dTypes_in_value[0] in [int, str, float, np.int64, bool]
132 133
            pgList = ",".join(["'%s'" % i if isinstance(value[0], str) else "%s" % i for i in value])
            pgV = "'{%s}'" % pgList
134 135 136
    return pgV


137
def get_postgreSQL_matchingExp(key, value):
138
    # type: (str,any) -> str
139 140 141 142
    """Converts a key/value pair to a postgreSQL matching expression in the form "column=value" respecting postgreSQL
    type casts. The resulting string can be directly inserted into a postgreSQL query.
    """
    pgVal = get_postgreSQL_value(value)
143 144 145 146
    if isinstance(pgVal, str) and pgVal.startswith("'{") and pgVal.endswith("}'"):
        return '%s in %s' % (key, pgVal.replace("'{", '(').replace("}'", ')'))  # '{1,2,3}' => (1,2,3)
    elif pgVal == 'NULL':
        return '%s is NULL' % key
147
    else:
148
        return '%s=%s' % (key, pgVal)
149

150

151
def get_info_from_postgreSQLdb(conn_params, tablename, vals2return, cond_dict=None, records2fetch=0, timeout=15000):
Daniel Scheffler's avatar
Daniel Scheffler committed
152
    # type: (str, str, Union[list, str], dict, int, int) -> Union[list, str]
153
    """Queries a postgreSQL database for the given parameters.
154

155
    :param conn_params:     <str> connection parameters as provided by CFG.conn_params
156 157 158
    :param tablename:       <str> name of the table within the database to be queried
    :param vals2return:     <list or str> a list of strings containing the column titles of the values to be returned
    :param cond_dict:       <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
159 160
                            HINT: <value> can also be a list or a tuple of elements to match, BUT note that the order
                                  of the list items is NOT respected!
161
    :param records2fetch:   <int> number of records to be fetched (default=0: fetch unlimited records)
162 163
    :param timeout:         <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
164

165 166
    if not isinstance(vals2return, list):
        vals2return = [vals2return]
167 168
    assert isinstance(records2fetch, int), "get_info_from_postgreSQLdb: Expected an integer for the argument " \
                                           "'records2return'. Got %s" % type(records2fetch)
169
    cond_dict = cond_dict if cond_dict else {}
170
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
171
    connection = psycopg2.connect(conn_params)
172 173 174
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
175 176 177 178 179
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
    cmd = "SELECT " + ','.join(vals2return) + " FROM " + tablename + " " + condition
    execute_pgSQL_query(cursor, cmd)
180

181
    records2return = cursor.fetchall() if records2fetch == 0 else [cursor.fetchone()] if records2fetch == 1 else \
182
        cursor.fetchmany(size=records2fetch)  # e.g. [('LE71950282003121EDC00',), ('LE71950282003105ASN00',)]
183 184 185 186
    cursor.close()
    connection.close()
    return records2return

187

188
def update_records_in_postgreSQLdb(conn_params, tablename, vals2update_dict, cond_dict=None, timeout=15000):
189
    # type: (str, str, dict, dict, int) -> Union[None, str]
190
    """Queries a postgreSQL database for the given parameters and updates the given columns of the query result.
191

192
    :param conn_params:       <str> connection parameters as provided by CFG.conn_params
193 194 195 196
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2update_dict:  <dict> a dictionary containing keys and values to be set in the form {'col_name':<value>}
    :param cond_dict:         <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                              HINT: <value> can also be a list or a tuple of elements to match
197 198
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
199

200
    cond_dict = cond_dict if cond_dict else {}
201
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
202
    connection = psycopg2.connect(conn_params)
203 204 205
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
206 207 208
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
209 210
    update_cond = "SET " + ', '.join(['%s=%s' % (k, get_postgreSQL_value(vals2update_dict[k]))
                                      for k in vals2update_dict.keys()])
211
    execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition)
212
    if cursor.fetchone()[0] == 0:
213
        warnings.warn("No record found fulfilling this condition: \n'%s'." % condition)
214
    else:
215
        execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + update_cond + " " + condition)
216

217 218 219
    if 'connection' in locals():
        connection.commit()
        connection.close()
220 221


222
def append_item_to_arrayCol_in_postgreSQLdb(conn_params, tablename, vals2append_dict, cond_dict=None, timeout=15000):
223
    # type: (str, str, dict, dict, int) -> Union[None, str]
224 225 226
    """Queries a postgreSQL database for the given parameters
    and appends the given value to the specified column of the query result.

227
    :param conn_params:       <str> connection parameters as provided by CFG.conn_params
228 229 230 231 232 233 234
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2append_dict:  <dict> a dictionary containing keys and value(s) to be set in the form
                              {'col_name':[<value>,<value>]}
    :param cond_dict:         <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                              HINT: <value> can also be a list or a tuple of elements to match
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
235

236 237
    assert len(vals2append_dict) == 1, 'Values can be appended to only one column at once.'
    if type(list(vals2append_dict.values())[0]) in [list, tuple]:
238
        raise NotImplementedError('Appending multiple values to one column at once is not yet supported.')
239
    cond_dict = cond_dict if cond_dict else {}
240
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
241
    connection = psycopg2.connect(conn_params)
242 243 244
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
245 246 247 248 249 250 251 252
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
    col2update = list(vals2append_dict.keys())[0]
    pgSQL_val = get_postgreSQL_value(vals2append_dict[col2update])
    pgSQL_val = pgSQL_val if type(vals2append_dict[col2update]) in [list, tuple] else '{%s}' % pgSQL_val
    append_cond = "SET %s = array_cat(%s, '%s')" % (col2update, col2update, pgSQL_val)
    execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition)
253
    if cursor.fetchone()[0] == 0:
254
        warnings.warn("No record found fulfilling this condition: \n'%s'." % condition)
255
    else:
256 257 258 259
        execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + append_cond + " " + condition + ';')
    if 'connection' in locals():
        connection.commit()
        connection.close()
260 261 262


def remove_item_from_arrayCol_in_postgreSQLdb(conn_params, tablename, vals2remove_dict, cond_dict=None, timeout=15000):
263
    # type: (str, str, dict, dict, int) -> Union[None, str]
264 265 266
    """Queries a postgreSQL database for the given parameters
    and removes the given value from the specified column of the query result.

267
    :param conn_params:       <str> connection parameters as provided by CFG.conn_params
268 269 270 271 272 273 274
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2remove_dict:  <dict> a dictionary containing keys and value(s) to be set in the form
                              {'col_name':[<value>,<value>]}
    :param cond_dict:         <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                              HINT: <value> can also be a list or a tuple of elements to match
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
275

276 277
    assert len(vals2remove_dict) == 1, 'Values can be removed from only one column at once.'
    if type(list(vals2remove_dict.values())[0]) in [list, tuple]:
278
        raise NotImplementedError('Removing multiple values from one column at once is not yet supported.')
279
    cond_dict = cond_dict if cond_dict else {}
280
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
281
    connection = psycopg2.connect(conn_params)
282 283 284
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
285 286 287 288 289 290 291 292
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""
    col2update = list(vals2remove_dict.keys())[0]
    pgSQL_val = get_postgreSQL_value(vals2remove_dict[col2update])
    pgSQL_val = pgSQL_val if type(vals2remove_dict[col2update]) in [list, tuple] else '{%s}' % pgSQL_val
    remove_cond = "SET %s = array_remove(%s, '%s')" % (col2update, col2update, pgSQL_val)
    execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition)
293
    if cursor.fetchone()[0] == 0:
294
        warnings.warn("No record found fulfilling this condition: \n'%s'." % condition)
295
    else:
296 297 298 299
        execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + remove_cond + " " + condition + ';')
    if 'connection' in locals():
        connection.commit()
        connection.close()
300 301


302 303
def increment_decrement_arrayCol_in_postgreSQLdb(conn_params, tablename, col2update, idx_val2decrement=None,
                                                 idx_val2increment=None, cond_dict=None, timeout=15000):
304
    # type: (str, str, str, int, int, dict, int) -> Union[None, str]
305
    """Updates an array column of a specific postgreSQL table in the form that it increments or decrements the elements
306
    at a given position. HINT: The column must have values like that: [52,0,27,10,8,0,0,0,0]
307

308
    :param conn_params:         <str> connection parameters as provided by CFG.conn_params
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
    :param tablename:           <str> name of the table within the database to be update
    :param col2update:          <str> column name of the column to be updated
    :param idx_val2decrement:   <int> the index of the array element to be decremented (starts with 1)
    :param idx_val2increment:   <int> the index of the array element to be incremented (starts with 1)
    :param cond_dict:           <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
                                HINT: <value> can also be a list or a tuple of elements to match
    :param timeout:             <int> allows to set a custom statement timeout (milliseconds)
    :return:
    """

    cond_dict = cond_dict if cond_dict else {}
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
    connection = psycopg2.connect(conn_params)
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
    cursor = connection.cursor()
    condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
        if cond_dict else ""

    dec_str = '' if idx_val2decrement is None else \
330
        "%s[%s] = %s[%s]-1" % (col2update, idx_val2decrement, col2update, idx_val2decrement)
331
    inc_str = '' if idx_val2increment is None else \
332
        "%s[%s] = %s[%s]+1" % (col2update, idx_val2increment, col2update, idx_val2increment)
333 334 335

    if dec_str or inc_str:
        dec_inc_str = ','.join([dec_str, inc_str])
336
        execute_pgSQL_query(cursor, "UPDATE %s SET %s %s" % (tablename, dec_inc_str, condition))
337

338 339 340
    if 'connection' in locals():
        connection.commit()
        connection.close()
341 342


343
def create_record_in_postgreSQLdb(conn_params, tablename, vals2write_dict, timeout=15000):
Daniel Scheffler's avatar
Daniel Scheffler committed
344
    # type: (str, str, dict, int) -> Union[int, str]
345 346
    """Creates a single new record in a postgreSQL database and pupulates its columns with the given values.

347
    :param conn_params:       <str> connection parameters as provided by CFG.conn_params
348 349 350 351
    :param tablename:         <str> name of the table within the database to be updated
    :param vals2write_dict:   <dict> a dictionary containing keys and values to be set in the form {'col_name':<value>}
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
352

353
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
354
    connection = psycopg2.connect(conn_params)
355 356 357
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
358
    cursor = connection.cursor()
359 360 361 362

    keys, vals = zip(*[(k, str(get_postgreSQL_value(v))) for k, v in vals2write_dict.items()])

    execute_pgSQL_query(cursor, "INSERT INTO %s (%s) VALUES (%s);" % (tablename, ','.join(keys), ','.join(vals)))
363
    execute_pgSQL_query(cursor, "SELECT id FROM %s ORDER BY id DESC LIMIT 1" % tablename)
364 365
    newID = cursor.fetchone()[0]

366 367 368
    if 'connection' in locals():
        connection.commit()
        connection.close()
369 370 371 372

    return newID


373
def delete_record_in_postgreSQLdb(conn_params, tablename, record_id, timeout=15000):
Daniel Scheffler's avatar
Daniel Scheffler committed
374
    # type: (str, str, dict, int) -> Union[int, str]
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401
    """Delete a single record in a postgreSQL database.

    :param conn_params:       <str> connection parameters as provided by CFG.conn_params
    :param tablename:         <str> name of the table within the database to be updated
    :param record_id:         <dict> ID of the record to be deleted
    :param timeout:           <int> allows to set a custom statement timeout (milliseconds)
    """

    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
    connection = psycopg2.connect(conn_params)
    if connection is None:
        warnings.warn('database connection fault')
        return 'database connection fault'
    cursor = connection.cursor()

    execute_pgSQL_query(cursor, "DELETE FROM %s WHERE id=%s;" % (tablename, record_id))
    execute_pgSQL_query(cursor, "SELECT id FROM %s WHERE id=%s" % (tablename,  record_id))

    res = cursor.fetchone()

    if 'connection' in locals():
        connection.commit()
        connection.close()

    return 'success' if res is None else 'fail'


402
def get_pgSQL_geospatial_query_cond(conn_params, table2query, geomCol2use='bounds', tgt_corners_lonlat=None,
403
                                    scene_ID=None, queryfunc='ST_Intersects', crossing_dateline_check=True):
404
    assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
Daniel Scheffler's avatar
GEOP:  
Daniel Scheffler committed
405

406
    if tgt_corners_lonlat:
407 408 409 410 411 412 413
        # handle coordinates crossing the 180 degress meridian (dateline)
        # FIXME in that case the polygone has to be split at the dateline. otherwise pgSQL may yield wrong results
        if crossing_dateline_check:
            xvals = [x for x, y in tgt_corners_lonlat]
            if max(xvals) - min(xvals) > 180:
                tgt_corners_lonlat = [(x, y) if x > 0 else (x + 360, y) for x, y in tgt_corners_lonlat]

414
        from .helper_functions import cornerLonLat_to_postgreSQL_poly
415
        pGSQL_poly = cornerLonLat_to_postgreSQL_poly(tgt_corners_lonlat)
416 417 418 419 420
        src_geom = "'SRID=4326;%s'::geometry" % pGSQL_poly  # source geometry is given
        # FIXME scenes tabelle hat "geography" geoinfos -> eigener Index wird bei "geometry" nicht genutzt:
        tgt_geom = "%s.%s::geometry" % (table2query, geomCol2use)
        geocond = "%s(%s, %s)" % (queryfunc, src_geom, tgt_geom)
    else:  # scene_ID is not None:
Daniel Scheffler's avatar
GEOP:  
Daniel Scheffler committed
421
        connection = psycopg2.connect(conn_params)
422 423
        if connection is None:
            return 'database connection fault'
Daniel Scheffler's avatar
GEOP:  
Daniel Scheffler committed
424
        cursor = connection.cursor()
425 426 427
        cmd = "SELECT ST_AsText(bounds) FROM scenes WHERE scenes.id = %s" % scene_ID
        execute_pgSQL_query(cursor, cmd)
        res = cursor.fetchone()
Daniel Scheffler's avatar
GEOP:  
Daniel Scheffler committed
428 429 430
        cursor.close()
        connection.close()
        if len(res):
431
            src_geom = "'SRID=4326;%s'::geometry" % res
Daniel Scheffler's avatar
GEOP:  
Daniel Scheffler committed
432 433 434
        else:
            print('The scene with the ID %s does not exist in the scenes table.')
            return []
435
        geocond = "%s(%s, %s.%s::geometry)" % (queryfunc, src_geom, table2query, geomCol2use)
Daniel Scheffler's avatar
GEOP:  
Daniel Scheffler committed
436 437 438
    return geocond


439
def get_overlapping_scenes_from_postgreSQLdb(conn_params, table='scenes_proc', scene_ID=None,
440
                                             tgt_corners_lonlat=None, conditions=None, add_cmds='', timeout=15000):
441
    # type: (str, str, int, list, Union[list, str], str, int) -> Union[list, str]
Daniel Scheffler's avatar
Daniel Scheffler committed
442

443 444
    """Queries the postgreSQL database in order to find those scenes of a specified reference satellite (Landsat-8 or
    Sentinel-2) that have an overlap to the given corner coordinates AND that fulfill the given conditions.
445

446
    :param conn_params:             <str> connection parameters as provided by CFG.conn_params
447 448
    :param table:                   <str> name of the table within the database to be updated
    :param scene_ID:                <int> a sceneID to get the target geographical extent from
449 450 451
                                        (needed if tgt_corners_lonlat is not provided)
    :param tgt_corners_lonlat:      <list> a list of coordinates defining the target geographical extent
                                           (needed if scene_ID is not provided)
452 453 454 455
    :param conditions:              <list> a list of additional query conditions
    :param add_cmds:                <str> additional pgSQL commands to be added to the pgSQL query
    :param timeout:                 <int> allows to set a custom statement timeout (milliseconds)
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
456

457
    conditions = [] if conditions is None else conditions if isinstance(conditions, list) else [conditions]
458
    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
459
    connection = psycopg2.connect(conn_params)
460 461
    if connection is None:
        return 'database connection fault'
462
    datasetids = [int(d.split('=')[1].strip()) for d in conditions if d.startswith('datasetid')]
463 464 465 466 467 468 469
    datasetid = datasetids[0] if datasetids else 104  # Landsat-8
    # FIXME: use Landsat-8 instead of Sentinel-2 as long as S2 L1A_P is not working:
    datasetid = 104 if datasetid == 249 else datasetid

    if table != 'scenes_proc':
        assert datasetid, "filtdsId is needed if table is not 'scenes_proc'"
    if scene_ID is None:
470
        assert tgt_corners_lonlat, "Provide either scene_ID or tgt_corners_lonlat!"
471
    if tgt_corners_lonlat is None:
472
        assert scene_ID, "Provide either scene_ID or tgt_corners_lonlat!"
473 474 475 476 477 478 479 480 481 482

    val2get = "scenes.id" if table == 'scenes' else "%s.sceneid" % table
    # refcond  = ['scenes_proc.georef = True'] if not datasetids else ['scenes.datasetid = %s' %datasetid]
    refcond = ['scenes.datasetid = %s' % datasetid]

    geocond = [get_pgSQL_geospatial_query_cond(conn_params, table, tgt_corners_lonlat=tgt_corners_lonlat,
                                               scene_ID=scene_ID, queryfunc='ST_Intersects',
                                               crossing_dateline_check=True)]

    join = "INNER JOIN scenes ON (%s.sceneid = scenes.id) " % table if table != 'scenes' and datasetids else ''
483
    conditions = [c for c in conditions if not c.startswith('datasetid')]
484 485 486 487
    where = "WHERE %s" % " AND ".join(geocond + refcond + conditions)
    usedtbls = "scenes" if table == 'scenes' else "%s, scenes" % table if 'scenes.' in where and join == '' else table
    query = "SELECT %s FROM %s %s%s %s" % (val2get, usedtbls, join, where, add_cmds)
    cursor = connection.cursor()
488
    execute_pgSQL_query(cursor, query)
489 490 491 492 493
    records2return = cursor.fetchall()
    cursor.close()
    connection.close()
    return records2return

494

495
def get_overlapping_MGRS_tiles(conn_params, scene_ID=None, tgt_corners_lonlat=None, timeout=15000):
Daniel Scheffler's avatar
GEOP:  
Daniel Scheffler committed
496 497
    """In contrast to pgSQL 'Overlapping' here means that both geometries share some spatial area.
    So it combines ST_Overlaps and ST_Contains."""
498
    assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
Daniel Scheffler's avatar
GEOP:  
Daniel Scheffler committed
499 500 501

    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
    connection = psycopg2.connect(conn_params)
502 503
    if connection is None:
        return 'database connection fault'
Daniel Scheffler's avatar
GEOP:  
Daniel Scheffler committed
504 505

    vals2get = ['grid100k', 'grid1mil', 'geom']
506
    # FIXME this is covered by ST_Intersects:
Daniel Scheffler's avatar
Daniel Scheffler committed
507
    # geocond1 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Overlaps',
508
    #                                            tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
Daniel Scheffler's avatar
Daniel Scheffler committed
509 510 511 512 513
    # geocond2 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Contains',
    #                                            tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
    # geocond3 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Within',
    #                                            tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
    geocond = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Intersects',
514 515
                                              tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID,
                                              crossing_dateline_check=True)
516 517
    # query = "SELECT %s FROM %s WHERE %s OR %s OR %s"
    #     % (', '.join(vals2get), 'mgrs_tiles', geocond1, geocond2, geocond3)
Daniel Scheffler's avatar
Daniel Scheffler committed
518
    query = "SELECT %s FROM %s WHERE %s" % (', '.join(vals2get), 'mgrs_tiles', geocond)
Daniel Scheffler's avatar
GEOP:  
Daniel Scheffler committed
519 520 521 522 523 524 525 526
    cursor = connection.cursor()
    execute_pgSQL_query(cursor, query)
    records = cursor.fetchall()
    cursor.close()
    connection.close()

    GDF = GeoDataFrame(records, columns=['grid100k', 'grid1mil', 'wkb_hex'])

Daniel Scheffler's avatar
Daniel Scheffler committed
527
    GDF['shapelyPoly_LonLat'] = list(GDF['wkb_hex'].map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
528 529
    GDF['granuleid'] = GDF['grid1mil'].str.cat(GDF['grid100k'])
    return GDF[['granuleid', 'shapelyPoly_LonLat']]
Daniel Scheffler's avatar
GEOP:  
Daniel Scheffler committed
530 531


532 533
def get_overlapping_MGRS_tiles2(conn_params, scene_ID=None, tgt_corners_lonlat=None, timeout=15000):
    assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
Daniel Scheffler's avatar
GEOP:  
Daniel Scheffler committed
534 535 536

    conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
    connection = psycopg2.connect(conn_params)
537 538
    if connection is None:
        return 'database connection fault'
Daniel Scheffler's avatar
GEOP:  
Daniel Scheffler committed
539 540

    vals2get = ['granuleid', 'footprint_wgs84']
541 542
    geocond = get_pgSQL_geospatial_query_cond(conn_params, 'footprints_sentinel2_granules',
                                              geomCol2use='footprint_wgs84',
543
                                              tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
Daniel Scheffler's avatar
GEOP:  
Daniel Scheffler committed
544 545 546 547 548 549 550 551 552 553
    query = "SELECT %s FROM %s WHERE %s" % (', '.join(vals2get), 'footprints_sentinel2_granules', geocond)

    cursor = connection.cursor()
    execute_pgSQL_query(cursor, query)
    records = cursor.fetchall()
    cursor.close()
    connection.close()

    GDF = GeoDataFrame(records, columns=['granuleid', 'wkb_hex'])

Daniel Scheffler's avatar
Daniel Scheffler committed
554
    GDF['shapelyPoly_LonLat'] = list(GDF['wkb_hex'].map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
Daniel Scheffler's avatar
GEOP:  
Daniel Scheffler committed
555 556 557 558

    return GDF[['granuleid', 'shapelyPoly_LonLat']]


559 560 561
def get_dict_satellite_name_id(conn_params):
    # type: (str) -> dict
    """Returns a dictionary with satellite names as keys and satellite IDs as values as read from pgSQL database.
562 563 564

    :param conn_params:     <str> pgSQL database connection parameters
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
565

566 567
    res = get_info_from_postgreSQLdb(conn_params, 'satellites', ['name', 'id'])
    assert len(res) > 0, 'Error getting satellite names from postgreSQL database.'
568
    arr = np.array(res)
569
    return dict(zip(list(arr[:, 0]), list(arr[:, 1])))
570 571 572 573 574 575


def get_dict_sensor_name_id(conn_params):
    # type: (str) -> dict
    """Returns a dictionary with sensor names as keys and sensor IDs as values as read from pgSQL database.
    :param conn_params:     <str> pgSQL database connection parameters """
Daniel Scheffler's avatar
Daniel Scheffler committed
576

577 578
    res = get_info_from_postgreSQLdb(conn_params, 'sensors', ['name', 'id'])
    assert len(res) > 0, 'Error getting sensor names from postgreSQL database.'
579
    arr = np.array(res)
580
    return dict(zip(list(arr[:, 0]), list(arr[:, 1])))
581 582


583
def get_entityIDs_from_filename(conn_DB, filename):
584 585 586
    # type: (str, str) -> list
    """Returns entityID(s) for the given filename. In case of Sentinel-2 there can be more multiple entity IDs if
    multiple granules are saved in one .zip file.
587

588 589 590
    :param conn_DB:     <str> pgSQL database connection parameters
    :param filename:    <str> the filename to get the corresponding entity ID(s) for
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
591

592
    if filename[:2] in ['LE', 'LC', 'LO'] and filename.endswith('.tar.gz'):  # Landsat
593 594
        entityIDs = [filename.split('.tar.gz')[0]]
    else:
595 596 597
        print('Querying database in order to get entityIDs for %s...' % filename)
        res = get_info_from_postgreSQLdb(conn_DB, 'scenes', ['entityid'], {'filename': filename}, timeout=40000)
        entityIDs = [subres[0] for subres in res] if len(res) > 0 else []
598 599 600 601 602 603
    return entityIDs


def get_filename_by_entityID(conn_DB, entityid, satellite):
    # type: (str,str,str) -> str
    """Returns the filename for the given entity ID.
604

605 606 607 608
    :param conn_DB:     <str> pgSQL database connection parameters
    :param entityid:    <str> entity ID
    :param satellite:   <str> satellite name to which the entity ID is belonging
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
609

610
    if re.search(r'Landsat', satellite, re.I):
611
        filename = '%s.tar.gz' % entityid
612
    elif re.search(r'Sentinel-2', satellite, re.I):
613 614 615 616 617 618 619
        filename = get_info_from_postgreSQLdb(conn_DB, 'scenes', ['filename'],
                                              {'entityid': entityid}, records2fetch=1)[0][0]
    else:
        raise NotImplementedError
    return filename


620
def get_notDownloadedsceneIDs(conn_DB, entityIDs, satellite, sensor, src_folder):
621 622 623 624
    # type: (str,list,str,str,str) -> np.ndarray
    """Takes a list of entity IDs and extracts those records that have the corresponding archive file in the given
    source folder and that have the processing level 'METADATA' in the pgSQL database. Based on this subset a numpy
    array containing the corresponding scene IDs and the target filenames for the fileserver is returned.
625

626 627 628 629 630 631
    :param conn_DB:     <str> pgSQL database connection parameters
    :param entityIDs:   <list> a list of entity IDs
    :param satellite:   <str> the name of the satellite to restrict the query on
    :param sensor:      <str> the name of the sensor to restrict the query on
    :param src_folder:  <str> the source directory where archive files are saved
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
632

633 634 635
    columns = ['id', 'entityid', 'satelliteid', 'sensorid', 'filename', 'proc_level']
    result = get_info_from_postgreSQLdb(conn_DB, 'scenes', columns, {'entityid': entityIDs})
    df = pd.DataFrame(result, columns=columns)
636 637

    satNameID_dic = get_dict_satellite_name_id(conn_DB)
638
    satID = satNameID_dic[satellite]
639
    target_folder = os.path.join(CFG.path_archive, satellite, sensor)
640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659

    def get_fName(entityid): return get_filename_by_entityID(conn_DB, entityid, satellite)

    def tgt_exists(fileName): return os.path.exists(os.path.join(target_folder, fileName))

    def src_exists(entityid):
        return os.path.exists(os.path.join(src_folder, get_filename_by_entityID(conn_DB, entityid, satellite)))
    df['tgt_fileName'] = list(df['entityid'].map(get_fName))
    df['tgtFile_exists'] = list(df['tgt_fileName'].map(tgt_exists))
    df['srcFile_exists'] = list(df['entityid'].map(src_exists))
    tgt_satID = (df.satelliteid == float(satID))
    # isDL = (df.proc_level == 'DOWNLOADED')
    isMET = (df.proc_level == 'METADATA')
    # tgtE = (df.tgtFile_exists == True)
    srcE = df.srcFile_exists  # (df.srcFile_exists == True)
    # sceneIDs_notDL_tgtE = df[tgt_satID & (isDL == 0) & tgtE]  # maybe needed later
    # sceneIDs_DL_tgtNE = df[tgt_satID & isDL & (tgtE == 0)]  # maybe needed later
    # sceneIDs_DL_tgtE = df[tgt_satID & isDL & tgtE]  # maybe needed later
    sceneIDs_isMET_srcE = df[tgt_satID & isMET & srcE]
    return sceneIDs_isMET_srcE[['id', 'tgt_fileName']].values
660 661


662
class GMS_JOB(object):
663
    """gms_preprocessing job manager"""
664

665
    def __init__(self, conn_db):
666
        # type: (str) -> None
667
        """
668
        :param conn_db: <str> the database connection parameters as given by CFG.conn_params
669
        """
670 671
        # privates
        self._virtualsensorid = None
Daniel Scheffler's avatar
Daniel Scheffler committed
672

673
        # defaults
674
        self.conn = conn_db
675
        self.dataframe = DataFrame()
676
        self.scene_counts = {}  # set by self.create()
677

678 679
        self.exists_in_db = False
        self.id = None  #: int
680
        self.creationtime = datetime.now()  # default, needed to create new job
681 682 683 684
        self.finishtime = None
        self.sceneids = []
        self.timerange_start = datetime.min
        self.timerange_end = datetime.max
685
        self.bounds = box(-180, -90, 180, 90)  # default, needed to create new job
686 687 688 689 690 691
        self.distribution_index = None
        self.progress = None
        self.feedback = None
        self.failed_sceneids = []
        self.ref_job_id = None
        self.datacube_mgrs_tiles_proc = []
692 693 694 695 696
        self.non_ref_datasetids = []
        self.max_cloudcover = None
        self.season_code = None  # type: int
        self.path_analysis_script = ''  # TODO
        self.job_mode = 'processing_only'  # FIXME download/processing/...
697 698 699 700
        self.jobs_table_columns = ['id', 'creationtime', 'finishtime', 'sceneids', 'timerange_start',
                                   'timerange_end', 'bounds', 'distribution_index', 'progress', 'feedback',
                                   'failed_sceneids', 'datasetid_spatial_ref',
                                   'virtualsensorid', 'ref_job_id', 'datacube_mgrs_tiles_proc', 'comment',
701 702 703 704
                                   'non_ref_datasetids', 'max_cloudcover', 'season_code', 'status',
                                   'path_analysis_script', 'analysis_parameter', 'statistics', 'job_mode']
        self.datasetid_spatial_ref = 249  # this is overwritten if existing job is read from DB but needed to create new
        self.datasetname_spatial_ref = 'SENTINEL-2A'  # same here
705 706 707 708 709
        self.status = None
        self.statistics = []
        self.comment = None
        self.epsg = None  # set by self._set_target_sensor_specs()
        self.ground_spatial_sampling = None  # set by self._set_target_sensor_specs()
710
        self.analysis_parameter = None
711

712
    def __repr__(self):
713
        return 'GMS job:\n\n' + Series(self.db_entry).to_string()
714

715 716 717 718 719 720 721 722 723 724 725
    @property
    def virtualsensorid(self):
        return self._virtualsensorid

    @virtualsensorid.setter
    def virtualsensorid(self, value):
        """Set virtual sensor ID but continue if no data value is received
        NOTE:  set by self._set_target_sensor_specs() and self.from_ID()"""
        if value != -1:  # no data value
            self._virtualsensorid = value

726
    def _set_target_sensor_specs(self, virtual_sensor_id, datasetid_spatial_ref):
727
        self.virtualsensorid = virtual_sensor_id
728 729 730 731

        if not isinstance(datasetid_spatial_ref, int):
            raise ValueError(datasetid_spatial_ref)

732 733 734 735 736 737 738 739 740 741 742 743
        res = get_info_from_postgreSQLdb(self.conn, 'virtual_sensors', ['spatial_resolution',
                                                                        "projection_epsg"], {'id': virtual_sensor_id})
        assert res, \
            "'virtual_sensor_id'=%s does not exist in the table 'virtual_sensors' of the database." % virtual_sensor_id
        target_gsd = res[0][0]
        self.ground_spatial_sampling = [target_gsd, target_gsd] if type(target_gsd) in [int, float] else target_gsd
        self.epsg = int(res[0][1])

        self.datasetid_spatial_ref = datasetid_spatial_ref
        res = get_info_from_postgreSQLdb(self.conn, 'datasets', ['name'], {'id': datasetid_spatial_ref})
        assert res, \
            "'datasetid_spatial_ref'=%s does not exist in the table 'datasets' of the database." % datasetid_spatial_ref
744 745 746 747
        self.datasetname_spatial_ref = res

    @property
    def db_entry(self):
748 749
        """Returns an OrderedDict containing keys and values of the database entry.
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
750

751 752
        db_entry = collections.OrderedDict()
        for i in self.jobs_table_columns:
753 754 755 756 757 758 759
            val = getattr(self, i)

            if i == 'virtualsensorid' and val is None:
                val = -1  # nodata value

            db_entry[i] = val

760 761
        return db_entry

762
    def from_dictlist(self, dictlist_data2process, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
763
        # type: (list, int, int, str) -> GMS_JOB
764
        """
765
        :param dictlist_data2process:  <list> a list of dictionaries containing the keys "satellite", "sensor" and
766 767 768 769 770 771
                                        "filenames",
                                        e.g. [{'satellite:'Landsat-8,'sensor':'OLI_TIRS','filenames':file.tar.gz},{...}]
        :param virtual_sensor_id :     <int> a valid ID from the 'virtual_sensors' table of the postgreSQL database
        :param datasetid_spatial_ref:  <int> a valid dataset ID of the dataset to be chosen as spatial reference
                                        (from the 'datasets' table of the postgreSQL database)
                                        (default:249 - Sentinel-2A), 104=Landsat-8
772
        :param comment:                <str> a comment describing the job (e.g. 'Beta job')
773
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
774

775
        self._set_target_sensor_specs(virtual_sensor_id, datasetid_spatial_ref)
776
        self.comment = comment
777 778 779

        dictlist_data2process = dictlist_data2process if dictlist_data2process else []

780
        for idx, datadict in enumerate(dictlist_data2process):
781
            assert isinstance(datadict, dict), "Expected only dictionaries within 'dictlist_data2process'. " \
782
                                               "Got %s in there." % type(datadict)
783
            assert False not in [i in datadict for i in ['satellite', 'sensor', 'filenames']]
784
            assert type(datadict['filenames']) in [list, str]
785

786
            if isinstance(datadict['filenames'], str):
787 788 789 790 791
                if datadict['filenames'].endswith('.csv'):
                    assert os.path.exists(datadict['filenames'])
                else:
                    datadict['filenames'] = [datadict['filenames']]

792 793
        # find all duplicates in input datadicts and build common dataframe
        all_dfs = []
794
        for datadict in dictlist_data2process:
795
            assert isinstance(datadict, dict)
796

797 798
            if isinstance(datadict['filenames'], str) and datadict['filenames'].endswith('.csv'):
                datadict['filenames'] = None  # TODO implement csv reader here
799
                raise NotImplementedError
800

801
            else:
802
                temp_df = DataFrame(datadict, columns=['satellite', 'sensor', 'filenames'])
803 804 805 806

                if re.search(r'Landsat-7', datadict['satellite'], re.I) and \
                   re.search(r'ETM+', datadict['sensor'], re.I):

807
                    from .helper_functions import Landsat_entityID_decrypter as LED
808

809 810 811
                    def