Commit b8d0dead authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Fixed bug where the same lock was acquired by two different processes.

parent b510c29e
...@@ -6,6 +6,7 @@ from redis_lock import StrictRedis, Lock, NotAcquired ...@@ -6,6 +6,7 @@ from redis_lock import StrictRedis, Lock, NotAcquired
from redis.exceptions import ConnectionError as RedisConnectionError from redis.exceptions import ConnectionError as RedisConnectionError
import logging import logging
import functools import functools
import re
from ..misc.logging import GMS_logger from ..misc.logging import GMS_logger
from ..options.config import GMS_config as CFG from ..options.config import GMS_config as CFG
...@@ -21,12 +22,12 @@ except RedisConnectionError: ...@@ -21,12 +22,12 @@ except RedisConnectionError:
class MultiSlotLock(Lock): class MultiSlotLock(Lock):
def __init__(self, name, allowed_threads=1, logger=None, **kwargs): def __init__(self, name, allowed_threads=1, logger=None, **kwargs):
self.conn = redis_conn self.conn = redis_conn
self.allowed_threads = allowed_threads self.allowed_threads = allowed_threads or 0
self.logger = logger or GMS_logger("RedisLock: '%s'" % name) self.logger = logger or GMS_logger("RedisLock: '%s'" % name)
self.allowed_slot_names = ['GMS_%s__%s, slot #%s' % (CFG.ID, name, i) for i in range(1, allowed_threads + 1)] self.allowed_slot_names = ['%s, slot #%s' % (name, i) for i in range(1, self.allowed_threads + 1)]
if allowed_threads != 0 and redis_conn: if allowed_threads and redis_conn:
logged = False logged = False
while True: while True:
name_free_slot = self.get_free_slot_name() name_free_slot = self.get_free_slot_name()
...@@ -40,7 +41,7 @@ class MultiSlotLock(Lock): ...@@ -40,7 +41,7 @@ class MultiSlotLock(Lock):
break break
name = name_free_slot name = name_free_slot
super().__init__(self.conn, name, **kwargs) super().__init__(self.conn, 'GMS_%s__' % CFG.ID + name_free_slot, **kwargs)
else: else:
pass pass
...@@ -48,7 +49,10 @@ class MultiSlotLock(Lock): ...@@ -48,7 +49,10 @@ class MultiSlotLock(Lock):
@property @property
def existing_locks(self): def existing_locks(self):
return [i.decode('utf8').split('lock:')[1] for i in self.conn.keys() if i.decode('utf8').startswith('lock:')] names = [i.decode('utf8').split('lock:')[1] for i in self.conn.keys() if i.decode('utf8').startswith('lock:')]
# split 'GMS_<jobid>' and return
return list(set([re.search('GMS_[0-9-]*__(.*)', n, re.I).group(1) for n in names if n.startswith('GMS_')]))
def get_free_slot_name(self): def get_free_slot_name(self):
free_slots = [sn for sn in self.allowed_slot_names if sn not in self.existing_locks] free_slots = [sn for sn in self.allowed_slot_names if sn not in self.existing_locks]
...@@ -56,14 +60,14 @@ class MultiSlotLock(Lock): ...@@ -56,14 +60,14 @@ class MultiSlotLock(Lock):
return free_slots[0] return free_slots[0]
def __enter__(self): def __enter__(self):
if self.allowed_threads != 0 and self.conn: if self.allowed_threads and self.conn:
super().__enter__() super().__enter__()
self.logger.info("Acquired lock '%s'." % self.name) self.logger.info("Acquired lock '%s'." % self.name)
else: else:
pass pass
def __exit__(self, exc_type=None, exc_value=None, traceback=None): def __exit__(self, exc_type=None, exc_value=None, traceback=None):
if self.allowed_threads != 0 and self.conn: if self.allowed_threads and self.conn:
super().__exit__(exc_type=exc_type, exc_value=exc_value, traceback=traceback) super().__exit__(exc_type=exc_type, exc_value=exc_value, traceback=traceback)
self.logger.info("Released lock '%s'." % self.name) self.logger.info("Released lock '%s'." % self.name)
else: else:
......
...@@ -214,7 +214,7 @@ class JobConfig(object): ...@@ -214,7 +214,7 @@ class JobConfig(object):
self.CPUs = \ self.CPUs = \
gp('CPUs', json_globts['CPUs'], fallback=multiprocessing.cpu_count()) gp('CPUs', json_globts['CPUs'], fallback=multiprocessing.cpu_count())
self.CPUs_all_jobs = \ self.CPUs_all_jobs = \
gp('CPUs_all_jobs', json_globts['CPUs_all_jobs'], fallback=multiprocessing.cpu_count()) gp('CPUs_all_jobs', json_globts['CPUs_all_jobs'])
self.delete_old_output = \ self.delete_old_output = \
gp('delete_old_output', json_globts['delete_old_output']) gp('delete_old_output', json_globts['delete_old_output'])
self.max_parallel_reads_writes = \ self.max_parallel_reads_writes = \
......
...@@ -204,8 +204,8 @@ class BaseTestCases: ...@@ -204,8 +204,8 @@ class BaseTestCases:
[cls.validate_db_entry(ds['filename']) for ds in cls.PC.config.data_list] [cls.validate_db_entry(ds['filename']) for ds in cls.PC.config.data_list]
# cls.PC.config.CPUs = 2 cls.PC.config.CPUs_all_jobs = 2
# cls.PC.config.max_parallel_reads_writes = 2 cls.PC.config.max_parallel_reads_writes = 2
# cls.PC.config.ac_estimate_accuracy = True # FIXME # cls.PC.config.ac_estimate_accuracy = True # FIXME
# cls.PC.config.spechomo_estimate_accuracy = True # FIXME # cls.PC.config.spechomo_estimate_accuracy = True # FIXME
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment