Commit 6d7433a9 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Fixed deadlock during acquisition of MemoryReserver.

parent 20083316
......@@ -4,7 +4,6 @@ __author__ = 'Daniel Scheffler'
import time
from redis import StrictRedis
from redis_semaphore import Semaphore
from redis_lock import Lock, NotAcquired
from redis.exceptions import ConnectionError as RedisConnectionError
import functools
from psutil import virtual_memory
......@@ -145,18 +144,7 @@ class MemoryReserver(Semaphore):
def acquire(self, timeout=0, target=None):
if not self.disabled:
# # due to, e.g., a MemoryError redis_lock.Lock may try to extend an already expired lock
# # (which does not exist anymore) -> for GMS, thats also
# class mocked_Lock(Lock):
# def extend(self, expire=None):
# try:
# super(mocked_Lock, self).extend(expire=expire)
# except NotAcquired:
# pass
with Lock(self.client, 'GMS_mem_acquire_lock'): # , expire=3, auto_renewal=True):
self.client.set('GMS_mem_acquire_lock:GRABBED_BY_GMSJOB', CFG.ID)
with MemoryReserverAcquisitionLock():
if self.usable_memory_gb >= self.mem2lock_gb:
for i in range(self.mem2lock_gb):
......@@ -178,8 +166,6 @@ class MemoryReserver(Semaphore):
time.sleep(1)
self.acquire(timeout=timeout)
self.client.delete('GMS_mem_acquire_lock:GRABBED_BY_GMSJOB')
def release(self):
if not self.disabled:
for token in self._local_tokens:
......@@ -211,6 +197,49 @@ class MemoryReserver(Semaphore):
self.client.delete(self.reserved_key)
class MemoryReserverAcquisitionLock(Semaphore):
def __init__(self, **kwargs):
self.disabled = redis_conn is None or CFG.disable_memory_locks
if not self.disabled:
super(MemoryReserverAcquisitionLock, self).__init__(client=redis_conn, count=1,
namespace='MemoryReserverAcquisitionLock', **kwargs)
@property
def grabbed_key_jobID(self):
return self._get_and_set_key('_grabbed_key_jobID', 'GRABBED_BY_GMSJOB_%s' % CFG.ID)
def acquire(self, timeout=0, target=None):
if not self.disabled:
token = super(MemoryReserverAcquisitionLock, self).acquire(timeout=timeout, target=target)
self.client.hset(self.grabbed_key_jobID, token, self.current_time)
def release_all_jobID_tokens(self):
if not self.disabled:
for token in self.client.hkeys(self.grabbed_key_jobID):
self.signal(token)
self.client.delete(self.grabbed_key_jobID)
def signal(self, token):
if token is None:
return None
with self.client.pipeline() as pipe:
pipe.multi()
pipe.hdel(self.grabbed_key, token)
pipe.hdel(self.grabbed_key_jobID, token) # only difference to Semaphore.signal()
pipe.lpush(self.available_key, token)
pipe.execute()
return token
def delete(self):
if not self.disabled:
self.client.delete(self.check_exists_key)
self.client.delete(self.available_key)
self.client.delete(self.grabbed_key)
self.client.delete(self.grabbed_key_jobID)
def acquire_process_lock(**processlock_kwargs):
"""Decorator function for ProcessLock.
......@@ -259,13 +288,16 @@ def release_unclosed_locks():
if lock.client.hlen(lock.grabbed_key) == 0:
lock.delete()
ML = MemoryReserver(1)
ML.release_all_jobID_tokens()
MR = MemoryReserver(1)
MR.release_all_jobID_tokens()
# delete the complete redis namespace if no lock slot is acquired anymore
if ML.client.hlen(ML.grabbed_key) == 0:
ML.delete()
if MR.client.hlen(MR.grabbed_key) == 0:
MR.delete()
if int(redis_conn.get('GMS_mem_acquire_lock:GRABBED_BY_GMSJOB') or -9999) == CFG.ID:
redis_conn.delete('GMS_mem_acquire_lock')
redis_conn.delete('GMS_mem_acquire_lock:GRABBED_BY_GMSJOB')
MRAL = MemoryReserverAcquisitionLock()
MRAL.release_all_jobID_tokens()
# delete the complete redis namespace if no lock slot is acquired anymore
if MRAL.client.hlen(MRAL.grabbed_key) == 0:
MRAL.delete()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment