Commit 405ea0c5 authored by Daniel Scheffler's avatar Daniel Scheffler Committed by Mathias Peters
Browse files

Flink compatibility update / major revision of process controller

summary:
- revised config -> database must be queried once per machine
- all modules of the package are now directly importable
- process controller is now a Python class containing a couple of methods for running the job

all modules:
- revised import statements
- replaced functions 'CFG.get_job()' and 'CFG.get_usecase()' by property 'CFG.job' and 'CFG.usecase'

misc.helper_functions:
- moved MAP() to new module processing.multiproc

misc.SpatialIndexMediator.SpatialIndexMediatorServer:
- bugfix for returning 'port' instead of 'process_id'

added new package 'processing'
- added __init__
- added module 'multiproc'
- added new module 'process_controller' containing new class 'process_controller' with several methods for running the GMS job

__init__:
- revised

config:
- added set_config() replacing unified_config.set_config()
- added class GMS_configuration containing properties for 'job' and 'usecase' that replace unified_config.get_job() and unified_config.get_usecase()
- Job:
    - added attributes 'end_time' and 'computation_time'
    - revised class structure
- Usecase:
    - revised class structure
- removed deprecated code

copied old version of config to config_old (still used by deprecated process_controller_for_testing)

added run_gms.py for running GeoMultiSens process controller from console

- updated __version__
parent cfadc485
......@@ -10,10 +10,17 @@
from . import algorithms
from . import io
from . import misc
from . import processing
from . import config
from .processing.process_controller import process_controller
__all__=['algorithms',
'io',
'misc']
__version__ = '20160905.01'
__author__='Daniel Scheffler'
\ No newline at end of file
__version__ = '20161201.01'
__author__ = 'Daniel Scheffler'
__all__ = ['algorithms',
'io',
'misc',
'processing',
'config',
'process_controller',
]
\ No newline at end of file
......@@ -15,19 +15,19 @@
# **** IMPORTING LIBRARIES ****************************************************
import collections
import datetime
import math
import os
import re
import subprocess
import sys
import time
import numpy as np
import re
import scipy
import scipy.ndimage
import datetime
from matplotlib import dates as mdates
import math
import subprocess
import builtins
import time
import collections
from scipy.interpolate import RegularGridInterpolator
# custom
......@@ -48,16 +48,15 @@ import pyproj
from pyorbital import astronomy
import ephem
from shapely.geometry import MultiPoint
from numba import jit
from ..io import envifilehandling as ef
from ..misc import helper_functions as HLP_F
from py_tools_ds.ptds.geo.coord_grid import snap_bounds_to_pixGrid
from py_tools_ds.ptds.geo.coord_trafo import transform_utm_to_wgs84, transform_wgs84_to_utm, mapXY2imXY, imXY2mapXY
from py_tools_ds.ptds.geo.projection import get_UTMzone, EPSG2WKT, isProjectedOrGeographic
from py_tools_ds.ptds.geo.raster.reproject import warp_ndarray
from .. import unified_config as CFG
from ..config import GMS_config as CFG
from ..io import envifilehandling as ef
from ..misc import helper_functions as HLP_F
class GEOPROCESSING(object):
......@@ -1025,7 +1024,7 @@ class GEOPROCESSING(object):
# %(dst_EPSG_code, in_nodataVal,out_nodataVal, translatedFile, warpedFile))
os.system('gdalwarp -of ENVI --config GDAL_CACHEMAX 2048 -wm 2048 -t_srs EPSG:%s -tps -r \
cubic -srcnodata %s -dstnodata %s -multi -overwrite -wo NUM_THREADS=%s -q %s %s' \
%(dst_EPSG_code,inFill,out_nodataVal,CFG.get_job().CPUs,translatedFile,warpedFile))
%(dst_EPSG_code,inFill,out_nodataVal,CFG.job.CPUs,translatedFile,warpedFile))
# import shutil
# shutil.copy(translatedFile, '//misc/hy5/scheffler/Skripte_Models/python/GeoMultiSens/testing/out/') ## only for bugfixing
# shutil.copy(translatedFile+'.hdr','//misc/hy5/scheffler/Skripte_Models/python/GeoMultiSens/testing/out/') ## only for bugfixing
......@@ -1046,7 +1045,7 @@ class GEOPROCESSING(object):
# %(dst_EPSG_code,in_nodataVal,out_nodataVal,translatedFile,warpedFile))
os.system('gdalwarp -of VRT --config GDAL_CACHEMAX 2048 -wm 2048 -ot Int16 -t_srs EPSG:%s -tps -r \
cubic -srcnodata %s -dstnodata %s -overwrite -multi -wo NUM_THREADS=%s -q %s %s' \
% (dst_EPSG_code, inFill, out_nodataVal, CFG.get_job().CPUs, translatedFile, warpedFile))
% (dst_EPSG_code, inFill, out_nodataVal, CFG.job.CPUs, translatedFile, warpedFile))
# print('warped')
print('GDAL warping time',time.time()-t0)
......
......@@ -13,32 +13,32 @@
###############################################################################
########################### Library import ####################################
import os
import re
import datetime
import glob
import builtins
import os
import re
from collections import OrderedDict
import psycopg2
import psycopg2.extras
from collections import OrderedDict
from ..io import Input_reader as INP_R
from ..misc import helper_functions as HLP_F
from ..config import GMS_config as CFG
from ..io import Input_reader as INP_R
from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc import path_generator as PG
from .METADATA import get_LayerBandsAssignment
from .. import unified_config as CFG
########################### core functions ####################################
def get_entity_IDs_within_AOI(): # called in console mode
data_list =[]
if re.search('ALOS', ','.join(CFG.get_usecase().filt_datasets)): # sensorname has to be in HLP_F.get_GMS_sensorcode
if re.search('ALOS', ','.join(CFG.usecase.filt_datasets)): # sensorname has to be in HLP_F.get_GMS_sensorcode
data_list.append({'image_type':'RSD','satellite':'ALOS', 'sensor':'AVNIR-2', 'subsystem':None, 'acquisition_date':'2009-07-02', 'entity_ID':'A1002553-001-P6100002-AODS-201007300008'}) # TAR-ID 1B1
data_list.append({'image_type':'RSD','satellite':'ALOS', 'sensor':'AVNIR-2', 'subsystem':None, 'acquisition_date':'2007-09-27', 'entity_ID':'20070927_L1B2_ALAV2A089152780'}) # extracted Folder 1B2
data_list.append({'image_type':'RSD','satellite':'ALOS', 'sensor':'AVNIR-2', 'subsystem':None, 'acquisition_date':'2009-07-19', 'entity_ID':'20090719_L1B2_ALAV2A185572780'}) # extracted Folder 1B2
data_list.append({'image_type':'RSD','satellite':'ALOS', 'sensor':'AVNIR-2', 'subsystem':None, 'acquisition_date':'2010-04-21', 'entity_ID':'20100421_L1B2_ALAV2A225832780'}) # extracted Folder 1B2
if re.search('Terra', ','.join(CFG.get_usecase().filt_datasets)):
if re.search('Terra', ','.join(CFG.usecase.filt_datasets)):
data_list.append({'image_type':'RSD','satellite':'Terra', 'sensor':'ASTER', 'subsystem':'VNIR1', 'acquisition_date':'2007-11-08', 'entity_ID':'AST_L1B_00308192007061017_20071108171717_32444'}) # HDF-ID
data_list.append({'image_type':'RSD','satellite':'Terra', 'sensor':'ASTER', 'subsystem':'VNIR2', 'acquisition_date':'2007-11-08', 'entity_ID':'AST_L1B_00308192007061017_20071108171717_32444'}) # HDF-ID
data_list.append({'image_type':'RSD','satellite':'Terra', 'sensor':'ASTER', 'subsystem':'SWIR', 'acquisition_date':'2007-11-08', 'entity_ID':'AST_L1B_00308192007061017_20071108171717_32444'}) # HDF-ID
......@@ -47,23 +47,23 @@ def get_entity_IDs_within_AOI(): # called in console mode
data_list.append({'image_type':'RSD','satellite':'Terra', 'sensor':'ASTER', 'subsystem':'VNIR2', 'acquisition_date':'2002-06-08', 'entity_ID':'AST_L1A_003_05262002060543_06082002144959'}) # HDF-ID
data_list.append({'image_type':'RSD','satellite':'Terra', 'sensor':'ASTER', 'subsystem':'SWIR', 'acquisition_date':'2002-06-08', 'entity_ID':'AST_L1A_003_05262002060543_06082002144959'}) # HDF-ID
data_list.append({'image_type':'RSD','satellite':'Terra', 'sensor':'ASTER', 'subsystem':'TIR', 'acquisition_date':'2002-06-08', 'entity_ID':'AST_L1A_003_05262002060543_06082002144959'}) # HDF-ID
if re.search('Landsat', ','.join(CFG.get_usecase().filt_datasets)): # sensorname has to be in HLP_F.get_GMS_sensorcode
if re.search('Landsat', ','.join(CFG.usecase.filt_datasets)): # sensorname has to be in HLP_F.get_GMS_sensorcode
data_list.append({'image_type':'RSD','satellite':'Landsat-5', 'sensor':'TM', 'subsystem':None, 'acquisition_date':'1996-10-24', 'entity_ID':'LT51510321996298XXX01'}) # TAR-ID
# data_list.append({'image_type':'RSD','satellite':'Landsat-7', 'sensor':'ETM+', 'subsystem':None, 'acquisition_date':'2002-08-15', 'entity_ID':'LE70050152002227EDC00'}) # TAR-ID
# data_list.append({'image_type':'RSD','satellite':'Landsat-7', 'sensor':'ETM+', 'subsystem':None, 'acquisition_date':'2000-04-02', 'entity_ID':'LE71510322000093SGS00'}) # TAR-ID
data_list = data_list + LandsatID2dataset([os.path.basename(i).split('.tar.gz')[0] for i in glob.glob(os.path.join(CFG.get_job().path_archive,'Landsat-7/ETM+/*.tar.gz'))]) # TAR-ID
data_list = data_list + LandsatID2dataset([os.path.basename(i).split('.tar.gz')[0] for i in glob.glob(os.path.join(CFG.get_job().path_archive,'Landsat-8/OLI_TIRS/*.tar.gz'))]) # TAR-ID
data_list = data_list + LandsatID2dataset([os.path.basename(i).split('.tar.gz')[0] for i in glob.glob(os.path.join(CFG.job.path_archive,'Landsat-7/ETM+/*.tar.gz'))]) # TAR-ID
data_list = data_list + LandsatID2dataset([os.path.basename(i).split('.tar.gz')[0] for i in glob.glob(os.path.join(CFG.job.path_archive,'Landsat-8/OLI_TIRS/*.tar.gz'))]) # TAR-ID
# data_list.append({'image_type':'RSD','satellite':'Landsat-8', 'sensor':'OLI_TIRS','subsystem':None, 'acquisition_date':'2013-07-03', 'entity_ID':'LC81510322013184LGN00'}) # TAR-ID
# data_list.append({'image_type':'RSD','satellite':'Landsat-8', 'sensor':'OLI_TIRS','subsystem':None, 'acquisition_date':'2013-06-01', 'entity_ID':'LC81510322013152LGN00'}) # TAR-ID ~6% Cloud cover
if re.search('SPOT', ','.join(CFG.get_usecase().filt_datasets)):
if re.search('SPOT', ','.join(CFG.usecase.filt_datasets)):
data_list.append({'image_type':'RSD','satellite':'SPOT-1', 'sensor':'HRV1', 'subsystem':None, 'acquisition_date':'1986-07-17', 'entity_ID':'00197112001'})
data_list.append({'image_type':'RSD','satellite':'SPOT-5', 'sensor':'HRG2', 'subsystem':None, 'acquisition_date':'2010-04-21', 'entity_ID':'00197112009'})
data_list.append({'image_type':'RSD','satellite':'SPOT-5', 'sensor':'HRG2', 'subsystem':None, 'acquisition_date':'2010-04-21', 'entity_ID':'00197112010'})
if re.search('RapidEye', ','.join(CFG.get_usecase().filt_datasets)):
if re.search('RapidEye', ','.join(CFG.usecase.filt_datasets)):
data_list.append({'image_type':'RSD','satellite':'RapidEye-5','sensor':'MSI', 'subsystem':None, 'acquisition_date':'2014-04-23', 'entity_ID':'4357606_2014-04-23_RE5_3A_180259'})
if re.search('SRTM', ','.join(CFG.get_usecase().filt_datasets)):
if re.search('SRTM', ','.join(CFG.usecase.filt_datasets)):
data_list.append({'image_type':'DGM','satellite':'SRTM', 'sensor':'SRTM2', 'subsystem':None, 'acquisition_date':'unknown', 'entity_ID':'srtm-1arcsec-version2jan2015-39-42n-70-85'})
if re.search('ATM', ','.join(CFG.get_usecase().filt_datasets)):
if re.search('ATM', ','.join(CFG.usecase.filt_datasets)):
data_list.append({'image_type':'ATM','satellite':'ATM-data', 'sensor':'unknown', 'subsystem':None, 'acquisition_date':'unknown', 'entity_ID':'dummy_ID'})
for ds in data_list:
......@@ -73,15 +73,14 @@ def get_entity_IDs_within_AOI(): # called in console mode
#ds['scene_ID'] = '_'.join([ds['satellite'],ds['sensor'],ds['subsystem'],ds['entity_ID']])
ds['scene_ID'] = ds['entity_ID']
ds['sensormode'] = get_sensormode(ds)
if CFG.get_usecase().skip_thermal:
if CFG.usecase.skip_thermal:
data_list = [ds for ds in data_list if not ds['subsystem'] == 'TIR'] # removes ASTER TIR in case of skip_thermal
if CFG.get_usecase().skip_pan:
if CFG.usecase.skip_pan:
data_list = [ds for ds in data_list if not ds['sensormode'] == 'P'] # removes e.g. SPOT PAN in case of skip_pan
return data_list
def get_data_list_of_current_jobID(): # called in webapp mode
job = CFG.get_job()
usecase = CFG.get_usecase()
job = CFG.job
data_list = []
with psycopg2.connect(job.conn_database) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
......@@ -124,10 +123,10 @@ def get_data_list_of_current_jobID(): # called in webapp mode
ds["filename"] = row["filename"]
ds['sensor'] = 'ETM+' if re.search('ETM+', ds['sensor']) else ds['sensor']
if CFG.get_usecase().skip_thermal and ds['subsystem']=='TIR': continue # removes ASTER TIR in case of skip_thermal
if CFG.usecase.skip_thermal and ds['subsystem']=='TIR': continue # removes ASTER TIR in case of skip_thermal
ds['subsystem'] = '' if ds['subsystem'] is None else ds['subsystem']
ds['sensormode'] = get_sensormode(ds)
if CFG.get_usecase().skip_pan and ds['sensormode']=='P': continue # removes e.g. SPOT PAN in case of skip_pan
if CFG.usecase.skip_pan and ds['sensormode']=='P': continue # removes e.g. SPOT PAN in case of skip_pan
if re.search("Sentinel-2A",ds['satellite'],re.I):
for subsystem in ['S2A10','S2A20','S2A60']:
......@@ -169,11 +168,11 @@ def get_sensormode(dataset):
return 'M'
def add_local_availability(dataset):
if CFG.get_job().call_type == 'webapp':
if CFG.job.call_type == 'webapp':
DB_match = DB_T.get_info_from_postgreSQLdb \
(CFG.get_job().conn_database,'scenes_proc',['proc_level','layer_bands_assignment'], {'sceneid':dataset['scene_ID']})
else: ## CFG.get_job().call_type == 'console'
DB_match = DB_T.get_info_from_SQLdb(CFG.get_job().path_database,'processed_data',['proc_level','LayerBandsAssignment'],
(CFG.job.conn_database,'scenes_proc',['proc_level','layer_bands_assignment'], {'sceneid':dataset['scene_ID']})
else: ## CFG.job.call_type == 'console'
DB_match = DB_T.get_info_from_SQLdb(CFG.job.path_database,'processed_data',['proc_level','LayerBandsAssignment'],
{'image_type':dataset['image_type'],'satellite':dataset['satellite'], 'sensor':dataset['sensor'],
'subsystem':dataset['subsystem'], 'sensormode':dataset['sensormode'], 'entity_ID':dataset['entity_ID']})
path_logfile = PG.path_generator(dataset).get_path_logfile()
......@@ -210,7 +209,7 @@ def add_local_availability(dataset):
'logfile %s has been written successfully. Recreating missing database entry.' \
%(dataset['entity_ID'],ProcL))
DB_T.data_DB_updater(GMS_file_dict)
if CFG.get_job().call_type == 'console':
if CFG.job.call_type == 'console':
DB_T.SQL_DB_to_csv()
dataset['proc_level'] = ProcL
elif len(DB_match) == 1:
......
......@@ -18,23 +18,22 @@
########################### Library import ####################################
#from __future__ import (division, print_function, unicode_literals,absolute_import)
import collections
import os
import re
import builtins
import collections
from ..misc import path_generator as PG
from ..config import GMS_config as CFG
from ..misc import path_generator as PG
from ..misc.logging import GMS_logger
from .. import unified_config as CFG
########################### core functions ####################################
class L0B_object(object):
def __init__(self, data_list_posX):
self.proc_level = 'L0B'
self.job_ID = CFG.get_job().ID
self.job_CPUs = CFG.get_job().CPUs
self.job_ID = CFG.job.ID
self.job_CPUs = CFG.job.CPUs
self.image_type = data_list_posX['image_type']
self.satellite = data_list_posX['satellite']
self.sensor = data_list_posX['sensor']
......@@ -57,7 +56,7 @@ class L0B_object(object):
['image_type' ,'Satellite' ,'Sensor' ,'Subsystem' ,'logger' ],
[self.image_type,self.satellite,self.sensor,self.subsystem,self.logger]) )
self.path_cloud_class_obj = PG.get_path_cloud_class_obj(self.GMS_identifier,
get_all=True if CFG.get_job().bench_CLD_class else False)
get_all=True if CFG.job.bench_CLD_class else False)
if not os.path.isfile(self.path_archive) and not os.path.isdir(self.path_archive):
self.logger.info("The %s dataset '%s' has not been processed earlier and no corresponding raw data archive"
......@@ -72,7 +71,7 @@ class L0B_object(object):
self.logger.info('Level 0B object for %s %s%s (data-ID %s) successfully created.' %(self.satellite,
self.sensor, (' '+self.subsystem) if self.subsystem not in [None,''] else '', self.entity_ID))
if CFG.get_job().exec_mode=='Python' and self.ExtractedFolder and not os.path.isdir(self.ExtractedFolder):
if CFG.job.exec_mode=='Python' and self.ExtractedFolder and not os.path.isdir(self.ExtractedFolder):
os.makedirs(self.ExtractedFolder)
# close loggers
......
This diff is collapsed.
......@@ -15,7 +15,6 @@
###############################################################################
########################### Library import ####################################
import builtins
import collections
import json
import os
......@@ -28,24 +27,22 @@ import numpy as np
from geopandas import GeoDataFrame
from shapely.geometry import box
from ..misc import helper_functions as HLP_F
from ..misc import path_generator as PG
from ..misc import database_tools as DB_T
from .L1A_P import L1A_object
from ..misc.SpatialIndexMediator import SpatialIndexMediator
from ..misc.logging import GMS_logger
from .. import unified_config as CFG
#sys.path.append('/home/gfz-fe/')
from CoReg_Sat import COREG, DESHIFTER
from py_tools_ds.ptds import GeoArray
from py_tools_ds.ptds.geo.projection import prj_equal, EPSG2WKT
from py_tools_ds.ptds.geo.coord_calc import corner_coord_to_minmax
from py_tools_ds.ptds.geo.coord_trafo import reproject_shapelyPoly, transform_any_prj
from py_tools_ds.ptds.geo.map_info import mapinfo2geotransform
from py_tools_ds.ptds.geo.projection import prj_equal, EPSG2WKT
from py_tools_ds.ptds.geo.vector.topology import get_overlap_polygon
from ..config import GMS_config as CFG
from .L1A_P import L1A_object
from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc import path_generator as PG
from ..misc.SpatialIndexMediator import SpatialIndexMediator
from ..misc.logging import GMS_logger
#if socket.gethostname() == 'geoms':
# sys.path.append('/usr/lib/otb/python/')
......@@ -231,7 +228,7 @@ class Scene_finder(object):
SpIM = SpatialIndexMediator(timeout=timeout)
self.possib_ref_scenes = \
SpIM.getFullSceneDataForDataset(self.boundsLonLat , self.timeStart, self.timeEnd, self.min_cloudcov,
self.max_cloudcov, CFG.get_usecase().datasetid_spatial_ref,
self.max_cloudcov, CFG.usecase.datasetid_spatial_ref,
refDate=self.src_AcqDate, maxDaysDelta=self.plusminus_days)
break
except socket.timeout:
......@@ -297,7 +294,7 @@ class Scene_finder(object):
if not GDF.empty:
# get processing level of refernce scenes
query_procL = lambda sceneID: \
DB_T.get_info_from_postgreSQLdb(CFG.get_job().conn_database, 'scenes_proc', ['proc_level'], {'sceneid': sceneID})
DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'scenes_proc', ['proc_level'], {'sceneid': sceneID})
GDF['temp_queryRes'] = [*GDF['sceneid'] .map(query_procL)]
GDF['proc_level'] = [*GDF['temp_queryRes'].map(lambda queryRes: queryRes[0][0] if queryRes else None)]
GDF.drop('temp_queryRes',axis=1,inplace=True)
......@@ -325,7 +322,7 @@ class Scene_finder(object):
GDF = self.GDF_ref_scenes
if not GDF.empty:
# check if a proper entity ID can be gathered from database
query_eID = lambda sceneID: DB_T.get_info_from_postgreSQLdb(CFG.get_job().conn_database, 'scenes', ['entityid'],
query_eID = lambda sceneID: DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'scenes', ['entityid'],
{'id': sceneID}, records2fetch=1)
GDF['temp_queryRes'] = [*GDF['sceneid'] .map(query_eID)]
GDF['entityid'] = [*GDF['temp_queryRes'].map(lambda queryRes: queryRes[0][0] if queryRes else None)]
......@@ -363,14 +360,14 @@ class Scene_finder(object):
plusminus_days = 30
AcqDate = self.im2shift_objDict['acquisition_date']
date_minmax = [AcqDate-timedelta(days=plusminus_days),AcqDate+timedelta(days=plusminus_days)]
dataset_cond = 'datasetid=%s' %CFG.get_usecase().datasetid_spatial_ref
dataset_cond = 'datasetid=%s' %CFG.usecase.datasetid_spatial_ref
cloudcov_cond = 'cloudcover < %s' %max_cloudcov # FIXME noch nicht für alle scenes im proc_level METADATA verfügbar
dayrange_cond = "(EXTRACT(MONTH FROM scenes.acquisitiondate), EXTRACT(DAY FROM scenes.acquisitiondate)) " \
"BETWEEN (%s, %s) AND (%s, %s)" \
%(date_minmax[0].month, date_minmax[0].day, date_minmax[1].month, date_minmax[1].day)
# TODO weitere Kriterien einbauen!
query_scenes = lambda condlist: DB_T.get_overlapping_scenes_from_postgreSQLdb(CFG.get_job().conn_database,
query_scenes = lambda condlist: DB_T.get_overlapping_scenes_from_postgreSQLdb(CFG.job.conn_database,
table='scenes', trueDataCornerLonLat=self.trueDataCornerLonLat,
conditions=condlist, add_cmds='ORDER BY scenes.cloudcover ASC', timeout=30000)
conds_descImportance = [dataset_cond, cloudcov_cond, dayrange_cond]
......@@ -383,9 +380,9 @@ class Scene_finder(object):
if count==0:
# search within already processed scenes
res = DB_T.get_overlapping_scenes_from_postgreSQLdb(
CFG.get_job().conn_database, trueDataCornerLonLat=self.trueDataCornerLonLat,
conditions=['datasetid=%s' %CFG.get_usecase().datasetid_spatial_ref],
add_cmds='ORDER BY scenes.cloudcover ASC',timeout=25000) # das ist nur Ergebnis aus scenes_proc -> dort liegt nur eine referenz, wenn die szene schon bei CFG.get_job()-Beginn in Datensatzliste drin war
CFG.job.conn_database, trueDataCornerLonLat=self.trueDataCornerLonLat,
conditions=['datasetid=%s' %CFG.usecase.datasetid_spatial_ref],
add_cmds='ORDER BY scenes.cloudcover ASC',timeout=25000) # das ist nur Ergebnis aus scenes_proc -> dort liegt nur eine referenz, wenn die szene schon bei CFG.job-Beginn in Datensatzliste drin war
filt_overlap_scenes = self.sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]],20.)
else:
......@@ -423,7 +420,7 @@ class Scene_finder(object):
break
# start download of scene data not available and start L1A processing
dl_cmd = lambda scene_ID: print('%s %s %s' %(CFG.get_job().java_commands['keyword'].strip(), CFG.get_job().java_commands
dl_cmd = lambda scene_ID: print('%s %s %s' %(CFG.job.java_commands['keyword'].strip(), CFG.job.java_commands
["value_download"].strip(), scene_ID))
path = PG.path_generator(scene_ID = sc['scene_ID']).get_path_imagedata()
......@@ -437,12 +434,12 @@ class Scene_finder(object):
# check if scene is downloading
download_start_timeout = 5 # seconds
# set timout for external processing ## DEPRECATED BECAUSE CREATION OF EXTERNAL CFG.get_JOB() WITHIN CFG.get_JOB() IS NOT ALLOWED
# set timout for external processing ## DEPRECATED BECAUSE CREATION OF EXTERNAL CFG.job WITHIN CFG.job IS NOT ALLOWED
processing_timeout = 5 # seconds # FIXME increase timeout if processing is really started
proc_level = None
while True:
proc_level_chk = DB_T.get_info_from_postgreSQLdb(
CFG.get_job().conn_database,'scenes','proc_level',{'id':sc['scene_ID']})[0][0]
CFG.job.conn_database,'scenes','proc_level',{'id':sc['scene_ID']})[0][0]
if proc_level != proc_level_chk:
print('Reference scene %s, current processing level: %s' %(sc['scene_ID'], proc_level_chk))
proc_level = proc_level_chk
......@@ -457,7 +454,7 @@ class Scene_finder(object):
warnings.warn('L1A processing of reference scene %s (entity ID %s) timed out. '
'Coregistration of this scene failed.' % (self.baseN, self.scene_ID))
break
#DB_T.set_info_in_postgreSQLdb(CFG.get_job().conn_database,'scenes',
#DB_T.set_info_in_postgreSQLdb(CFG.job.conn_database,'scenes',
# {'proc_level':'METADATA'},{'id':sc['scene_ID']})
time.sleep(5)
......@@ -474,7 +471,7 @@ class Scene_finder(object):
self.overlap_percentage = sc['overlap percentage']
self.overlap_area = sc['overlap area']
query_res = DB_T.get_info_from_postgreSQLdb(CFG.get_job().conn_database,'scenes','entityid',
query_res = DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database,'scenes','entityid',
{'id':self.imref_scene_ID},records2fetch=1)
assert query_res != [], 'No entity-ID found for scene number %s' %self.imref_scene_ID
self.imref_entity_ID = query_res[0][0] # [('LC81510322013152LGN00',)]
......@@ -686,8 +683,8 @@ class L1B_object(L1A_object):
self.coreg_info.update({'reference geotransform': None})
self.coreg_info.update({'reference projection' : self.meta['coordinate system string']}) # must be the own projection in order to avoid overwriting with a wring EPSG
self.coreg_info.update({'reference extent' : {'rows':None, 'cols':None }})
self.coreg_info.update({'reference grid' : [list(CFG.get_usecase().spatial_ref_gridx),
list(CFG.get_usecase().spatial_ref_gridy)]})
self.coreg_info.update({'reference grid' : [list(CFG.usecase.spatial_ref_gridx),
list(CFG.usecase.spatial_ref_gridy)]})
self.coreg_info.update({'success' : True if not self.coreg_needed else False}) # False means spatRef not available
......@@ -710,7 +707,7 @@ class L1B_object(L1A_object):
geoArr = GeoArray(getattr(self,attrname), tuple(gt), prj)
rspAlg = 'cubic' if attrname=='arr' else 'nearest'
DS = DESHIFTER(geoArr, self.coreg_info,
target_xyGrid=[CFG.get_usecase().spatial_ref_gridx, CFG.get_usecase().spatial_ref_gridy],
target_xyGrid=[CFG.usecase.spatial_ref_gridx, CFG.usecase.spatial_ref_gridy],
cliptoextent=True, clipextent=mapBounds, align_grids=True, resamp_alg=rspAlg, v=False)
DS.correct_shifts()
setattr(self,attrname, DS.arr_shifted)
......
......@@ -22,20 +22,20 @@
########################### Library import ####################################
import numpy as np
import builtins
try:
from osgeo import osr
except ImportError:
import osr
from py_tools_ds.ptds.geo.map_info import mapinfo2geotransform
from ..config import GMS_config as CFG
from ..misc import helper_functions as HLP_F
from . import GEOPROCESSING as GEOP
from ..io import Input_reader as INP_R
from ..misc import path_generator as PG
from .L1B_P import L1B_object
from py_tools_ds.ptds.geo.map_info import mapinfo2geotransform
from .. import unified_config as CFG
########################### core functions ####################################
class L1C_object(L1B_object):
......@@ -71,7 +71,7 @@ class L1C_object(L1B_object):
mapinfo2geotransform(self.meta['map info']),
self.meta['coordinate system string'], mask_1bit_temp, fillVal)[0]
if CFG.get_job().exec_mode == 'Flink' and subset[0]=='cube':
if CFG.job.exec_mode == 'Flink' and subset[0]=='cube':
self.lonlat_arr = lonlat_arr
else:
return {'desc': 'lonlat_arr', 'row_start': rS, 'row_end': rE, 'col_start': cS, 'col_end': cE,
......@@ -110,14 +110,14 @@ class L1C_object(L1B_object):
self.meta['AcqTime'], self.trueDataCornerPos,
self.trueDataCornerLonLat, self.meta['overpass duraction sec'],
self.logger, mask_1bit_temp, fillVal,
accurracy=CFG.get_job().SZA_SAA_calculation_accurracy,
accurracy=CFG.job.SZA_SAA_calculation_accurracy,
lonlat_arr=self.lonlat_arr
if CFG.get_job().SZA_SAA_calculation_accurracy == 'fine' else None)
if CFG.job.SZA_SAA_calculation_accurracy == 'fine' else None)
if 'IncidenceAngle_arrProv' in self.meta and self.meta['IncidenceAngle_arrProv']:
pass # FIXME Sentinel-2s viewing azimuth array is currently not used (instead a static VAA is used)
RAA_arr = GEOP.calc_RAA_array(self.trueDataCornerLonLat, SAA_arr, self.VAA_mean, mask_1bit_temp, fillVal)
if CFG.get_job().exec_mode == 'Flink' and subset is None:
if CFG.job.exec_mode == 'Flink' and subset is None:
self.VZA_arr, self.SZA_arr, self.SAA_arr, self.RAA_arr = VZA_arr, SZA_arr, SAA_arr, RAA_arr
else:
return ({'desc': 'VZA_arr', 'row_start': rS, 'row_end': rE, 'col_start': cS, 'col_end': cE, 'data': VZA_arr},
......
......@@ -11,25 +11,24 @@ import collections
import os
import time
import warnings
import builtins
import gdal
import numpy as np
import rasterio
from . import GEOPROCESSING as GEOP # FIXME import functions directly as soon as GEOPROCESSING is included in algorithms.__init__.__all__
from ..misc import path_generator as PG
from ..misc import helper_functions as HLP_F
from ..misc.logging import GMS_logger
from .L1C_P import L1C_object
from py_tools_ds.ptds.geo.map_info import mapinfo2geotransform, geotransform2mapinfo
from py_tools_ds.ptds.geo.projection import get_proj4info
from py_tools_ds.ptds.geo.coord_calc import corner_coord_to_minmax, get_corner_coordinates
from py_tools_ds.ptds.geo.coord_trafo import pixelToMapYX
from py_tools_ds.ptds.geo.map_info import mapinfo2geotransform, geotransform2mapinfo
from py_tools_ds.ptds.geo.projection import get_proj4info
from py_tools_ds.ptds.geo.raster.reproject import warp_ndarray
from py_tools_ds.ptds.numeric.vector import find_nearest
from .. import unified_config as CFG
from ..config import GMS_config as CFG
from . import GEOPROCESSING as GEOP # FIXME import functions directly as soon as GEOPROCESSING is included in algorithms.__init__.__all__
from .L1C_P import L1C_object
from ..misc import helper_functions as HLP_F
from ..misc import path_generator as PG
from ..misc.logging import GMS_logger
def get_DESHIFTER_configs(dicts_GMS_obj, attrnames2deshift, proc_bandwise=False, paramsFromUsecase=True, **kwargs):
......@@ -69,9 +68,9 @@ def get_DESHIFTER_configs(dicts_GMS_obj, attrnames2deshift, proc_bandwise=False,
# get general kwargs
gen_kwargs = collections.OrderedDict()
if paramsFromUsecase:
gen_kwargs.update({'align_grids':CFG.get_usecase().align_coord_grids})
gen_kwargs.update({'out_gsd' :CFG.get_usecase().target_gsd})
gen_kwargs.update({'match_gsd' :CFG.get_usecase().match_gsd})
gen_kwargs.update({'align_grids':CFG.usecase.align_coord_grids})
gen_kwargs.update({'out_gsd' :CFG.usecase.target_gsd})
gen_kwargs.update({'match_gsd' :CFG.usecase.match_gsd})
else:
[gen_kwargs.update({kw:kwargs.get(kw)}) for kw in ['align_grids','out_gsd','match_gsd'] if kw in kwargs]
[gen_kwargs.update({kw:kwargs.get(kw)}) for kw in ['no_resamp','cliptoextent'] if kw in kwargs]
......@@ -84,8 +83,8 @@ def get_DESHIFTER_configs(dicts_GMS_obj, attrnames2deshift, proc_bandwise=False,
if not obj['coreg_info']['reference geotransform']:
obj['coreg_info']['reference geotransform'] = mapinfo2geotransform(
obj['coreg_info']['original map info'])
obj['coreg_info']['reference geotransform'][1] = CFG.get_usecase().target_gsd[0]
obj['coreg_info']['reference geotransform'][5] = -abs(CFG.get_usecase().target_gsd[1])
obj['coreg_info']['reference geotransform'][1] = CFG.usecase.target_gsd[0]
obj['coreg_info']['reference geotransform'][5] = -abs(CFG.usecase.target_gsd[1])
item2add = [obj]
for attrname in attrnames2deshift:
......@@ -195,7 +194,7 @@ class DESHIFTER(object):
# set the rest
self.shift_xgsd = abs(self.shift_gt[1])
self.shift_ygsd = abs(self.shift_gt[5])
self.ref_xgsd = abs(self.ref_gt[1]) # falls ref_gt nach Coreg nicht verfügbar: get_deshift_configs überschreibt leere ref gt mit originaler shift-gt + gsd vom CFG.get_usecase() target grid
self.ref_xgsd = abs(self.ref_gt[1]) # falls ref_gt nach Coreg nicht verfügbar: get_deshift_configs überschreibt leere ref gt mit originaler shift-gt + gsd vom CFG.usecase target grid
self.ref_ygsd = abs(self.ref_gt[5])
if not self.deshift_needed:
......
......@@ -7,14 +7,13 @@
###############################################################################
__author__='Daniel Scheffler'
import builtins
import numpy as np
from scipy.interpolate import interp1d
from ..io import Input_reader as INP_R
from .L2A_P import L2A_object
from ..config import GMS_config as CFG
from .L2A_P import L2A_object
from ..io import Input_reader as INP_R
from .. import unified_config as CFG
class L2B_object(L2A_object):
def __init__(self, L2A_obj):
......@@ -24,7 +23,7 @@ class L2B_object(L2A_object):
def spectral_homogenization(self, subset=None, kind='linear'):
src_cwls = self.meta['wavelength']
tgt_cwls = CFG.get_usecase().target_CWL # FIXME exclude or include thermal bands; respect sorted CWLs in context of LayerBandsAssignment
tgt_cwls = CFG.usecase.target_CWL # FIXME exclude or include thermal bands; respect sorted CWLs in context of LayerBandsAssignment
if src_cwls!=tgt_cwls:
assert kind in ['linear',], "%s is not a supported kind of homogenization." %kind
self.log_for_fullArr_or_firstTile('Performing spectral homogenization (%s) with target wavelength '
......
......@@ -13,33 +13,33 @@
# **** IMPORTING LIBRARIES ****************************************************
from __future__ import (division, print_function, unicode_literals,absolute_import)
import os
import sys
import re
import numpy as np
import scipy.interpolate
import collections
import datetime
import math
import pyproj
import glob
import builtins
import math
import os
import re
import sys
import warnings
import xml.etree.ElementTree as ET