locks.py 10.8 KB
Newer Older
1
2
3
4
# -*- coding: utf-8 -*-
__author__ = 'Daniel Scheffler'

import time
5
from redis import StrictRedis
6
from redis_semaphore import Semaphore
Daniel Scheffler's avatar
Daniel Scheffler committed
7
from redis.exceptions import ConnectionError as RedisConnectionError
8
from retools.lock import Lock, LockTimeout
9
import functools
10
from psutil import virtual_memory
11
12
13
14

from ..misc.logging import GMS_logger
from ..options.config import GMS_config as CFG

15
try:
16
    redis_conn = StrictRedis(host='localhost', db=0)
17
    redis_conn.keys()  # may raise ConnectionError, e.g., if redis server is not installed or not running
Daniel Scheffler's avatar
Daniel Scheffler committed
18
except RedisConnectionError:
19
20
21
    redis_conn = None


22
23
class MultiSlotLock(Semaphore):
    def __init__(self, name='MultiSlotLock', allowed_slots=1, logger=None, **kwargs):
24
        self.disabled = redis_conn is None or allowed_slots in [None, False]
25
        self.namespace = name
26
27
28
29
        self.allowed_slots = allowed_slots
        self.logger = logger or GMS_logger("RedisLock: '%s'" % name)

        if not self.disabled:
30
            super(MultiSlotLock, self).__init__(client=redis_conn, count=allowed_slots, namespace=name, **kwargs)
31
32
33
34

    def acquire(self, timeout=0, target=None):
        if not self.disabled:
            if self.available_count == 0:
35
                self.logger.info("Waiting for free lock '%s'." % self.namespace)
36

37
            token = super(MultiSlotLock, self).acquire(timeout=timeout, target=target)
38

39
            self.logger.info("Acquired lock '%s'" % self.namespace +
40
41
42
43
44
45
                             ('.' if self.allowed_slots == 1 else ', slot #%s.' % (int(token) + 1)))

            return token

    def release(self):
        if not self.disabled:
46
            token = super(MultiSlotLock, self).release()
47
            if token:
48
                self.logger.info("Released lock '%s'" % self.namespace +
49
50
51
52
                                 ('.' if self.allowed_slots == 1 else ', slot #%s.' % (int(token) + 1)))

    def delete(self):
        if not self.disabled:
53
54
55
            self.client.delete(self.check_exists_key)
            self.client.delete(self.available_key)
            self.client.delete(self.grabbed_key)
56
57


58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class SharedResourceLock(MultiSlotLock):
    def acquire(self, timeout=0, target=None):
        if not self.disabled:
            token = super(SharedResourceLock, self).acquire(timeout=timeout, target=target)
            self.client.hset(self.grabbed_key_jobID, token, self.current_time)

    def release_all_jobID_tokens(self):
        if not self.disabled:
            for token in self.client.hkeys(self.grabbed_key_jobID):
                self.signal(token)

            self.client.delete(self.grabbed_key_jobID)

    @property
    def grabbed_key_jobID(self):
        return self._get_and_set_key('_grabbed_key_jobID', 'GRABBED_BY_GMSJOB_%s' % CFG.ID)

    def signal(self, token):
        if token is None:
            return None
        with self.client.pipeline() as pipe:
            pipe.multi()
            pipe.hdel(self.grabbed_key, token)
            pipe.hdel(self.grabbed_key_jobID, token)  # only difference to Semaphore.signal()
            pipe.lpush(self.available_key, token)
            pipe.execute()
            return token

    def delete(self):
        if not self.disabled:
            super(SharedResourceLock, self).delete()
            self.client.delete(self.grabbed_key_jobID)


class IOLock(SharedResourceLock):
93
    def __init__(self, allowed_slots=1, logger=None, **kwargs):
94
95
96
97
        self.disabled = CFG.disable_IO_locks

        if not self.disabled:
            super(IOLock, self).__init__(name='IOLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
98
99


100
class ProcessLock(SharedResourceLock):
101
    def __init__(self, allowed_slots=1, logger=None, **kwargs):
102
103
104
105
        self.disabled = CFG.disable_CPU_locks

        if not self.disabled:
            super(ProcessLock, self).__init__(name='ProcessLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
106
107


108
class MemoryReserver(object):
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
    def __init__(self, mem2lock_gb, max_usage=90, logger=None):
        """

        :param mem2lock_gb:    Amount of memory to be reserved during the lock is acquired (gigabytes).
        """
        self.disabled = redis_conn is None or CFG.disable_memory_locks or mem2lock_gb in [None, False]
        self.mem2lock_gb = mem2lock_gb
        self.max_usage = max_usage
        self.namespace = 'MemoryReserver'
        self.client = redis_conn

        self._waiting = False

        self.mem_limit = int(virtual_memory().total * max_usage / 100 / 1024 ** 3)

        if not self.disabled:
            self.logger = logger or GMS_logger("RedisLock: 'MemoryReserver'")

    @property
    def mem_reserved_gb(self):
        return int(self.client.get(self.reserved_key) or 0)

    @property
    def usable_memory_gb(self):
        return int((virtual_memory().total * self.max_usage / 100 - virtual_memory().used) / 1024 ** 3) \
               - int(self.mem_reserved_gb)

    @property
    def acquisition_key(self):
        return "%s:ACQUISITION_LOCK" % self.namespace

    @property
    def reserved_key(self):
        return "%s:MEM_RESERVED" % self.namespace

    @property
    def reserved_key_jobID(self):
        return "%s:MEM_RESERVED_BY_GMSJOB_%s" % (self.namespace, CFG.ID)

148
149
150
151
152
153
154
155
156
157
158
159
160
161
    @property
    def waiting_key(self):
        return "%s:NUMBER_WAITING" % self.namespace

    @property
    def waiting_key_jobID(self):
        return "%s:NUMBER_WAITING_GMSJOB_%s" % (self.namespace, CFG.ID)

    @property
    def waiting(self):
        return self._waiting

    @waiting.setter
    def waiting(self, val):
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
162
163
164
165
        """Set self.waiting.

        NOTE: This setter does not use a lock. Redis access must be locked by calling function.
        """
166
        if val is not self._waiting:
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
167
168
169
170
171
172
            if val:
                self.client.incr(self.waiting_key, 1)
                self.client.incr(self.waiting_key_jobID, 1)
            else:
                self.client.decr(self.waiting_key, 1)
                self.client.decr(self.waiting_key_jobID, 1)
173
174
175

        self._waiting = val

176
177
178
179
180
    def acquire(self, timeout=20):
        if not self.disabled:
            try:
                with Lock(self.acquisition_key, expires=20, timeout=timeout, redis=self.client):
                    if self.usable_memory_gb >= self.mem2lock_gb:
181
182
                        t_start = time.time()
                        self.waiting = False
183

184
185
186
187
188
189
190
191
                        with self.client.pipeline() as pipe:
                            pipe.multi()
                            pipe.incr(self.reserved_key, self.mem2lock_gb)
                            pipe.incr(self.reserved_key_jobID, self.mem2lock_gb)
                            pipe.execute()

                        self.logger.info('Reserved %s GB of memory.' % self.mem2lock_gb)

192
193
194
195
196
                        # warn in case the lock has expired before incrementing reserved_key and reserved_key_jobID
                        if time.time() > t_start + timeout:
                            self.logger.warning('Reservation of memory took more time than expected. '
                                                'Possibly more memory than available has been reserved.')

197
                    else:
198
                        if not self.waiting:
199
200
                            self.logger.info('Currently usable memory: %s GB. Waiting until at least %s GB are '
                                             'usable.' % (self.usable_memory_gb, self.mem2lock_gb))
201
                        self.waiting = True
202

203
204
            except LockTimeout:
                self.acquire(timeout=timeout)
205

206
            if self.waiting:
207
208
                while self.usable_memory_gb < self.mem2lock_gb:
                    time.sleep(1)
209
210
211
212
213

                self.acquire(timeout=timeout)

    def release(self):
        if not self.disabled:
Daniel Scheffler's avatar
Daniel Scheffler committed
214
215
216
217
218
219
            with Lock(self.acquisition_key, expires=20, timeout=20, redis=self.client):
                with redis_conn.pipeline() as pipe:
                    pipe.multi()
                    pipe.decr(self.reserved_key, self.mem2lock_gb)
                    pipe.decr(self.reserved_key_jobID, self.mem2lock_gb)
                    pipe.execute()
220

Daniel Scheffler's avatar
Daniel Scheffler committed
221
                self.logger.info('Released %s GB of reserved memory.' % self.mem2lock_gb)
222
223
224

    def delete(self):
        if not self.disabled:
Daniel Scheffler's avatar
Daniel Scheffler committed
225
            with Lock(self.acquisition_key, expires=20, timeout=20, redis=self.client):
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
226
                # handle reserved_key and reserved_key_jobID
Daniel Scheffler's avatar
Daniel Scheffler committed
227
228
229
230
231
232
233
234
                mem_reserved_currJob = int(self.client.get(self.reserved_key_jobID) or 0)
                with redis_conn.pipeline() as pipe:
                    pipe.multi()
                    pipe.decr(self.reserved_key_jobID, mem_reserved_currJob)
                    pipe.decr(self.reserved_key, mem_reserved_currJob)
                    pipe.delete(self.reserved_key_jobID)
                    pipe.execute()

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
235
236
237
238
                if int(self.client.get(self.reserved_key) or 0) == 0:
                    self.client.delete(self.reserved_key)

                # handle waiting_key and waiting_key_jobID
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
239
240
241
242
                n_waiting_currJob = int(self.client.get(self.waiting_key_jobID) or 0)
                with redis_conn.pipeline() as pipe:
                    pipe.multi()
                    pipe.decr(self.waiting_key_jobID, n_waiting_currJob)
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
243
                    pipe.decr(self.waiting_key, n_waiting_currJob)
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
244
245
246
                    pipe.delete(self.waiting_key_jobID)
                    pipe.execute()

Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
247
248
                if int(self.client.get(self.waiting_key) or 0) == 0:
                    self.client.delete(self.waiting_key)
249
250
251
252
253
254
255
256
257
258

    def __enter__(self):
        self.acquire()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()
        return True if exc_type is None else False


259
260
261
def acquire_process_lock(**processlock_kwargs):
    """Decorator function for ProcessLock.

262
    :param processlock_kwargs:  Keyword arguments to be passed to ProcessLock class.
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
    """

    def decorator(func):
        @functools.wraps(func)  # needed to avoid pickling errors
        def wrapped_func(*args, **kwargs):
            with ProcessLock(**processlock_kwargs):
                result = func(*args, **kwargs)

            return result

        return wrapped_func

    return decorator


def reserve_mem(**memlock_kwargs):
    """Decorator function for MemoryReserver.

    :param memlock_kwargs:  Keyword arguments to be passed to MemoryReserver class.
    """

    def decorator(func):
        @functools.wraps(func)  # needed to avoid pickling errors
        def wrapped_func(*args, **kwargs):
            with MemoryReserver(**memlock_kwargs):
                result = func(*args, **kwargs)

            return result

        return wrapped_func

    return decorator


def release_unclosed_locks():
    if redis_conn:
        for L in [IOLock, ProcessLock]:
            lock = L(allowed_slots=1)
            lock.release_all_jobID_tokens()

            # delete the complete redis namespace if no lock slot is acquired anymore
            if lock.client.hlen(lock.grabbed_key) == 0:
                lock.delete()

307
        MemoryReserver(1).delete()