Commit 30ee85fa authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

First working version of MemoryReserver.

parent 25c10a27
......@@ -156,21 +156,32 @@ class ProcessLock(SharedResourceLock):
class MemoryLock(SharedResourceLock):
def __init__(self, mem2lock_gb, usage_threshold=80, blocking_threshold=90, logger=None, **kwargs):
def __init__(self, mem2lock_gb, allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=None, **kwargs):
"""
:param usage_th: Memory usage threshold above which the memory lock is active (percent).
:param logger:
:param kwargs:
"""
super(MemoryLock, self).__init__(name='MemoryLock', allowed_slots=self.usable_memory_gb, logger=logger,
**kwargs)
super(MemoryLock, self).__init__(name='MemoryLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
self.mem2lock_gb = mem2lock_gb
self.usage_th = usage_threshold
self.blocking_th = blocking_threshold
self.mem_usage = virtual_memory().percent
@property
def mem_reserved_gb(self):
val = redis_conn.get('GMS_mem_reserved')
if val is not None:
return val
return 0
@property
def usable_memory_gb(self):
return int((virtual_memory().total * self.usage_th / 100 - virtual_memory().used) / 1024**3) \
- int(self.mem_reserved_gb)
@timeout_decorator.timeout(seconds=15 * 60, timeout_exception=TimeoutError,
exception_message='Memory lock could not be acquired after waiting 15 minutes '
'due to memory overload.')
......@@ -184,25 +195,19 @@ class MemoryLock(SharedResourceLock):
time.sleep(5)
self.mem_usage = virtual_memory().percent
@property
def mem_reserved_gb(self):
val = redis_conn.get('GMS_mem_reserved')
if val is not None:
return val
return 0
@property
def usable_memory_gb(self):
return int((virtual_memory().total * self.usage_th / 100 - virtual_memory().used) / 1024**3) \
- int(self.mem_reserved_gb)
@timeout_decorator.timeout(seconds=15 * 60, timeout_exception=TimeoutError,
exception_message='Memory lock could not be acquired after waiting 15 minutes '
'because not enough memory could reserved.')
def ensure_enough_memory(self):
if redis_conn:
usable_memory_gb = int((virtual_memory().total * self.usage_th / 100 - virtual_memory().used) / 1024 ** 3) \
- int(self.mem_reserved_gb)
if not self.disabled:
logged = False
with Lock(self.client, 'GMS_mem_checker'):
while self.mem2lock_gb < usable_memory_gb:
while self.mem2lock_gb < self.usable_memory_gb:
if not logged:
self.logger.info('Currently usable memory: %s GB. Waiting until at least %s GB are usable.'
% (self.usable_memory_gb, self.mem2lock_gb))
logged = True
time.sleep(1)
self.client.incr('GMS_mem_reserved', self.mem2lock_gb)
......@@ -230,8 +235,9 @@ class MemoryLock(SharedResourceLock):
def release(self):
if not self.disabled:
for token in self._local_tokens:
self.signal(token)
# for token in self._local_tokens:
# self.signal(token)
super(MemoryLock, self).release()
self.client.decr('GMS_mem_reserved', self.mem2lock_gb)
......@@ -245,7 +251,6 @@ class MemoryLock(SharedResourceLock):
# redis_conn.
def acquire_process_lock(**processlock_kwargs):
"""Decorator function for ProcessLock.
......@@ -294,18 +299,106 @@ def release_unclosed_locks():
if lock.client.hlen(lock.grabbed_key) == 0:
lock.delete()
ML = MemoryReserver(1)
ML.release_all_jobID_tokens()
class MemoryReserver(object):
def __init__(self, reserved_mem=10):
# delete the complete redis namespace if no lock slot is acquired anymore
if ML.client.hlen(ML.grabbed_key) == 0:
ML.delete()
# redis_conn.delete('GMS_mem_checker')
class MemoryReserver(Semaphore):
def __init__(self, mem2lock_gb, max_usage=90, logger=None, **kwargs):
"""
:param reserved_mem: Amount of memory to be reserved during the lock is acquired (gigabytes).
"""
self.disabled = redis_conn is None
self.mem2lock_gb = mem2lock_gb
self.max_usage = max_usage
self._waiting = False
if not self.disabled:
mem_limit = int(virtual_memory().total * max_usage / 100 / 1024**3)
super(MemoryReserver, self).__init__(client=redis_conn, count=mem_limit, namespace='MemoryReserver',
**kwargs)
self.logger = logger or GMS_logger("RedisLock: 'MemoryReserver'")
@property
def mem_reserved_gb(self):
val = redis_conn.get('GMS_mem_reserved')
if val is not None:
return int(val)
return 0
@property
def usable_memory_gb(self):
return int((virtual_memory().total * self.max_usage / 100 - virtual_memory().used) / 1024**3) \
- int(self.mem_reserved_gb)
@property
def grabbed_key_jobID(self):
return self._get_and_set_key('_grabbed_key_jobID', 'GRABBED_BY_GMSJOB_%s' % CFG.ID)
@property
def reserved_key(self):
return self._get_and_set_key('_reserved_key', 'MEM_RESERVED')
@property
def reserved_key_jobID(self):
return self._get_and_set_key('_reserved_key_jobID', 'MEM_RESERVED_BY_GMSJOB_%s' % CFG.ID)
def acquire(self, timeout=0, target=None):
if not self.disabled:
with Lock(self.client, 'GMS_mem_acquire_lock'):
if self.usable_memory_gb >= self.mem2lock_gb:
for i in range(self.mem2lock_gb):
token = super(MemoryReserver, self).acquire(timeout=timeout)
self.client.hset(self.grabbed_key_jobID, token, self.current_time)
self.client.incr(self.reserved_key, self.mem2lock_gb)
self.client.incr(self.reserved_key_jobID, self.mem2lock_gb)
def __enter__(self):
# while
self.logger.info('Reserved %s GB of memory.' % self.mem2lock_gb)
self._waiting = False
return self
else:
if not self._waiting:
self.logger.info('Currently usable memory: %s GB. Waiting until at least %s GB are usable.'
% (self.usable_memory_gb, self.mem2lock_gb))
self._waiting = True
time.sleep(1)
self.acquire(timeout=timeout)
def release(self):
if not self.disabled:
for token in self._local_tokens:
self.signal(token)
self.client.decr(self.reserved_key, self.mem2lock_gb)
self.client.decr(self.reserved_key_jobID, self.mem2lock_gb)
self.logger.info('Released %s GB of reserved memory.' % self.mem2lock_gb)
def release_all_jobID_tokens(self):
mem_reserved = int(redis_conn.get(self.reserved_key_jobID) or 0)
if mem_reserved:
redis_conn.decr(self.reserved_key, mem_reserved)
redis_conn.delete(self.reserved_key_jobID)
for token in self.client.hkeys(self.grabbed_key_jobID):
self.signal(token)
self.client.delete(self.grabbed_key_jobID)
def delete(self):
if not self.disabled:
self.client.delete(self.check_exists_key)
self.client.delete(self.available_key)
self.client.delete(self.grabbed_key)
def __exit__(self, exc_type=None, exc_value=None, traceback=None):
""""""
if self.mem_reserved_gb <= 0:
self.client.delete(self.reserved_key)
......@@ -6,7 +6,7 @@ from ..options.config import GMS_config as CFG
from ..misc import exception_handler as EXC_H
from ..misc.path_generator import path_generator
from ..misc.logging import GMS_logger
from ..misc.locks import ProcessLock, MemoryLock
from ..misc.locks import ProcessLock, MemoryReserver
from ..algorithms import L1A_P
from ..algorithms import L1B_P
from ..algorithms import L1C_P
......@@ -237,7 +237,8 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
pipeline_logger = GMS_logger('log__%s' % CFG.ID, fmt_suffix=list_dataset_dicts_per_scene[0]['scene_ID'],
log_level=CFG.log_level, append=True)
with ProcessLock(allowed_slots=CFG.CPUs_all_jobs, logger=pipeline_logger):
with MemoryReserver(mem2lock_gb=15, logger=pipeline_logger),\
ProcessLock(allowed_slots=CFG.CPUs_all_jobs, logger=pipeline_logger):
if len(list(set([ds['proc_level'] for ds in list_dataset_dicts_per_scene]))) != 1:
raise ValueError('Lists of subsystem datasets with different processing levels are not supported here. '
......
......@@ -11,7 +11,7 @@ Tests for gms_preprocessing.misc.locks
import unittest
from gms_preprocessing import set_config
from gms_preprocessing.misc.locks import MultiSlotLock, ProcessLock # , AlreadyAcquired
from gms_preprocessing.misc.locks import MultiSlotLock, ProcessLock, MemoryLock # , AlreadyAcquired
from . import db_host
......@@ -56,3 +56,24 @@ class Test_ProcessLock(unittest.TestCase):
def test_with_statement(self):
with ProcessLock(allowed_slots=15) as lock:
self.assertNotEquals(lock, None)
class Test_MemoryReserver(unittest.TestCase):
def setUp(self):
self.config = set_config(job_ID=26186196, db_host=db_host, reset_status=True)
def test_acquire_release(self):
ml = Test_MemoryReserver(mem2lock_gb=20)
ml.acquire()
# with self.assertRaises(AlreadyAcquired):
# msl.acquire()
ml.release()
# if not msl.disabled:
# self.failIf(True in [i.startswith('ProcessLock') for i in msl.existing_locks])
def test_with_statement(self):
with Test_MemoryReserver(mem2lock_gb=20) as lock:
self.assertNotEquals(lock, None)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment