locks.py 7.11 KB
Newer Older
1
2
3
4
# -*- coding: utf-8 -*-
__author__ = 'Daniel Scheffler'

import time
5
from redis_lock import StrictRedis, Lock, NotAcquired, AlreadyAcquired
Daniel Scheffler's avatar
Daniel Scheffler committed
6
from redis.exceptions import ConnectionError as RedisConnectionError
7
import logging
8
import functools
9
import re
10
import random
11
from psutil import virtual_memory
12
13
14
15

from ..misc.logging import GMS_logger
from ..options.config import GMS_config as CFG

16
17

try:
18
    redis_conn = StrictRedis(host='localhost', db=0)
19
    redis_conn.keys()  # may raise ConnectionError, e.g., if redis server is not installed or not running
Daniel Scheffler's avatar
Daniel Scheffler committed
20
except RedisConnectionError:
21
22
23
    redis_conn = None


24
class MultiSlotLock(Lock):
25
    def __init__(self, name, allowed_slots=1, logger=None, **kwargs):
26
        self.conn = redis_conn
27
        self.name = name
28
        self.allowed_slots = allowed_slots or 0
29
30
        self.logger = logger or GMS_logger("RedisLock: '%s'" % self.name)
        self.kwargs = kwargs
31

32
        self.allowed_slot_names = ['%s, slot #%s' % (self.name, i) for i in range(1, self.allowed_slots + 1)]
33
        self.final_name = ''
34
        self._acquired = None
35

36
        if allowed_slots and redis_conn:
37
38
            logged = False
            while True:
39
                time.sleep(random.uniform(0, 1.5))  # avoids race conditions in case multiple tasks are waiting
40
                name_free_slot = self.get_free_slot_name()
41

42
                if not name_free_slot:
43
                    if not logged:
44
                        self.logger.info("Waiting for free '%s' lock." % self.name)
45
46
47
                        logged = True
                else:
                    break
48

49
50
            self.final_name = 'GMS_%s__' % CFG.ID + name_free_slot
            super().__init__(self.conn, self.final_name, **kwargs)
51
52
53
        else:
            pass

54
55
    @property
    def existing_locks(self):
56
57
58
        names = [i.decode('utf8').split('lock:')[1] for i in self.conn.keys() if i.decode('utf8').startswith('lock:')]

        # split 'GMS_<jobid>' and return
59
        return list(set([re.search('GMS_[0-9]*__(.*)', n, re.I).group(1) for n in names if n.startswith('GMS_')]))
60
61

    def get_free_slot_name(self):
62
        free_slots = [sn for sn in self.allowed_slot_names if sn not in self.existing_locks]
63
64
65
        if free_slots:
            return free_slots[0]

66
    def acquire(self, blocking=True, timeout=None):
67
        if self.allowed_slots and self.conn:
68
69
70
71
72
            if self._acquired:
                raise AlreadyAcquired("Already acquired from this Lock instance.")

            while not self._acquired:
                try:
73
                    # print('Trying to acquire %s.' % self.final_name.split('GMS_%s__' % CFG.ID)[1])
74
                    self._acquired = super(MultiSlotLock, self).acquire(blocking=blocking, timeout=timeout)
75
                    # print("Acquired lock '%s'." % self.final_name.split('GMS_%s__' % CFG.ID)[1])
76
77
78
79
                except AlreadyAcquired:
                    # this happens in case the lock has already been acquired by another instance of MultiSlotLock due
                    # to a race condition (time gap between finding the free slot and the call of self.acquire())
                    # -> in that case: re-initialize to get a new free slot
80
                    self.__init__(self.name, allowed_slots=self.allowed_slots, logger=self.logger,
81
82
83
                                  **self.kwargs)

                if self._acquired is False:  # and not None
84
                    self.__init__(self.name, allowed_slots=self.allowed_slots, logger=self.logger,
85
86
                                  **self.kwargs)

87
88
                # print(self.final_name.split('GMS_%s__' % CFG.ID)[1], self._acquired)

89
90
            if self._acquired:
                self.logger.info("Acquired lock '%s'." % self.final_name.split('GMS_%s__' % CFG.ID)[1])
91
        else:
92
            self._acquired = True
93

94
        return self._acquired
95

96
    def release(self):
97
        if self.allowed_slots and self.conn:
98
99
            super(MultiSlotLock, self).release()
            self.logger.info("Released lock '%s'." % self.final_name.split('GMS_%s__' % CFG.ID)[1])
100
101


102
class IOLock(MultiSlotLock):
103
104
    def __init__(self, allowed_slots=1, logger=None, **kwargs):
        super(IOLock, self).__init__(name='IOLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
105
106


107
class ProcessLock(MultiSlotLock):
108
109
    def __init__(self, allowed_slots=1, logger=None, **kwargs):
        super(ProcessLock, self).__init__(name='ProcessLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
110
111


112
113
114
class MEMLock(MultiSlotLock):
    def __init__(self, allowed_slots=1, usage_threshold=80, logger=None, **kwargs):
        """
115

116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
        :param allowed_slots:
        :param usage_th:        Memory usage threshold above which the memory lock is active (percent).
        :param logger:
        :param kwargs:
        """
        self.usage_th = usage_threshold
        self.logger = logger or GMS_logger("RedisLock: '%s'" % self.name)
        self.mem_usage = virtual_memory()['percent']

        self._acquired = None

        if self.mem_usage >= usage_threshold:
            self.logger.info('Memory usage is %.1f percent.' % self.mem_usage)
            super(MEMLock, self).__init__(name='MEMLock', allowed_slots=allowed_slots, logger=self.logger, **kwargs)
        else:
            self.logger.info('Memory usage is %.1f percent. Memory lock not needed -> skipped.' % self.mem_usage)

    def acquire(self, blocking=True, timeout=None):
        if self.mem_usage > self.usage_th:
            self._acquired = super(MEMLock, self).acquire(blocking=blocking, timeout=timeout)
        else:
            self._acquired = True

        return self._acquired

    def release(self):
        if self._acquired:
            super(MEMLock, self).release()


def acquire_process_lock(**processlock_kwargs):
    """Decorator function for ProcessLock.

    :param processlock_kwargs:  Keywourd arguments to be passed to ProcessLock class.
    """
    def decorator(func):

        @functools.wraps(func)  # needed to avoid pickling errors
        def wrapped_func(*args, **kwargs):
            with ProcessLock(**processlock_kwargs):
                result = func(*args, **kwargs)

            return result

        return wrapped_func

    return decorator


def acquire_mem_lock(**memlock_kwargs):
    """Decorator function for MEMLock.

    :param memlock_kwargs:  Keywourd arguments to be passed to MEMLock class.
    """
170
171
172
173
    def decorator(func):

        @functools.wraps(func)  # needed to avoid pickling errors
        def wrapped_func(*args, **kwargs):
174
            with MEMLock(**memlock_kwargs):
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
                result = func(*args, **kwargs)

            return result

        return wrapped_func

    return decorator


def release_unclosed_locks(logger=None):
    if redis_conn:
        logger = logger or GMS_logger('LockReseter')

        locks2release = [i.split(b'lock:')[1].decode('latin') for i in redis_conn.keys()
                         if i.decode('latin').startswith('lock:GMS_%s__' % CFG.ID)]
        if locks2release:
            logger.info("Releasing unclosed locks of job %s." % CFG.ID)

        for lockN in locks2release:
            lock = Lock(redis_conn, lockN)
            try:
                lock.release()
            except NotAcquired:
                lock.reset()