locks.py 11.3 KB
Newer Older
1
2
3
4
# -*- coding: utf-8 -*-
__author__ = 'Daniel Scheffler'

import time
5
from redis import StrictRedis
6
from redis_semaphore import Semaphore
7
from redis_lock import Lock
Daniel Scheffler's avatar
Daniel Scheffler committed
8
from redis.exceptions import ConnectionError as RedisConnectionError
9
import functools
10
from psutil import virtual_memory
Daniel Scheffler's avatar
Daniel Scheffler committed
11
import timeout_decorator
12
13
14
15

from ..misc.logging import GMS_logger
from ..options.config import GMS_config as CFG

16
try:
17
    redis_conn = StrictRedis(host='localhost', db=0)
18
    redis_conn.keys()  # may raise ConnectionError, e.g., if redis server is not installed or not running
Daniel Scheffler's avatar
Daniel Scheffler committed
19
except RedisConnectionError:
20
21
22
    redis_conn = None


23
24
25
class MultiSlotLock(Semaphore):
    def __init__(self, name='MultiSlotLock', allowed_slots=1, logger=None, **kwargs):
        self.disabled = redis_conn is None
26
        self.namespace = name
27
28
29
30
        self.allowed_slots = allowed_slots
        self.logger = logger or GMS_logger("RedisLock: '%s'" % name)

        if not self.disabled:
31
            super(MultiSlotLock, self).__init__(client=redis_conn, count=allowed_slots, namespace=name, **kwargs)
32
33
34
35

    def acquire(self, timeout=0, target=None):
        if not self.disabled:
            if self.available_count == 0:
36
                self.logger.info("Waiting for free lock '%s'." % self.namespace)
37

38
            token = super(MultiSlotLock, self).acquire(timeout=timeout, target=target)
39

40
            self.logger.info("Acquired lock '%s'" % self.namespace +
41
42
43
44
45
46
                             ('.' if self.allowed_slots == 1 else ', slot #%s.' % (int(token) + 1)))

            return token

    def release(self):
        if not self.disabled:
47
            token = super(MultiSlotLock, self).release()
48
            if token:
49
                self.logger.info("Released lock '%s'" % self.namespace +
50
51
52
53
                                 ('.' if self.allowed_slots == 1 else ', slot #%s.' % (int(token) + 1)))

    def delete(self):
        if not self.disabled:
54
55
56
            self.client.delete(self.check_exists_key)
            self.client.delete(self.available_key)
            self.client.delete(self.grabbed_key)
57
58


59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class SharedResourceLock(MultiSlotLock):
    def acquire(self, timeout=0, target=None):
        if not self.disabled:
            token = super(SharedResourceLock, self).acquire(timeout=timeout, target=target)
            self.client.hset(self.grabbed_key_jobID, token, self.current_time)

    def release_all_jobID_tokens(self):
        if not self.disabled:
            for token in self.client.hkeys(self.grabbed_key_jobID):
                self.signal(token)

            self.client.delete(self.grabbed_key_jobID)

    @property
    def grabbed_key_jobID(self):
        return self._get_and_set_key('_grabbed_key_jobID', 'GRABBED_BY_GMSJOB_%s' % CFG.ID)

    def signal(self, token):
        if token is None:
            return None
        with self.client.pipeline() as pipe:
            pipe.multi()
            pipe.hdel(self.grabbed_key, token)
            pipe.hdel(self.grabbed_key_jobID, token)  # only difference to Semaphore.signal()
            pipe.lpush(self.available_key, token)
            pipe.execute()
            return token

    def delete(self):
        if not self.disabled:
            super(SharedResourceLock, self).delete()
            self.client.delete(self.grabbed_key_jobID)


class IOLock(SharedResourceLock):
94
95
    def __init__(self, allowed_slots=1, logger=None, **kwargs):
        super(IOLock, self).__init__(name='IOLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
96
97


98
class ProcessLock(SharedResourceLock):
99
100
    def __init__(self, allowed_slots=1, logger=None, **kwargs):
        super(ProcessLock, self).__init__(name='ProcessLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
101
102


103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# class MemoryLock(SharedResourceLock):
#     def __init__(self, allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=None, **kwargs):
#         """
#
#         :param allowed_slots:
#         :param usage_th:        Memory usage threshold above which the memory lock is active (percent).
#         :param logger:
#         :param kwargs:
#         """
#         self.usage_th = usage_threshold
#         self.blocking_th = blocking_threshold
#         self.logger = logger or GMS_logger("RedisLock: '%s'" % self.namespace)
#         self.mem_usage = virtual_memory().percent
#
#         self._acquired = None
#
#         self.check_system_overload()
#
#         if self.mem_usage >= usage_threshold:
#             self.logger.info('Memory usage is %.1f percent. Acquiring memory lock..' % self.mem_usage)
#             self.disabled = False
#         else:
#             self.logger.debug('Memory usage is %.1f percent. Memory lock not needed -> skipped.' % self.mem_usage)
#             self.disabled = True
#
#         super(MemoryLock, self).__init__(name='MemoryLock', allowed_slots=allowed_slots, logger=self.logger,
#                                          disabled=self.disabled, **kwargs)
#
#     @timeout_decorator.timeout(seconds=15 * 60, timeout_exception=TimeoutError,
#                                exception_message='Memory lock could not be acquired after waiting 15 minutes '
#                                                  'due to memory overload.')
#     def check_system_overload(self):
#         logged = False
#         while self.mem_usage >= self.blocking_th:
#             if not logged:
#                 self.logger.info('Memory usage is %.1f percent. Waiting until memory usage is below %s percent.'
#                                  % (self.mem_usage, self.blocking_th))
#                 logged = True
#             time.sleep(5)
#             self.mem_usage = virtual_memory().percent
#
#     def acquire(self, blocking=True, timeout=None):
#         if self.mem_usage > self.usage_th:
#             # self._acquired = super(MemoryLock, self).acquire(blocking=blocking, timeout=timeout)
#             self._acquired = super(MemoryLock, self).acquire(timeout=timeout)
#         else:
#             self._acquired = True
#
#         return self._acquired
#
#     def release(self):
#         if self._acquired and not self.disabled:
#             super(MemoryLock, self).release()


158
class MemoryLock(SharedResourceLock):
159
    def __init__(self, mem2lock_gb, usage_threshold=80, blocking_threshold=90, logger=None, **kwargs):
160
        """
161

162
163
164
165
        :param usage_th:        Memory usage threshold above which the memory lock is active (percent).
        :param logger:
        :param kwargs:
        """
166
167
168
169
        super(MemoryLock, self).__init__(name='MemoryLock', allowed_slots=self.usable_memory_gb, logger=logger,
                                         **kwargs)

        self.mem2lock_gb = mem2lock_gb
170
        self.usage_th = usage_threshold
Daniel Scheffler's avatar
Daniel Scheffler committed
171
172
        self.blocking_th = blocking_threshold
        self.mem_usage = virtual_memory().percent
173

174
    @timeout_decorator.timeout(seconds=15 * 60, timeout_exception=TimeoutError,
Daniel Scheffler's avatar
Daniel Scheffler committed
175
176
177
178
179
180
181
                               exception_message='Memory lock could not be acquired after waiting 15 minutes '
                                                 'due to memory overload.')
    def check_system_overload(self):
        logged = False
        while self.mem_usage >= self.blocking_th:
            if not logged:
                self.logger.info('Memory usage is %.1f percent. Waiting until memory usage is below %s percent.'
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
182
                                 % (self.mem_usage, self.blocking_th))
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
183
                logged = True
Daniel Scheffler's avatar
Daniel Scheffler committed
184
185
            time.sleep(5)
            self.mem_usage = virtual_memory().percent
186

187
188
189
190
191
192
    @property
    def mem_reserved_gb(self):
        val = redis_conn.get('GMS_mem_reserved')
        if val is not None:
            return val
        return 0
193

194
195
196
197
    @property
    def usable_memory_gb(self):
        return int((virtual_memory().total * self.usage_th / 100 - virtual_memory().used) / 1024**3) \
               - int(self.mem_reserved_gb)
198

199
200
201
202
    def ensure_enough_memory(self):
        if redis_conn:
            usable_memory_gb = int((virtual_memory().total * self.usage_th / 100 - virtual_memory().used) / 1024 ** 3) \
                               - int(self.mem_reserved_gb)
203

204
205
206
            with Lock(self.client, 'GMS_mem_checker'):
                while self.mem2lock_gb < usable_memory_gb:
                    time.sleep(1)
207

208
                self.client.incr('GMS_mem_reserved', self.mem2lock_gb)
209

210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
    def acquire(self, blocking=True, timeout=None):
        # TODO vielleicht auch als float redis value per increment decrement + lock mit 1 slot
        if not self.disabled:
            with Lock(self.client, 'GMS_mem_checker'):
                self.check_system_overload()
                self.ensure_enough_memory()

                if self.usage_th < self.mem_usage < self.blocking_th:
                    self.acquire(blocking=blocking, timeout=timeout)

            # # TODO add an acquire lock here
            # with MultiSlotLock(name='GMS_mem_acquire_key', allowed_slots=1):
            #     if self.usable_memory_gb >= self.mem2lock_gb:
            #         for i in range(self.mem2lock_gb):
            #             super(MemoryLock, self).acquire(timeout=timeout)
            #         self.client.incr('GMS_mem_reserved', self.mem2lock_gb)
            #
            #     else:
            #         time.sleep(1)
            #         self.acquire(blocking=blocking, timeout=timeout)
230

231
232
233
234
235
    def release(self):
        if not self.disabled:
            for token in self._local_tokens:
                self.signal(token)
            self.client.decr('GMS_mem_reserved', self.mem2lock_gb)
236
237


238
239
240
241
242
243
244
245
# def ensure_enough_memory(needed_mem_gb, usage_threshold=80):
#     if redis_conn:
#         usable_memory_gb = int((virtual_memory().total * usage_threshold / 100 - virtual_memory().used) / 1024**3) \
#                            - int(needed_mem_gb)
#
#         with Lock(redis_conn, 'GMS_mem_checker'):
#             if usable_memory_gb >= needed_mem_gb:
#                 redis_conn.
246
247
248



249
250
251
252
253
254
def acquire_process_lock(**processlock_kwargs):
    """Decorator function for ProcessLock.

    :param processlock_kwargs:  Keywourd arguments to be passed to ProcessLock class.
    """

255
    def decorator(func):
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
        @functools.wraps(func)  # needed to avoid pickling errors
        def wrapped_func(*args, **kwargs):
            with ProcessLock(**processlock_kwargs):
                result = func(*args, **kwargs)

            return result

        return wrapped_func

    return decorator


def acquire_mem_lock(**memlock_kwargs):
    """Decorator function for MEMLock.

Daniel Scheffler's avatar
Daniel Scheffler committed
271
    :param memlock_kwargs:  Keywourd arguments to be passed to MemoryLock class.
272
    """
273

274
    def decorator(func):
275
276
        @functools.wraps(func)  # needed to avoid pickling errors
        def wrapped_func(*args, **kwargs):
Daniel Scheffler's avatar
Daniel Scheffler committed
277
            with MemoryLock(**memlock_kwargs):
278
279
280
281
282
283
284
285
286
                result = func(*args, **kwargs)

            return result

        return wrapped_func

    return decorator


287
def release_unclosed_locks():
288
    if redis_conn:
289
290
291
292
293
294
295
        for L in [IOLock, ProcessLock]:
            lock = L(allowed_slots=1)
            lock.release_all_jobID_tokens()

            # delete the complete redis namespace if no lock slot is acquired anymore
            if lock.client.hlen(lock.grabbed_key) == 0:
                lock.delete()
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311


class MemoryReserver(object):
    def __init__(self, reserved_mem=10):
        """

        :param reserved_mem:    Amount of memory to be reserved during the lock is acquired (gigabytes).
        """

    def __enter__(self):
        # while

        return self

    def __exit__(self, exc_type=None, exc_value=None, traceback=None):
        """"""