Commit 8c6f55bd authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Implemented process locks to avoid CPU/RAM overload on case multiple GMS jobs...

Implemented process locks to avoid CPU/RAM overload on case multiple GMS jobs are running on the same host.
parent ad0849e5
...@@ -176,7 +176,7 @@ class L1A_object(GMS_object): ...@@ -176,7 +176,7 @@ class L1A_object(GMS_object):
rasObj = GEOP.GEOPROCESSING(paths_files2stack[0], self.logger, subset=subset) rasObj = GEOP.GEOPROCESSING(paths_files2stack[0], self.logger, subset=subset)
# perform layer stack # perform layer stack
with MultiSlotLock('IO', allowed_threads=CFG.max_parallel_reads_writes, logger=self.logger): with MultiSlotLock('IOLock', allowed_threads=CFG.max_parallel_reads_writes, logger=self.logger):
if CFG.inmem_serialization and path_output is None: # numpy array output if CFG.inmem_serialization and path_output is None: # numpy array output
self.arr = rasObj.Layerstacking(paths_files2stack) self.arr = rasObj.Layerstacking(paths_files2stack)
self.path_InFilePreprocessor = paths_files2stack[0] self.path_InFilePreprocessor = paths_files2stack[0]
...@@ -197,7 +197,7 @@ class L1A_object(GMS_object): ...@@ -197,7 +197,7 @@ class L1A_object(GMS_object):
rasObj = GEOP.GEOPROCESSING(path_file2load, self.logger, subset=subset) rasObj = GEOP.GEOPROCESSING(path_file2load, self.logger, subset=subset)
# read a single file # read a single file
with MultiSlotLock('IO', allowed_threads=CFG.max_parallel_reads_writes, logger=self.logger): with MultiSlotLock('IOLock', allowed_threads=CFG.max_parallel_reads_writes, logger=self.logger):
if CFG.inmem_serialization and path_output is None: # numpy array output if CFG.inmem_serialization and path_output is None: # numpy array output
self.arr = gdalnumeric.LoadFile(path_file2load) if subset is None else \ self.arr = gdalnumeric.LoadFile(path_file2load) if subset is None else \
gdalnumeric.LoadFile(path_file2load, rasObj.colStart, rasObj.rowStart, rasObj.cols, rasObj.rows) gdalnumeric.LoadFile(path_file2load, rasObj.colStart, rasObj.rowStart, rasObj.cols, rasObj.rows)
......
...@@ -2,11 +2,16 @@ ...@@ -2,11 +2,16 @@
__author__ = 'Daniel Scheffler' __author__ = 'Daniel Scheffler'
import time import time
from redis_lock import StrictRedis, Lock from redis_lock import StrictRedis, Lock, NotAcquired
import logging import logging
import functools
from ..misc.logging import GMS_logger
from ..options.config import GMS_config as CFG
try: try:
redis_conn = StrictRedis(host='localhost') redis_conn = StrictRedis(host='localhost', db=0)
redis_conn.keys() # may raise ConnectionError, e.g., if redis server is not installed or not running redis_conn.keys() # may raise ConnectionError, e.g., if redis server is not installed or not running
except ConnectionError: except ConnectionError:
redis_conn = None redis_conn = None
...@@ -16,31 +21,36 @@ class MultiSlotLock(Lock): ...@@ -16,31 +21,36 @@ class MultiSlotLock(Lock):
def __init__(self, name, allowed_threads=1, logger=None, **kwargs): def __init__(self, name, allowed_threads=1, logger=None, **kwargs):
self.conn = redis_conn self.conn = redis_conn
self.allowed_threads = allowed_threads self.allowed_threads = allowed_threads
self.allowed_slot_names = ['%s, slot #%s' % (name, i) for i in range(1, allowed_threads + 1)] self.logger = logger or GMS_logger("RedisLock: '%s'" % name)
self.allowed_slot_names = ['GMS_%s__%s, slot #%s' % (CFG.ID, name, i) for i in range(1, allowed_threads + 1)]
if allowed_threads != 0 and redis_conn: if allowed_threads != 0 and redis_conn:
if allowed_threads > 1: logged = False
while True: while True:
name_free_slot = self.get_free_slot_name() name_free_slot = self.get_free_slot_name()
if not name_free_slot: if not name_free_slot:
time.sleep(0.2) time.sleep(0.2)
else:
break
name = name_free_slot if not logged:
self.logger.info("Waiting for free '%s' lock." % name)
logged = True
else:
break
name = name_free_slot
super().__init__(self.conn, name, **kwargs) super().__init__(self.conn, name, **kwargs)
else: else:
pass pass
self.name = name self.name = name
self.logger = logger or logging.getLogger("RedisLock: '%s'" % name)
def get_existing_locks(self): @property
def existing_locks(self):
return [i.decode('utf8').split('lock:')[1] for i in self.conn.keys()] return [i.decode('utf8').split('lock:')[1] for i in self.conn.keys()]
def get_free_slot_name(self): def get_free_slot_name(self):
free_slots = [sn for sn in self.allowed_slot_names if sn not in self.get_existing_locks()] free_slots = [sn for sn in self.allowed_slot_names if sn not in self.existing_locks]
if free_slots: if free_slots:
return free_slots[0] return free_slots[0]
...@@ -57,3 +67,50 @@ class MultiSlotLock(Lock): ...@@ -57,3 +67,50 @@ class MultiSlotLock(Lock):
self.logger.info("Released lock '%s'." % self.name) self.logger.info("Released lock '%s'." % self.name)
else: else:
pass pass
class ProcessLock(MultiSlotLock):
def __init__(self, processes=1, logger=None, **kwargs):
super(ProcessLock, self).__init__(name='ProcessLock', allowed_threads=processes, logger=logger, **kwargs)
def __enter__(self):
super(ProcessLock, self).__enter__()
def __exit__(self, exc_type=None, exc_value=None, traceback=None):
super(ProcessLock, self).__exit__(exc_type=exc_type, exc_value=exc_value, traceback=traceback)
def acquire_process_lock(processes=None, logger=None):
if not logger:
logger = logging.getLogger('ProcessLock')
logger.setLevel('INFO')
def decorator(func):
@functools.wraps(func) # needed to avoid pickling errors
def wrapped_func(*args, **kwargs):
with ProcessLock(allowed_threads=processes, logger=logger):
result = func(*args, **kwargs)
return result
return wrapped_func
return decorator
def release_unclosed_locks(logger=None):
if redis_conn:
logger = logger or GMS_logger('LockReseter')
locks2release = [i.split(b'lock:')[1].decode('latin') for i in redis_conn.keys()
if i.decode('latin').startswith('lock:GMS_%s__' % CFG.ID)]
if locks2release:
logger.info("Releasing unclosed locks of job %s." % CFG.ID)
for lockN in locks2release:
lock = Lock(redis_conn, lockN)
try:
lock.release()
except NotAcquired:
lock.reset()
...@@ -1308,7 +1308,7 @@ class GMS_object(Dataset): ...@@ -1308,7 +1308,7 @@ class GMS_object(Dataset):
# loop through all attributes to write and execute writer # # loop through all attributes to write and execute writer #
########################################################### ###########################################################
with MultiSlotLock('IO', allowed_threads=CFG.max_parallel_reads_writes, logger=self.logger): with MultiSlotLock('IOLock', allowed_threads=CFG.max_parallel_reads_writes, logger=self.logger):
for arrayname in attributes2write: for arrayname in attributes2write:
descriptor = '%s_%s' % (image_type_dict[arrayname], self.proc_level) descriptor = '%s_%s' % (image_type_dict[arrayname], self.proc_level)
......
...@@ -126,6 +126,10 @@ def set_config(job_ID, json_config='', inmem_serialization=False, parallelizatio ...@@ -126,6 +126,10 @@ def set_config(job_ID, json_config='', inmem_serialization=False, parallelizatio
GMSEnv.ensure_properly_activated_GDAL() GMSEnv.ensure_properly_activated_GDAL()
GMSEnv.check_ecmwf_api_creds() GMSEnv.check_ecmwf_api_creds()
# close unclosed locks from previous runs
from ..misc.locks import release_unclosed_locks
release_unclosed_locks(logger)
builtins.GMS_EnvOK = True builtins.GMS_EnvOK = True
return getattr(builtins, 'GMS_JobConfig') return getattr(builtins, 'GMS_JobConfig')
...@@ -209,6 +213,8 @@ class JobConfig(object): ...@@ -209,6 +213,8 @@ class JobConfig(object):
gp('spatial_index_server_port', json_globts['spatial_index_server_port']) gp('spatial_index_server_port', json_globts['spatial_index_server_port'])
self.CPUs = \ self.CPUs = \
gp('CPUs', json_globts['CPUs'], fallback=multiprocessing.cpu_count()) gp('CPUs', json_globts['CPUs'], fallback=multiprocessing.cpu_count())
self.CPUs_all_jobs = \
gp('CPUs_all_jobs', json_globts['CPUs_all_jobs'], fallback=multiprocessing.cpu_count())
self.delete_old_output = \ self.delete_old_output = \
gp('delete_old_output', json_globts['delete_old_output']) gp('delete_old_output', json_globts['delete_old_output'])
self.max_parallel_reads_writes = \ self.max_parallel_reads_writes = \
......
...@@ -8,6 +8,9 @@ ...@@ -8,6 +8,9 @@
CLI frontend or to the set_config function directly!*/ CLI frontend or to the set_config function directly!*/
"spatial_index_server_port": 8654, /*"port used for connecting to the spatial index mediator server"*/ "spatial_index_server_port": 8654, /*"port used for connecting to the spatial index mediator server"*/
"CPUs": "None", /*number of CPU cores to be used for processing (default: "None" -> use all available)*/ "CPUs": "None", /*number of CPU cores to be used for processing (default: "None" -> use all available)*/
"CPUs_all_jobs": "None", /*total number of CPU cores to be used for all GMS jobs running on a single host
NOTE: This may be set to CPU load in case multiple GMS jobs are running on the same
host.*/
"delete_old_output": false, /*whether to delete previously created output of the given job ID*/ "delete_old_output": false, /*whether to delete previously created output of the given job ID*/
"max_parallel_reads_writes": 0, /*number of parallel disk read/write processes (integer). "max_parallel_reads_writes": 0, /*number of parallel disk read/write processes (integer).
0: no limit; 0: no limit;
......
...@@ -8,6 +8,7 @@ gms_schema_input = dict( ...@@ -8,6 +8,7 @@ gms_schema_input = dict(
spatial_index_server_host=dict(type='string', required=False), spatial_index_server_host=dict(type='string', required=False),
spatial_index_server_port=dict(type='integer', required=False), spatial_index_server_port=dict(type='integer', required=False),
CPUs=dict(type='integer', required=False, nullable=True), CPUs=dict(type='integer', required=False, nullable=True),
CPUs_all_jobs=dict(type='integer', required=False, nullable=True),
delete_old_output=dict(type='boolean', required=False), delete_old_output=dict(type='boolean', required=False),
max_parallel_reads_writes=dict(type='integer', required=False, min=0), max_parallel_reads_writes=dict(type='integer', required=False, min=0),
allow_subMultiprocessing=dict(type='boolean', required=False), allow_subMultiprocessing=dict(type='boolean', required=False),
......
...@@ -5,6 +5,7 @@ from typing import List, Tuple, Generator, Iterable, Union # noqa F401 # flake ...@@ -5,6 +5,7 @@ from typing import List, Tuple, Generator, Iterable, Union # noqa F401 # flake
from ..options.config import GMS_config as CFG from ..options.config import GMS_config as CFG
from ..misc import exception_handler as EXC_H from ..misc import exception_handler as EXC_H
from ..misc.path_generator import path_generator from ..misc.path_generator import path_generator
from ..misc.locks import MultiSlotLock
from ..algorithms import L1A_P from ..algorithms import L1A_P
from ..algorithms import L1B_P from ..algorithms import L1B_P
from ..algorithms import L1C_P from ..algorithms import L1C_P
...@@ -222,109 +223,112 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise ...@@ -222,109 +223,112 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
:param list_dataset_dicts_per_scene: :param list_dataset_dicts_per_scene:
:return: :return:
""" """
if len(list(set([ds['proc_level'] for ds in list_dataset_dicts_per_scene]))) != 1: with MultiSlotLock('ProcessLock', allowed_threads=CFG.CPUs_all_jobs):
raise ValueError('Lists of subsystem datasets with different processing levels are not supported here. ' if len(list(set([ds['proc_level'] for ds in list_dataset_dicts_per_scene]))) != 1:
'Received %s.' % list_dataset_dicts_per_scene) raise ValueError('Lists of subsystem datasets with different processing levels are not supported here. '
'Received %s.' % list_dataset_dicts_per_scene)
input_proc_level = list_dataset_dicts_per_scene[0]['proc_level'] input_proc_level = list_dataset_dicts_per_scene[0]['proc_level']
################## ##################
# L1A processing # # L1A processing #
################## ##################
L1A_objects = [] L1A_objects = []
if CFG.exec_L1AP[0] and input_proc_level is None: if CFG.exec_L1AP[0] and input_proc_level is None:
L1A_objects = [L1A_map(subsystem_dataset_dict) for subsystem_dataset_dict in list_dataset_dicts_per_scene] L1A_objects = [L1A_map(subsystem_dataset_dict) for subsystem_dataset_dict in list_dataset_dicts_per_scene]
if any([isinstance(obj, failed_GMS_object) for obj in L1A_objects]): if any([isinstance(obj, failed_GMS_object) for obj in L1A_objects]):
return L1A_objects return L1A_objects
################## ##################
# L1B processing # # L1B processing #
################## ##################
# select subsystem with optimal band for co-registration # select subsystem with optimal band for co-registration
# L1B_obj_coreg = L1B_map(L1A_objects[0]) # L1B_obj_coreg = L1B_map(L1A_objects[0])
# copy coreg information to remaining subsets # copy coreg information to remaining subsets
L1B_objects = L1A_objects L1B_objects = L1A_objects
if CFG.exec_L1BP[0]: if CFG.exec_L1BP[0]:
# add earlier processed L1A data # add earlier processed L1A data
if input_proc_level == 'L1A': if input_proc_level == 'L1A':
for ds in list_dataset_dicts_per_scene: for ds in list_dataset_dicts_per_scene:
GMSfile = path_generator(ds, proc_level='L1A').get_path_gmsfile() GMSfile = path_generator(ds, proc_level='L1A').get_path_gmsfile()
L1A_objects.append(L1A_P.L1A_object().from_disk([GMSfile, ['cube', None]])) L1A_objects.append(L1A_P.L1A_object().from_disk([GMSfile, ['cube', None]]))
L1B_objects = [L1B_map(L1A_obj) for L1A_obj in L1A_objects] L1B_objects = [L1B_map(L1A_obj) for L1A_obj in L1A_objects]
del L1A_objects del L1A_objects
if any([isinstance(obj, failed_GMS_object) for obj in L1B_objects]): if any([isinstance(obj, failed_GMS_object) for obj in L1B_objects]):
return L1B_objects return L1B_objects
################## ##################
# L1C processing # # L1C processing #
################## ##################
L1C_objects = L1B_objects L1C_objects = L1B_objects
if CFG.exec_L1CP[0]: if CFG.exec_L1CP[0]:
# add earlier processed L1B data # add earlier processed L1B data
if input_proc_level == 'L1B': if input_proc_level == 'L1B':
for ds in list_dataset_dicts_per_scene: for ds in list_dataset_dicts_per_scene:
GMSfile = path_generator(ds, proc_level='L1B').get_path_gmsfile() GMSfile = path_generator(ds, proc_level='L1B').get_path_gmsfile()
L1B_objects.append(L1B_P.L1B_object().from_disk([GMSfile, ['cube', None]])) L1B_objects.append(L1B_P.L1B_object().from_disk([GMSfile, ['cube', None]]))
L1C_objects = L1C_map(L1B_objects) L1C_objects = L1C_map(L1B_objects)
del L1B_objects del L1B_objects
if any([isinstance(obj, failed_GMS_object) for obj in L1C_objects]): if any([isinstance(obj, failed_GMS_object) for obj in L1C_objects]):
return L1C_objects return L1C_objects
if not CFG.exec_L2AP[0]: if not CFG.exec_L2AP[0]:
return L1C_objects return L1C_objects
################## ##################
# L2A processing # # L2A processing #
################## ##################
# add earlier processed L1C data # add earlier processed L1C data
if input_proc_level == 'L1C': if input_proc_level == 'L1C':
for ds in list_dataset_dicts_per_scene: for ds in list_dataset_dicts_per_scene:
GMSfile = path_generator(ds, proc_level='L1C').get_path_gmsfile() GMSfile = path_generator(ds, proc_level='L1C').get_path_gmsfile()
L1C_objects.append(L1C_P.L1C_object().from_disk([GMSfile, ['cube', None]])) L1C_objects.append(L1C_P.L1C_object().from_disk([GMSfile, ['cube', None]]))
L2A_obj = L2A_map(L1C_objects, return_tiles=False) L2A_obj = L2A_map(L1C_objects, return_tiles=False)
del L1C_objects del L1C_objects
if isinstance(L2A_obj, failed_GMS_object) or not CFG.exec_L2BP[0]: if isinstance(L2A_obj, failed_GMS_object) or not CFG.exec_L2BP[0]:
return L2A_obj return L2A_obj
################## ##################
# L2B processing # # L2B processing #
################## ##################
# add earlier processed L2A data # add earlier processed L2A data
if input_proc_level == 'L2A': if input_proc_level == 'L2A':
assert len(list_dataset_dicts_per_scene) == 1, 'Expected only a single L2A dataset since subsystems are merged.' assert len(list_dataset_dicts_per_scene) == 1, \
GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2A').get_path_gmsfile() 'Expected only a single L2A dataset since subsystems are merged.'
L2A_obj = L2A_P.L2A_object().from_disk([GMSfile, ['cube', None]]) GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2A').get_path_gmsfile()
L2A_obj = L2A_P.L2A_object().from_disk([GMSfile, ['cube', None]])
L2B_obj = L2B_map(L2A_obj) L2B_obj = L2B_map(L2A_obj)
del L2A_obj del L2A_obj
if isinstance(L2B_obj, failed_GMS_object) or not CFG.exec_L2CP[0]: if isinstance(L2B_obj, failed_GMS_object) or not CFG.exec_L2CP[0]:
return L2B_obj return L2B_obj
################## ##################
# L2C processing # # L2C processing #
################## ##################
# add earlier processed L2B data # add earlier processed L2B data
if input_proc_level == 'L2B': if input_proc_level == 'L2B':
assert len(list_dataset_dicts_per_scene) == 1, 'Expected only a single L2B dataset since subsystems are merged.' assert len(list_dataset_dicts_per_scene) == 1, \
GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2B').get_path_gmsfile() 'Expected only a single L2B dataset since subsystems are merged.'
L2B_obj = L2B_P.L2B_object().from_disk([GMSfile, ['cube', None]]) GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2B').get_path_gmsfile()
L2B_obj = L2B_P.L2B_object().from_disk([GMSfile, ['cube', None]])
L2C_obj = L2C_map(L2B_obj) # type: Union[GMS_object, failed_GMS_object, List] L2C_obj = L2C_map(L2B_obj) # type: Union[GMS_object, failed_GMS_object, List]
del L2B_obj del L2B_obj
return L2C_obj return L2C_obj
...@@ -27,6 +27,7 @@ from .pipeline import (L1A_map, L1A_map_1, L1A_map_2, L1A_map_3, L1B_map, L1C_ma ...@@ -27,6 +27,7 @@ from .pipeline import (L1A_map, L1A_map_1, L1A_map_2, L1A_map_3, L1B_map, L1C_ma
from ..options.config import set_config from ..options.config import set_config
from .multiproc import MAP, imap_unordered from .multiproc import MAP, imap_unordered
from ..misc.definition_dicts import proc_chain, db_jobs_statistics_def from ..misc.definition_dicts import proc_chain, db_jobs_statistics_def
from ..misc.locks import release_unclosed_locks
from py_tools_ds.numeric.array import get_array_tilebounds from py_tools_ds.numeric.array import get_array_tilebounds
...@@ -515,6 +516,9 @@ class process_controller(object): ...@@ -515,6 +516,9 @@ class process_controller(object):
del self.logger del self.logger
shutdown_loggers() shutdown_loggers()
# release unclosed locks
release_unclosed_locks(self.logger)
# clear any temporary files # clear any temporary files
tempdir = os.path.join(self.config.path_tempdir) tempdir = os.path.join(self.config.path_tempdir)
self.logger.info('Deleting temporary directory %s.' % tempdir) self.logger.info('Deleting temporary directory %s.' % tempdir)
......
...@@ -18,3 +18,4 @@ nested_dict ...@@ -18,3 +18,4 @@ nested_dict
openpyxl openpyxl
timeout_decorator timeout_decorator
python-redis-lock python-redis-lock
psutil
...@@ -15,7 +15,7 @@ requirements = [ ...@@ -15,7 +15,7 @@ requirements = [
'matplotlib', 'numpy', 'scikit-learn', 'scipy', 'gdal', 'pyproj', 'shapely', 'ephem', 'pyorbital', 'dill', 'pytz', 'matplotlib', 'numpy', 'scikit-learn', 'scipy', 'gdal', 'pyproj', 'shapely', 'ephem', 'pyorbital', 'dill', 'pytz',
'pandas', 'numba', 'spectral>=0.16', 'geopandas', 'iso8601', 'pyinstrument', 'geoalchemy2', 'sqlalchemy', 'pandas', 'numba', 'spectral>=0.16', 'geopandas', 'iso8601', 'pyinstrument', 'geoalchemy2', 'sqlalchemy',
'psycopg2', 'py_tools_ds>=0.12.4', 'geoarray>=0.7.1', 'arosics>=0.6.6', 'six', 'tqdm', 'jsmin', 'cerberus', 'psycopg2', 'py_tools_ds>=0.12.4', 'geoarray>=0.7.1', 'arosics>=0.6.6', 'six', 'tqdm', 'jsmin', 'cerberus',
'nested_dict', 'openpyxl', 'timeout_decorator', 'python-redis-lock' 'nested_dict', 'openpyxl', 'timeout_decorator', 'python-redis-lock', 'psutil'
# spectral<0.16 has some problems with writing signed integer 8bit data # spectral<0.16 has some problems with writing signed integer 8bit data
# fmask # conda install -c conda-forge python-fmask # fmask # conda install -c conda-forge python-fmask
# 'pyhdf', # conda install --yes -c conda-forge pyhdf # 'pyhdf', # conda install --yes -c conda-forge pyhdf
......
...@@ -75,6 +75,7 @@ dependencies: ...@@ -75,6 +75,7 @@ dependencies:
- openpyxl - openpyxl
- timeout_decorator - timeout_decorator
- python-redis-lock - python-redis-lock
- psutil
- py_tools_ds>=0.12.4 - py_tools_ds>=0.12.4
- geoarray>=0.7.0 - geoarray>=0.7.0
- arosics>=0.6.6 - arosics>=0.6.6
......
...@@ -204,6 +204,8 @@ class BaseTestCases: ...@@ -204,6 +204,8 @@ class BaseTestCases:
[cls.validate_db_entry(ds['filename']) for ds in cls.PC.config.data_list] [cls.validate_db_entry(ds['filename']) for ds in cls.PC.config.data_list]
# cls.PC.config.CPUs = 2
# cls.PC.config.max_parallel_reads_writes = 2
# cls.PC.config.ac_estimate_accuracy = True # FIXME # cls.PC.config.ac_estimate_accuracy = True # FIXME
# cls.PC.config.spechomo_estimate_accuracy = True # FIXME # cls.PC.config.spechomo_estimate_accuracy = True # FIXME
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment