Commit f9a9c681 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Fixed wrong log message regarding accuracy layers. Revised locks (not yet working):

Added MultiSlotLock inheriting from redis_semaphore.Semaphore.
parent 357719f5
...@@ -36,6 +36,7 @@ class AccuracyCube(GeoArray): ...@@ -36,6 +36,7 @@ class AccuracyCube(GeoArray):
self._layers = None self._layers = None
if self.layers: if self.layers:
self.logger.info('Generating combined accuracy layers array..')
super(AccuracyCube, self).__init__(self.generate_array(), super(AccuracyCube, self).__init__(self.generate_array(),
geotransform=list(self.layers.values())[0].gt, geotransform=list(self.layers.values())[0].gt,
projection=list(self.layers.values())[0].prj, projection=list(self.layers.values())[0].prj,
......
...@@ -3,6 +3,7 @@ __author__ = 'Daniel Scheffler' ...@@ -3,6 +3,7 @@ __author__ = 'Daniel Scheffler'
import time import time
from redis_lock import StrictRedis, Lock, NotAcquired, AlreadyAcquired from redis_lock import StrictRedis, Lock, NotAcquired, AlreadyAcquired
from redis_semaphore import Semaphore
from redis.exceptions import ConnectionError as RedisConnectionError from redis.exceptions import ConnectionError as RedisConnectionError
import functools import functools
import re import re
...@@ -99,6 +100,43 @@ class MultiSlotLock(Lock): ...@@ -99,6 +100,43 @@ class MultiSlotLock(Lock):
self.logger.info("Released lock '%s'." % self.final_name.split('GMS_%s__' % CFG.ID)[1]) self.logger.info("Released lock '%s'." % self.final_name.split('GMS_%s__' % CFG.ID)[1])
class MultiSlotLock(Semaphore):
def __init__(self, name='MultiSlotLock', allowed_slots=1, logger=None, **kwargs):
self.disabled = redis_conn is None
self.name = name
self.redis_varname = 'GMS_%s__' % CFG.ID + name
self.allowed_slots = allowed_slots
self.logger = logger or GMS_logger("RedisLock: '%s'" % name)
if not self.disabled:
super().__init__(client=redis_conn, count=allowed_slots, namespace=self.redis_varname, **kwargs)
def acquire(self, timeout=0, target=None):
if not self.disabled:
if self.available_count == 0:
self.logger.info("Waiting for free lock '%s'." % self.name)
token = super().acquire(timeout=timeout, target=target)
self.logger.info("Acquired lock '%s'" % self.name +
('.' if self.allowed_slots == 1 else ', slot #%s.' % (int(token) + 1)))
return token
def release(self):
if not self.disabled:
token = super().release()
if token:
self.logger.info("Released lock '%s'" % self.name +
('.' if self.allowed_slots == 1 else ', slot #%s.' % (int(token) + 1)))
def delete(self):
if not self.disabled:
redis_conn.delete('%s:EXISTS' % self.redis_varname)
redis_conn.delete('%s:AVAILABLE' % self.redis_varname)
redis_conn.delete('%s:GRABBED' % self.redis_varname)
class IOLock(MultiSlotLock): class IOLock(MultiSlotLock):
def __init__(self, allowed_slots=1, logger=None, **kwargs): def __init__(self, allowed_slots=1, logger=None, **kwargs):
super(IOLock, self).__init__(name='IOLock', allowed_slots=allowed_slots, logger=logger, **kwargs) super(IOLock, self).__init__(name='IOLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
...@@ -152,7 +190,8 @@ class MemoryLock(MultiSlotLock): ...@@ -152,7 +190,8 @@ class MemoryLock(MultiSlotLock):
def acquire(self, blocking=True, timeout=None): def acquire(self, blocking=True, timeout=None):
if self.mem_usage > self.usage_th: if self.mem_usage > self.usage_th:
self._acquired = super(MemoryLock, self).acquire(blocking=blocking, timeout=timeout) # self._acquired = super(MemoryLock, self).acquire(blocking=blocking, timeout=timeout)
self._acquired = super(MemoryLock, self).acquire(timeout=timeout)
else: else:
self._acquired = True self._acquired = True
...@@ -163,6 +202,63 @@ class MemoryLock(MultiSlotLock): ...@@ -163,6 +202,63 @@ class MemoryLock(MultiSlotLock):
super(MemoryLock, self).release() super(MemoryLock, self).release()
class MemoryLock(MultiSlotLock):
def __init__(self, name='MemoryLock', allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=None,
**kwargs):
"""
:param allowed_slots:
:param usage_th: Memory usage threshold above which the memory lock is active (percent).
:param logger:
:param kwargs:
"""
self.usage_th = usage_threshold
self.blocking_th = blocking_threshold
self.logger = logger or GMS_logger("RedisLock: '%s'" % self.name)
self.mem_usage = virtual_memory().percent
self._acquired = None
self.check_system_overload()
if self.mem_usage >= usage_threshold:
self.logger.info('Memory usage is %.1f percent. Acquiring memory lock..' % self.mem_usage)
self.disabled = False
else:
self.logger.debug('Memory usage is %.1f percent. Memory lock not needed -> skipped.' % self.mem_usage)
self.disabled = True
super(MemoryLock, self).__init__(name='MemoryLock', allowed_slots=allowed_slots, logger=self.logger,
disabled=self.disabled, **kwargs)
@timeout_decorator.timeout(seconds=15*60, timeout_exception=TimeoutError,
exception_message='Memory lock could not be acquired after waiting 15 minutes '
'due to memory overload.')
def check_system_overload(self):
logged = False
while self.mem_usage >= self.blocking_th:
if not logged:
self.logger.info('Memory usage is %.1f percent. Waiting until memory usage is below %s percent.'
% (self.mem_usage, self.blocking_th))
logged = True
time.sleep(5)
self.mem_usage = virtual_memory().percent
def acquire(self, blocking=True, timeout=None):
if self.mem_usage > self.usage_th:
# self._acquired = super(MemoryLock, self).acquire(blocking=blocking, timeout=timeout)
self._acquired = super(MemoryLock, self).acquire(timeout=timeout)
else:
self._acquired = True
return self._acquired
def release(self):
if self._acquired and not self.disabled:
super(MemoryLock, self).release()
def acquire_process_lock(**processlock_kwargs): def acquire_process_lock(**processlock_kwargs):
"""Decorator function for ProcessLock. """Decorator function for ProcessLock.
...@@ -205,6 +301,9 @@ def release_unclosed_locks(logger=None): ...@@ -205,6 +301,9 @@ def release_unclosed_locks(logger=None):
if redis_conn: if redis_conn:
logger = logger or GMS_logger('LockReseter') logger = logger or GMS_logger('LockReseter')
IOLock(allowed_slots=1).delete()
ProcessLock(allowed_slots=1).delete()
locks2release = [i.split(b'lock:')[1].decode('latin') for i in redis_conn.keys() locks2release = [i.split(b'lock:')[1].decode('latin') for i in redis_conn.keys()
if i.decode('latin').startswith('lock:GMS_%s__' % CFG.ID)] if i.decode('latin').startswith('lock:GMS_%s__' % CFG.ID)]
if locks2release: if locks2release:
...@@ -216,3 +315,19 @@ def release_unclosed_locks(logger=None): ...@@ -216,3 +315,19 @@ def release_unclosed_locks(logger=None):
lock.release() lock.release()
except NotAcquired: except NotAcquired:
lock.reset() lock.reset()
class MemoryReserver(object):
def __init__(self, reserved_mem=10):
"""
:param reserved_mem: Amount of memory to be reserved during the lock is acquired (gigabytes).
"""
def __enter__(self):
# while
return self
def __exit__(self, exc_type=None, exc_value=None, traceback=None):
""""""
...@@ -705,7 +705,6 @@ class GMS_object(object): ...@@ -705,7 +705,6 @@ class GMS_object(object):
% self.proc_level) % self.proc_level)
return None return None
self.logger.info('Generating combined accuracy layers array..')
try: try:
from ..algorithms.L2C_P import AccuracyCube from ..algorithms.L2C_P import AccuracyCube
self._accuracy_layers = AccuracyCube(self) self._accuracy_layers = AccuracyCube(self)
......
...@@ -251,9 +251,9 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise ...@@ -251,9 +251,9 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
L1A_objects = [] L1A_objects = []
if CFG.exec_L1AP[0] and input_proc_level is None: if CFG.exec_L1AP[0] and input_proc_level is None:
with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger): # with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger):
L1A_objects = \ L1A_objects = \
[L1A_map(subsystem_dataset_dict) for subsystem_dataset_dict in list_dataset_dicts_per_scene] [L1A_map(subsystem_dataset_dict) for subsystem_dataset_dict in list_dataset_dicts_per_scene]
if any([isinstance(obj, failed_GMS_object) for obj in L1A_objects]): if any([isinstance(obj, failed_GMS_object) for obj in L1A_objects]):
return L1A_objects return L1A_objects
...@@ -274,8 +274,8 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise ...@@ -274,8 +274,8 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
GMSfile = path_generator(ds, proc_level='L1A').get_path_gmsfile() GMSfile = path_generator(ds, proc_level='L1A').get_path_gmsfile()
L1A_objects.append(L1A_P.L1A_object.from_disk([GMSfile, ['cube', None]])) L1A_objects.append(L1A_P.L1A_object.from_disk([GMSfile, ['cube', None]]))
with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger): # with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger):
L1B_objects = [L1B_map(L1A_obj) for L1A_obj in L1A_objects] L1B_objects = [L1B_map(L1A_obj) for L1A_obj in L1A_objects]
del L1A_objects del L1A_objects
...@@ -294,8 +294,8 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise ...@@ -294,8 +294,8 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
GMSfile = path_generator(ds, proc_level='L1B').get_path_gmsfile() GMSfile = path_generator(ds, proc_level='L1B').get_path_gmsfile()
L1B_objects.append(L1B_P.L1B_object.from_disk([GMSfile, ['cube', None]])) L1B_objects.append(L1B_P.L1B_object.from_disk([GMSfile, ['cube', None]]))
with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger): # with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger):
L1C_objects = L1C_map(L1B_objects) L1C_objects = L1C_map(L1B_objects)
del L1B_objects del L1B_objects
...@@ -315,8 +315,8 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise ...@@ -315,8 +315,8 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
GMSfile = path_generator(ds, proc_level='L1C').get_path_gmsfile() GMSfile = path_generator(ds, proc_level='L1C').get_path_gmsfile()
L1C_objects.append(L1C_P.L1C_object.from_disk([GMSfile, ['cube', None]])) L1C_objects.append(L1C_P.L1C_object.from_disk([GMSfile, ['cube', None]]))
with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger): # with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger):
L2A_obj = L2A_map(L1C_objects, return_tiles=False) L2A_obj = L2A_map(L1C_objects, return_tiles=False)
del L1C_objects del L1C_objects
...@@ -334,8 +334,8 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise ...@@ -334,8 +334,8 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2A').get_path_gmsfile() GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2A').get_path_gmsfile()
L2A_obj = L2A_P.L2A_object.from_disk([GMSfile, ['cube', None]]) L2A_obj = L2A_P.L2A_object.from_disk([GMSfile, ['cube', None]])
with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger): # with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger):
L2B_obj = L2B_map(L2A_obj) L2B_obj = L2B_map(L2A_obj)
del L2A_obj del L2A_obj
...@@ -353,8 +353,8 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise ...@@ -353,8 +353,8 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2B').get_path_gmsfile() GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2B').get_path_gmsfile()
L2B_obj = L2B_P.L2B_object.from_disk([GMSfile, ['cube', None]]) L2B_obj = L2B_P.L2B_object.from_disk([GMSfile, ['cube', None]])
with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger): # with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger):
L2C_obj = L2C_map(L2B_obj) # type: Union[GMS_object, failed_GMS_object, List] L2C_obj = L2C_map(L2B_obj) # type: Union[GMS_object, failed_GMS_object, List]
del L2B_obj del L2B_obj
......
...@@ -29,8 +29,8 @@ class Test_MultiSlotLock(unittest.TestCase): ...@@ -29,8 +29,8 @@ class Test_MultiSlotLock(unittest.TestCase):
msl.release() msl.release()
if msl.conn: # if not msl.disabled:
self.failIf(True in [i.startswith('unittest') for i in msl.existing_locks]) # self.failIf(True in [i.startswith('unittest') for i in msl.existing_locks])
def test_with_statement(self): def test_with_statement(self):
with MultiSlotLock('unittest', allowed_slots=15) as lock: with MultiSlotLock('unittest', allowed_slots=15) as lock:
...@@ -50,7 +50,7 @@ class Test_ProcessLock(unittest.TestCase): ...@@ -50,7 +50,7 @@ class Test_ProcessLock(unittest.TestCase):
msl.release() msl.release()
if msl.conn: if not msl.disabled:
self.failIf(True in [i.startswith('ProcessLock') for i in msl.existing_locks]) self.failIf(True in [i.startswith('ProcessLock') for i in msl.existing_locks])
def test_with_statement(self): def test_with_statement(self):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment