locks.py 9.83 KB
Newer Older
1
2
3
4
# -*- coding: utf-8 -*-
__author__ = 'Daniel Scheffler'

import time
5
from redis import StrictRedis
6
from redis_semaphore import Semaphore
Daniel Scheffler's avatar
Daniel Scheffler committed
7
from redis.exceptions import ConnectionError as RedisConnectionError
8
import functools
9
from psutil import virtual_memory
Daniel Scheffler's avatar
Daniel Scheffler committed
10
import timeout_decorator
11
12
13
14

from ..misc.logging import GMS_logger
from ..options.config import GMS_config as CFG

15
try:
16
    redis_conn = StrictRedis(host='localhost', db=0)
17
    redis_conn.keys()  # may raise ConnectionError, e.g., if redis server is not installed or not running
Daniel Scheffler's avatar
Daniel Scheffler committed
18
except RedisConnectionError:
19
20
21
    redis_conn = None


22
23
24
class MultiSlotLock(Semaphore):
    def __init__(self, name='MultiSlotLock', allowed_slots=1, logger=None, **kwargs):
        self.disabled = redis_conn is None
25
        self.namespace = name
26
27
28
29
        self.allowed_slots = allowed_slots
        self.logger = logger or GMS_logger("RedisLock: '%s'" % name)

        if not self.disabled:
30
            super(MultiSlotLock, self).__init__(client=redis_conn, count=allowed_slots, namespace=name, **kwargs)
31
32
33
34

    def acquire(self, timeout=0, target=None):
        if not self.disabled:
            if self.available_count == 0:
35
                self.logger.info("Waiting for free lock '%s'." % self.namespace)
36

37
            token = super(MultiSlotLock, self).acquire(timeout=timeout, target=target)
38

39
            self.logger.info("Acquired lock '%s'" % self.namespace +
40
41
42
43
44
45
                             ('.' if self.allowed_slots == 1 else ', slot #%s.' % (int(token) + 1)))

            return token

    def release(self):
        if not self.disabled:
46
            token = super(MultiSlotLock, self).release()
47
            if token:
48
                self.logger.info("Released lock '%s'" % self.namespace +
49
50
51
52
                                 ('.' if self.allowed_slots == 1 else ', slot #%s.' % (int(token) + 1)))

    def delete(self):
        if not self.disabled:
53
54
55
            self.client.delete(self.check_exists_key)
            self.client.delete(self.available_key)
            self.client.delete(self.grabbed_key)
56
57


58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class SharedResourceLock(MultiSlotLock):
    def acquire(self, timeout=0, target=None):
        if not self.disabled:
            token = super(SharedResourceLock, self).acquire(timeout=timeout, target=target)
            self.client.hset(self.grabbed_key_jobID, token, self.current_time)

    def release_all_jobID_tokens(self):
        if not self.disabled:
            for token in self.client.hkeys(self.grabbed_key_jobID):
                self.signal(token)

            self.client.delete(self.grabbed_key_jobID)

    @property
    def grabbed_key_jobID(self):
        return self._get_and_set_key('_grabbed_key_jobID', 'GRABBED_BY_GMSJOB_%s' % CFG.ID)

    def signal(self, token):
        if token is None:
            return None
        with self.client.pipeline() as pipe:
            pipe.multi()
            pipe.hdel(self.grabbed_key, token)
            pipe.hdel(self.grabbed_key_jobID, token)  # only difference to Semaphore.signal()
            pipe.lpush(self.available_key, token)
            pipe.execute()
            return token

    def delete(self):
        if not self.disabled:
            super(SharedResourceLock, self).delete()
            self.client.delete(self.grabbed_key_jobID)


class IOLock(SharedResourceLock):
93
94
    def __init__(self, allowed_slots=1, logger=None, **kwargs):
        super(IOLock, self).__init__(name='IOLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
95
96


97
class ProcessLock(SharedResourceLock):
98
99
    def __init__(self, allowed_slots=1, logger=None, **kwargs):
        super(ProcessLock, self).__init__(name='ProcessLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
100
101


102
class MemoryLock(SharedResourceLock):
Daniel Scheffler's avatar
Daniel Scheffler committed
103
    def __init__(self, allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=None, **kwargs):
104
        """
105

106
107
108
109
110
111
        :param allowed_slots:
        :param usage_th:        Memory usage threshold above which the memory lock is active (percent).
        :param logger:
        :param kwargs:
        """
        self.usage_th = usage_threshold
Daniel Scheffler's avatar
Daniel Scheffler committed
112
        self.blocking_th = blocking_threshold
113
        self.logger = logger or GMS_logger("RedisLock: '%s'" % self.namespace)
Daniel Scheffler's avatar
Daniel Scheffler committed
114
        self.mem_usage = virtual_memory().percent
115
116
117

        self._acquired = None

Daniel Scheffler's avatar
Daniel Scheffler committed
118
119
        self.check_system_overload()

120
        if self.mem_usage >= usage_threshold:
Daniel Scheffler's avatar
Daniel Scheffler committed
121
122
            self.logger.info('Memory usage is %.1f percent. Acquiring memory lock..' % self.mem_usage)
            self.disabled = False
123
        else:
Daniel Scheffler's avatar
Daniel Scheffler committed
124
125
126
127
128
129
            self.logger.debug('Memory usage is %.1f percent. Memory lock not needed -> skipped.' % self.mem_usage)
            self.disabled = True

        super(MemoryLock, self).__init__(name='MemoryLock', allowed_slots=allowed_slots, logger=self.logger,
                                         disabled=self.disabled, **kwargs)

130
    @timeout_decorator.timeout(seconds=15 * 60, timeout_exception=TimeoutError,
Daniel Scheffler's avatar
Daniel Scheffler committed
131
132
133
134
135
136
137
                               exception_message='Memory lock could not be acquired after waiting 15 minutes '
                                                 'due to memory overload.')
    def check_system_overload(self):
        logged = False
        while self.mem_usage >= self.blocking_th:
            if not logged:
                self.logger.info('Memory usage is %.1f percent. Waiting until memory usage is below %s percent.'
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
138
                                 % (self.mem_usage, self.blocking_th))
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
139
                logged = True
Daniel Scheffler's avatar
Daniel Scheffler committed
140
141
            time.sleep(5)
            self.mem_usage = virtual_memory().percent
142
143
144

    def acquire(self, blocking=True, timeout=None):
        if self.mem_usage > self.usage_th:
145
146
            # self._acquired = super(MemoryLock, self).acquire(blocking=blocking, timeout=timeout)
            self._acquired = super(MemoryLock, self).acquire(timeout=timeout)
147
148
149
150
151
152
        else:
            self._acquired = True

        return self._acquired

    def release(self):
Daniel Scheffler's avatar
Daniel Scheffler committed
153
154
        if self._acquired and not self.disabled:
            super(MemoryLock, self).release()
155
156


157
class MemoryLock(SharedResourceLock):
158
159
160
161
162
163
164
165
166
167
168
    def __init__(self, name='MemoryLock', allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=None,
                 **kwargs):
        """

        :param allowed_slots:
        :param usage_th:        Memory usage threshold above which the memory lock is active (percent).
        :param logger:
        :param kwargs:
        """
        self.usage_th = usage_threshold
        self.blocking_th = blocking_threshold
169
        self.logger = logger or GMS_logger("RedisLock: '%s'" % self.namespace)
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
        self.mem_usage = virtual_memory().percent

        self._acquired = None

        self.check_system_overload()

        if self.mem_usage >= usage_threshold:
            self.logger.info('Memory usage is %.1f percent. Acquiring memory lock..' % self.mem_usage)
            self.disabled = False
        else:
            self.logger.debug('Memory usage is %.1f percent. Memory lock not needed -> skipped.' % self.mem_usage)
            self.disabled = True

        super(MemoryLock, self).__init__(name='MemoryLock', allowed_slots=allowed_slots, logger=self.logger,
                                         disabled=self.disabled, **kwargs)

186
    @timeout_decorator.timeout(seconds=15 * 60, timeout_exception=TimeoutError,
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
                               exception_message='Memory lock could not be acquired after waiting 15 minutes '
                                                 'due to memory overload.')
    def check_system_overload(self):
        logged = False
        while self.mem_usage >= self.blocking_th:
            if not logged:
                self.logger.info('Memory usage is %.1f percent. Waiting until memory usage is below %s percent.'
                                 % (self.mem_usage, self.blocking_th))
                logged = True
            time.sleep(5)
            self.mem_usage = virtual_memory().percent

    def acquire(self, blocking=True, timeout=None):
        if self.mem_usage > self.usage_th:
            # self._acquired = super(MemoryLock, self).acquire(blocking=blocking, timeout=timeout)
            self._acquired = super(MemoryLock, self).acquire(timeout=timeout)
        else:
            self._acquired = True

        return self._acquired

    def release(self):
        if self._acquired and not self.disabled:
            super(MemoryLock, self).release()


213
214
215
216
217
218
def acquire_process_lock(**processlock_kwargs):
    """Decorator function for ProcessLock.

    :param processlock_kwargs:  Keywourd arguments to be passed to ProcessLock class.
    """

219
    def decorator(func):
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
        @functools.wraps(func)  # needed to avoid pickling errors
        def wrapped_func(*args, **kwargs):
            with ProcessLock(**processlock_kwargs):
                result = func(*args, **kwargs)

            return result

        return wrapped_func

    return decorator


def acquire_mem_lock(**memlock_kwargs):
    """Decorator function for MEMLock.

Daniel Scheffler's avatar
Daniel Scheffler committed
235
    :param memlock_kwargs:  Keywourd arguments to be passed to MemoryLock class.
236
    """
237

238
    def decorator(func):
239
240
        @functools.wraps(func)  # needed to avoid pickling errors
        def wrapped_func(*args, **kwargs):
Daniel Scheffler's avatar
Daniel Scheffler committed
241
            with MemoryLock(**memlock_kwargs):
242
243
244
245
246
247
248
249
250
                result = func(*args, **kwargs)

            return result

        return wrapped_func

    return decorator


251
def release_unclosed_locks():
252
    if redis_conn:
253
254
255
256
257
258
259
        for L in [IOLock, ProcessLock]:
            lock = L(allowed_slots=1)
            lock.release_all_jobID_tokens()

            # delete the complete redis namespace if no lock slot is acquired anymore
            if lock.client.hlen(lock.grabbed_key) == 0:
                lock.delete()
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275


class MemoryReserver(object):
    def __init__(self, reserved_mem=10):
        """

        :param reserved_mem:    Amount of memory to be reserved during the lock is acquired (gigabytes).
        """

    def __enter__(self):
        # while

        return self

    def __exit__(self, exc_type=None, exc_value=None, traceback=None):
        """"""