Commit 7f7db3a2 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Replaced dependency redis dependence with python-redis-lock (imports as redis_lock).

Added config parameter 'max_parallel_reads_writes' to limit number of read/writes or to enable/disable IO locks, respectively.
parent e502df9a
...@@ -176,7 +176,7 @@ class L1A_object(GMS_object): ...@@ -176,7 +176,7 @@ class L1A_object(GMS_object):
rasObj = GEOP.GEOPROCESSING(paths_files2stack[0], self.logger, subset=subset) rasObj = GEOP.GEOPROCESSING(paths_files2stack[0], self.logger, subset=subset)
# perform layer stack # perform layer stack
with MultiSlotLock('IO', allowed_threads=1, logger=self.logger): # FIXME hardcoded with MultiSlotLock('IO', allowed_threads=CFG.max_parallel_reads_writes, logger=self.logger):
if CFG.inmem_serialization and path_output is None: # numpy array output if CFG.inmem_serialization and path_output is None: # numpy array output
self.arr = rasObj.Layerstacking(paths_files2stack) self.arr = rasObj.Layerstacking(paths_files2stack)
self.path_InFilePreprocessor = paths_files2stack[0] self.path_InFilePreprocessor = paths_files2stack[0]
...@@ -197,7 +197,7 @@ class L1A_object(GMS_object): ...@@ -197,7 +197,7 @@ class L1A_object(GMS_object):
rasObj = GEOP.GEOPROCESSING(path_file2load, self.logger, subset=subset) rasObj = GEOP.GEOPROCESSING(path_file2load, self.logger, subset=subset)
# read a single file # read a single file
with MultiSlotLock('IO', allowed_threads=1, logger=self.logger): # FIXME hardcoded with MultiSlotLock('IO', allowed_threads=CFG.max_parallel_reads_writes, logger=self.logger):
if CFG.inmem_serialization and path_output is None: # numpy array output if CFG.inmem_serialization and path_output is None: # numpy array output
self.arr = gdalnumeric.LoadFile(path_file2load) if subset is None else \ self.arr = gdalnumeric.LoadFile(path_file2load) if subset is None else \
gdalnumeric.LoadFile(path_file2load, rasObj.colStart, rasObj.rowStart, rasObj.cols, rasObj.rows) gdalnumeric.LoadFile(path_file2load, rasObj.colStart, rasObj.rowStart, rasObj.cols, rasObj.rows)
......
...@@ -49,9 +49,11 @@ class GMSEnvironment(object): ...@@ -49,9 +49,11 @@ class GMSEnvironment(object):
os.environ['GMS_SPAT_IDX_SRV_STATUS'] = 'unavailable' os.environ['GMS_SPAT_IDX_SRV_STATUS'] = 'unavailable'
def _check_redis_server(self): def _check_redis_server(self):
if not redis_conn: if CFG.max_parallel_reads_writes > 0 and not redis_conn:
self.logger.warning("Unable to connect to redis server. Is the server installed and running? For " self.logger.warning("Unable to connect to redis server. Is the server installed and running? For "
"installation on Ubuntu, use 'sudo apt install redis-server'.") "installation on Ubuntu, use 'sudo apt install redis-server'. \n"
"NOTE: Without connection to redis server, any kind of locking (IO / RAM / database) "
"is disabled!")
def _check_nonpip_packages(self): def _check_nonpip_packages(self):
"""Check for not pip-installable packages.""" """Check for not pip-installable packages."""
......
...@@ -2,13 +2,12 @@ ...@@ -2,13 +2,12 @@
__author__ = 'Daniel Scheffler' __author__ = 'Daniel Scheffler'
import time import time
from redis import StrictRedis from redis_lock import StrictRedis, Lock
from redis.lock import Lock
import logging import logging
try: try:
redis_conn = StrictRedis(host='localhost') redis_conn = StrictRedis(host='localhost')
redis_conn.keys() # may raise ConnectionError redis_conn.keys() # may raise ConnectionError, e.g., if redis server is not installed or not running
except ConnectionError: except ConnectionError:
redis_conn = None redis_conn = None
...@@ -19,7 +18,7 @@ class MultiSlotLock(Lock): ...@@ -19,7 +18,7 @@ class MultiSlotLock(Lock):
self.allowed_threads = allowed_threads self.allowed_threads = allowed_threads
self.allowed_slot_names = ['%s, slot #%s' % (name, i) for i in range(1, allowed_threads + 1)] self.allowed_slot_names = ['%s, slot #%s' % (name, i) for i in range(1, allowed_threads + 1)]
if redis_conn: if allowed_threads != 0 and redis_conn:
if allowed_threads > 1: if allowed_threads > 1:
while True: while True:
name_free_slot = self.get_free_slot_name() name_free_slot = self.get_free_slot_name()
...@@ -46,14 +45,14 @@ class MultiSlotLock(Lock): ...@@ -46,14 +45,14 @@ class MultiSlotLock(Lock):
return free_slots[0] return free_slots[0]
def __enter__(self): def __enter__(self):
if self.conn: if self.allowed_threads != 0 and self.conn:
super().__enter__() super().__enter__()
self.logger.info("Acquired lock '%s'." % self.name) self.logger.info("Acquired lock '%s'." % self.name)
else: else:
pass pass
def __exit__(self, exc_type=None, exc_value=None, traceback=None): def __exit__(self, exc_type=None, exc_value=None, traceback=None):
if self.conn: if self.allowed_threads != 0 and self.conn:
super().__exit__(exc_type=exc_type, exc_value=exc_value, traceback=traceback) super().__exit__(exc_type=exc_type, exc_value=exc_value, traceback=traceback)
self.logger.info("Released lock '%s'." % self.name) self.logger.info("Released lock '%s'." % self.name)
else: else:
......
...@@ -1308,7 +1308,7 @@ class GMS_object(Dataset): ...@@ -1308,7 +1308,7 @@ class GMS_object(Dataset):
# loop through all attributes to write and execute writer # # loop through all attributes to write and execute writer #
########################################################### ###########################################################
with MultiSlotLock('IO', allowed_threads=1, logger=self.logger): with MultiSlotLock('IO', allowed_threads=CFG.max_parallel_reads_writes, logger=self.logger):
for arrayname in attributes2write: for arrayname in attributes2write:
descriptor = '%s_%s' % (image_type_dict[arrayname], self.proc_level) descriptor = '%s_%s' % (image_type_dict[arrayname], self.proc_level)
......
...@@ -18,6 +18,8 @@ from jsmin import jsmin ...@@ -18,6 +18,8 @@ from jsmin import jsmin
from cerberus import Validator from cerberus import Validator
import pkgutil import pkgutil
import logging import logging
import time
import psutil
from pprint import pformat from pprint import pformat
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
...@@ -209,6 +211,8 @@ class JobConfig(object): ...@@ -209,6 +211,8 @@ class JobConfig(object):
gp('CPUs', json_globts['CPUs'], fallback=multiprocessing.cpu_count()) gp('CPUs', json_globts['CPUs'], fallback=multiprocessing.cpu_count())
self.delete_old_output = \ self.delete_old_output = \
gp('delete_old_output', json_globts['delete_old_output']) gp('delete_old_output', json_globts['delete_old_output'])
self.max_parallel_reads_writes = \
gp('max_parallel_reads_writes', json_globts['max_parallel_reads_writes'])
self.allow_subMultiprocessing = \ self.allow_subMultiprocessing = \
gp('allow_subMultiprocessing', json_globts['allow_subMultiprocessing']) gp('allow_subMultiprocessing', json_globts['allow_subMultiprocessing'])
self.disable_exception_handler = \ self.disable_exception_handler = \
...@@ -433,6 +437,10 @@ class JobConfig(object): ...@@ -433,6 +437,10 @@ class JobConfig(object):
GMSValidator(allow_unknown=True, schema=gms_schema_config_output).validate(self.to_dict()) GMSValidator(allow_unknown=True, schema=gms_schema_config_output).validate(self.to_dict())
# check if parallel read/write processes have been limited on a storage mountpoint shared between multiple hosts
if self.max_parallel_reads_writes != 0:
self.check_no_read_write_limit_on_xtfs_mountpoint()
@property @property
def kwargs_defaults(self): def kwargs_defaults(self):
if not self._kwargs_defaults: if not self._kwargs_defaults:
...@@ -687,7 +695,25 @@ class JobConfig(object): ...@@ -687,7 +695,25 @@ class JobConfig(object):
raise ValueError('Execution mode must be provided as list of 3 elements containing only boolean ' raise ValueError('Execution mode must be provided as list of 3 elements containing only boolean '
'values. Got %s for %s.' % (exec_lvl, i)) 'values. Got %s for %s.' % (exec_lvl, i))
def check_no_read_write_limit_on_xtfs_mountpoint(self):
intensive_IO_paths = [self.path_fileserver, self.path_archive, self.path_benchmarks,
self.path_dem_proc_srtm_90m, self.path_ECMWF_db, self.path_procdata_MGRS,
self.path_procdata_scenes]
mount_points = {el.mountpoint: el for el in psutil.disk_partitions(all=True)}
for path in intensive_IO_paths:
for mp, mp_object in mount_points.items():
if path.startswith(mp) and mp_object.device.startswith('xtreemfs'):
warnings.warn("Path %s appears to be on an XtreemFS mountpoint. It is highly recommended to set "
"the configuration parameter 'max_parallel_reads_writes' to 0 in that case! "
"Otherwise read/write processes might be slowed down! Continuing in 20 seconds.."
% path)
time.sleep(20)
break
def to_dict(self): def to_dict(self):
"""Generate a dictionary in the same structure like the one in options_default.json from the current config."""
opts_default = get_options(path_options_default) opts_default = get_options(path_options_default)
# add all keys included in options_default.json # add all keys included in options_default.json
......
...@@ -9,6 +9,12 @@ ...@@ -9,6 +9,12 @@
"spatial_index_server_port": 8654, /*"port used for connecting to the spatial index mediator server"*/ "spatial_index_server_port": 8654, /*"port used for connecting to the spatial index mediator server"*/
"CPUs": "None", /*number of CPU cores to be used for processing (default: "None" -> use all available)*/ "CPUs": "None", /*number of CPU cores to be used for processing (default: "None" -> use all available)*/
"delete_old_output": false, /*whether to delete previously created output of the given job ID*/ "delete_old_output": false, /*whether to delete previously created output of the given job ID*/
"max_parallel_reads_writes": 0, /*number of parallel disk read/write processes (integer).
0: no limit;
1: only 1 read/write process is allowed, all others are put into a queue.
NOTE: Do NOT SET A LIMIT if GMS is executed on a cluster with a shared
storage mount point. On a single server hosting its own HDD storage,
setting a limit might be useful to overcome IO bottlenecks.*/
"allow_subMultiprocessing": true, /*allow multiprocessing within multiprocessing workers*/ "allow_subMultiprocessing": true, /*allow multiprocessing within multiprocessing workers*/
"disable_exception_handler": false, /*enable/disable automatic handling of unexpected exceptions*/ "disable_exception_handler": false, /*enable/disable automatic handling of unexpected exceptions*/
"log_level": "INFO", /*the logging level to be used (choices: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL';*/ "log_level": "INFO", /*the logging level to be used (choices: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL';*/
......
...@@ -9,6 +9,7 @@ gms_schema_input = dict( ...@@ -9,6 +9,7 @@ gms_schema_input = dict(
spatial_index_server_port=dict(type='integer', required=False), spatial_index_server_port=dict(type='integer', required=False),
CPUs=dict(type='integer', required=False, nullable=True), CPUs=dict(type='integer', required=False, nullable=True),
delete_old_output=dict(type='boolean', required=False), delete_old_output=dict(type='boolean', required=False),
max_parallel_reads_writes=dict(type='integer', required=False, min=0),
allow_subMultiprocessing=dict(type='boolean', required=False), allow_subMultiprocessing=dict(type='boolean', required=False),
disable_exception_handler=dict(type='boolean', required=False), disable_exception_handler=dict(type='boolean', required=False),
log_level=dict(type='string', required=False, allowed=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']), log_level=dict(type='string', required=False, allowed=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
......
...@@ -17,4 +17,4 @@ cerberus ...@@ -17,4 +17,4 @@ cerberus
nested_dict nested_dict
openpyxl openpyxl
timeout_decorator timeout_decorator
redis python-redis-lock
...@@ -15,7 +15,7 @@ requirements = [ ...@@ -15,7 +15,7 @@ requirements = [
'matplotlib', 'numpy', 'scikit-learn', 'scipy', 'gdal', 'pyproj', 'shapely', 'ephem', 'pyorbital', 'dill', 'pytz', 'matplotlib', 'numpy', 'scikit-learn', 'scipy', 'gdal', 'pyproj', 'shapely', 'ephem', 'pyorbital', 'dill', 'pytz',
'pandas', 'numba', 'spectral>=0.16', 'geopandas', 'iso8601', 'pyinstrument', 'geoalchemy2', 'sqlalchemy', 'pandas', 'numba', 'spectral>=0.16', 'geopandas', 'iso8601', 'pyinstrument', 'geoalchemy2', 'sqlalchemy',
'psycopg2', 'py_tools_ds>=0.12.4', 'geoarray>=0.7.1', 'arosics>=0.6.6', 'six', 'tqdm', 'jsmin', 'cerberus', 'psycopg2', 'py_tools_ds>=0.12.4', 'geoarray>=0.7.1', 'arosics>=0.6.6', 'six', 'tqdm', 'jsmin', 'cerberus',
'nested_dict', 'openpyxl', 'timeout_decorator', 'redis' 'nested_dict', 'openpyxl', 'timeout_decorator', 'python-redis-lock'
# spectral<0.16 has some problems with writing signed integer 8bit data # spectral<0.16 has some problems with writing signed integer 8bit data
# fmask # conda install -c conda-forge python-fmask # fmask # conda install -c conda-forge python-fmask
# 'pyhdf', # conda install --yes -c conda-forge pyhdf # 'pyhdf', # conda install --yes -c conda-forge pyhdf
......
...@@ -74,7 +74,7 @@ dependencies: ...@@ -74,7 +74,7 @@ dependencies:
- nested_dict - nested_dict
- openpyxl - openpyxl
- timeout_decorator - timeout_decorator
- redis - python-redis-lock
- py_tools_ds>=0.12.4 - py_tools_ds>=0.12.4
- geoarray>=0.7.0 - geoarray>=0.7.0
- arosics>=0.6.6 - arosics>=0.6.6
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment