Commit d6922d15 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

First (not yet working) implementation of memory locks.

parent e6a2a956
......@@ -8,6 +8,7 @@ import logging
import functools
import re
import random
from psutil import virtual_memory
from ..misc.logging import GMS_logger
from ..options.config import GMS_config as CFG
......@@ -24,11 +25,11 @@ class MultiSlotLock(Lock):
def __init__(self, name, allowed_slots=1, logger=None, **kwargs):
self.conn = redis_conn
self.name = name
self.allowed_threads = allowed_slots or 0
self.allowed_slots = allowed_slots or 0
self.logger = logger or GMS_logger("RedisLock: '%s'" % self.name)
self.kwargs = kwargs
self.allowed_slot_names = ['%s, slot #%s' % (self.name, i) for i in range(1, self.allowed_threads + 1)]
self.allowed_slot_names = ['%s, slot #%s' % (self.name, i) for i in range(1, self.allowed_slots + 1)]
self.final_name = ''
self._acquired = None
......@@ -63,7 +64,7 @@ class MultiSlotLock(Lock):
return free_slots[0]
def acquire(self, blocking=True, timeout=None):
if self.allowed_threads and self.conn:
if self.allowed_slots and self.conn:
if self._acquired:
raise AlreadyAcquired("Already acquired from this Lock instance.")
......@@ -76,11 +77,11 @@ class MultiSlotLock(Lock):
# this happens in case the lock has already been acquired by another instance of MultiSlotLock due
# to a race condition (time gap between finding the free slot and the call of self.acquire())
# -> in that case: re-initialize to get a new free slot
self.__init__(self.name, allowed_slots=self.allowed_threads, logger=self.logger,
self.__init__(self.name, allowed_slots=self.allowed_slots, logger=self.logger,
**self.kwargs)
if self._acquired is False: # and not None
self.__init__(self.name, allowed_slots=self.allowed_threads, logger=self.logger,
self.__init__(self.name, allowed_slots=self.allowed_slots, logger=self.logger,
**self.kwargs)
# print(self.final_name.split('GMS_%s__' % CFG.ID)[1], self._acquired)
......@@ -93,7 +94,7 @@ class MultiSlotLock(Lock):
return self._acquired
def release(self):
if self.allowed_threads and self.conn:
if self.allowed_slots and self.conn:
super(MultiSlotLock, self).release()
self.logger.info("Released lock '%s'." % self.final_name.split('GMS_%s__' % CFG.ID)[1])
......@@ -108,16 +109,69 @@ class ProcessLock(MultiSlotLock):
super(ProcessLock, self).__init__(name='ProcessLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
def acquire_process_lock(allowed_slots=None, logger=None):
if not logger:
logger = logging.getLogger('ProcessLock')
logger.setLevel('INFO')
class MEMLock(MultiSlotLock):
def __init__(self, allowed_slots=1, usage_threshold=80, logger=None, **kwargs):
"""
:param allowed_slots:
:param usage_th: Memory usage threshold above which the memory lock is active (percent).
:param logger:
:param kwargs:
"""
self.usage_th = usage_threshold
self.logger = logger or GMS_logger("RedisLock: '%s'" % self.name)
self.mem_usage = virtual_memory()['percent']
self._acquired = None
if self.mem_usage >= usage_threshold:
self.logger.info('Memory usage is %.1f percent.' % self.mem_usage)
super(MEMLock, self).__init__(name='MEMLock', allowed_slots=allowed_slots, logger=self.logger, **kwargs)
else:
self.logger.info('Memory usage is %.1f percent. Memory lock not needed -> skipped.' % self.mem_usage)
def acquire(self, blocking=True, timeout=None):
if self.mem_usage > self.usage_th:
self._acquired = super(MEMLock, self).acquire(blocking=blocking, timeout=timeout)
else:
self._acquired = True
return self._acquired
def release(self):
if self._acquired:
super(MEMLock, self).release()
def acquire_process_lock(**processlock_kwargs):
"""Decorator function for ProcessLock.
:param processlock_kwargs: Keywourd arguments to be passed to ProcessLock class.
"""
def decorator(func):
@functools.wraps(func) # needed to avoid pickling errors
def wrapped_func(*args, **kwargs):
with ProcessLock(**processlock_kwargs):
result = func(*args, **kwargs)
return result
return wrapped_func
return decorator
def acquire_mem_lock(**memlock_kwargs):
"""Decorator function for MEMLock.
:param memlock_kwargs: Keywourd arguments to be passed to MEMLock class.
"""
def decorator(func):
@functools.wraps(func) # needed to avoid pickling errors
def wrapped_func(*args, **kwargs):
with ProcessLock(allowed_threads=allowed_slots, logger=logger):
with MEMLock(**memlock_kwargs):
result = func(*args, **kwargs)
return result
......
......@@ -6,7 +6,7 @@ from ..options.config import GMS_config as CFG
from ..misc import exception_handler as EXC_H
from ..misc.path_generator import path_generator
from ..misc.logging import GMS_logger
from ..misc.locks import ProcessLock
from ..misc.locks import ProcessLock, MEMLock
from ..algorithms import L1A_P
from ..algorithms import L1B_P
from ..algorithms import L1C_P
......@@ -234,9 +234,10 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
:param list_dataset_dicts_per_scene:
:return:
"""
with ProcessLock(allowed_slots=CFG.CPUs_all_jobs,
logger=GMS_logger('log__%s' % CFG.ID, fmt_suffix=list_dataset_dicts_per_scene[0]['scene_ID'],
log_level=CFG.log_level, append=True)):
pipeline_logger = GMS_logger('log__%s' % CFG.ID, fmt_suffix=list_dataset_dicts_per_scene[0]['scene_ID'],
log_level=CFG.log_level, append=True)
with ProcessLock(allowed_slots=CFG.CPUs_all_jobs, logger=pipeline_logger):
if len(list(set([ds['proc_level'] for ds in list_dataset_dicts_per_scene]))) != 1:
raise ValueError('Lists of subsystem datasets with different processing levels are not supported here. '
......@@ -250,7 +251,10 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
L1A_objects = []
if CFG.exec_L1AP[0] and input_proc_level is None:
L1A_objects = [L1A_map(subsystem_dataset_dict) for subsystem_dataset_dict in list_dataset_dicts_per_scene]
with MEMLock(allowed_slots=1, usage_threshold=80, logger=pipeline_logger):
L1A_objects = \
[L1A_map(subsystem_dataset_dict) for subsystem_dataset_dict in list_dataset_dicts_per_scene]
if any([isinstance(obj, failed_GMS_object) for obj in L1A_objects]):
return L1A_objects
......@@ -271,7 +275,9 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
GMSfile = path_generator(ds, proc_level='L1A').get_path_gmsfile()
L1A_objects.append(L1A_P.L1A_object.from_disk([GMSfile, ['cube', None]]))
L1B_objects = [L1B_map(L1A_obj) for L1A_obj in L1A_objects]
with MEMLock(allowed_slots=1, usage_threshold=80, logger=pipeline_logger):
L1B_objects = [L1B_map(L1A_obj) for L1A_obj in L1A_objects]
del L1A_objects
if any([isinstance(obj, failed_GMS_object) for obj in L1B_objects]):
......@@ -289,7 +295,9 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
GMSfile = path_generator(ds, proc_level='L1B').get_path_gmsfile()
L1B_objects.append(L1B_P.L1B_object.from_disk([GMSfile, ['cube', None]]))
L1C_objects = L1C_map(L1B_objects)
with MEMLock(allowed_slots=1, usage_threshold=80, logger=pipeline_logger):
L1C_objects = L1C_map(L1B_objects)
del L1B_objects
if any([isinstance(obj, failed_GMS_object) for obj in L1C_objects]):
......@@ -308,7 +316,9 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
GMSfile = path_generator(ds, proc_level='L1C').get_path_gmsfile()
L1C_objects.append(L1C_P.L1C_object.from_disk([GMSfile, ['cube', None]]))
L2A_obj = L2A_map(L1C_objects, return_tiles=False)
with MEMLock(allowed_slots=1, usage_threshold=80, logger=pipeline_logger):
L2A_obj = L2A_map(L1C_objects, return_tiles=False)
del L1C_objects
if isinstance(L2A_obj, failed_GMS_object) or not CFG.exec_L2BP[0]:
......@@ -325,7 +335,9 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2A').get_path_gmsfile()
L2A_obj = L2A_P.L2A_object.from_disk([GMSfile, ['cube', None]])
L2B_obj = L2B_map(L2A_obj)
with MEMLock(allowed_slots=1, usage_threshold=80, logger=pipeline_logger):
L2B_obj = L2B_map(L2A_obj)
del L2A_obj
if isinstance(L2B_obj, failed_GMS_object) or not CFG.exec_L2CP[0]:
......@@ -342,7 +354,9 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2B').get_path_gmsfile()
L2B_obj = L2B_P.L2B_object.from_disk([GMSfile, ['cube', None]])
L2C_obj = L2C_map(L2B_obj) # type: Union[GMS_object, failed_GMS_object, List]
with MEMLock(allowed_slots=1, usage_threshold=80, logger=pipeline_logger):
L2C_obj = L2C_map(L2B_obj) # type: Union[GMS_object, failed_GMS_object, List]
del L2B_obj
return L2C_obj
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment