Commit e8ffb936 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Further developed MemoryLock (not yet working).

parent b2f8fbba
......@@ -4,6 +4,7 @@ __author__ = 'Daniel Scheffler'
import time
from redis import StrictRedis
from redis_semaphore import Semaphore
from redis_lock import Lock
from redis.exceptions import ConnectionError as RedisConnectionError
import functools
from psutil import virtual_memory
......@@ -99,34 +100,77 @@ class ProcessLock(SharedResourceLock):
super(ProcessLock, self).__init__(name='ProcessLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
# class MemoryLock(SharedResourceLock):
# def __init__(self, allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=None, **kwargs):
# """
#
# :param allowed_slots:
# :param usage_th: Memory usage threshold above which the memory lock is active (percent).
# :param logger:
# :param kwargs:
# """
# self.usage_th = usage_threshold
# self.blocking_th = blocking_threshold
# self.logger = logger or GMS_logger("RedisLock: '%s'" % self.namespace)
# self.mem_usage = virtual_memory().percent
#
# self._acquired = None
#
# self.check_system_overload()
#
# if self.mem_usage >= usage_threshold:
# self.logger.info('Memory usage is %.1f percent. Acquiring memory lock..' % self.mem_usage)
# self.disabled = False
# else:
# self.logger.debug('Memory usage is %.1f percent. Memory lock not needed -> skipped.' % self.mem_usage)
# self.disabled = True
#
# super(MemoryLock, self).__init__(name='MemoryLock', allowed_slots=allowed_slots, logger=self.logger,
# disabled=self.disabled, **kwargs)
#
# @timeout_decorator.timeout(seconds=15 * 60, timeout_exception=TimeoutError,
# exception_message='Memory lock could not be acquired after waiting 15 minutes '
# 'due to memory overload.')
# def check_system_overload(self):
# logged = False
# while self.mem_usage >= self.blocking_th:
# if not logged:
# self.logger.info('Memory usage is %.1f percent. Waiting until memory usage is below %s percent.'
# % (self.mem_usage, self.blocking_th))
# logged = True
# time.sleep(5)
# self.mem_usage = virtual_memory().percent
#
# def acquire(self, blocking=True, timeout=None):
# if self.mem_usage > self.usage_th:
# # self._acquired = super(MemoryLock, self).acquire(blocking=blocking, timeout=timeout)
# self._acquired = super(MemoryLock, self).acquire(timeout=timeout)
# else:
# self._acquired = True
#
# return self._acquired
#
# def release(self):
# if self._acquired and not self.disabled:
# super(MemoryLock, self).release()
class MemoryLock(SharedResourceLock):
def __init__(self, allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=None, **kwargs):
def __init__(self, mem2lock_gb, usage_threshold=80, blocking_threshold=90, logger=None, **kwargs):
"""
:param allowed_slots:
:param usage_th: Memory usage threshold above which the memory lock is active (percent).
:param logger:
:param kwargs:
"""
super(MemoryLock, self).__init__(name='MemoryLock', allowed_slots=self.usable_memory_gb, logger=logger,
**kwargs)
self.mem2lock_gb = mem2lock_gb
self.usage_th = usage_threshold
self.blocking_th = blocking_threshold
self.logger = logger or GMS_logger("RedisLock: '%s'" % self.namespace)
self.mem_usage = virtual_memory().percent
self._acquired = None
self.check_system_overload()
if self.mem_usage >= usage_threshold:
self.logger.info('Memory usage is %.1f percent. Acquiring memory lock..' % self.mem_usage)
self.disabled = False
else:
self.logger.debug('Memory usage is %.1f percent. Memory lock not needed -> skipped.' % self.mem_usage)
self.disabled = True
super(MemoryLock, self).__init__(name='MemoryLock', allowed_slots=allowed_slots, logger=self.logger,
disabled=self.disabled, **kwargs)
@timeout_decorator.timeout(seconds=15 * 60, timeout_exception=TimeoutError,
exception_message='Memory lock could not be acquired after waiting 15 minutes '
'due to memory overload.')
......@@ -140,74 +184,66 @@ class MemoryLock(SharedResourceLock):
time.sleep(5)
self.mem_usage = virtual_memory().percent
def acquire(self, blocking=True, timeout=None):
if self.mem_usage > self.usage_th:
# self._acquired = super(MemoryLock, self).acquire(blocking=blocking, timeout=timeout)
self._acquired = super(MemoryLock, self).acquire(timeout=timeout)
else:
self._acquired = True
return self._acquired
def release(self):
if self._acquired and not self.disabled:
super(MemoryLock, self).release()
class MemoryLock(SharedResourceLock):
def __init__(self, name='MemoryLock', allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=None,
**kwargs):
"""
@property
def mem_reserved_gb(self):
val = redis_conn.get('GMS_mem_reserved')
if val is not None:
return val
return 0
:param allowed_slots:
:param usage_th: Memory usage threshold above which the memory lock is active (percent).
:param logger:
:param kwargs:
"""
self.usage_th = usage_threshold
self.blocking_th = blocking_threshold
self.logger = logger or GMS_logger("RedisLock: '%s'" % self.namespace)
self.mem_usage = virtual_memory().percent
@property
def usable_memory_gb(self):
return int((virtual_memory().total * self.usage_th / 100 - virtual_memory().used) / 1024**3) \
- int(self.mem_reserved_gb)
self._acquired = None
def ensure_enough_memory(self):
if redis_conn:
usable_memory_gb = int((virtual_memory().total * self.usage_th / 100 - virtual_memory().used) / 1024 ** 3) \
- int(self.mem_reserved_gb)
self.check_system_overload()
with Lock(self.client, 'GMS_mem_checker'):
while self.mem2lock_gb < usable_memory_gb:
time.sleep(1)
if self.mem_usage >= usage_threshold:
self.logger.info('Memory usage is %.1f percent. Acquiring memory lock..' % self.mem_usage)
self.disabled = False
else:
self.logger.debug('Memory usage is %.1f percent. Memory lock not needed -> skipped.' % self.mem_usage)
self.disabled = True
self.client.incr('GMS_mem_reserved', self.mem2lock_gb)
super(MemoryLock, self).__init__(name='MemoryLock', allowed_slots=allowed_slots, logger=self.logger,
disabled=self.disabled, **kwargs)
def acquire(self, blocking=True, timeout=None):
# TODO vielleicht auch als float redis value per increment decrement + lock mit 1 slot
if not self.disabled:
with Lock(self.client, 'GMS_mem_checker'):
self.check_system_overload()
self.ensure_enough_memory()
if self.usage_th < self.mem_usage < self.blocking_th:
self.acquire(blocking=blocking, timeout=timeout)
# # TODO add an acquire lock here
# with MultiSlotLock(name='GMS_mem_acquire_key', allowed_slots=1):
# if self.usable_memory_gb >= self.mem2lock_gb:
# for i in range(self.mem2lock_gb):
# super(MemoryLock, self).acquire(timeout=timeout)
# self.client.incr('GMS_mem_reserved', self.mem2lock_gb)
#
# else:
# time.sleep(1)
# self.acquire(blocking=blocking, timeout=timeout)
@timeout_decorator.timeout(seconds=15 * 60, timeout_exception=TimeoutError,
exception_message='Memory lock could not be acquired after waiting 15 minutes '
'due to memory overload.')
def check_system_overload(self):
logged = False
while self.mem_usage >= self.blocking_th:
if not logged:
self.logger.info('Memory usage is %.1f percent. Waiting until memory usage is below %s percent.'
% (self.mem_usage, self.blocking_th))
logged = True
time.sleep(5)
self.mem_usage = virtual_memory().percent
def release(self):
if not self.disabled:
for token in self._local_tokens:
self.signal(token)
self.client.decr('GMS_mem_reserved', self.mem2lock_gb)
def acquire(self, blocking=True, timeout=None):
if self.mem_usage > self.usage_th:
# self._acquired = super(MemoryLock, self).acquire(blocking=blocking, timeout=timeout)
self._acquired = super(MemoryLock, self).acquire(timeout=timeout)
else:
self._acquired = True
return self._acquired
# def ensure_enough_memory(needed_mem_gb, usage_threshold=80):
# if redis_conn:
# usable_memory_gb = int((virtual_memory().total * usage_threshold / 100 - virtual_memory().used) / 1024**3) \
# - int(needed_mem_gb)
#
# with Lock(redis_conn, 'GMS_mem_checker'):
# if usable_memory_gb >= needed_mem_gb:
# redis_conn.
def release(self):
if self._acquired and not self.disabled:
super(MemoryLock, self).release()
def acquire_process_lock(**processlock_kwargs):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment