Commit cb934d71 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Merge branch 'feature/add_locks' into dev

parents c6f4146e 9fa9e926
......@@ -32,7 +32,7 @@ test_gms_preprocessing:
# - python setup.py install
# - cd ../../
# make tests
- pip install nested_dict openpyxl timeout_decorator python-redis-lock redis psutil # FIXME: remove as soon as runner is rebuilt
- pip install nested_dict openpyxl timeout_decorator redis redis-semaphore python-redis-lock psutil # FIXME: remove as soon as runner is rebuilt
- make nosetests
- make docs
artifacts:
......
......@@ -12,9 +12,6 @@ from ..options.config import GMS_config as CFG
__author__ = 'Daniel Scheffler'
shared = {}
res = {}
class L2C_object(L2B_object):
def __init__(self, L2B_obj=None):
......@@ -36,6 +33,7 @@ class AccuracyCube(GeoArray):
self._layers = None
if self.layers:
GMS_obj.logger.info('Generating combined accuracy layers array..')
super(AccuracyCube, self).__init__(self.generate_array(),
geotransform=list(self.layers.values())[0].gt,
projection=list(self.layers.values())[0].prj,
......
......@@ -8,7 +8,8 @@ import sys
import traceback
import warnings
from datetime import datetime
from typing import Union # noqa F401 # flake8 issue
from typing import Union, TYPE_CHECKING # noqa F401 # flake8 issue
from pkg_resources import parse_version
import numpy as np
import pandas as pd
......@@ -25,6 +26,9 @@ from ..options.config import GMS_config as CFG
from . import path_generator as PG
from .definition_dicts import proc_chain
if TYPE_CHECKING:
from ..model.gms_object import GMS_object # noqa F401 # flake8 issue
# + misc.helper_functions.cornerLonLat_to_postgreSQL_poly: (left out here in order to avoid circular dependencies)
__author__ = 'Daniel Scheffler'
......@@ -1413,3 +1417,65 @@ def archive_exists_on_fileserver(conn_DB, entityID):
exists = False
return exists
def record_stats_memusage(conn_db, GMS_obj):
# type: (str, GMS_object) -> bool
vals2write_dict = dict(
creationtime=datetime.now(),
software_version=CFG.version,
datasetid=GMS_obj.dataset_ID,
virtual_sensor_id=CFG.virtual_sensor_id,
target_gsd=CFG.target_gsd[0], # respects only xgsd
target_nbands=len(CFG.target_CWL),
inmem_serialization=CFG.inmem_serialization,
target_radunit_optical=CFG.target_radunit_optical,
skip_coreg=CFG.skip_coreg,
ac_estimate_accuracy=CFG.ac_estimate_accuracy,
ac_bandwise_accuracy=CFG.ac_bandwise_accuracy,
spathomo_estimate_accuracy=CFG.spathomo_estimate_accuracy,
spechomo_estimate_accuracy=CFG.spechomo_estimate_accuracy,
spechomo_bandwise_accuracy=CFG.spechomo_bandwise_accuracy,
parallelization_level=CFG.parallelization_level,
skip_thermal=CFG.skip_thermal,
skip_pan=CFG.skip_pan,
mgrs_pixel_buffer=CFG.mgrs_pixel_buffer,
cloud_masking_algorithm=CFG.cloud_masking_algorithm[GMS_obj.satellite],
used_mem_l1a=GMS_obj.mem_usage['L1A'],
used_mem_l1b=GMS_obj.mem_usage['L1B'],
used_mem_l1c=GMS_obj.mem_usage['L1C'],
used_mem_l2a=GMS_obj.mem_usage['L2A'],
used_mem_l2b=GMS_obj.mem_usage['L2B'],
used_mem_l2c=GMS_obj.mem_usage['L2C'],
dims_x_l2a=GMS_obj.arr.cols,
dims_y_l2a=GMS_obj.arr.rows,
is_test=CFG.is_test,
sceneid=GMS_obj.scene_ID
)
# get all existing database records matching the respective config
# NOTE: those columns that do not belong the config specification are ignored
vals2get = list(vals2write_dict.keys())
df_existing_recs = pd.DataFrame(
get_info_from_postgreSQLdb(conn_db, 'stats_mem_usage_homo',
vals2return=vals2get,
cond_dict={k: v for k, v in vals2write_dict.items()
if k not in ['creationtime', 'used_mem_l1a', 'used_mem_l1b',
'used_mem_l1c', 'used_mem_l2a', 'used_mem_l2b',
'used_mem_l2c', 'dims_x_l2a', 'dims_y_l2b', 'sceneid']}),
columns=vals2get)
# filter the existing records by gms_preprocessing software version number
# (higher than CFG.min_version_mem_usage_stats)
vers = list(df_existing_recs.software_version)
vers_usable = [ver for ver in vers if parse_version(ver) >= parse_version(CFG.min_version_mem_usage_stats)]
df_existing_recs_usable = df_existing_recs.loc[df_existing_recs.software_version.isin(vers_usable)]
# add memory stats to database
# (but skip if there are already 10 records matching the respective config and software version number
# or if the current scene ID is already among the matching records)
if len(df_existing_recs_usable) < 10 and GMS_obj.scene_ID not in list(df_existing_recs_usable.sceneid):
create_record_in_postgreSQLdb(conn_db, 'stats_mem_usage_homo', vals2write_dict=vals2write_dict)
return True
else:
return False
......@@ -105,7 +105,7 @@ class GMSEnvironment(object):
@staticmethod
def check_read_write_permissions():
if not os.path.isdir(CFG.path_tempdir):
os.makedirs(CFG.path_tempdir)
os.makedirs(CFG.path_tempdir, exist_ok=True)
if not os.access(CFG.path_tempdir, os.R_OK):
raise PermissionError('No read-permissions at %s.' % CFG.path_tempdir)
......
......@@ -2,17 +2,16 @@
__author__ = 'Daniel Scheffler'
import time
from redis_lock import StrictRedis, Lock, NotAcquired, AlreadyAcquired
from redis import StrictRedis
from redis_semaphore import Semaphore
from redis_lock import Lock
from redis.exceptions import ConnectionError as RedisConnectionError
import logging
import functools
import re
import random
from psutil import virtual_memory
from ..misc.logging import GMS_logger
from ..options.config import GMS_config as CFG
try:
redis_conn = StrictRedis(host='localhost', db=0)
redis_conn.keys() # may raise ConnectionError, e.g., if redis server is not installed or not running
......@@ -20,104 +19,213 @@ except RedisConnectionError:
redis_conn = None
class MultiSlotLock(Lock):
def __init__(self, name, allowed_slots=1, logger=None, **kwargs):
self.conn = redis_conn
self.name = name
self.allowed_threads = allowed_slots or 0
self.logger = logger or GMS_logger("RedisLock: '%s'" % self.name)
self.kwargs = kwargs
self.allowed_slot_names = ['%s, slot #%s' % (self.name, i) for i in range(1, self.allowed_threads + 1)]
self.final_name = ''
self._acquired = None
if allowed_slots and redis_conn:
logged = False
while True:
time.sleep(random.uniform(0, 1.5)) # avoids race conditions in case multiple tasks are waiting
name_free_slot = self.get_free_slot_name()
if not name_free_slot:
if not logged:
self.logger.info("Waiting for free '%s' lock." % self.name)
logged = True
else:
break
class MultiSlotLock(Semaphore):
def __init__(self, name='MultiSlotLock', allowed_slots=1, logger=None, **kwargs):
self.disabled = redis_conn is None or allowed_slots in [None, False]
self.namespace = name
self.allowed_slots = allowed_slots
self.logger = logger or GMS_logger("RedisLock: '%s'" % name)
self.final_name = 'GMS_%s__' % CFG.ID + name_free_slot
super().__init__(self.conn, self.final_name, **kwargs)
else:
pass
if not self.disabled:
super(MultiSlotLock, self).__init__(client=redis_conn, count=allowed_slots, namespace=name, **kwargs)
@property
def existing_locks(self):
names = [i.decode('utf8').split('lock:')[1] for i in self.conn.keys() if i.decode('utf8').startswith('lock:')]
# split 'GMS_<jobid>' and return
return list(set([re.search('GMS_[0-9]*__(.*)', n, re.I).group(1) for n in names if n.startswith('GMS_')]))
def get_free_slot_name(self):
free_slots = [sn for sn in self.allowed_slot_names if sn not in self.existing_locks]
if free_slots:
return free_slots[0]
def acquire(self, blocking=True, timeout=None):
if self.allowed_threads and self.conn:
if self._acquired:
raise AlreadyAcquired("Already acquired from this Lock instance.")
while not self._acquired:
try:
# print('Trying to acquire %s.' % self.final_name.split('GMS_%s__' % CFG.ID)[1])
self._acquired = super(MultiSlotLock, self).acquire(blocking=blocking, timeout=timeout)
# print("Acquired lock '%s'." % self.final_name.split('GMS_%s__' % CFG.ID)[1])
except AlreadyAcquired:
# this happens in case the lock has already been acquired by another instance of MultiSlotLock due
# to a race condition (time gap between finding the free slot and the call of self.acquire())
# -> in that case: re-initialize to get a new free slot
self.__init__(self.name, allowed_slots=self.allowed_threads, logger=self.logger,
**self.kwargs)
if self._acquired is False: # and not None
self.__init__(self.name, allowed_slots=self.allowed_threads, logger=self.logger,
**self.kwargs)
# print(self.final_name.split('GMS_%s__' % CFG.ID)[1], self._acquired)
if self._acquired:
self.logger.info("Acquired lock '%s'." % self.final_name.split('GMS_%s__' % CFG.ID)[1])
else:
self._acquired = True
return self._acquired
def acquire(self, timeout=0, target=None):
if not self.disabled:
if self.available_count == 0:
self.logger.info("Waiting for free lock '%s'." % self.namespace)
token = super(MultiSlotLock, self).acquire(timeout=timeout, target=target)
self.logger.info("Acquired lock '%s'" % self.namespace +
('.' if self.allowed_slots == 1 else ', slot #%s.' % (int(token) + 1)))
return token
def release(self):
if self.allowed_threads and self.conn:
super(MultiSlotLock, self).release()
self.logger.info("Released lock '%s'." % self.final_name.split('GMS_%s__' % CFG.ID)[1])
if not self.disabled:
token = super(MultiSlotLock, self).release()
if token:
self.logger.info("Released lock '%s'" % self.namespace +
('.' if self.allowed_slots == 1 else ', slot #%s.' % (int(token) + 1)))
def delete(self):
if not self.disabled:
self.client.delete(self.check_exists_key)
self.client.delete(self.available_key)
self.client.delete(self.grabbed_key)
class SharedResourceLock(MultiSlotLock):
def acquire(self, timeout=0, target=None):
if not self.disabled:
token = super(SharedResourceLock, self).acquire(timeout=timeout, target=target)
self.client.hset(self.grabbed_key_jobID, token, self.current_time)
class IOLock(MultiSlotLock):
def release_all_jobID_tokens(self):
if not self.disabled:
for token in self.client.hkeys(self.grabbed_key_jobID):
self.signal(token)
self.client.delete(self.grabbed_key_jobID)
@property
def grabbed_key_jobID(self):
return self._get_and_set_key('_grabbed_key_jobID', 'GRABBED_BY_GMSJOB_%s' % CFG.ID)
def signal(self, token):
if token is None:
return None
with self.client.pipeline() as pipe:
pipe.multi()
pipe.hdel(self.grabbed_key, token)
pipe.hdel(self.grabbed_key_jobID, token) # only difference to Semaphore.signal()
pipe.lpush(self.available_key, token)
pipe.execute()
return token
def delete(self):
if not self.disabled:
super(SharedResourceLock, self).delete()
self.client.delete(self.grabbed_key_jobID)
class IOLock(SharedResourceLock):
def __init__(self, allowed_slots=1, logger=None, **kwargs):
super(IOLock, self).__init__(name='IOLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
self.disabled = CFG.disable_IO_locks
if not self.disabled:
super(IOLock, self).__init__(name='IOLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
class ProcessLock(MultiSlotLock):
class ProcessLock(SharedResourceLock):
def __init__(self, allowed_slots=1, logger=None, **kwargs):
super(ProcessLock, self).__init__(name='ProcessLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
self.disabled = CFG.disable_CPU_locks
if not self.disabled:
super(ProcessLock, self).__init__(name='ProcessLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
class MemoryReserver(Semaphore):
def __init__(self, mem2lock_gb, max_usage=90, logger=None, **kwargs):
"""
:param reserved_mem: Amount of memory to be reserved during the lock is acquired (gigabytes).
"""
self.disabled = redis_conn is None or CFG.disable_memory_locks or mem2lock_gb in [None, False]
self.mem2lock_gb = mem2lock_gb
self.max_usage = max_usage
self._waiting = False
def acquire_process_lock(allowed_slots=None, logger=None):
if not logger:
logger = logging.getLogger('ProcessLock')
logger.setLevel('INFO')
if not self.disabled:
mem_limit = int(virtual_memory().total * max_usage / 100 / 1024**3)
super(MemoryReserver, self).__init__(client=redis_conn, count=mem_limit, namespace='MemoryReserver',
**kwargs)
self.logger = logger or GMS_logger("RedisLock: 'MemoryReserver'")
@property
def mem_reserved_gb(self):
return int(redis_conn.get('GMS_mem_reserved') or 0)
@property
def usable_memory_gb(self):
return int((virtual_memory().total * self.max_usage / 100 - virtual_memory().used) / 1024**3) \
- int(self.mem_reserved_gb)
@property
def grabbed_key_jobID(self):
return self._get_and_set_key('_grabbed_key_jobID', 'GRABBED_BY_GMSJOB_%s' % CFG.ID)
@property
def reserved_key(self):
return self._get_and_set_key('_reserved_key', 'MEM_RESERVED')
@property
def reserved_key_jobID(self):
return self._get_and_set_key('_reserved_key_jobID', 'MEM_RESERVED_BY_GMSJOB_%s' % CFG.ID)
def acquire(self, timeout=0, target=None):
if not self.disabled:
with Lock(self.client, 'GMS_mem_acquire_lock'):
if self.usable_memory_gb >= self.mem2lock_gb:
for i in range(self.mem2lock_gb):
token = super(MemoryReserver, self).acquire(timeout=timeout)
self.client.hset(self.grabbed_key_jobID, token, self.current_time)
self.client.incr(self.reserved_key, self.mem2lock_gb)
self.client.incr(self.reserved_key_jobID, self.mem2lock_gb)
self.logger.info('Reserved %s GB of memory.' % self.mem2lock_gb)
self._waiting = False
else:
if not self._waiting:
self.logger.info('Currently usable memory: %s GB. Waiting until at least %s GB are usable.'
% (self.usable_memory_gb, self.mem2lock_gb))
self._waiting = True
time.sleep(1)
self.acquire(timeout=timeout)
def release(self):
if not self.disabled:
for token in self._local_tokens:
self.signal(token)
self.client.decr(self.reserved_key, self.mem2lock_gb)
self.client.decr(self.reserved_key_jobID, self.mem2lock_gb)
self.logger.info('Released %s GB of reserved memory.' % self.mem2lock_gb)
def release_all_jobID_tokens(self):
mem_reserved = int(redis_conn.get(self.reserved_key_jobID) or 0)
if mem_reserved:
redis_conn.decr(self.reserved_key, mem_reserved)
redis_conn.delete(self.reserved_key_jobID)
for token in self.client.hkeys(self.grabbed_key_jobID):
self.signal(token)
self.client.delete(self.grabbed_key_jobID)
def delete(self):
if not self.disabled:
self.client.delete(self.check_exists_key)
self.client.delete(self.available_key)
self.client.delete(self.grabbed_key)
if self.mem_reserved_gb <= 0:
self.client.delete(self.reserved_key)
def acquire_process_lock(**processlock_kwargs):
"""Decorator function for ProcessLock.
:param processlock_kwargs: Keywourd arguments to be passed to ProcessLock class.
"""
def decorator(func):
@functools.wraps(func) # needed to avoid pickling errors
def wrapped_func(*args, **kwargs):
with ProcessLock(**processlock_kwargs):
result = func(*args, **kwargs)
return result
return wrapped_func
return decorator
def reserve_mem(**memlock_kwargs):
"""Decorator function for MemoryReserver.
:param memlock_kwargs: Keyword arguments to be passed to MemoryReserver class.
"""
def decorator(func):
@functools.wraps(func) # needed to avoid pickling errors
def wrapped_func(*args, **kwargs):
with ProcessLock(allowed_threads=allowed_slots, logger=logger):
with MemoryReserver(**memlock_kwargs):
result = func(*args, **kwargs)
return result
......@@ -127,18 +235,19 @@ def acquire_process_lock(allowed_slots=None, logger=None):
return decorator
def release_unclosed_locks(logger=None):
def release_unclosed_locks():
if redis_conn:
logger = logger or GMS_logger('LockReseter')
locks2release = [i.split(b'lock:')[1].decode('latin') for i in redis_conn.keys()
if i.decode('latin').startswith('lock:GMS_%s__' % CFG.ID)]
if locks2release:
logger.info("Releasing unclosed locks of job %s." % CFG.ID)
for lockN in locks2release:
lock = Lock(redis_conn, lockN)
try:
lock.release()
except NotAcquired:
lock.reset()
for L in [IOLock, ProcessLock]:
lock = L(allowed_slots=1)
lock.release_all_jobID_tokens()
# delete the complete redis namespace if no lock slot is acquired anymore
if lock.client.hlen(lock.grabbed_key) == 0:
lock.delete()
ML = MemoryReserver(1)
ML.release_all_jobID_tokens()
# delete the complete redis namespace if no lock slot is acquired anymore
if ML.client.hlen(ML.grabbed_key) == 0:
ML.delete()
# -*- coding: utf-8 -*-
import collections
import copy
import datetime
import functools
......@@ -15,6 +14,9 @@ import logging
from collections import OrderedDict
from itertools import chain
from typing import Iterable, List, Union, TYPE_CHECKING # noqa F401 # flake8 issue
import psutil
import time
from pkg_resources import parse_version
import numpy as np
import spectral
......@@ -137,6 +139,8 @@ class GMS_object(object):
self.lonlat_arr = None # set by self.write_tiles_to_ENVIfile
self.trueDataCornerUTM = None # set by self.from_tiles
self.mem_usage = {}
# set pathes
self.path_cloud_class_obj = ''
......@@ -705,16 +709,20 @@ class GMS_object(object):
% self.proc_level)
return None
self.logger.info('Generating combined accuracy layers array..')
try:
from ..algorithms.L2C_P import AccuracyCube
self._accuracy_layers = AccuracyCube(self)
except ValueError as e:
if str(e) == 'The given GMS_object contains no accuracy layers for combination.':
if CFG.ac_estimate_accuracy or CFG.spechomo_estimate_accuracy:
self.logger.warning('The given GMS_object contains no accuracy layers although computation '
'of accurracy layers was enabled in job configuration.')
if any([CFG.ac_estimate_accuracy, CFG.spathomo_estimate_accuracy, CFG.spechomo_estimate_accuracy]):
if self.proc_level == 'L2C':
self.logger.warning('The given GMS_object contains no accuracy layers although computation '
'of accurracy layers was enabled in job configuration.')
else:
# Suppress the warning if accuracy_layers is just called by GMS_object.to_tiles() within
# L2A or L2B processing.
pass
else:
pass # self._accuracy_layers keeps None
else:
......@@ -996,7 +1004,7 @@ class GMS_object(object):
attr_val = getattr(GMS_obj.MetaObj, attrN)
if isinstance(attr_val, list):
attrDic_fullLBA.update(dict(zip(GMS_obj.LayerBandsAssignment, attr_val)))
elif isinstance(attr_val, (dict, collections.OrderedDict)):
elif isinstance(attr_val, (dict, OrderedDict)):
attrDic_fullLBA.update(attr_val)
else:
raise ValueError(attrN)
......@@ -1735,7 +1743,7 @@ class GMS_object(object):
elif isinstance(v, GMS_identifier):
dict2write[k] = v.to_odict(include_logger=False)
elif isinstance(v, collections.OrderedDict) or isinstance(v, dict):
elif isinstance(v, OrderedDict) or isinstance(v, dict):
dict2write[k] = dict2write[k].copy()
if 'logger' in v:
if hasattr(dict2write[k]['logger'], 'handlers') and dict2write[k]['logger'].handlers[:]:
......@@ -2083,6 +2091,38 @@ class GMS_object(object):
# close logger
self.close_loggers()
def record_mem_usage(self):
"""Record memory usage of current process ID for the current processing level (megabytes)."""
# get memory usage of current process ID
mem_usage = round(psutil.Process(os.getpid()).memory_info().rss / 1024**2, 1)
# update the value (only if there is not a larger value already because we want to record maximum memory needed)
if self.proc_level not in self.mem_usage or self.mem_usage[self.proc_level] > mem_usage:
self.mem_usage[self.proc_level] = mem_usage
# push recorded memory stats to database
if self.proc_level == 'L2C':
stats_pushed = DB_T.record_stats_memusage(CFG.conn_database, self)
if stats_pushed:
self.logger.info('Recorded memory usage statistics.')
def block_at_system_overload(self, max_usage=95, timeout=15*60):
logged = False
t_start = time.time()
while psutil.virtual_memory().percent >= max_usage:
if not logged:
self.logger.warning('Memory usage is %.1f percent. Waiting until memory usage is below %s percent. '
'Timeout: %s minutes.' % (psutil.virtual_memory().percent, max_usage, timeout / 60))
logged = True
if time.time() - t_start > timeout:
self.logger.exception('Processing could not be continued due to memory overload after waiting '
'%s minutes.' % (timeout / 60))
time.sleep(5)
def close_loggers(self):
if self._logger not in [None, 'not set']:
self.logger.close() # this runs misc.logging.GMS_logger.close()
......@@ -2196,7 +2236,7 @@ class GMS_object(object):
class GMS_identifier(object):
def __init__(self, image_type, satellite, sensor, subsystem, proc_level, dataset_ID, logger=None):
# type: (str, str, str, str, str, int, logging.Logger) -> None
# type: (str, str, str, str, str, int, DatasetLogger) -> None
self.image_type = image_type
self.satellite = satellite