Commit b91d164d authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Fixed deadlock within MemoryReserver. Added number of waiting processes to...

Fixed deadlock within MemoryReserver. Added number of waiting processes to redis. Fixed linting. Removed python-redis-lock from dependencies.
parent c8962cbb
...@@ -32,7 +32,7 @@ test_gms_preprocessing: ...@@ -32,7 +32,7 @@ test_gms_preprocessing:
# - python setup.py install # - python setup.py install
# - cd ../../ # - cd ../../
# make tests # make tests
- pip install nested_dict openpyxl timeout_decorator redis redis-semaphore python-redis-lock psutil # FIXME: remove as soon as runner is rebuilt - pip install nested_dict openpyxl timeout_decorator redis redis-semaphore psutil # FIXME: remove as soon as runner is rebuilt
- make nosetests - make nosetests
- make docs - make docs
artifacts: artifacts:
......
...@@ -122,6 +122,20 @@ class MemoryReserver(Semaphore): ...@@ -122,6 +122,20 @@ class MemoryReserver(Semaphore):
**kwargs) **kwargs)
self.logger = logger or GMS_logger("RedisLock: 'MemoryReserver'") self.logger = logger or GMS_logger("RedisLock: 'MemoryReserver'")
@property
def waiting(self):
return self._waiting
@waiting.setter
def waiting(self, val):
if val is not self._waiting:
if val:
self.client.incr(self.waiting_key, 1)
else:
self.client.decr(self.waiting_key, 1)
self._waiting = val
@property @property
def mem_reserved_gb(self): def mem_reserved_gb(self):
return int(redis_conn.get('GMS_mem_reserved') or 0) return int(redis_conn.get('GMS_mem_reserved') or 0)
...@@ -147,29 +161,9 @@ class MemoryReserver(Semaphore): ...@@ -147,29 +161,9 @@ class MemoryReserver(Semaphore):
def acquisition_key(self): def acquisition_key(self):
return self._get_and_set_key('_acquisition_key', 'ACQUISITION_LOCK') return self._get_and_set_key('_acquisition_key', 'ACQUISITION_LOCK')
def acquire_old(self, timeout=0, target=None): @property
if not self.disabled: def waiting_key(self):
with MemoryReserverAcquisitionLock(): return self._get_and_set_key('_acquisition_key', 'NUMBER_WAITING')
if self.usable_memory_gb >= self.mem2lock_gb:
for i in range(self.mem2lock_gb):
token = super(MemoryReserver, self).acquire(timeout=timeout)
self.client.hset(self.grabbed_key_jobID, token, self.current_time)
self.client.incr(self.reserved_key, self.mem2lock_gb)
self.client.incr(self.reserved_key_jobID, self.mem2lock_gb)
self.logger.info('Reserved %s GB of memory.' % self.mem2lock_gb)
self._waiting = False
else:
if not self._waiting:
self.logger.info('Currently usable memory: %s GB. Waiting until at least %s GB are usable.'
% (self.usable_memory_gb, self.mem2lock_gb))
self._waiting = True
time.sleep(1)
self.acquire(timeout=timeout)
def acquire(self, timeout=0, target=None): def acquire(self, timeout=0, target=None):
if not self.disabled: if not self.disabled:
...@@ -177,7 +171,8 @@ class MemoryReserver(Semaphore): ...@@ -177,7 +171,8 @@ class MemoryReserver(Semaphore):
return self.client.getset(self.acquisition_key, self.exists_val) return self.client.getset(self.acquisition_key, self.exists_val)
while check_acquisition_key() is not None: while check_acquisition_key() is not None:
time.sleep(random.uniform(1, 5)) self.waiting = True
time.sleep(random.uniform(1, 5)) # avoids race conditions
self.client.expire(self.acquisition_key, 10) self.client.expire(self.acquisition_key, 10)
...@@ -191,13 +186,13 @@ class MemoryReserver(Semaphore): ...@@ -191,13 +186,13 @@ class MemoryReserver(Semaphore):
self.client.incr(self.reserved_key_jobID, self.mem2lock_gb) self.client.incr(self.reserved_key_jobID, self.mem2lock_gb)
self.logger.info('Reserved %s GB of memory.' % self.mem2lock_gb) self.logger.info('Reserved %s GB of memory.' % self.mem2lock_gb)
self._waiting = False self.waiting = False
else: else:
if not self._waiting: if not self._waiting:
self.logger.info('Currently usable memory: %s GB. Waiting until at least %s GB are usable.' self.logger.info('Currently usable memory: %s GB. Waiting until at least %s GB are usable.'
% (self.usable_memory_gb, self.mem2lock_gb)) % (self.usable_memory_gb, self.mem2lock_gb))
self._waiting = True self.waiting = True
time.sleep(random.uniform(1, 2)) time.sleep(random.uniform(1, 2))
self.acquire(timeout=timeout) self.acquire(timeout=timeout)
...@@ -232,58 +227,18 @@ class MemoryReserver(Semaphore): ...@@ -232,58 +227,18 @@ class MemoryReserver(Semaphore):
self.client.delete(self.available_key) self.client.delete(self.available_key)
self.client.delete(self.grabbed_key) self.client.delete(self.grabbed_key)
self.waiting = False # decrements self.waiting_key
if int(self.client.get(self.waiting_key) or 0) == 0:
self.client.delete(self.waiting_key)
if self.mem_reserved_gb <= 0: if self.mem_reserved_gb <= 0:
self.client.delete(self.reserved_key) self.client.delete(self.reserved_key)
class MemoryReserverAcquisitionLock(Semaphore):
def __init__(self, **kwargs):
self.disabled = redis_conn is None or CFG.disable_memory_locks
if not self.disabled:
super(MemoryReserverAcquisitionLock, self).__init__(client=redis_conn, count=1,
namespace='MemoryReserverAcquisitionLock',
stale_client_timeout=10, **kwargs)
@property
def grabbed_key_jobID(self):
return self._get_and_set_key('_grabbed_key_jobID', 'GRABBED_BY_GMSJOB_%s' % CFG.ID)
def acquire(self, timeout=0, target=None):
if not self.disabled:
token = super(MemoryReserverAcquisitionLock, self).acquire(timeout=timeout, target=target)
self.client.hset(self.grabbed_key_jobID, token, self.current_time)
def release_all_jobID_tokens(self):
if not self.disabled:
for token in self.client.hkeys(self.grabbed_key_jobID):
self.signal(token)
self.client.delete(self.grabbed_key_jobID)
def signal(self, token):
if token is None:
return None
with self.client.pipeline() as pipe:
pipe.multi()
pipe.hdel(self.grabbed_key, token)
pipe.hdel(self.grabbed_key_jobID, token) # only difference to Semaphore.signal()
pipe.lpush(self.available_key, token)
pipe.execute()
return token
def delete(self):
if not self.disabled:
self.client.delete(self.check_exists_key)
self.client.delete(self.available_key)
self.client.delete(self.grabbed_key)
self.client.delete(self.grabbed_key_jobID)
def acquire_process_lock(**processlock_kwargs): def acquire_process_lock(**processlock_kwargs):
"""Decorator function for ProcessLock. """Decorator function for ProcessLock.
:param processlock_kwargs: Keywourd arguments to be passed to ProcessLock class. :param processlock_kwargs: Keyword arguments to be passed to ProcessLock class.
""" """
def decorator(func): def decorator(func):
...@@ -334,10 +289,3 @@ def release_unclosed_locks(): ...@@ -334,10 +289,3 @@ def release_unclosed_locks():
# delete the complete redis namespace if no lock slot is acquired anymore # delete the complete redis namespace if no lock slot is acquired anymore
if MR.client.hlen(MR.grabbed_key) == 0: if MR.client.hlen(MR.grabbed_key) == 0:
MR.delete() MR.delete()
MRAL = MemoryReserverAcquisitionLock()
MRAL.release_all_jobID_tokens()
# delete the complete redis namespace if no lock slot is acquired anymore
if MRAL.client.hlen(MRAL.grabbed_key) == 0:
MRAL.delete()
...@@ -15,7 +15,7 @@ from ..algorithms import L2A_P ...@@ -15,7 +15,7 @@ from ..algorithms import L2A_P
from ..algorithms import L2B_P from ..algorithms import L2B_P
from ..algorithms import L2C_P from ..algorithms import L2C_P
from ..model.gms_object import \ from ..model.gms_object import \
failed_GMS_object, update_proc_status, return_GMS_objs_without_arrays, return_proc_reports_only, estimate_mem_usage failed_GMS_object, update_proc_status, return_proc_reports_only, estimate_mem_usage
from ..model.gms_object import GMS_object # noqa F401 # flake8 issue from ..model.gms_object import GMS_object # noqa F401 # flake8 issue
from ..algorithms.geoprocessing import get_common_extent from ..algorithms.geoprocessing import get_common_extent
......
...@@ -19,5 +19,4 @@ openpyxl ...@@ -19,5 +19,4 @@ openpyxl
timeout_decorator timeout_decorator
redis redis
redis-semaphore redis-semaphore
python-redis-lock
psutil psutil
...@@ -19,7 +19,7 @@ requirements = [ ...@@ -19,7 +19,7 @@ requirements = [
'matplotlib', 'numpy', 'scikit-learn', 'scipy', 'gdal', 'pyproj', 'shapely', 'ephem', 'pyorbital', 'dill', 'pytz', 'matplotlib', 'numpy', 'scikit-learn', 'scipy', 'gdal', 'pyproj', 'shapely', 'ephem', 'pyorbital', 'dill', 'pytz',
'pandas', 'numba', 'spectral>=0.16', 'geopandas', 'iso8601', 'pyinstrument', 'geoalchemy2', 'sqlalchemy', 'pandas', 'numba', 'spectral>=0.16', 'geopandas', 'iso8601', 'pyinstrument', 'geoalchemy2', 'sqlalchemy',
'psycopg2', 'py_tools_ds>=0.12.4', 'geoarray>=0.7.13', 'arosics>=0.6.6', 'six', 'tqdm', 'jsmin', 'cerberus', 'psycopg2', 'py_tools_ds>=0.12.4', 'geoarray>=0.7.13', 'arosics>=0.6.6', 'six', 'tqdm', 'jsmin', 'cerberus',
'nested_dict', 'openpyxl', 'timeout_decorator', 'redis', 'redis-semaphore', 'python-redis-lock', 'psutil' 'nested_dict', 'openpyxl', 'timeout_decorator', 'redis', 'redis-semaphore', 'psutil'
# spectral<0.16 has some problems with writing signed integer 8bit data # spectral<0.16 has some problems with writing signed integer 8bit data
# fmask # conda install -c conda-forge python-fmask # fmask # conda install -c conda-forge python-fmask
# 'pyhdf', # conda install --yes -c conda-forge pyhdf # 'pyhdf', # conda install --yes -c conda-forge pyhdf
......
...@@ -76,7 +76,6 @@ dependencies: ...@@ -76,7 +76,6 @@ dependencies:
- timeout_decorator - timeout_decorator
- redis - redis
- redis-semaphore - redis-semaphore
- python-redis-lock
- psutil - psutil
- py_tools_ds>=0.12.4 - py_tools_ds>=0.12.4
- geoarray>=0.7.13 - geoarray>=0.7.13
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment