locks.py 8.13 KB
Newer Older
1
2
3
4
# -*- coding: utf-8 -*-
__author__ = 'Daniel Scheffler'

import time
5
from redis_lock import StrictRedis, Lock, NotAcquired, AlreadyAcquired
Daniel Scheffler's avatar
Daniel Scheffler committed
6
from redis.exceptions import ConnectionError as RedisConnectionError
7
import functools
8
import re
9
import random
10
from psutil import virtual_memory
Daniel Scheffler's avatar
Daniel Scheffler committed
11
import timeout_decorator
12
13
14
15

from ..misc.logging import GMS_logger
from ..options.config import GMS_config as CFG

16
17

try:
18
    redis_conn = StrictRedis(host='localhost', db=0)
19
    redis_conn.keys()  # may raise ConnectionError, e.g., if redis server is not installed or not running
Daniel Scheffler's avatar
Daniel Scheffler committed
20
except RedisConnectionError:
21
22
23
    redis_conn = None


24
class MultiSlotLock(Lock):
Daniel Scheffler's avatar
Daniel Scheffler committed
25
    def __init__(self, name, allowed_slots=1, logger=None, disabled=False, **kwargs):
26
        self.conn = redis_conn
27
        self.name = name
28
        self.allowed_slots = allowed_slots or 0
29
30
        self.logger = logger or GMS_logger("RedisLock: '%s'" % self.name)
        self.kwargs = kwargs
31

32
        self.allowed_slot_names = ['%s, slot #%s' % (self.name, i) for i in range(1, self.allowed_slots + 1)]
33
        self.final_name = ''
34
        self._acquired = None
35

Daniel Scheffler's avatar
Daniel Scheffler committed
36
        if not disabled and allowed_slots and redis_conn:
37
38
            logged = False
            while True:
39
                time.sleep(random.uniform(0, 1.5))  # avoids race conditions in case multiple tasks are waiting
40
                name_free_slot = self.get_free_slot_name()
41

42
                if not name_free_slot:
43
                    if not logged:
44
                        self.logger.info("Waiting for free '%s' lock." % self.name)
45
46
47
                        logged = True
                else:
                    break
48

49
50
            self.final_name = 'GMS_%s__' % CFG.ID + name_free_slot
            super().__init__(self.conn, self.final_name, **kwargs)
51
52
53
        else:
            pass

54
55
    @property
    def existing_locks(self):
56
57
58
        names = [i.decode('utf8').split('lock:')[1] for i in self.conn.keys() if i.decode('utf8').startswith('lock:')]

        # split 'GMS_<jobid>' and return
59
        return list(set([re.search('GMS_[0-9]*__(.*)', n, re.I).group(1) for n in names if n.startswith('GMS_')]))
60
61

    def get_free_slot_name(self):
62
        free_slots = [sn for sn in self.allowed_slot_names if sn not in self.existing_locks]
63
64
65
        if free_slots:
            return free_slots[0]

66
    def acquire(self, blocking=True, timeout=None):
67
        if self.allowed_slots and self.conn:
68
69
70
71
72
            if self._acquired:
                raise AlreadyAcquired("Already acquired from this Lock instance.")

            while not self._acquired:
                try:
73
                    # print('Trying to acquire %s.' % self.final_name.split('GMS_%s__' % CFG.ID)[1])
74
                    self._acquired = super(MultiSlotLock, self).acquire(blocking=blocking, timeout=timeout)
75
                    # print("Acquired lock '%s'." % self.final_name.split('GMS_%s__' % CFG.ID)[1])
76
77
78
79
                except AlreadyAcquired:
                    # this happens in case the lock has already been acquired by another instance of MultiSlotLock due
                    # to a race condition (time gap between finding the free slot and the call of self.acquire())
                    # -> in that case: re-initialize to get a new free slot
80
                    self.__init__(self.name, allowed_slots=self.allowed_slots, logger=self.logger,
81
82
83
                                  **self.kwargs)

                if self._acquired is False:  # and not None
84
                    self.__init__(self.name, allowed_slots=self.allowed_slots, logger=self.logger,
85
86
                                  **self.kwargs)

87
88
                # print(self.final_name.split('GMS_%s__' % CFG.ID)[1], self._acquired)

89
90
            if self._acquired:
                self.logger.info("Acquired lock '%s'." % self.final_name.split('GMS_%s__' % CFG.ID)[1])
91
        else:
92
            self._acquired = True
93

94
        return self._acquired
95

96
    def release(self):
97
        if self.allowed_slots and self.conn:
98
99
            super(MultiSlotLock, self).release()
            self.logger.info("Released lock '%s'." % self.final_name.split('GMS_%s__' % CFG.ID)[1])
100
101


102
class IOLock(MultiSlotLock):
103
104
    def __init__(self, allowed_slots=1, logger=None, **kwargs):
        super(IOLock, self).__init__(name='IOLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
105
106


107
class ProcessLock(MultiSlotLock):
108
109
    def __init__(self, allowed_slots=1, logger=None, **kwargs):
        super(ProcessLock, self).__init__(name='ProcessLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
110
111


Daniel Scheffler's avatar
Daniel Scheffler committed
112
113
class MemoryLock(MultiSlotLock):
    def __init__(self, allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=None, **kwargs):
114
        """
115

116
117
118
119
120
121
        :param allowed_slots:
        :param usage_th:        Memory usage threshold above which the memory lock is active (percent).
        :param logger:
        :param kwargs:
        """
        self.usage_th = usage_threshold
Daniel Scheffler's avatar
Daniel Scheffler committed
122
        self.blocking_th = blocking_threshold
123
        self.logger = logger or GMS_logger("RedisLock: '%s'" % self.name)
Daniel Scheffler's avatar
Daniel Scheffler committed
124
        self.mem_usage = virtual_memory().percent
125
126
127

        self._acquired = None

Daniel Scheffler's avatar
Daniel Scheffler committed
128
129
        self.check_system_overload()

130
        if self.mem_usage >= usage_threshold:
Daniel Scheffler's avatar
Daniel Scheffler committed
131
132
            self.logger.info('Memory usage is %.1f percent. Acquiring memory lock..' % self.mem_usage)
            self.disabled = False
133
        else:
Daniel Scheffler's avatar
Daniel Scheffler committed
134
135
136
137
138
139
140
141
142
143
144
145
146
147
            self.logger.debug('Memory usage is %.1f percent. Memory lock not needed -> skipped.' % self.mem_usage)
            self.disabled = True

        super(MemoryLock, self).__init__(name='MemoryLock', allowed_slots=allowed_slots, logger=self.logger,
                                         disabled=self.disabled, **kwargs)

    @timeout_decorator.timeout(seconds=15*60, timeout_exception=TimeoutError,
                               exception_message='Memory lock could not be acquired after waiting 15 minutes '
                                                 'due to memory overload.')
    def check_system_overload(self):
        logged = False
        while self.mem_usage >= self.blocking_th:
            if not logged:
                self.logger.info('Memory usage is %.1f percent. Waiting until memory usage is below %s percent.'
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
148
                                 % (self.mem_usage, self.blocking_th))
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
149
                logged = True
Daniel Scheffler's avatar
Daniel Scheffler committed
150
151
            time.sleep(5)
            self.mem_usage = virtual_memory().percent
152
153
154

    def acquire(self, blocking=True, timeout=None):
        if self.mem_usage > self.usage_th:
Daniel Scheffler's avatar
Daniel Scheffler committed
155
            self._acquired = super(MemoryLock, self).acquire(blocking=blocking, timeout=timeout)
156
157
158
159
160
161
        else:
            self._acquired = True

        return self._acquired

    def release(self):
Daniel Scheffler's avatar
Daniel Scheffler committed
162
163
        if self._acquired and not self.disabled:
            super(MemoryLock, self).release()
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187


def acquire_process_lock(**processlock_kwargs):
    """Decorator function for ProcessLock.

    :param processlock_kwargs:  Keywourd arguments to be passed to ProcessLock class.
    """
    def decorator(func):

        @functools.wraps(func)  # needed to avoid pickling errors
        def wrapped_func(*args, **kwargs):
            with ProcessLock(**processlock_kwargs):
                result = func(*args, **kwargs)

            return result

        return wrapped_func

    return decorator


def acquire_mem_lock(**memlock_kwargs):
    """Decorator function for MEMLock.

Daniel Scheffler's avatar
Daniel Scheffler committed
188
    :param memlock_kwargs:  Keywourd arguments to be passed to MemoryLock class.
189
    """
190
191
192
193
    def decorator(func):

        @functools.wraps(func)  # needed to avoid pickling errors
        def wrapped_func(*args, **kwargs):
Daniel Scheffler's avatar
Daniel Scheffler committed
194
            with MemoryLock(**memlock_kwargs):
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
                result = func(*args, **kwargs)

            return result

        return wrapped_func

    return decorator


def release_unclosed_locks(logger=None):
    if redis_conn:
        logger = logger or GMS_logger('LockReseter')

        locks2release = [i.split(b'lock:')[1].decode('latin') for i in redis_conn.keys()
                         if i.decode('latin').startswith('lock:GMS_%s__' % CFG.ID)]
        if locks2release:
            logger.info("Releasing unclosed locks of job %s." % CFG.ID)

        for lockN in locks2release:
            lock = Lock(redis_conn, lockN)
            try:
                lock.release()
            except NotAcquired:
                lock.reset()