Commit ed0cb14b authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Removed deprecated config.Job.call_type parameter.

Former-commit-id: 74cad5e3
Former-commit-id: 068e26cb
parent 2c67f4fb
......@@ -6,9 +6,7 @@ import os
import warnings
import numpy as np
import builtins
import glob
import re
import sys
import psycopg2
import psycopg2.extras
from collections import OrderedDict
......@@ -21,12 +19,11 @@ from cerberus import Validator
__author__ = 'Daniel Scheffler'
def set_config(call_type, job_ID, exec_mode='Python', db_host='localhost', reset=False, job_kwargs=None):
# type: (str, int, str, str, bool, dict) -> None
def set_config(job_ID, exec_mode='Python', db_host='localhost', reset=False, job_kwargs=None):
# type: (int, str, str, bool, dict) -> None
"""Set up a configuration for a new gms_preprocessing job!
:param call_type: 'console' or 'webapp'
:param job_ID: job ID of the job to be executed, e.g. 123456 (must be present in database)
:param exec_mode: 'Python': writes intermediate results to disk in order to save memory
'Flink': keeps intermediate results in memory in order to save IO time
......@@ -37,7 +34,7 @@ def set_config(call_type, job_ID, exec_mode='Python', db_host='localhost', reset
if not hasattr(builtins, 'GMS_job') or not hasattr(builtins, 'GMS_usecase') or reset:
job_kwargs = job_kwargs if job_kwargs else {}
builtins.GMS_job = Job(call_type, job_ID, exec_mode=exec_mode, db_host=db_host, **job_kwargs)
builtins.GMS_job = Job(job_ID, exec_mode=exec_mode, db_host=db_host, **job_kwargs)
builtins.GMS_usecase = Usecase(getattr(builtins, 'GMS_job'))
......@@ -73,7 +70,7 @@ GMS_config = GMS_configuration()
class Job(object):
def __init__(self, call_type, ID, exec_mode='Python', db_host='localhost', exec_L1AP=None, exec_L1BP=None,
def __init__(self, ID, exec_mode='Python', db_host='localhost', exec_L1AP=None, exec_L1BP=None,
exec_L1CP=None, exec_L2AP=None, exec_L2BP=None, exec_L2CP=None, CPUs=None,
allow_subMultiprocessing=True, disable_exception_handler=True, log_level='INFO',
tiling_block_size_XY=(2048, 2048), is_test=False, profiling=False, benchmark_global=False,
......@@ -81,7 +78,6 @@ class Job(object):
"""Create a job configuration
:param call_type: 'console' or 'webapp'
:param ID: job ID of the job to be executed, e.g. 123456 (must be present in database)
:param exec_mode: 'Python': writes intermediate results to disk in order to save memory
'Flink': keeps intermediate results in memory in order to save IO time
......@@ -120,7 +116,6 @@ class Job(object):
# args
self.ID = ID
self.call_type = call_type # FIXME deprecated
self.exec_mode = exec_mode
assert exec_mode in ['Flink', 'Python']
self.db_host = db_host
......@@ -221,8 +216,7 @@ class Job(object):
@property
def conn_database(self):
return "dbname='geomultisens' user='gmsdb' password='gmsdb' host='%s' connect_timeout=3" % self.db_host \
if self.call_type == 'webapp' else ''
return "dbname='geomultisens' user='gmsdb' password='gmsdb' host='%s' connect_timeout=3" % self.db_host
def get_init_argskwargs(self, ignore=("logger",)):
"""
......@@ -304,69 +298,39 @@ class Usecase:
def query_vir(col, VSID):
return get_info_from_postgreSQLdb(_job.conn_database, 'virtual_sensors', col, {'id': VSID})[0][0]
if _job.call_type == 'console':
self.filt_coord = [None, None, None, None]
# filt_datasets = ['ALOS', 'Terra', 'Landsat', 'SPOT', 'RapidEye', 'SRTM', 'ATM']
# filt_datasets = ['ALOS', 'Terra', 'SPOT', 'RapidEye', 'SRTM', 'ATM']
self.filt_datasets = ['ALOS', 'Terra', 'Landsat', 'SPOT', 'RapidEye']
# filt_datasets = ['Terra']
# filt_datasets = ['Landsat']
# filt_datasets = ['ALOS']
# filt_datasets = ['SPOT']
# filt_datasets = ['RapidEye','ALOS']
# filt_datasets = ['RapidEye']
# filt_datasets = ['Landsat','SPOT','RapidEye']
# filt_datasets = ['Landsat','SPOT']
self.filt_date = [2000, 2015]
# filt_date = [2012,2015]
self.skip_thermal = True
self.skip_pan = True
self.sort_bands_by_cwl = True
self.conversion_type_optical = 'BOA_Ref' # 'Rad' / 'TOA_Ref' / 'BOA_Ref'
self.conversion_type_thermal = 'Rad' # 'Rad' / 'Temp'
self.scale_factor_TOARef = 10000
self.scale_factor_BOARef = 10000
self.scale_factor_errors_ac = 255
# self.virtual_sensor_id = 10 # Sentinel-2A 10m
self.virtual_sensor_id = 1 # Landsat-8
self.datasetid_spectral_ref = 249 # Sentinel-2A
self.target_CWL = []
self.target_FWHM = []
self.data_list = self.get_entity_IDs_within_AOI()
elif _job.call_type == 'webapp':
def query_job(col):
return get_info_from_postgreSQLdb(_job.conn_database, 'jobs', col, {'id': _job.ID})[0][0]
# skip_thermal = int(query_cfg(_job.conn_database, 'skip_thermal'))
self.skip_thermal = True
self.skip_pan = int(query_cfg('skip_pan'))
self.sort_bands_by_cwl = int(query_cfg('sort_bands_by_cwl'))
self.conversion_type_optical = query_cfg('conversion_type_optical')
self.conversion_type_thermal = query_cfg('conversion_type_thermal')
self.virtual_sensor_id = query_job('virtualsensorid')
self.virtual_sensor_id = self.virtual_sensor_id if self.virtual_sensor_id != -1 else 10 # Sentinel-2A 10m
self.virtual_sensor_name = query_vir('name', self.virtual_sensor_id)
self.datasetid_spatial_ref = query_job('datasetid_spatial_ref')
# self.datasetid_spatial_ref = 104
self.datasetid_spectral_ref = query_vir('spectral_characteristics_datasetid', self.virtual_sensor_id)
# FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen:
self.target_CWL = query_vir('wavelengths_pos', self.virtual_sensor_id)
# FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen:
self.target_FWHM = query_vir('band_width', self.virtual_sensor_id)
# FIXME target GSD setting is a duplicate to datasetid_spatial_ref!
self.target_gsd = query_vir('spatial_resolution',
self.virtual_sensor_id) # table features only 1 value for X/Y-dims
self.target_gsd = xgsd, ygsd = \
[self.target_gsd]*2 if isinstance(self.target_gsd, (int, float)) else self.target_gsd
self.EPSG = query_vir('projection_epsg', self.virtual_sensor_id)
self.spatial_ref_gridx = np.arange(xgsd / 2., xgsd / 2. + 2 * xgsd, xgsd) # e.g. [15, 45]
self.spatial_ref_gridy = np.arange(ygsd / 2., ygsd / 2. + 2 * ygsd, ygsd)
self.scale_factor_TOARef = int(query_cfg('scale_factor_TOARef'))
self.scale_factor_BOARef = int(query_cfg('scale_factor_BOARef'))
self.scale_factor_errors_ac = int(query_cfg('scale_factor_errors_ac'))
self.data_list = self.get_data_list_of_current_jobID()
def query_job(col):
return get_info_from_postgreSQLdb(_job.conn_database, 'jobs', col, {'id': _job.ID})[0][0]
# skip_thermal = int(query_cfg(_job.conn_database, 'skip_thermal'))
self.skip_thermal = True
self.skip_pan = int(query_cfg('skip_pan'))
self.sort_bands_by_cwl = int(query_cfg('sort_bands_by_cwl'))
self.conversion_type_optical = query_cfg('conversion_type_optical')
self.conversion_type_thermal = query_cfg('conversion_type_thermal')
self.virtual_sensor_id = query_job('virtualsensorid')
self.virtual_sensor_id = self.virtual_sensor_id if self.virtual_sensor_id != -1 else 10 # Sentinel-2A 10m
self.virtual_sensor_name = query_vir('name', self.virtual_sensor_id)
self.datasetid_spatial_ref = query_job('datasetid_spatial_ref')
# self.datasetid_spatial_ref = 104
self.datasetid_spectral_ref = query_vir('spectral_characteristics_datasetid', self.virtual_sensor_id)
# FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen:
self.target_CWL = query_vir('wavelengths_pos', self.virtual_sensor_id)
# FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen:
self.target_FWHM = query_vir('band_width', self.virtual_sensor_id)
# FIXME target GSD setting is a duplicate to datasetid_spatial_ref!
self.target_gsd = query_vir('spatial_resolution',
self.virtual_sensor_id) # table features only 1 value for X/Y-dims
self.target_gsd = xgsd, ygsd = \
[self.target_gsd]*2 if isinstance(self.target_gsd, (int, float)) else self.target_gsd
self.EPSG = query_vir('projection_epsg', self.virtual_sensor_id)
self.spatial_ref_gridx = np.arange(xgsd / 2., xgsd / 2. + 2 * xgsd, xgsd) # e.g. [15, 45]
self.spatial_ref_gridy = np.arange(ygsd / 2., ygsd / 2. + 2 * ygsd, ygsd)
self.scale_factor_TOARef = int(query_cfg('scale_factor_TOARef'))
self.scale_factor_BOARef = int(query_cfg('scale_factor_BOARef'))
self.scale_factor_errors_ac = int(query_cfg('scale_factor_errors_ac'))
self.data_list = self.get_data_list_of_current_jobID()
self.align_coord_grids = True # ONLY TO FORCE DEACTIVATION OF IMAGE RESAMPLING
self.match_gsd = True
......@@ -376,110 +340,7 @@ class Usecase:
assert self.conversion_type_optical in ['Rad', 'TOA_Ref', 'BOA_Ref', 'Temp'], \
'Unsupported conversion type: %s' % self.conversion_type_optical
@staticmethod
def get_usecase_coord_grid():
"""consider projections of images with status georef = master"""
geotransform = (0, 1, 0, 0, 0, -1) # FIXME
EPSG = 'EPSG:4326' # FIXME
GSD_meters = 30 # default
return geotransform, EPSG, GSD_meters
def get_entity_IDs_within_AOI(self): # called in console mode
from .model.metadata import LandsatID2dataset, get_sensormode
# parse cli arguments
sys.stderr.write("No scene ids from CLI received. Using old data_list.\n")
data_list = []
if re.search('ALOS', ','.join(self.filt_datasets)): # sensorname has to be in HLP_F.get_GMS_sensorcode
data_list.append({'image_type': 'RSD', 'satellite': 'ALOS', 'sensor': 'AVNIR-2', 'subsystem': None,
'acq_datetime': '2009-07-02',
'entity_ID': 'A1002553-001-P6100002-AODS-201007300008'}) # TAR-ID 1B1
data_list.append({'image_type': 'RSD', 'satellite': 'ALOS', 'sensor': 'AVNIR-2', 'subsystem': None,
'acq_datetime': '2007-09-27',
'entity_ID': '20070927_L1B2_ALAV2A089152780'}) # extracted Folder 1B2
data_list.append({'image_type': 'RSD', 'satellite': 'ALOS', 'sensor': 'AVNIR-2', 'subsystem': None,
'acq_datetime': '2009-07-19',
'entity_ID': '20090719_L1B2_ALAV2A185572780'}) # extracted Folder 1B2
data_list.append({'image_type': 'RSD', 'satellite': 'ALOS', 'sensor': 'AVNIR-2', 'subsystem': None,
'acq_datetime': '2010-04-21',
'entity_ID': '20100421_L1B2_ALAV2A225832780'}) # extracted Folder 1B2
if re.search('Terra', ','.join(self.filt_datasets)):
data_list.append({'image_type': 'RSD', 'satellite': 'Terra', 'sensor': 'ASTER', 'subsystem': 'VNIR1',
'acq_datetime': '2007-11-08',
'entity_ID': 'AST_L1B_00308192007061017_20071108171717_32444'}) # HDF-ID
data_list.append({'image_type': 'RSD', 'satellite': 'Terra', 'sensor': 'ASTER', 'subsystem': 'VNIR2',
'acq_datetime': '2007-11-08',
'entity_ID': 'AST_L1B_00308192007061017_20071108171717_32444'}) # HDF-ID
data_list.append({'image_type': 'RSD', 'satellite': 'Terra', 'sensor': 'ASTER', 'subsystem': 'SWIR',
'acq_datetime': '2007-11-08',
'entity_ID': 'AST_L1B_00308192007061017_20071108171717_32444'}) # HDF-ID
data_list.append({'image_type': 'RSD', 'satellite': 'Terra', 'sensor': 'ASTER', 'subsystem': 'TIR',
'acq_datetime': '2007-11-08',
'entity_ID': 'AST_L1B_00308192007061017_20071108171717_32444'}) # HDF-ID
data_list.append({'image_type': 'RSD', 'satellite': 'Terra', 'sensor': 'ASTER', 'subsystem': 'VNIR1',
'acq_datetime': '2002-06-08',
'entity_ID': 'AST_L1A_003_05262002060543_06082002144959'}) # HDF-ID
data_list.append({'image_type': 'RSD', 'satellite': 'Terra', 'sensor': 'ASTER', 'subsystem': 'VNIR2',
'acq_datetime': '2002-06-08',
'entity_ID': 'AST_L1A_003_05262002060543_06082002144959'}) # HDF-ID
data_list.append({'image_type': 'RSD', 'satellite': 'Terra', 'sensor': 'ASTER', 'subsystem': 'SWIR',
'acq_datetime': '2002-06-08',
'entity_ID': 'AST_L1A_003_05262002060543_06082002144959'}) # HDF-ID
data_list.append({'image_type': 'RSD', 'satellite': 'Terra', 'sensor': 'ASTER', 'subsystem': 'TIR',
'acq_datetime': '2002-06-08',
'entity_ID': 'AST_L1A_003_05262002060543_06082002144959'}) # HDF-ID
if re.search('Landsat',
','.join(self.filt_datasets)): # sensorname has to be in HLP_F.get_GMS_sensorcode
data_list.append({'image_type': 'RSD', 'satellite': 'Landsat-5', 'sensor': 'TM', 'subsystem': None,
'acq_datetime': '1996-10-24', 'entity_ID': 'LT51510321996298XXX01'}) # TAR-ID
# data_list.append({'image_type':'RSD','satellite':'Landsat-7', 'sensor':'ETM+', 'subsystem':None,
# 'acq_datetime':'2002-08-15', 'entity_ID':'LE70050152002227EDC00'}) # TAR-ID
# data_list.append({'image_type':'RSD','satellite':'Landsat-7', 'sensor':'ETM+', 'subsystem':None,
# 'acq_datetime':'2000-04-02', 'entity_ID':'LE71510322000093SGS00'}) # TAR-ID
data_list = data_list + LandsatID2dataset([os.path.basename(i).split('.tar.gz')[0] for i in glob.glob(
os.path.join(self._job.path_archive, 'Landsat-7/ETM+/*.tar.gz'))]) # TAR-ID
data_list = data_list + LandsatID2dataset([os.path.basename(i).split('.tar.gz')[0] for i in glob.glob(
os.path.join(self._job.path_archive, 'Landsat-8/OLI_TIRS/*.tar.gz'))]) # TAR-ID
# data_list.append({'image_type':'RSD','satellite':'Landsat-8', 'sensor':'OLI_TIRS','subsystem':None,
# 'acq_datetime':'2013-07-03', 'entity_ID':'LC81510322013184LGN00'}) # TAR-ID
# data_list.append({'image_type':'RSD','satellite':'Landsat-8', 'sensor':'OLI_TIRS','subsystem':None,
# 'acq_datetime':'2013-06-01', 'entity_ID':'LC81510322013152LGN00'}) # TAR-ID ~6% Cloudcov
if re.search('SPOT', ','.join(self.filt_datasets)):
data_list.append({'image_type': 'RSD', 'satellite': 'SPOT-1', 'sensor': 'HRV1', 'subsystem': None,
'acq_datetime': '1986-07-17', 'entity_ID': '00197112001'})
data_list.append({'image_type': 'RSD', 'satellite': 'SPOT-5', 'sensor': 'HRG2', 'subsystem': None,
'acq_datetime': '2010-04-21', 'entity_ID': '00197112009'})
data_list.append({'image_type': 'RSD', 'satellite': 'SPOT-5', 'sensor': 'HRG2', 'subsystem': None,
'acq_datetime': '2010-04-21', 'entity_ID': '00197112010'})
if re.search('RapidEye', ','.join(self.filt_datasets)):
data_list.append({'image_type': 'RSD', 'satellite': 'RapidEye-5', 'sensor': 'MSI', 'subsystem': None,
'acq_datetime': '2014-04-23', 'entity_ID': '4357606_2014-04-23_RE5_3A_180259'})
if re.search('SRTM', ','.join(self.filt_datasets)):
data_list.append({'image_type': 'DGM', 'satellite': 'SRTM', 'sensor': 'SRTM2', 'subsystem': None,
'acq_datetime': 'unknown', 'entity_ID': 'srtm-1arcsec-version2jan2015-39-42n-70-85'})
if re.search('ATM', ','.join(self.filt_datasets)):
data_list.append({'image_type': 'ATM', 'satellite': 'ATM-data', 'sensor': 'unknown', 'subsystem': None,
'acq_datetime': 'unknown', 'entity_ID': 'dummy_ID'})
data_list = [dict(i) for i in data_list]
for ds in data_list:
ds['proc_level'] = 'L0A'
ds['acq_datetime'] = datetime.datetime.strptime(ds['acq_datetime'], '%Y-%m-%d')
ds['subsystem'] = '' if ds['subsystem'] is None else ds['subsystem']
# ds['scene_ID'] = '_'.join([ds['satellite'],ds['sensor'],ds['subsystem'],ds['entity_ID']])
ds['scene_ID'] = ds['entity_ID']
ds['sensormode'] = get_sensormode(ds)
if self.skip_thermal:
data_list = [ds for ds in data_list if
not ds['subsystem'] == 'TIR'] # removes ASTER TIR in case of skip_thermal
if self.skip_pan:
data_list = [ds for ds in data_list if
not ds['sensormode'] == 'P'] # removes e.g. SPOT PAN in case of skip_pan
self.data_list = data_list
return self.data_list
def get_data_list_of_current_jobID(self): # called in webapp mode
def get_data_list_of_current_jobID(self):
"""
Get a list of datasets to be processed from database and return it together with some metadata.
......
......@@ -161,20 +161,12 @@ def get_list_GMSfiles(dataset_list, target):
:return [/path/to/gms_file1.gms, /path/to/gms_file1.gms]
"""
dataset_list = [dataset_list] if not isinstance(dataset_list, list) else dataset_list
if CFG.job.call_type == 'webapp':
def get_gmsP(ds, tgt): return PG.path_generator(ds, proc_level=tgt).get_path_gmsfile()
GMS_list = [p for p in [get_gmsP(ds, target) for ds in dataset_list] if os.path.exists(p)]
else: # CFG.job.call_type == 'console'
def SQLquery(ds): return DB_T.get_info_from_SQLdb(
CFG.job.path_database, 'processed_data', ['path_procdata', 'baseN'],
dict(image_type=ds['image_type'], entity_ID=ds['entity_ID'], subsystem=ds['subsystem'], proc_level=target))
returned_tuples = [SQLquery(ds) for ds in dataset_list]
all_paths, all_baseN = [rt[0] for rt in returned_tuples], [rt[1] for rt in returned_tuples]
def get_gmsP(ds, tgt):
return PG.path_generator(ds, proc_level=tgt).get_path_gmsfile()
def get_gmsP(dr, bN): return os.path.join(dr, '%s_%s.gms' % (bN, target))
GMS_list = [p for p in [get_gmsP(ds, target) for ds in dataset_list] if os.path.exists(p)]
GMS_list = [p for p in [get_gmsP(p, bN) for p, bN in [all_paths, all_baseN]] if os.path.exists(p)]
return GMS_list
......
import collections
import csv
import glob
import itertools
import os
......@@ -12,7 +11,6 @@ import warnings
from datetime import datetime
from typing import Union # noqa F401 # flake8 issue
from six import PY3
import numpy as np
import pandas as pd
from pandas.io.sql import pandasSQL_builder, SQLTable
......@@ -74,34 +72,6 @@ def get_scene_and_dataset_infos_from_postgreSQLdb(sceneid):
return ds
def get_info_from_SQLdb(path_db, tablename, vals2return, cond_dict, records2fetch=0):
# type: (str,str,list,dict,int) -> Union[list, str]
"""Queries an SQL database for the given parameters.
:param path_db: <str> the physical path of the SQL database on disk
:param tablename: <str> name of the table within the database to be queried
:param vals2return: <list or str> a list of strings containing the column titles of the values to be returned
:param cond_dict: <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
:param records2fetch: <int> number of records to be fetched (default=0: fetch unlimited records)
"""
if not isinstance(vals2return, list):
vals2return = [vals2return]
assert isinstance(records2fetch, int), \
"get_info_from_SQLdb: Expected an integer for the argument 'records2return'. Got %s" % type(records2fetch)
if not os.path.isfile(path_db):
return 'database connection fault'
connection = sqlite3.connect(path_db)
cursor = connection.cursor()
condition = "WHERE " + " AND ".join(["%s=?" % (list(cond_dict.keys())[i]) for i in range(len(cond_dict))])
cursor.execute("SELECT " + ','.join(vals2return) + " FROM " + tablename + " " + condition, list(cond_dict.values()))
records2return = cursor.fetchall() if records2fetch == 0 else [cursor.fetchone()] if records2fetch == 1 else \
cursor.fetchmany(size=records2fetch) # e.g. [('LE71950282003121EDC00',), ('LE71950282003105ASN00',)]
cursor.close()
connection.close()
return records2return
def get_postgreSQL_value(value):
# type: (any) -> str
"""Converts Python variable to a postgreSQL value respecting postgreSQL type casts.
......@@ -1287,7 +1257,7 @@ def import_shapefile_into_postgreSQL_database(path_shp, tablename, cols2import=N
def data_DB_updater(obj_dict):
# type: (dict) -> None
"""Updates the table "scenes_proc" or "mgrs_tiles_proc within a postgreSQL or an SQL database
"""Updates the table "scenes_proc" or "mgrs_tiles_proc within the postgreSQL database
according to the given dictionary of a GMS object.
:param obj_dict: <dict> a copy of the dictionary of the respective GMS object
......@@ -1297,111 +1267,61 @@ def data_DB_updater(obj_dict):
def list2str(list2convert): return ''.join([str(val) for val in list2convert])
if CFG.job.call_type == 'console':
if not os.path.isfile(CFG.job.path_database):
print('No internal database found. Creating a new one...')
connection = sqlite3.connect(CFG.job.path_database)
connection = psycopg2.connect(CFG.job.conn_database)
if connection is None:
print('Database connection could not be established. Database entry could not be created or updated.')
else:
if obj_dict['arr_shape'] != 'MGRS_tile':
table2update = 'scenes_proc'
dict_dbkey_objkey = {'sceneid': obj_dict['scene_ID'],
'georef': True if obj_dict['georef'] else False,
'proc_level': obj_dict['proc_level'],
'layer_bands_assignment': ''.join(obj_dict['LayerBandsAssignment']),
'bounds': Polygon(obj_dict['trueDataCornerLonLat'])}
matchExp = 'WHERE ' + get_postgreSQL_matchingExp('sceneid', dict_dbkey_objkey['sceneid'])
keys2update = ['georef', 'proc_level', 'layer_bands_assignment', 'bounds']
else: # MGRS_tile
table2update = 'mgrs_tiles_proc'
def get_tile_bounds_box(bnds): return box(bnds[0], bnds[2], bnds[1], bnds[3])
dict_dbkey_objkey = {'sceneid': obj_dict['scene_ID'],
'scenes_proc_id': obj_dict['scenes_proc_ID'],
'mgrs_code': obj_dict['MGRS_info']['tile_ID'],
'virtual_sensor_id': CFG.usecase.virtual_sensor_id,
'proc_level': obj_dict['proc_level'],
'coreg_success': obj_dict['coreg_info']['success'],
'tile_bounds': get_tile_bounds_box(obj_dict['bounds_LonLat']),
'data_corners': Polygon(obj_dict['trueDataCornerLonLat'])}
matchExp = 'WHERE ' + ' AND '.join([get_postgreSQL_matchingExp(k, dict_dbkey_objkey[k])
for k in ['sceneid', 'mgrs_code', 'virtual_sensor_id']])
keys2update = ['scenes_proc_id', 'proc_level', 'coreg_success', 'tile_bounds', 'data_corners']
if obj_dict['scenes_proc_ID'] is None:
keys2update.remove('scenes_proc_id')
cursor = connection.cursor()
fullColumnList = ['job_ID', 'job_CPUs', 'image_type', 'satellite', 'sensor', 'subsystem', 'sensormode',
'acquisition_date', 'entity_ID', 'georef', 'proc_level', 'LayerBandsAssignment',
'path_procdata']
cursor.execute('''CREATE TABLE IF NOT EXISTS processed_data (%s)''' % ', '.join(fullColumnList))
currentColumnList = [i[1] for i in cursor.execute("PRAGMA table_info('processed_data')").fetchall()]
missingColumns = [col for col in fullColumnList if col not in currentColumnList]
if missingColumns: # automatic adding of missing columns
cursor.execute('''CREATE TABLE IF NOT EXISTS processed_data_temp (%s)''' % ', '.join(fullColumnList))
cursor.execute("SELECT " + ','.join(currentColumnList) + " FROM processed_data")
[cursor.execute("INSERT INTO processed_data_temp (%(cols)s) VALUES (%(vals)s)" % {'cols': ','.join(
currentColumnList), 'vals': ','.join(['?'] * len(currentColumnList))}, row) for row in
cursor.fetchall()]
cursor.execute("DROP TABLE processed_data")
cursor.execute("ALTER TABLE processed_data_temp RENAME TO processed_data")
cursor.execute("SELECT EXISTS(SELECT 1 FROM processed_data WHERE entity_ID=? AND sensor=? AND subsystem=?)",
[obj_dict['entity_ID'], obj_dict['sensor'], obj_dict['subsystem']])
if cursor.fetchone()[0] == 0: # create new entry
new_record = [obj_dict[key] for key in fullColumnList]
new_record = [(''.join([str(val[li]) for li in range(len(val))])) if isinstance(val, list) else val
for val in new_record] # e.g. converts list of LayerBandsAssignment to string
cursor.execute("INSERT INTO processed_data VALUES (%s)" % ','.join(['?'] * len(new_record)), new_record)
else: # udate existing entry
values2update = [obj_dict[key] for key in
['job_ID', 'job_CPUs', 'proc_level', 'path_procdata', 'LayerBandsAssignment']]
values2update = [(''.join([str(val[li]) for li in range(len(val))])) if isinstance(val, list) else val
for val in values2update] # e.g. converts list of LayerBandsAssignment to string
connection.execute("UPDATE processed_data set job_ID=?, job_CPUs=?, proc_level=?,path_procdata=?, \
LayerBandsAssignment=? WHERE entity_ID=? AND sensor=? AND subsystem=?",
values2update + [obj_dict['entity_ID']] + [obj_dict['sensor'], obj_dict['subsystem']])
else: # call_type == 'webapp'
connection = psycopg2.connect(CFG.job.conn_database)
if connection is None:
print('Database connection could not be established. Database entry could not be created or updated.')
# check if record exists
execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM %s %s)" % (table2update, matchExp))
# create new entry
if cursor.fetchone()[0] == 0:
keys, vals = zip(*[(k, str(get_postgreSQL_value(v))) for k, v in dict_dbkey_objkey.items()])
execute_pgSQL_query(cursor,
"INSERT INTO %s (%s) VALUES (%s);" % (table2update, ','.join(keys), ','.join(vals)))
# or update existing entry
else:
if obj_dict['arr_shape'] != 'MGRS_tile':
table2update = 'scenes_proc'
dict_dbkey_objkey = {'sceneid': obj_dict['scene_ID'],
'georef': True if obj_dict['georef'] else False,
'proc_level': obj_dict['proc_level'],
'layer_bands_assignment': ''.join(obj_dict['LayerBandsAssignment']),
'bounds': Polygon(obj_dict['trueDataCornerLonLat'])}
matchExp = 'WHERE ' + get_postgreSQL_matchingExp('sceneid', dict_dbkey_objkey['sceneid'])
keys2update = ['georef', 'proc_level', 'layer_bands_assignment', 'bounds']
else: # MGRS_tile
table2update = 'mgrs_tiles_proc'
def get_tile_bounds_box(bnds): return box(bnds[0], bnds[2], bnds[1], bnds[3])
dict_dbkey_objkey = {'sceneid': obj_dict['scene_ID'],
'scenes_proc_id': obj_dict['scenes_proc_ID'],
'mgrs_code': obj_dict['MGRS_info']['tile_ID'],
'virtual_sensor_id': CFG.usecase.virtual_sensor_id,
'proc_level': obj_dict['proc_level'],
'coreg_success': obj_dict['coreg_info']['success'],
'tile_bounds': get_tile_bounds_box(obj_dict['bounds_LonLat']),
'data_corners': Polygon(obj_dict['trueDataCornerLonLat'])}
matchExp = 'WHERE ' + ' AND '.join([get_postgreSQL_matchingExp(k, dict_dbkey_objkey[k])
for k in ['sceneid', 'mgrs_code', 'virtual_sensor_id']])
keys2update = ['scenes_proc_id', 'proc_level', 'coreg_success', 'tile_bounds', 'data_corners']
if obj_dict['scenes_proc_ID'] is None:
keys2update.remove('scenes_proc_id')
cursor = connection.cursor()
# check if record exists
execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM %s %s)" % (table2update, matchExp))
# create new entry
if cursor.fetchone()[0] == 0:
keys, vals = zip(*[(k, str(get_postgreSQL_value(v))) for k, v in dict_dbkey_objkey.items()])
execute_pgSQL_query(cursor,
"INSERT INTO %s (%s) VALUES (%s);" % (table2update, ','.join(keys), ','.join(vals)))
# or update existing entry
else:
setExp = 'SET ' + ','.join(
['%s=%s' % (k, get_postgreSQL_value(dict_dbkey_objkey[k])) for k in keys2update])
execute_pgSQL_query(cursor, "UPDATE %s %s %s;" % (table2update, setExp, matchExp))
setExp = 'SET ' + ','.join(
['%s=%s' % (k, get_postgreSQL_value(dict_dbkey_objkey[k])) for k in keys2update])
execute_pgSQL_query(cursor, "UPDATE %s %s %s;" % (table2update, setExp, matchExp))
if 'connection' in locals():
connection.commit()
connection.close()
def SQL_DB_to_csv():
if not os.path.exists(CFG.job.path_database) or not os.path.getsize(CFG.job.path_database) > 0:
print('No database conversion to CSV performed, because DB does not exist or DB is empty.')
else:
connection = sqlite3.connect(CFG.job.path_database)
cursor = connection.cursor()
cursor.execute("SELECT * FROM processed_data")
with open(os.path.join(os.path.dirname(CFG.job.path_database), 'data_DB.csv'), 'w' if PY3 else 'wb') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow([i[0] for i in cursor.description])
csvwriter.writerows(cursor)
def postgreSQL_table_to_csv(conn_db, path_csv, tablename):
# GeoDataFrame.to_csv(path_csv, index_label='id')
raise NotImplementedError # TODO
......
......@@ -558,18 +558,7 @@ class METADATA(object):
if self.EntityID == '':
self.logger.info('Scene-ID could not be extracted and has to be retrieved from %s metadata database...'
% self.Satellite)
if CFG.job.call_type == 'console':
DB_T.update_metaDB_if_needed(self.Satellite, self.Sensor, self.Subsystem, self.AcqDate)
tablename = '%s_%s_%s' % (self.Satellite.replace('-', ''), self.Sensor.replace('+', ''),
self.Subsystem) if self.Subsystem != '' else \
'%s_%s' % (self.Satellite.replace('-', ''), self.Sensor.replace('+', ''))
tablename = tablename if tablename not in ['Landsat4_TM', 'Landsat5_TM'] else 'Landsat45_TM'
result = DB_T.get_info_from_SQLdb(CFG.job.path_db_meta, tablename, ['sceneID', 'sensor'],
{'acquisitionDate': self.AcqDate, 'path': self.WRS_path,
'row': self.WRS_row}, records2fetch=1)
else:
result = DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'scenes',
['entityID'], {'id': self.SceneID})
result = DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'scenes', ['entityID'], {'id': self.SceneID})
if len(result) == 1: # e.g. [('LE71950282003121EDC00',)]
self.EntityID = result[0][0]
......
......@@ -38,14 +38,13 @@ __author__ = 'Daniel Scheffler'
class process_controller(object):
def __init__(self, job_ID, call_type='webapp', exec_mode='Python', db_host='localhost',
def __init__(self, job_ID, exec_mode='Python', db_host='localhost',
parallelization_level='scenes', delete_old_output=False, job_config_kwargs=None):
# type: (int, str, str, str, str, bool) -> None
# type: (str, str, str, str, bool) -> None
"""gms_preprocessing process controller
:param job_ID: <int> a job ID belonging to a valid database record within table 'jobs'
:param call_type: <str> choices: 'webapp' and 'console'
:param exec_mode: <str> choices: 'Python' - writes all intermediate data to disk
'Flink' - keeps all intermediate data in memory
:param db_host: <str> hostname of the host where database is hosted
......@@ -58,14 +57,11 @@ class process_controller(object):
# assertions
if not isinstance(job_ID, int):
raise ValueError("'job_ID' must be an integer value. Got %s." % type(job_ID))
if call_type not in ['webapp', 'console']:
raise ValueError("Unexpected call_type '%s'!" % call_type)
if exec_mode not in ['Python', 'Flink']:
raise ValueError("Unexpected exec_mode '%s'!" % exec_mode)
if parallelization_level not in ['scenes', 'tiles']:
raise ValueError("Unexpected parallelization_level '%s'!" % parallelization_level)