Commit 15267b3f authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

IOLock and ProcessLock are now working properly. Added redis-semaphore to dependencies.

parent f9a9c681
......@@ -2,19 +2,16 @@
__author__ = 'Daniel Scheffler'
import time
from redis_lock import StrictRedis, Lock, NotAcquired, AlreadyAcquired
from redis import StrictRedis
from redis_semaphore import Semaphore
from redis.exceptions import ConnectionError as RedisConnectionError
import functools
import re
import random
from psutil import virtual_memory
import timeout_decorator
from ..misc.logging import GMS_logger
from ..options.config import GMS_config as CFG
try:
redis_conn = StrictRedis(host='localhost', db=0)
redis_conn.keys() # may raise ConnectionError, e.g., if redis server is not installed or not running
......@@ -22,132 +19,87 @@ except RedisConnectionError:
redis_conn = None
class MultiSlotLock(Lock):
def __init__(self, name, allowed_slots=1, logger=None, disabled=False, **kwargs):
self.conn = redis_conn
self.name = name
self.allowed_slots = allowed_slots or 0
self.logger = logger or GMS_logger("RedisLock: '%s'" % self.name)
self.kwargs = kwargs
self.allowed_slot_names = ['%s, slot #%s' % (self.name, i) for i in range(1, self.allowed_slots + 1)]
self.final_name = ''
self._acquired = None
if not disabled and allowed_slots and redis_conn:
logged = False
while True:
time.sleep(random.uniform(0, 1.5)) # avoids race conditions in case multiple tasks are waiting
name_free_slot = self.get_free_slot_name()
if not name_free_slot:
if not logged:
self.logger.info("Waiting for free '%s' lock." % self.name)
logged = True
else:
break
self.final_name = 'GMS_%s__' % CFG.ID + name_free_slot
super().__init__(self.conn, self.final_name, **kwargs)
else:
pass
@property
def existing_locks(self):
names = [i.decode('utf8').split('lock:')[1] for i in self.conn.keys() if i.decode('utf8').startswith('lock:')]
# split 'GMS_<jobid>' and return
return list(set([re.search('GMS_[0-9]*__(.*)', n, re.I).group(1) for n in names if n.startswith('GMS_')]))
def get_free_slot_name(self):
free_slots = [sn for sn in self.allowed_slot_names if sn not in self.existing_locks]
if free_slots:
return free_slots[0]
def acquire(self, blocking=True, timeout=None):
if self.allowed_slots and self.conn:
if self._acquired:
raise AlreadyAcquired("Already acquired from this Lock instance.")
while not self._acquired:
try:
# print('Trying to acquire %s.' % self.final_name.split('GMS_%s__' % CFG.ID)[1])
self._acquired = super(MultiSlotLock, self).acquire(blocking=blocking, timeout=timeout)
# print("Acquired lock '%s'." % self.final_name.split('GMS_%s__' % CFG.ID)[1])
except AlreadyAcquired:
# this happens in case the lock has already been acquired by another instance of MultiSlotLock due
# to a race condition (time gap between finding the free slot and the call of self.acquire())
# -> in that case: re-initialize to get a new free slot
self.__init__(self.name, allowed_slots=self.allowed_slots, logger=self.logger,
**self.kwargs)
if self._acquired is False: # and not None
self.__init__(self.name, allowed_slots=self.allowed_slots, logger=self.logger,
**self.kwargs)
# print(self.final_name.split('GMS_%s__' % CFG.ID)[1], self._acquired)
if self._acquired:
self.logger.info("Acquired lock '%s'." % self.final_name.split('GMS_%s__' % CFG.ID)[1])
else:
self._acquired = True
return self._acquired
def release(self):
if self.allowed_slots and self.conn:
super(MultiSlotLock, self).release()
self.logger.info("Released lock '%s'." % self.final_name.split('GMS_%s__' % CFG.ID)[1])
class MultiSlotLock(Semaphore):
def __init__(self, name='MultiSlotLock', allowed_slots=1, logger=None, **kwargs):
self.disabled = redis_conn is None
self.name = name
self.redis_varname = 'GMS_%s__' % CFG.ID + name
self.namespace = name
self.allowed_slots = allowed_slots
self.logger = logger or GMS_logger("RedisLock: '%s'" % name)
if not self.disabled:
super().__init__(client=redis_conn, count=allowed_slots, namespace=self.redis_varname, **kwargs)
super(MultiSlotLock, self).__init__(client=redis_conn, count=allowed_slots, namespace=name, **kwargs)
def acquire(self, timeout=0, target=None):
if not self.disabled:
if self.available_count == 0:
self.logger.info("Waiting for free lock '%s'." % self.name)
self.logger.info("Waiting for free lock '%s'." % self.namespace)
token = super().acquire(timeout=timeout, target=target)
token = super(MultiSlotLock, self).acquire(timeout=timeout, target=target)
self.logger.info("Acquired lock '%s'" % self.name +
self.logger.info("Acquired lock '%s'" % self.namespace +
('.' if self.allowed_slots == 1 else ', slot #%s.' % (int(token) + 1)))
return token
def release(self):
if not self.disabled:
token = super().release()
token = super(MultiSlotLock, self).release()
if token:
self.logger.info("Released lock '%s'" % self.name +
self.logger.info("Released lock '%s'" % self.namespace +
('.' if self.allowed_slots == 1 else ', slot #%s.' % (int(token) + 1)))
def delete(self):
if not self.disabled:
redis_conn.delete('%s:EXISTS' % self.redis_varname)
redis_conn.delete('%s:AVAILABLE' % self.redis_varname)
redis_conn.delete('%s:GRABBED' % self.redis_varname)
self.client.delete(self.check_exists_key)
self.client.delete(self.available_key)
self.client.delete(self.grabbed_key)
class SharedResourceLock(MultiSlotLock):
def acquire(self, timeout=0, target=None):
if not self.disabled:
token = super(SharedResourceLock, self).acquire(timeout=timeout, target=target)
self.client.hset(self.grabbed_key_jobID, token, self.current_time)
def release_all_jobID_tokens(self):
if not self.disabled:
for token in self.client.hkeys(self.grabbed_key_jobID):
self.signal(token)
self.client.delete(self.grabbed_key_jobID)
@property
def grabbed_key_jobID(self):
return self._get_and_set_key('_grabbed_key_jobID', 'GRABBED_BY_GMSJOB_%s' % CFG.ID)
def signal(self, token):
if token is None:
return None
with self.client.pipeline() as pipe:
pipe.multi()
pipe.hdel(self.grabbed_key, token)
pipe.hdel(self.grabbed_key_jobID, token) # only difference to Semaphore.signal()
pipe.lpush(self.available_key, token)
pipe.execute()
return token
def delete(self):
if not self.disabled:
super(SharedResourceLock, self).delete()
self.client.delete(self.grabbed_key_jobID)
class IOLock(MultiSlotLock):
class IOLock(SharedResourceLock):
def __init__(self, allowed_slots=1, logger=None, **kwargs):
super(IOLock, self).__init__(name='IOLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
class ProcessLock(MultiSlotLock):
class ProcessLock(SharedResourceLock):
def __init__(self, allowed_slots=1, logger=None, **kwargs):
super(ProcessLock, self).__init__(name='ProcessLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
class MemoryLock(MultiSlotLock):
class MemoryLock(SharedResourceLock):
def __init__(self, allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=None, **kwargs):
"""
......@@ -158,7 +110,7 @@ class MemoryLock(MultiSlotLock):
"""
self.usage_th = usage_threshold
self.blocking_th = blocking_threshold
self.logger = logger or GMS_logger("RedisLock: '%s'" % self.name)
self.logger = logger or GMS_logger("RedisLock: '%s'" % self.namespace)
self.mem_usage = virtual_memory().percent
self._acquired = None
......@@ -175,7 +127,7 @@ class MemoryLock(MultiSlotLock):
super(MemoryLock, self).__init__(name='MemoryLock', allowed_slots=allowed_slots, logger=self.logger,
disabled=self.disabled, **kwargs)
@timeout_decorator.timeout(seconds=15*60, timeout_exception=TimeoutError,
@timeout_decorator.timeout(seconds=15 * 60, timeout_exception=TimeoutError,
exception_message='Memory lock could not be acquired after waiting 15 minutes '
'due to memory overload.')
def check_system_overload(self):
......@@ -202,7 +154,7 @@ class MemoryLock(MultiSlotLock):
super(MemoryLock, self).release()
class MemoryLock(MultiSlotLock):
class MemoryLock(SharedResourceLock):
def __init__(self, name='MemoryLock', allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=None,
**kwargs):
"""
......@@ -214,7 +166,7 @@ class MemoryLock(MultiSlotLock):
"""
self.usage_th = usage_threshold
self.blocking_th = blocking_threshold
self.logger = logger or GMS_logger("RedisLock: '%s'" % self.name)
self.logger = logger or GMS_logger("RedisLock: '%s'" % self.namespace)
self.mem_usage = virtual_memory().percent
self._acquired = None
......@@ -231,7 +183,7 @@ class MemoryLock(MultiSlotLock):
super(MemoryLock, self).__init__(name='MemoryLock', allowed_slots=allowed_slots, logger=self.logger,
disabled=self.disabled, **kwargs)
@timeout_decorator.timeout(seconds=15*60, timeout_exception=TimeoutError,
@timeout_decorator.timeout(seconds=15 * 60, timeout_exception=TimeoutError,
exception_message='Memory lock could not be acquired after waiting 15 minutes '
'due to memory overload.')
def check_system_overload(self):
......@@ -258,14 +210,13 @@ class MemoryLock(MultiSlotLock):
super(MemoryLock, self).release()
def acquire_process_lock(**processlock_kwargs):
"""Decorator function for ProcessLock.
:param processlock_kwargs: Keywourd arguments to be passed to ProcessLock class.
"""
def decorator(func):
def decorator(func):
@functools.wraps(func) # needed to avoid pickling errors
def wrapped_func(*args, **kwargs):
with ProcessLock(**processlock_kwargs):
......@@ -283,8 +234,8 @@ def acquire_mem_lock(**memlock_kwargs):
:param memlock_kwargs: Keywourd arguments to be passed to MemoryLock class.
"""
def decorator(func):
def decorator(func):
@functools.wraps(func) # needed to avoid pickling errors
def wrapped_func(*args, **kwargs):
with MemoryLock(**memlock_kwargs):
......@@ -297,24 +248,15 @@ def acquire_mem_lock(**memlock_kwargs):
return decorator
def release_unclosed_locks(logger=None):
def release_unclosed_locks():
if redis_conn:
logger = logger or GMS_logger('LockReseter')
IOLock(allowed_slots=1).delete()
ProcessLock(allowed_slots=1).delete()
locks2release = [i.split(b'lock:')[1].decode('latin') for i in redis_conn.keys()
if i.decode('latin').startswith('lock:GMS_%s__' % CFG.ID)]
if locks2release:
logger.info("Releasing unclosed locks of job %s." % CFG.ID)
for lockN in locks2release:
lock = Lock(redis_conn, lockN)
try:
lock.release()
except NotAcquired:
lock.reset()
for L in [IOLock, ProcessLock]:
lock = L(allowed_slots=1)
lock.release_all_jobID_tokens()
# delete the complete redis namespace if no lock slot is acquired anymore
if lock.client.hlen(lock.grabbed_key) == 0:
lock.delete()
class MemoryReserver(object):
......
......@@ -129,7 +129,7 @@ def set_config(job_ID, json_config='', inmem_serialization=False, parallelizatio
# close unclosed locks from previous runs
from ..misc.locks import release_unclosed_locks
release_unclosed_locks(logger)
release_unclosed_locks()
builtins.GMS_EnvOK = True
......
......@@ -518,7 +518,7 @@ class process_controller(object):
"""Shutdown the process controller instance (loggers, remove temporary directories, ...)."""
# release unclosed locks
release_unclosed_locks(self.logger)
release_unclosed_locks()
# clear any temporary files
tempdir = os.path.join(self.config.path_tempdir)
......
......@@ -17,6 +17,7 @@ cerberus
nested_dict
openpyxl
timeout_decorator
python-redis-lock
redis
redis-semaphore
python-redis-lock
psutil
......@@ -19,7 +19,7 @@ requirements = [
'matplotlib', 'numpy', 'scikit-learn', 'scipy', 'gdal', 'pyproj', 'shapely', 'ephem', 'pyorbital', 'dill', 'pytz',
'pandas', 'numba', 'spectral>=0.16', 'geopandas', 'iso8601', 'pyinstrument', 'geoalchemy2', 'sqlalchemy',
'psycopg2', 'py_tools_ds>=0.12.4', 'geoarray>=0.7.12', 'arosics>=0.6.6', 'six', 'tqdm', 'jsmin', 'cerberus',
'nested_dict', 'openpyxl', 'timeout_decorator', 'python-redis-lock', 'redis', 'psutil'
'nested_dict', 'openpyxl', 'timeout_decorator', 'redis', 'redis-semaphore', 'python-redis-lock', 'psutil'
# spectral<0.16 has some problems with writing signed integer 8bit data
# fmask # conda install -c conda-forge python-fmask
# 'pyhdf', # conda install --yes -c conda-forge pyhdf
......
......@@ -74,8 +74,9 @@ dependencies:
- nested_dict
- openpyxl
- timeout_decorator
- python-redis-lock
- redis
- redis-semaphore
- python-redis-lock
- psutil
- py_tools_ds>=0.12.4
- geoarray>=0.7.12
......
......@@ -42,16 +42,16 @@ class Test_ProcessLock(unittest.TestCase):
self.config = set_config(job_ID=26186196, db_host=db_host, reset_status=True)
def test_acquire_release(self):
msl = ProcessLock(allowed_slots=15)
msl.acquire()
pl = ProcessLock(allowed_slots=15)
pl.acquire()
# with self.assertRaises(AlreadyAcquired):
# msl.acquire()
msl.release()
pl.release()
if not msl.disabled:
self.failIf(True in [i.startswith('ProcessLock') for i in msl.existing_locks])
# if not msl.disabled:
# self.failIf(True in [i.startswith('ProcessLock') for i in msl.existing_locks])
def test_with_statement(self):
with ProcessLock(allowed_slots=15) as lock:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment