Commit cd4c82d2 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

revised config.Job

config:
- Job: now accepts keyword arguments for most of the available options

updated __version__
parent d7f01df7
...@@ -15,7 +15,7 @@ from . import config ...@@ -15,7 +15,7 @@ from . import config
from .processing.process_controller import process_controller from .processing.process_controller import process_controller
__version__ = '20170407.01' __version__ = '20170410.01'
__author__ = 'Daniel Scheffler' __author__ = 'Daniel Scheffler'
__all__ = ['algorithms', __all__ = ['algorithms',
'io', 'io',
......
...@@ -602,7 +602,7 @@ class METADATA(object): ...@@ -602,7 +602,7 @@ class METADATA(object):
result = DB_T.get_info_from_SQLdb(CFG.job.path_db_meta,tablename,['sceneID','sensor'], result = DB_T.get_info_from_SQLdb(CFG.job.path_db_meta,tablename,['sceneID','sensor'],
{'acquisitionDate':self.AcqDate,'path':self.WRS_path,'row':self.WRS_row}, records2fetch = 1) {'acquisitionDate':self.AcqDate,'path':self.WRS_path,'row':self.WRS_row}, records2fetch = 1)
else: else:
result= DB_T.get_info_from_postgreSQLdb(CFG.job.conn_db_meta,'scenes',['entityID'],{'id':self.SceneID}) result= DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database,'scenes',['entityID'],{'id':self.SceneID})
if len(result) == 1: # e.g. [('LE71950282003121EDC00',)] if len(result) == 1: # e.g. [('LE71950282003121EDC00',)]
self.EntityID = result[0][0] self.EntityID = result[0][0]
...@@ -1176,7 +1176,7 @@ class METADATA(object): ...@@ -1176,7 +1176,7 @@ class METADATA(object):
# self.default_attr() # self.default_attr()
assert self.SceneID is not None and self.SceneID!=-9999, "Read_Sentinel2A_xmls(): Missing scene ID. " assert self.SceneID is not None and self.SceneID!=-9999, "Read_Sentinel2A_xmls(): Missing scene ID. "
res = DB_T.get_info_from_postgreSQLdb(CFG.job.conn_db_meta,'scenes',['entityid'],{'id':self.SceneID}) res = DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database,'scenes',['entityid'],{'id':self.SceneID})
assert len(res) != 0, \ assert len(res) != 0, \
"Invalid SceneID given - no corresponding scene with the ID=%s found in database.\n" % self.SceneID "Invalid SceneID given - no corresponding scene with the ID=%s found in database.\n" % self.SceneID
assert len(res) == 1, "Error in database. The sceneid %s exists more than once. \n" % self.SceneID assert len(res) == 1, "Error in database. The sceneid %s exists more than once. \n" % self.SceneID
......
...@@ -1368,7 +1368,7 @@ class GMS_object(object): ...@@ -1368,7 +1368,7 @@ class GMS_object(object):
self.logger.info('Cutting scene %s (entity ID %s) into MGRS tiles...' % (self.scene_ID, self.entity_ID)) self.logger.info('Cutting scene %s (entity ID %s) into MGRS tiles...' % (self.scene_ID, self.entity_ID))
# get GeoDataFrame containing all overlapping MGRS tiles (MGRS geometries completely within nodata area are excluded) # get GeoDataFrame containing all overlapping MGRS tiles (MGRS geometries completely within nodata area are excluded)
GDF_MGRS_tiles = DB_T.get_overlapping_MGRS_tiles(CFG.job.conn_db_meta, GDF_MGRS_tiles = DB_T.get_overlapping_MGRS_tiles(CFG.job.conn_database,
tgt_corners_lonlat=self.trueDataCornerLonLat) tgt_corners_lonlat=self.trueDataCornerLonLat)
# calculate image coordinate bounds of the full GMS object for each MGRS tile within the GeoDataFrame # calculate image coordinate bounds of the full GMS object for each MGRS tile within the GeoDataFrame
......
...@@ -59,52 +59,83 @@ GMS_config = GMS_configuration() ...@@ -59,52 +59,83 @@ GMS_config = GMS_configuration()
class Job: class Job:
def __init__(self, call_type, ID, exec_mode='Python', db_host='localhost'): def __init__(self, call_type, ID, exec_mode='Python', db_host='localhost', exec_L1AP=None, exec_L1BP=None,
absP, joinP = lambda r: os.path.join(os.path.dirname(__file__), os.path.relpath(r)), lambda *x: os.path.join(*x) exec_L1CP=None, exec_L2AP=None, exec_L2BP=None, exec_L2CP=None, CPUs=None, sub_multiProc=True,
exc_handler=True, blocksize=(2048,2048), profiling=False, bench_all=False, bench_cloudMask=False):
"""Create a job configuration
:param call_type: 'console' or 'webapp'
:param ID: job ID of the job to be executed, e.g. 123456 (must be present in database)
:param exec_mode: 'Python': writes intermediate results to disk in order to save memory
'Flink': keeps intermediate results in memory in order to save IO time
:param db_host: host name of the server that runs the postgreSQL database
:param exec_L1AP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L1BP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L1CP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L2AP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L2BP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L2CP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param CPUs: number of CPU cores to be used for processing (default: None -> use all available)
:param sub_multiProc: allow multiprocessing within workers
:param exc_handler: enable/disable automatic handling of unexpected exceptions (default: True -> enabled)
:param blocksize: X/Y block size to be used for any tiling process (default: (2048,2048)
:param profiling: enable/disable code profiling (default: False)
:param bench_all: enable/disable benchmark of the whole processing pipeline
:param bench_cloudMask: enable/disable benchmark of the of the cloud mask generator module
"""
## args
self.ID = ID self.ID = ID
self.call_type = call_type self.call_type = call_type
self.exec_mode = exec_mode self.exec_mode = exec_mode
assert exec_mode in ['Flink','Python'] assert exec_mode in ['Flink','Python']
self.db_host = db_host
assert isinstance(db_host, str), "'db_host must be a string! Got %s." %type(db_host) assert isinstance(db_host, str), "'db_host must be a string! Got %s." %type(db_host)
## kwargs
# processor configuration: [run processor, write output, delete output if not needed anymore]
self.exec__L1AP = [1, 1, 1] if not exec_L1AP else exec_L1AP
self.exec__L1BP = [1, 1, 1] if not exec_L1BP else exec_L1BP
self.exec__L1CP = [1, 1, 1] if not exec_L1CP else exec_L1CP
self.exec__L2AP = [1, 1, 1] if not exec_L2AP else exec_L2AP
self.exec__L2BP = [1, 1, 0] if not exec_L2BP else exec_L2BP
self.exec__L2CP = [1, 1, 0] if not exec_L2CP else exec_L2CP
self.validate_exec_configs()
self.CPUs = CPUs if CPUs else multiprocessing.cpu_count()
self.allow_subMultiprocessing = sub_multiProc
self.disable_exception_handler = exc_handler is False
self.log_level = 'INFO' # TODO implement log level
self.tiling_block_size_XY = blocksize
self.profiling = profiling
self.benchmark_global = bench_all
self.bench_CLD_class = bench_cloudMask
self.SZA_SAA_calculation_accurracy = 'coarse' # hardcoded
self.export_VZA_SZA_SAA_RAA_stats = True # hardcoded
self.export_L1C_obj_dumps = False # hardcoded
## fixed attributes
self.status = 'pending' # possible values: 'pending', 'running', 'canceled', 'failed', 'finished_with_warnings', 'finished_with_errors', 'finished' self.status = 'pending' # possible values: 'pending', 'running', 'canceled', 'failed', 'finished_with_warnings', 'finished_with_errors', 'finished'
self.start_time = datetime.datetime.now() self.start_time = datetime.datetime.now()
self.end_time = None self.end_time = None
self.computation_time = None self.computation_time = None
self.hostname = socket.gethostname() self.hostname = socket.gethostname()
self.CPUs = multiprocessing.cpu_count() self._DB_job_record = None
#self.CPUs = 1
self.allow_subMultiprocessing = True # allows multiprocessing within workers ## set all the pathes
self.disable_exception_handler = False # disables automatic handling of unexpected exceptions absP, joinP = lambda r: os.path.join(os.path.dirname(__file__), os.path.relpath(r)), lambda *x: os.path.join(*x)
self.profiling = False
# TODO add log level self.path_earthSunDist = absP('./database/earth_sun_distance/Earth_Sun_distances_per_day_edited.csv')
self.path_SRFs = absP('./database/srf/')
self.benchmark_global = False
# bench_CLD_class = True
self.bench_CLD_class = False
self.tiling_block_size_XY = (2048,2048)
self.SZA_SAA_calculation_accurracy = 'coarse'
self.export_VZA_SZA_SAA_RAA_stats = True
self.export_L1C_obj_dumps = False
self.path_earthSunDist = absP('./database/earth_sun_distance/Earth_Sun_distances_per_day_edited.csv')
self.path_SRFs = absP('./database/srf/')
self.path_cloud_classif = absP('./database/cloud_classifier/') self.path_cloud_classif = absP('./database/cloud_classifier/')
self.path_solar_irr = absP('./database/solar_irradiance/SUNp1fontenla__350-2500nm_@0.1nm_converted.txt') self.path_solar_irr = absP('./database/solar_irradiance/SUNp1fontenla__350-2500nm_@0.1nm_converted.txt')
self.path_testing = absP('./testing/') self.path_testing = absP('./testing/')
self.path_benchmarks = absP('./benchmarks/') self.path_benchmarks = absP('./benchmarks/')
self.path_job_logs = absP('./logs/job_logs/') self.path_job_logs = absP('./logs/job_logs/')
# processor configuration: [run processor, write output, delete output if not needed anymore]
self.exec__L1AP = [1, 1, 1] if self.call_type == 'console':
self.exec__L1BP = [1, 1, 1]
self.exec__L1CP = [1, 1, 1]
self.exec__L2AP = [1, 1, 1]
self.exec__L2BP = [1, 1, 0]
self.exec__L2CP = [1, 1, 0]
if call_type == 'console':
"""path_fileserver is to be replaced by Fileserver URL""" """path_fileserver is to be replaced by Fileserver URL"""
self.path_fileserver = '/misc/gms2/scheffler/GeoMultiSens/' if self.hostname != 'geoms' else absP('./') self.path_fileserver = '/misc/gms2/scheffler/GeoMultiSens/' if self.hostname != 'geoms' else absP('./')
self.path_tempdir = '/dev/shm/GeoMultiSens/' self.path_tempdir = '/dev/shm/GeoMultiSens/'
...@@ -122,21 +153,11 @@ class Job: ...@@ -122,21 +153,11 @@ class Job:
# path_archive = absP('./database/sampledata/') # path_archive = absP('./database/sampledata/')
# path_archive = '/srv/gms2/scheffler/GeoMultiSens/database/sampledata/' # path_archive = '/srv/gms2/scheffler/GeoMultiSens/database/sampledata/'
self.path_archive = joinP(self.path_fileserver, 'database/sampledata/') self.path_archive = joinP(self.path_fileserver, 'database/sampledata/')
elif call_type == 'webapp':
self.conn_database = "dbname='geomultisens' user='gmsdb' password='gmsdb' host='%s' connect_timeout=3"\
%db_host
# check if job ID exists in database elif self.call_type == 'webapp':
from .misc.database_tools import GMS_JOB
try:
self.DB_job_record = GMS_JOB(self.conn_database).from_job_ID(ID)
except ValueError:
raise
self.conn_db_meta = self.conn_database
from .misc.database_tools import get_info_from_postgreSQLdb from .misc.database_tools import get_info_from_postgreSQLdb
query_cfg = lambda key: \ query_cfg = lambda key: \
get_info_from_postgreSQLdb(self.conn_db_meta, 'config', ['value'], {'key': "%s" % key})[0][0] get_info_from_postgreSQLdb(self.conn_database, 'config', ['value'], {'key': "%s" % key})[0][0]
self.path_fileserver = query_cfg('path_data_root') self.path_fileserver = query_cfg('path_data_root')
self.path_tempdir = query_cfg('path_tempdir') self.path_tempdir = query_cfg('path_tempdir')
self.path_procdata_scenes = joinP(self.path_fileserver, query_cfg('foldername_procdata_scenes')) self.path_procdata_scenes = joinP(self.path_fileserver, query_cfg('foldername_procdata_scenes'))
...@@ -159,16 +180,46 @@ class Job: ...@@ -159,16 +180,46 @@ class Job:
self.java_commands["keyword"] = query_cfg('command_keyword') self.java_commands["keyword"] = query_cfg('command_keyword')
self.java_commands["value_download"] = query_cfg('command_value_download') self.java_commands["value_download"] = query_cfg('command_value_download')
assert os.path.isdir(self.path_archive), "Given archive folder '%s' does not exist. Execution stopped" % self.path_archive assert os.path.isdir(self.path_archive), \
if not os.path.isdir(self.path_job_logs): os.makedirs(self.path_job_logs) "Given archive folder '%s' does not exist. Execution stopped" % self.path_archive
if not os.path.isdir(self.path_job_logs):
os.makedirs(self.path_job_logs)
@property
def conn_database(self):
return "dbname='geomultisens' user='gmsdb' password='gmsdb' host='%s' connect_timeout=3" %self.db_host \
if self.call_type=='webapp' else ''
@property
def DB_job_record(self):
if not self._DB_job_record:
# check if job ID exists in database
from .misc.database_tools import GMS_JOB
try:
self._DB_job_record = GMS_JOB(self.conn_database).from_job_ID(self.ID)
except ValueError:
raise
return self._DB_job_record
def validate_exec_configs(self):
for i in ['L1AP','L1BP','L1CP','L2AP','L2BP','L2CP']:
exec_lvl = getattr(self, 'exec__%s' % i)
# check input format
if not all([len(exec_lvl)==3, (np.array(exec_lvl) == np.array(np.array(exec_lvl, np.bool), np.int)).all()]):
raise ValueError('Execution mode must be provided as list of 3 elements containing only boolean '
'values. Got %s for %s.' %(exec_lvl, i))
if exec_mode == 'Python': # written output cannot be turned off in execution mode 'Python'
for i in ['L1AP','L1BP','L1CP','L2AP','L2BP','L2CP']: if self.exec_mode == 'Python' and exec_lvl[1] == 0:
exec_lvl = getattr(self, 'exec__%s' % i) warnings.warn("If job.exec_mode is set to 'Python' the output writer for %s has to be enabled "
if exec_lvl[1] == 0: "because any operations on GMS_obj.arr read the intermediate results from disk. "
warnings.warn("If job.exec_mode is set to 'Python' the output writer for %s has to be enabled because " "Turning it on.." %i)
"any operations on GMS_obj.arr read the intermediate results from disk. Turning it on.." %i) exec_lvl[1] = 1
exec_lvl[1] = 1
...@@ -178,9 +229,9 @@ class Usecase: ...@@ -178,9 +229,9 @@ class Usecase:
from .misc.database_tools import get_info_from_postgreSQLdb from .misc.database_tools import get_info_from_postgreSQLdb
query_cfg = lambda key: \ query_cfg = lambda key: \
get_info_from_postgreSQLdb(_job.conn_db_meta, 'config', ['value'], {'key': "%s" % key})[0][0] get_info_from_postgreSQLdb(_job.conn_database, 'config', ['value'], {'key': "%s" % key})[0][0]
query_vir = lambda col, VSID: \ query_vir = lambda col, VSID: \
get_info_from_postgreSQLdb(_job.conn_db_meta, 'virtual_sensors', col, {'id': VSID})[0][0] get_info_from_postgreSQLdb(_job.conn_database, 'virtual_sensors', col, {'id': VSID})[0][0]
if _job.call_type == 'console': if _job.call_type == 'console':
self.filt_coord = [None, None, None, None] self.filt_coord = [None, None, None, None]
...@@ -212,8 +263,8 @@ class Usecase: ...@@ -212,8 +263,8 @@ class Usecase:
self.data_list = self.get_entity_IDs_within_AOI() self.data_list = self.get_entity_IDs_within_AOI()
elif _job.call_type == 'webapp': elif _job.call_type == 'webapp':
query_job = lambda col: get_info_from_postgreSQLdb(_job.conn_db_meta,'jobs',col,{'id':_job.ID})[0][0] query_job = lambda col: get_info_from_postgreSQLdb(_job.conn_database,'jobs',col,{'id':_job.ID})[0][0]
#skip_thermal = int(query_cfg(_job.conn_db_meta, 'skip_thermal')) #skip_thermal = int(query_cfg(_job.conn_database, 'skip_thermal'))
self.skip_thermal = True self.skip_thermal = True
self.skip_pan = int(query_cfg('skip_pan')) self.skip_pan = int(query_cfg('skip_pan'))
self.sort_bands_by_cwl = int(query_cfg('sort_bands_by_cwl')) self.sort_bands_by_cwl = int(query_cfg('sort_bands_by_cwl'))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment