Commit 0c67304b authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Revised locks.MultiSlotLock. Added locks.IOLock.

Added test_locks.py.
parent 5f0ebea3
......@@ -2,7 +2,7 @@
__author__ = 'Daniel Scheffler'
import time
from redis_lock import StrictRedis, Lock, NotAcquired
from redis_lock import StrictRedis, Lock, NotAcquired, AlreadyAcquired
from redis.exceptions import ConnectionError as RedisConnectionError
import logging
import functools
......@@ -22,10 +22,14 @@ except RedisConnectionError:
class MultiSlotLock(Lock):
def __init__(self, name, allowed_threads=1, logger=None, **kwargs):
self.conn = redis_conn
self.name = name
self.allowed_threads = allowed_threads or 0
self.logger = logger or GMS_logger("RedisLock: '%s'" % name)
self.logger = logger or GMS_logger("RedisLock: '%s'" % self.name)
self.kwargs = kwargs
self.allowed_slot_names = ['%s, slot #%s' % (name, i) for i in range(1, self.allowed_threads + 1)]
self.allowed_slot_names = ['%s, slot #%s' % (self.name, i) for i in range(1, self.allowed_threads + 1)]
self.final_name = ''
self._acquired = False
if allowed_threads and redis_conn:
logged = False
......@@ -35,56 +39,64 @@ class MultiSlotLock(Lock):
time.sleep(0.2)
if not logged:
self.logger.info("Waiting for free '%s' lock." % name)
self.logger.info("Waiting for free '%s' lock." % self.name)
logged = True
else:
break
name = name_free_slot
super().__init__(self.conn, 'GMS_%s__' % CFG.ID + name_free_slot, **kwargs)
self.final_name = 'GMS_%s__' % CFG.ID + name_free_slot
super().__init__(self.conn, self.final_name, **kwargs)
else:
pass
self.name = name
@property
def existing_locks(self):
names = [i.decode('utf8').split('lock:')[1] for i in self.conn.keys() if i.decode('utf8').startswith('lock:')]
# split 'GMS_<jobid>' and return
return list(set([re.search('GMS_[0-9-]*__(.*)', n, re.I).group(1) for n in names if n.startswith('GMS_')]))
return list(set([re.search('GMS_[0-9]*__(.*)', n, re.I).group(1) for n in names if n.startswith('GMS_')]))
def get_free_slot_name(self):
free_slots = [sn for sn in self.allowed_slot_names if sn not in self.existing_locks]
if free_slots:
return free_slots[0]
def __enter__(self):
def acquire(self, blocking=True, timeout=None):
if self.allowed_threads and self.conn:
super().__enter__()
self.logger.info("Acquired lock '%s'." % self.name)
if self._acquired:
raise AlreadyAcquired("Already acquired from this Lock instance.")
while not self._acquired:
try:
self._acquired = super(MultiSlotLock, self).acquire(blocking=blocking, timeout=timeout)
except AlreadyAcquired:
# this happens in case the lock has already been acquired by another instance of MultiSlotLock due
# to a race condition (time gap between finding the free slot and the call of self.acquire())
# -> in that case: re-initialize to get a new free slot
self.__init__(self.name, allowed_threads=self.allowed_threads, logger=self.logger,
**self.kwargs)
if self._acquired:
self.logger.info("Acquired lock '%s'." % self.final_name.split('GMS_%s__' % CFG.ID)[1])
else:
pass
self._acquired = True
return self
return self._acquired
def __exit__(self, exc_type=None, exc_value=None, traceback=None):
def release(self):
if self.allowed_threads and self.conn:
super().__exit__(exc_type=exc_type, exc_value=exc_value, traceback=traceback)
self.logger.info("Released lock '%s'." % self.name)
else:
pass
super(MultiSlotLock, self).release()
self.logger.info("Released lock '%s'." % self.final_name.split('GMS_%s__' % CFG.ID)[1])
class ProcessLock(MultiSlotLock):
class IOLock(MultiSlotLock):
def __init__(self, processes=1, logger=None, **kwargs):
super(ProcessLock, self).__init__(name='ProcessLock', allowed_threads=processes, logger=logger, **kwargs)
super(IOLock, self).__init__(name='IOLock', allowed_threads=processes, logger=logger, **kwargs)
def __enter__(self):
super(ProcessLock, self).__enter__()
def __exit__(self, exc_type=None, exc_value=None, traceback=None):
super(ProcessLock, self).__exit__(exc_type=exc_type, exc_value=exc_value, traceback=traceback)
class ProcessLock(MultiSlotLock):
def __init__(self, processes=1, logger=None, **kwargs):
super(ProcessLock, self).__init__(name='ProcessLock', allowed_threads=processes, logger=logger, **kwargs)
def acquire_process_lock(processes=None, logger=None):
......
......@@ -5,7 +5,7 @@ from typing import List, Tuple, Generator, Iterable, Union # noqa F401 # flake
from ..options.config import GMS_config as CFG
from ..misc import exception_handler as EXC_H
from ..misc.path_generator import path_generator
from ..misc.locks import MultiSlotLock
from ..misc.locks import ProcessLock
from ..algorithms import L1A_P
from ..algorithms import L1B_P
from ..algorithms import L1C_P
......@@ -223,7 +223,7 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
:param list_dataset_dicts_per_scene:
:return:
"""
with MultiSlotLock('ProcessLock', allowed_threads=CFG.CPUs_all_jobs):
with ProcessLock(processes=CFG.CPUs_all_jobs):
if len(list(set([ds['proc_level'] for ds in list_dataset_dicts_per_scene]))) != 1:
raise ValueError('Lists of subsystem datasets with different processing levels are not supported here. '
'Received %s.' % list_dataset_dicts_per_scene)
......
......@@ -204,8 +204,8 @@ class BaseTestCases:
[cls.validate_db_entry(ds['filename']) for ds in cls.PC.config.data_list]
cls.PC.config.CPUs_all_jobs = 2
cls.PC.config.max_parallel_reads_writes = 2
cls.PC.config.CPUs_all_jobs = 3
cls.PC.config.max_parallel_reads_writes = 3
# cls.PC.config.spathomo_estimate_accuracy = True
# cls.PC.config.ac_estimate_accuracy = True # FIXME
# cls.PC.config.spechomo_estimate_accuracy = True # FIXME
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
test_locks
----------
Tests for gms_preprocessing.misc.locks
"""
import unittest
from gms_preprocessing import set_config
from gms_preprocessing.misc.locks import MultiSlotLock, ProcessLock, AlreadyAcquired
from . import db_host
class Test_MultiSlotLock(unittest.TestCase):
def setUp(self):
self.config = set_config(job_ID=26186196, db_host=db_host, reset_status=True)
def test_acquire_release(self):
msl = MultiSlotLock('unittest', allowed_threads=15)
msl.acquire()
with self.assertRaises(AlreadyAcquired):
msl.acquire()
msl.release()
self.failIf(True in [i.startswith('unittest') for i in msl.existing_locks])
def test_with_statement(self):
with MultiSlotLock('unittest', allowed_threads=15) as lock:
self.assertNotEquals(lock, None)
class Test_ProcessLock(unittest.TestCase):
def setUp(self):
self.config = set_config(job_ID=26186196, db_host=db_host, reset_status=True)
def test_acquire_release(self):
msl = ProcessLock(processes=15)
msl.acquire()
with self.assertRaises(AlreadyAcquired):
msl.acquire()
msl.release()
self.failIf(True in [i.startswith('ProcessLock') for i in msl.existing_locks])
def test_with_statement(self):
with ProcessLock(processes=15) as lock:
self.assertNotEquals(lock, None)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment