Commit 1831949c authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Revised memory locks.

parent d6922d15
......@@ -4,11 +4,11 @@ __author__ = 'Daniel Scheffler'
import time
from redis_lock import StrictRedis, Lock, NotAcquired, AlreadyAcquired
from redis.exceptions import ConnectionError as RedisConnectionError
import logging
import functools
import re
import random
from psutil import virtual_memory
import timeout_decorator
from ..misc.logging import GMS_logger
from ..options.config import GMS_config as CFG
......@@ -22,7 +22,7 @@ except RedisConnectionError:
class MultiSlotLock(Lock):
def __init__(self, name, allowed_slots=1, logger=None, **kwargs):
def __init__(self, name, allowed_slots=1, logger=None, disabled=False, **kwargs):
self.conn = redis_conn
self.name = name
self.allowed_slots = allowed_slots or 0
......@@ -33,7 +33,7 @@ class MultiSlotLock(Lock):
self.final_name = ''
self._acquired = None
if allowed_slots and redis_conn:
if not disabled and allowed_slots and redis_conn:
logged = False
while True:
time.sleep(random.uniform(0, 1.5)) # avoids race conditions in case multiple tasks are waiting
......@@ -109,8 +109,8 @@ class ProcessLock(MultiSlotLock):
super(ProcessLock, self).__init__(name='ProcessLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
class MEMLock(MultiSlotLock):
def __init__(self, allowed_slots=1, usage_threshold=80, logger=None, **kwargs):
class MemoryLock(MultiSlotLock):
def __init__(self, allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=None, **kwargs):
"""
:param allowed_slots:
......@@ -119,28 +119,47 @@ class MEMLock(MultiSlotLock):
:param kwargs:
"""
self.usage_th = usage_threshold
self.blocking_th = blocking_threshold
self.logger = logger or GMS_logger("RedisLock: '%s'" % self.name)
self.mem_usage = virtual_memory()['percent']
self.mem_usage = virtual_memory().percent
self._acquired = None
self.check_system_overload()
if self.mem_usage >= usage_threshold:
self.logger.info('Memory usage is %.1f percent.' % self.mem_usage)
super(MEMLock, self).__init__(name='MEMLock', allowed_slots=allowed_slots, logger=self.logger, **kwargs)
self.logger.info('Memory usage is %.1f percent. Acquiring memory lock..' % self.mem_usage)
self.disabled = False
else:
self.logger.info('Memory usage is %.1f percent. Memory lock not needed -> skipped.' % self.mem_usage)
self.logger.debug('Memory usage is %.1f percent. Memory lock not needed -> skipped.' % self.mem_usage)
self.disabled = True
super(MemoryLock, self).__init__(name='MemoryLock', allowed_slots=allowed_slots, logger=self.logger,
disabled=self.disabled, **kwargs)
@timeout_decorator.timeout(seconds=15*60, timeout_exception=TimeoutError,
exception_message='Memory lock could not be acquired after waiting 15 minutes '
'due to memory overload.')
def check_system_overload(self):
logged = False
while self.mem_usage >= self.blocking_th:
if not logged:
self.logger.info('Memory usage is %.1f percent. Waiting until memory usage is below %s percent.'
% (self.mem_usage, self.usage_th))
time.sleep(5)
self.mem_usage = virtual_memory().percent
def acquire(self, blocking=True, timeout=None):
if self.mem_usage > self.usage_th:
self._acquired = super(MEMLock, self).acquire(blocking=blocking, timeout=timeout)
self._acquired = super(MemoryLock, self).acquire(blocking=blocking, timeout=timeout)
else:
self._acquired = True
return self._acquired
def release(self):
if self._acquired:
super(MEMLock, self).release()
if self._acquired and not self.disabled:
super(MemoryLock, self).release()
def acquire_process_lock(**processlock_kwargs):
......@@ -165,13 +184,13 @@ def acquire_process_lock(**processlock_kwargs):
def acquire_mem_lock(**memlock_kwargs):
"""Decorator function for MEMLock.
:param memlock_kwargs: Keywourd arguments to be passed to MEMLock class.
:param memlock_kwargs: Keywourd arguments to be passed to MemoryLock class.
"""
def decorator(func):
@functools.wraps(func) # needed to avoid pickling errors
def wrapped_func(*args, **kwargs):
with MEMLock(**memlock_kwargs):
with MemoryLock(**memlock_kwargs):
result = func(*args, **kwargs)
return result
......
......@@ -6,7 +6,7 @@ from ..options.config import GMS_config as CFG
from ..misc import exception_handler as EXC_H
from ..misc.path_generator import path_generator
from ..misc.logging import GMS_logger
from ..misc.locks import ProcessLock, MEMLock
from ..misc.locks import ProcessLock, MemoryLock
from ..algorithms import L1A_P
from ..algorithms import L1B_P
from ..algorithms import L1C_P
......@@ -251,8 +251,7 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
L1A_objects = []
if CFG.exec_L1AP[0] and input_proc_level is None:
with MEMLock(allowed_slots=1, usage_threshold=80, logger=pipeline_logger):
with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger):
L1A_objects = \
[L1A_map(subsystem_dataset_dict) for subsystem_dataset_dict in list_dataset_dicts_per_scene]
......@@ -275,7 +274,7 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
GMSfile = path_generator(ds, proc_level='L1A').get_path_gmsfile()
L1A_objects.append(L1A_P.L1A_object.from_disk([GMSfile, ['cube', None]]))
with MEMLock(allowed_slots=1, usage_threshold=80, logger=pipeline_logger):
with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger):
L1B_objects = [L1B_map(L1A_obj) for L1A_obj in L1A_objects]
del L1A_objects
......@@ -295,7 +294,7 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
GMSfile = path_generator(ds, proc_level='L1B').get_path_gmsfile()
L1B_objects.append(L1B_P.L1B_object.from_disk([GMSfile, ['cube', None]]))
with MEMLock(allowed_slots=1, usage_threshold=80, logger=pipeline_logger):
with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger):
L1C_objects = L1C_map(L1B_objects)
del L1B_objects
......@@ -316,7 +315,7 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
GMSfile = path_generator(ds, proc_level='L1C').get_path_gmsfile()
L1C_objects.append(L1C_P.L1C_object.from_disk([GMSfile, ['cube', None]]))
with MEMLock(allowed_slots=1, usage_threshold=80, logger=pipeline_logger):
with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger):
L2A_obj = L2A_map(L1C_objects, return_tiles=False)
del L1C_objects
......@@ -335,7 +334,7 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2A').get_path_gmsfile()
L2A_obj = L2A_P.L2A_object.from_disk([GMSfile, ['cube', None]])
with MEMLock(allowed_slots=1, usage_threshold=80, logger=pipeline_logger):
with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger):
L2B_obj = L2B_map(L2A_obj)
del L2A_obj
......@@ -354,7 +353,7 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2B').get_path_gmsfile()
L2B_obj = L2B_P.L2B_object.from_disk([GMSfile, ['cube', None]])
with MEMLock(allowed_slots=1, usage_threshold=80, logger=pipeline_logger):
with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger):
L2C_obj = L2C_map(L2B_obj) # type: Union[GMS_object, failed_GMS_object, List]
del L2B_obj
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment