locks.py 3.66 KB
Newer Older
1
2
3
4
# -*- coding: utf-8 -*-
__author__ = 'Daniel Scheffler'

import time
5
from redis_lock import StrictRedis, Lock, NotAcquired
6
import logging
7
8
9
10
11
import functools

from ..misc.logging import GMS_logger
from ..options.config import GMS_config as CFG

12
13

try:
14
    redis_conn = StrictRedis(host='localhost', db=0)
15
    redis_conn.keys()  # may raise ConnectionError, e.g., if redis server is not installed or not running
16
17
18
19
except ConnectionError:
    redis_conn = None


20
class MultiSlotLock(Lock):
21
22
23
    def __init__(self, name, allowed_threads=1, logger=None, **kwargs):
        self.conn = redis_conn
        self.allowed_threads = allowed_threads
24
25
26
        self.logger = logger or GMS_logger("RedisLock: '%s'" % name)

        self.allowed_slot_names = ['GMS_%s__%s, slot #%s' % (CFG.ID, name, i) for i in range(1, allowed_threads + 1)]
27

28
        if allowed_threads != 0 and redis_conn:
29
30
31
32
33
            logged = False
            while True:
                name_free_slot = self.get_free_slot_name()
                if not name_free_slot:
                    time.sleep(0.2)
34

35
36
37
38
39
                    if not logged:
                        self.logger.info("Waiting for free '%s' lock." % name)
                        logged = True
                else:
                    break
40

41
            name = name_free_slot
42
43
44
45
46
47
            super().__init__(self.conn, name, **kwargs)
        else:
            pass

        self.name = name

48
49
    @property
    def existing_locks(self):
50
51
52
        return [i.decode('utf8').split('lock:')[1] for i in self.conn.keys()]

    def get_free_slot_name(self):
53
        free_slots = [sn for sn in self.allowed_slot_names if sn not in self.existing_locks]
54
55
56
57
        if free_slots:
            return free_slots[0]

    def __enter__(self):
58
        if self.allowed_threads != 0 and self.conn:
59
60
61
62
63
64
            super().__enter__()
            self.logger.info("Acquired lock '%s'." % self.name)
        else:
            pass

    def __exit__(self, exc_type=None, exc_value=None, traceback=None):
65
        if self.allowed_threads != 0 and self.conn:
66
67
68
69
            super().__exit__(exc_type=exc_type, exc_value=exc_value, traceback=traceback)
            self.logger.info("Released lock '%s'." % self.name)
        else:
            pass
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116


class ProcessLock(MultiSlotLock):
    def __init__(self, processes=1, logger=None, **kwargs):
        super(ProcessLock, self).__init__(name='ProcessLock', allowed_threads=processes, logger=logger, **kwargs)

    def __enter__(self):
        super(ProcessLock, self).__enter__()

    def __exit__(self, exc_type=None, exc_value=None, traceback=None):
        super(ProcessLock, self).__exit__(exc_type=exc_type, exc_value=exc_value, traceback=traceback)


def acquire_process_lock(processes=None, logger=None):
    if not logger:
        logger = logging.getLogger('ProcessLock')
        logger.setLevel('INFO')

    def decorator(func):

        @functools.wraps(func)  # needed to avoid pickling errors
        def wrapped_func(*args, **kwargs):
            with ProcessLock(allowed_threads=processes, logger=logger):
                result = func(*args, **kwargs)

            return result

        return wrapped_func

    return decorator


def release_unclosed_locks(logger=None):
    if redis_conn:
        logger = logger or GMS_logger('LockReseter')

        locks2release = [i.split(b'lock:')[1].decode('latin') for i in redis_conn.keys()
                         if i.decode('latin').startswith('lock:GMS_%s__' % CFG.ID)]
        if locks2release:
            logger.info("Releasing unclosed locks of job %s." % CFG.ID)

        for lockN in locks2release:
            lock = Lock(redis_conn, lockN)
            try:
                lock.release()
            except NotAcquired:
                lock.reset()