locks.py 13 KB
Newer Older
1
# -*- coding: utf-8 -*-
2 3 4

# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
#
5
# Copyright (C) 2020  Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
6 7 8 9 10 11
#
# This software was developed within the context of the GeoMultiSens project funded
# by the German Federal Ministry of Education and Research
# (project grant code: 01 IS 14 010 A-C).
#
# This program is free software: you can redistribute it and/or modify it under
12
# the terms of the GNU General Public License as published by the Free Software
13 14
# Foundation, either version 3 of the License, or (at your option) any later version.
# Please note the following exception: `gms_preprocessing` depends on tqdm, which
15 16 17
# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
18 19 20 21 22 23 24 25 26
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.

27 28 29
__author__ = 'Daniel Scheffler'

import time
30
from redis import Redis
31
from redis_semaphore import Semaphore
Daniel Scheffler's avatar
Daniel Scheffler committed
32
from redis.exceptions import ConnectionError as RedisConnectionError
33
from retools.lock import Lock, LockTimeout
34
import functools
35
from psutil import virtual_memory
36 37 38 39

from ..misc.logging import GMS_logger
from ..options.config import GMS_config as CFG

40
try:
41
    redis_conn = Redis(host='localhost', db=0)
42
    redis_conn.keys()  # may raise ConnectionError, e.g., if redis server is not installed or not running
Daniel Scheffler's avatar
Daniel Scheffler committed
43
except RedisConnectionError:
44 45 46
    redis_conn = None


Daniel Scheffler's avatar
Daniel Scheffler committed
47 48 49 50
"""NOTE:

To get a list of all currently set redis keys, run:

51 52
    from redis import Redis
    conn = Redis('localhost', db=0)
Daniel Scheffler's avatar
Daniel Scheffler committed
53 54 55 56 57 58 59 60 61 62
    list(sorted(conn.keys()))

Then, to delete all currently set redis keys, run:

    for i in list(sorted(conn.keys())):
    k = i.decode('utf-8')
    conn.delete(k)
"""


63 64
class MultiSlotLock(Semaphore):
    def __init__(self, name='MultiSlotLock', allowed_slots=1, logger=None, **kwargs):
65
        self.disabled = redis_conn is None or allowed_slots in [None, False]
66
        self.namespace = name
67 68 69 70
        self.allowed_slots = allowed_slots
        self.logger = logger or GMS_logger("RedisLock: '%s'" % name)

        if not self.disabled:
71
            super(MultiSlotLock, self).__init__(client=redis_conn, count=allowed_slots, namespace=name, **kwargs)
72 73 74 75

    def acquire(self, timeout=0, target=None):
        if not self.disabled:
            if self.available_count == 0:
76
                self.logger.info("Waiting for free lock '%s'." % self.namespace)
77

78
            token = super(MultiSlotLock, self).acquire(timeout=timeout, target=target)
79

80
            self.logger.info("Acquired lock '%s'" % self.namespace +
81 82 83 84 85 86
                             ('.' if self.allowed_slots == 1 else ', slot #%s.' % (int(token) + 1)))

            return token

    def release(self):
        if not self.disabled:
87
            token = super(MultiSlotLock, self).release()
88
            if token:
89
                self.logger.info("Released lock '%s'" % self.namespace +
90 91 92 93
                                 ('.' if self.allowed_slots == 1 else ', slot #%s.' % (int(token) + 1)))

    def delete(self):
        if not self.disabled:
94 95 96
            self.client.delete(self.check_exists_key)
            self.client.delete(self.available_key)
            self.client.delete(self.grabbed_key)
97

98
    def __exit__(self, exc_type, exc_val, exc_tb):
99
        exitcode = super(MultiSlotLock, self).__exit__(exc_type, exc_val, exc_tb)
100
        self.logger.close()
101
        return exitcode
102

103

104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
class SharedResourceLock(MultiSlotLock):
    def acquire(self, timeout=0, target=None):
        if not self.disabled:
            token = super(SharedResourceLock, self).acquire(timeout=timeout, target=target)
            self.client.hset(self.grabbed_key_jobID, token, self.current_time)

    def release_all_jobID_tokens(self):
        if not self.disabled:
            for token in self.client.hkeys(self.grabbed_key_jobID):
                self.signal(token)

            self.client.delete(self.grabbed_key_jobID)

    @property
    def grabbed_key_jobID(self):
        return self._get_and_set_key('_grabbed_key_jobID', 'GRABBED_BY_GMSJOB_%s' % CFG.ID)

    def signal(self, token):
        if token is None:
            return None
        with self.client.pipeline() as pipe:
            pipe.multi()
            pipe.hdel(self.grabbed_key, token)
            pipe.hdel(self.grabbed_key_jobID, token)  # only difference to Semaphore.signal()
            pipe.lpush(self.available_key, token)
            pipe.execute()
            return token

    def delete(self):
        if not self.disabled:
            super(SharedResourceLock, self).delete()
            self.client.delete(self.grabbed_key_jobID)

137 138 139
    def __exit__(self, exc_type, exc_val, exc_tb):
        return super(SharedResourceLock, self).__exit__(exc_type, exc_val, exc_tb)

140 141

class IOLock(SharedResourceLock):
142
    def __init__(self, allowed_slots=1, logger=None, **kwargs):
143 144 145 146
        self.disabled = CFG.disable_IO_locks

        if not self.disabled:
            super(IOLock, self).__init__(name='IOLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
147

148 149 150
    def __exit__(self, exc_type, exc_val, exc_tb):
        return super(IOLock, self).__exit__(exc_type, exc_val, exc_tb)

151

152
class ProcessLock(SharedResourceLock):
153
    def __init__(self, allowed_slots=1, logger=None, **kwargs):
154 155 156 157
        self.disabled = CFG.disable_CPU_locks

        if not self.disabled:
            super(ProcessLock, self).__init__(name='ProcessLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
158

159 160 161
    def __exit__(self, exc_type, exc_val, exc_tb):
        return super(ProcessLock, self).__exit__(exc_type, exc_val, exc_tb)

162

163
class MemoryReserver(object):
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
    def __init__(self, mem2lock_gb, max_usage=90, logger=None):
        """

        :param mem2lock_gb:    Amount of memory to be reserved during the lock is acquired (gigabytes).
        """
        self.disabled = redis_conn is None or CFG.disable_memory_locks or mem2lock_gb in [None, False]
        self.mem2lock_gb = mem2lock_gb
        self.max_usage = max_usage
        self.namespace = 'MemoryReserver'
        self.client = redis_conn

        self._waiting = False

        self.mem_limit = int(virtual_memory().total * max_usage / 100 / 1024 ** 3)

        if not self.disabled:
            self.logger = logger or GMS_logger("RedisLock: 'MemoryReserver'")

    @property
    def mem_reserved_gb(self):
        return int(self.client.get(self.reserved_key) or 0)

    @property
    def usable_memory_gb(self):
        return int((virtual_memory().total * self.max_usage / 100 - virtual_memory().used) / 1024 ** 3) \
               - int(self.mem_reserved_gb)

    @property
    def acquisition_key(self):
        return "%s:ACQUISITION_LOCK" % self.namespace

    @property
    def reserved_key(self):
        return "%s:MEM_RESERVED" % self.namespace

    @property
    def reserved_key_jobID(self):
        return "%s:MEM_RESERVED_BY_GMSJOB_%s" % (self.namespace, CFG.ID)

203 204 205 206 207 208 209 210 211 212 213 214 215 216
    @property
    def waiting_key(self):
        return "%s:NUMBER_WAITING" % self.namespace

    @property
    def waiting_key_jobID(self):
        return "%s:NUMBER_WAITING_GMSJOB_%s" % (self.namespace, CFG.ID)

    @property
    def waiting(self):
        return self._waiting

    @waiting.setter
    def waiting(self, val):
Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
217 218 219 220
        """Set self.waiting.

        NOTE: This setter does not use a lock. Redis access must be locked by calling function.
        """
221
        if val is not self._waiting:
Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
222 223 224 225 226 227
            if val:
                self.client.incr(self.waiting_key, 1)
                self.client.incr(self.waiting_key_jobID, 1)
            else:
                self.client.decr(self.waiting_key, 1)
                self.client.decr(self.waiting_key_jobID, 1)
228 229 230

        self._waiting = val

231 232 233 234 235
    def acquire(self, timeout=20):
        if not self.disabled:
            try:
                with Lock(self.acquisition_key, expires=20, timeout=timeout, redis=self.client):
                    if self.usable_memory_gb >= self.mem2lock_gb:
236 237
                        t_start = time.time()
                        self.waiting = False
238

239 240 241 242 243 244 245 246
                        with self.client.pipeline() as pipe:
                            pipe.multi()
                            pipe.incr(self.reserved_key, self.mem2lock_gb)
                            pipe.incr(self.reserved_key_jobID, self.mem2lock_gb)
                            pipe.execute()

                        self.logger.info('Reserved %s GB of memory.' % self.mem2lock_gb)

247 248 249 250 251
                        # warn in case the lock has expired before incrementing reserved_key and reserved_key_jobID
                        if time.time() > t_start + timeout:
                            self.logger.warning('Reservation of memory took more time than expected. '
                                                'Possibly more memory than available has been reserved.')

252
                    else:
253
                        if not self.waiting:
254 255
                            self.logger.info('Currently usable memory: %s GB. Waiting until at least %s GB are '
                                             'usable.' % (self.usable_memory_gb, self.mem2lock_gb))
256
                        self.waiting = True
257

258 259
            except LockTimeout:
                self.acquire(timeout=timeout)
260

261
            if self.waiting:
262 263
                while self.usable_memory_gb < self.mem2lock_gb:
                    time.sleep(1)
264 265 266 267 268

                self.acquire(timeout=timeout)

    def release(self):
        if not self.disabled:
Daniel Scheffler's avatar
Daniel Scheffler committed
269 270 271 272 273 274
            with Lock(self.acquisition_key, expires=20, timeout=20, redis=self.client):
                with redis_conn.pipeline() as pipe:
                    pipe.multi()
                    pipe.decr(self.reserved_key, self.mem2lock_gb)
                    pipe.decr(self.reserved_key_jobID, self.mem2lock_gb)
                    pipe.execute()
275

Daniel Scheffler's avatar
Daniel Scheffler committed
276
                self.logger.info('Released %s GB of reserved memory.' % self.mem2lock_gb)
277 278 279

    def delete(self):
        if not self.disabled:
Daniel Scheffler's avatar
Daniel Scheffler committed
280
            with Lock(self.acquisition_key, expires=20, timeout=20, redis=self.client):
Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
281
                # handle reserved_key and reserved_key_jobID
Daniel Scheffler's avatar
Daniel Scheffler committed
282 283 284 285 286 287 288 289
                mem_reserved_currJob = int(self.client.get(self.reserved_key_jobID) or 0)
                with redis_conn.pipeline() as pipe:
                    pipe.multi()
                    pipe.decr(self.reserved_key_jobID, mem_reserved_currJob)
                    pipe.decr(self.reserved_key, mem_reserved_currJob)
                    pipe.delete(self.reserved_key_jobID)
                    pipe.execute()

Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
290 291 292 293
                if int(self.client.get(self.reserved_key) or 0) == 0:
                    self.client.delete(self.reserved_key)

                # handle waiting_key and waiting_key_jobID
Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
294 295 296 297
                n_waiting_currJob = int(self.client.get(self.waiting_key_jobID) or 0)
                with redis_conn.pipeline() as pipe:
                    pipe.multi()
                    pipe.decr(self.waiting_key_jobID, n_waiting_currJob)
Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
298
                    pipe.decr(self.waiting_key, n_waiting_currJob)
Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
299 300 301
                    pipe.delete(self.waiting_key_jobID)
                    pipe.execute()

Daniel Scheffler's avatar
Bugfix.  
Daniel Scheffler committed
302 303
                if int(self.client.get(self.waiting_key) or 0) == 0:
                    self.client.delete(self.waiting_key)
304 305 306 307 308 309 310 311 312 313

    def __enter__(self):
        self.acquire()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()
        return True if exc_type is None else False


314 315 316
def acquire_process_lock(**processlock_kwargs):
    """Decorator function for ProcessLock.

317
    :param processlock_kwargs:  Keyword arguments to be passed to ProcessLock class.
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
    """

    def decorator(func):
        @functools.wraps(func)  # needed to avoid pickling errors
        def wrapped_func(*args, **kwargs):
            with ProcessLock(**processlock_kwargs):
                result = func(*args, **kwargs)

            return result

        return wrapped_func

    return decorator


def reserve_mem(**memlock_kwargs):
    """Decorator function for MemoryReserver.

    :param memlock_kwargs:  Keyword arguments to be passed to MemoryReserver class.
    """

    def decorator(func):
        @functools.wraps(func)  # needed to avoid pickling errors
        def wrapped_func(*args, **kwargs):
            with MemoryReserver(**memlock_kwargs):
                result = func(*args, **kwargs)

            return result

        return wrapped_func

    return decorator


def release_unclosed_locks():
    if redis_conn:
        for L in [IOLock, ProcessLock]:
            lock = L(allowed_slots=1)
            lock.release_all_jobID_tokens()

            # delete the complete redis namespace if no lock slot is acquired anymore
            if lock.client.hlen(lock.grabbed_key) == 0:
                lock.delete()

362
        MemoryReserver(1).delete()