Commit 2e9a54ba authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Added some options to options_default.json. Bugfix...

Added some options to options_default.json. Bugfix database_tools.record_stats_memusage. Fixed missing lib within CI config.
parent 5c9644e0
......@@ -32,7 +32,7 @@ test_gms_preprocessing:
# - python setup.py install
# - cd ../../
# make tests
- pip install nested_dict openpyxl timeout_decorator python-redis-lock redis psutil # FIXME: remove as soon as runner is rebuilt
- pip install nested_dict openpyxl timeout_decorator redis redis-semaphore python-redis-lock psutil # FIXME: remove as soon as runner is rebuilt
- make nosetests
- make docs
artifacts:
......
......@@ -1419,7 +1419,7 @@ def archive_exists_on_fileserver(conn_DB, entityID):
return exists
def record_stats_memusage(conn_db, GMS_obj, min_version='0.13.16'):
def record_stats_memusage(conn_db, GMS_obj):
# type: (str, GMS_object) -> bool
vals2write_dict = dict(
creationtime=datetime.now(),
......@@ -1453,7 +1453,8 @@ def record_stats_memusage(conn_db, GMS_obj, min_version='0.13.16'):
sceneid=GMS_obj.scene_ID
)
# get all existing database records matching the respective config)
# get all existing database records matching the respective config
# NOTE: those columns that do not belong the config specification are ignored
vals2get = list(vals2write_dict.keys())
df_existing_recs = pd.DataFrame(
get_info_from_postgreSQLdb(conn_db, 'stats_mem_usage_homo',
......@@ -1461,12 +1462,13 @@ def record_stats_memusage(conn_db, GMS_obj, min_version='0.13.16'):
cond_dict={k: v for k, v in vals2write_dict.items()
if k not in ['creationtime', 'used_mem_l1a', 'used_mem_l1b',
'used_mem_l1c', 'used_mem_l2a', 'used_mem_l2b',
'used_mem_l2c', 'dims_x_l2a', 'dims_x_l2b']}),
'used_mem_l2c', 'dims_x_l2a', 'dims_y_l2b', 'sceneid']}),
columns=vals2get)
# filter the existing records by gms_preprocessing software version number (higher than min_version)
# filter the existing records by gms_preprocessing software version number
# (higher than CFG.min_version_mem_usage_stats)
vers = list(df_existing_recs.software_version)
vers_usable = [ver for ver in vers if parse_version(ver) >= parse_version(min_version)]
vers_usable = [ver for ver in vers if parse_version(ver) >= parse_version(CFG.min_version_mem_usage_stats)]
df_existing_recs_usable = df_existing_recs.loc[df_existing_recs.software_version.isin(vers_usable)]
# add memory stats to database
......
......@@ -91,11 +91,17 @@ class SharedResourceLock(MultiSlotLock):
class IOLock(SharedResourceLock):
def __init__(self, allowed_slots=1, logger=None, **kwargs):
self.disabled = CFG.disable_IO_locks
if not self.disabled:
super(IOLock, self).__init__(name='IOLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
class ProcessLock(SharedResourceLock):
def __init__(self, allowed_slots=1, logger=None, **kwargs):
self.disabled = CFG.disable_CPU_locks
if not self.disabled:
super(ProcessLock, self).__init__(name='ProcessLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
......@@ -105,7 +111,7 @@ class MemoryReserver(Semaphore):
:param reserved_mem: Amount of memory to be reserved during the lock is acquired (gigabytes).
"""
self.disabled = redis_conn is None
self.disabled = redis_conn is None or CFG.disable_memory_locks or mem2lock_gb in [None, False]
self.mem2lock_gb = mem2lock_gb
self.max_usage = max_usage
self._waiting = False
......
......@@ -2408,7 +2408,7 @@ def GMS_object_2_dataset_dict(GMS_obj):
])
def estimate_mem_usage(dataset_ID, satellite, min_version='0.13.16'):
def estimate_mem_usage(dataset_ID, satellite):
memcols = ['used_mem_l1a', 'used_mem_l1b', 'used_mem_l1c',
'used_mem_l2a', 'used_mem_l2b', 'used_mem_l2c']
......@@ -2441,10 +2441,9 @@ def estimate_mem_usage(dataset_ID, satellite, min_version='0.13.16'):
if not df.empty:
df['used_mem_max'] = df[memcols].max(axis=1)
# get records from gms_preprocessing versions higher than min_version
# get records from gms_preprocessing versions higher than CFG.min_version_mem_usage_stats
vers = list(df.software_version)
# vers.sort(key=lambda s: list(map(int, s.split('.'))))
vers_usable = [ver for ver in vers if parse_version(ver) >= parse_version(min_version)]
vers_usable = [ver for ver in vers if parse_version(ver) >= parse_version(CFG.min_version_mem_usage_stats)]
df_sub = df.loc[df.software_version.isin(vers_usable)]
......
......@@ -218,14 +218,26 @@ class JobConfig(object):
gp('CPUs', json_globts['CPUs'], fallback=multiprocessing.cpu_count())
self.CPUs_all_jobs = \
gp('CPUs_all_jobs', json_globts['CPUs_all_jobs'])
self.delete_old_output = \
gp('delete_old_output', json_globts['delete_old_output'])
self.max_mem_usage = \
gp('max_mem_usage', json_globts['max_mem_usage'])
self.critical_mem_usage = \
gp('critical_mem_usage', json_globts['critical_mem_usage'])
self.max_parallel_reads_writes = \
gp('max_parallel_reads_writes', json_globts['max_parallel_reads_writes'])
self.allow_subMultiprocessing = \
gp('allow_subMultiprocessing', json_globts['allow_subMultiprocessing'])
self.delete_old_output = \
gp('delete_old_output', json_globts['delete_old_output'])
self.disable_exception_handler = \
gp('disable_exception_handler', json_globts['disable_exception_handler'])
self.disable_IO_locks = \
gp('disable_IO_locks', json_globts['disable_IO_locks'])
self.disable_CPU_locks = \
gp('disable_CPU_locks', json_globts['disable_CPU_locks'])
self.disable_memory_locks = \
gp('disable_memory_locks', json_globts['disable_memory_locks'])
self.min_version_mem_usage_stats = \
gp('min_version_mem_usage_stats', json_globts['min_version_mem_usage_stats'])
self.log_level = \
gp('log_level', json_globts['log_level'])
self.tiling_block_size_XY = \
......
......@@ -12,7 +12,9 @@
NOTE: This may be set to avoid CPU/RAM overload in case multiple GMS jobs are
running on the same host (only usable in case the GMS is not executed via
Apache Flink).*/
"delete_old_output": false, /*whether to delete previously created output of the given job ID*/
"max_mem_usage": 90, /*maximum memory usage percentage
(only usable in case the GMS is not executed via Apache Flink)*/
"critical_mem_usage": 95, /*processes are blocked if memory usage percentage above this threshold (percent)*/
"max_parallel_reads_writes": 0, /*number of parallel disk read/write processes (integer).
0: no limit;
1: only 1 read/write process is allowed, all others are put into a queue.
......@@ -20,7 +22,19 @@
storage mount point. On a single server hosting its own HDD storage,
setting a limit might be useful to overcome IO bottlenecks.*/
"allow_subMultiprocessing": true, /*allow multiprocessing within multiprocessing workers*/
"delete_old_output": false, /*whether to delete previously created output of the given job ID*/
"disable_exception_handler": false, /*enable/disable automatic handling of unexpected exceptions*/
"disable_IO_locks": false, /*disable limiting of parallel disk read/write processes*/
"disable_CPU_locks": false, /*disable system-wide limiting of CPUs to be used for GeoMultiSens
(adjustable via 'CPUs_all_jobs')*/
"disable_memory_locks": false, /*disable reservation of a certain amount of memory per process
NOTE: disabling this might lead to RAM overload*/
"min_version_mem_usage_stats": "0.13.16", /*If 'disable_memory_locks' is False, GeoMultiSens reserves a
certain amount of memory per process. By setting this version,
you may specify a minimal software version of gms_preprocessing
below which all recorded memory usage statistics are ignored. This
may be useful after a software update, that changes the memory
usage per process.*/
"log_level": "INFO", /*the logging level to be used (choices: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL';*/
"tiling_block_size_XY": [
2048,
......
......@@ -9,10 +9,16 @@ gms_schema_input = dict(
spatial_index_server_port=dict(type='integer', required=False),
CPUs=dict(type='integer', required=False, nullable=True),
CPUs_all_jobs=dict(type='integer', required=False, nullable=True),
delete_old_output=dict(type='boolean', required=False),
max_mem_usage=dict(type='integer', required=False, min=0, max=100),
critical_mem_usage=dict(type='integer', required=False, min=0, max=100),
max_parallel_reads_writes=dict(type='integer', required=False, min=0),
allow_subMultiprocessing=dict(type='boolean', required=False),
delete_old_output=dict(type='boolean', required=False),
disable_exception_handler=dict(type='boolean', required=False),
disable_IO_locks=dict(type='boolean', required=False),
disable_CPU_locks=dict(type='boolean', required=False),
disable_memory_locks=dict(type='boolean', required=False),
min_version_mem_usage_stats=dict(type='string', required=False),
log_level=dict(type='string', required=False, allowed=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
tiling_block_size_XY=dict(type='list', required=False, schema=dict(type="integer"), minlength=2,
maxlength=2),
......
......@@ -27,7 +27,7 @@ def L1A_map(dataset_dict): # map (scene-wise parallelization)
# type: (dict) -> L1A_P.L1A_object
L1A_obj = L1A_P.L1A_object(**dataset_dict)
L1A_obj.block_at_system_overload(max_usage=95)
L1A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
L1A_obj.import_rasterdata()
L1A_obj.import_metadata()
L1A_obj.validate_GeoTransProj_GeoAlign() # sets self.GeoTransProj_ok and self.GeoAlign_ok
......@@ -55,7 +55,7 @@ def L1A_map_1(dataset_dict, block_size=None): # map (scene-wise parallelization
# type: (dict) -> List[L1A_P.L1A_object]
L1A_obj = L1A_P.L1A_object(**dataset_dict)
L1A_obj.block_at_system_overload(max_usage=95)
L1A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
L1A_obj.import_rasterdata()
L1A_obj.import_metadata()
L1A_obj.validate_GeoTransProj_GeoAlign() # sets self.GeoTransProj_ok and self.GeoAlign_ok
......@@ -102,7 +102,7 @@ def L1B_map(L1A_obj):
"""L1A_obj enthält in Python- (im Gegensatz zur inmem_serialization-) Implementierung KEINE ARRAY-DATEN!,
nur die für die ganze Szene gültigen Metadaten"""
L1A_obj.block_at_system_overload(max_usage=95)
L1A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
L1B_obj = L1B_P.L1B_object(L1A_obj)
L1B_obj.compute_global_shifts()
......@@ -124,7 +124,7 @@ def L1C_map(L1B_objs):
:param L1B_objs: list containing one or multiple L1B objects belonging to the same scene ID.
"""
list(L1B_objs)[0].block_at_system_overload(max_usage=95)
list(L1B_objs)[0].block_at_system_overload(max_usage=CFG.critical_mem_usage)
# initialize L1C objects
L1C_objs = [L1C_P.L1C_object(L1B_obj) for L1B_obj in L1B_objs]
......@@ -169,7 +169,7 @@ def L2A_map(L1C_objs, block_size=None, return_tiles=True):
:param return_tiles: return computed L2A object in tiles
:return: list of L2A_object tiles
"""
L1C_objs[0].block_at_system_overload(max_usage=95)
L1C_objs[0].block_at_system_overload(max_usage=CFG.critical_mem_usage)
# initialize L2A objects
L2A_objs = [L2A_P.L2A_object(L1C_obj) for L1C_obj in L1C_objs]
......@@ -215,7 +215,7 @@ def L2A_map(L1C_objs, block_size=None, return_tiles=True):
@update_proc_status
def L2B_map(L2A_obj):
# type: (L2A_P.L2A_object) -> L2B_P.L2B_object
L2A_obj.block_at_system_overload(max_usage=95)
L2A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
L2B_obj = L2B_P.L2B_object(L2A_obj)
L2B_obj.spectral_homogenization()
if CFG.exec_L2BP[1]:
......@@ -230,7 +230,7 @@ def L2B_map(L2A_obj):
@update_proc_status
def L2C_map(L2B_obj):
# type: (L2B_P.L2B_object) -> L2C_P.L2C_objec
L2B_obj.block_at_system_overload(max_usage=95)
L2B_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
L2C_obj = L2C_P.L2C_object(L2B_obj)
if CFG.exec_L2CP[1]:
L2C_MRGS_tiles = L2C_obj.to_MGRS_tiles(pixbuffer=CFG.mgrs_pixel_buffer)
......@@ -269,7 +269,7 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
'Limiting processes to %s in order to collect memory statistics first.' % cpulimit)
# start processing
with MemoryReserver(mem2lock_gb=mem2reserve, logger=pipeline_logger),\
with MemoryReserver(mem2lock_gb=mem2reserve, logger=pipeline_logger, max_usage=CFG.max_mem_usage),\
ProcessLock(allowed_slots=cpulimit, logger=pipeline_logger):
if len(list(set([ds['proc_level'] for ds in list_dataset_dicts_per_scene]))) != 1:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment