Commit f78c26fb authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Removed old version of class MemoryReserver. Added retools to dependencies.

parent e13a780f
...@@ -32,7 +32,7 @@ test_gms_preprocessing: ...@@ -32,7 +32,7 @@ test_gms_preprocessing:
# - python setup.py install # - python setup.py install
# - cd ../../ # - cd ../../
# make tests # make tests
- pip install nested_dict openpyxl timeout_decorator redis redis-semaphore psutil # FIXME: remove as soon as runner is rebuilt - pip install nested_dict openpyxl timeout_decorator redis retools redis-semaphore psutil # FIXME: remove as soon as runner is rebuilt
- make nosetests - make nosetests
- make docs - make docs
artifacts: artifacts:
......
...@@ -8,7 +8,6 @@ from redis.exceptions import ConnectionError as RedisConnectionError ...@@ -8,7 +8,6 @@ from redis.exceptions import ConnectionError as RedisConnectionError
from retools.lock import Lock, LockTimeout from retools.lock import Lock, LockTimeout
import functools import functools
from psutil import virtual_memory from psutil import virtual_memory
import random
from ..misc.logging import GMS_logger from ..misc.logging import GMS_logger
from ..options.config import GMS_config as CFG from ..options.config import GMS_config as CFG
...@@ -106,216 +105,6 @@ class ProcessLock(SharedResourceLock): ...@@ -106,216 +105,6 @@ class ProcessLock(SharedResourceLock):
super(ProcessLock, self).__init__(name='ProcessLock', allowed_slots=allowed_slots, logger=logger, **kwargs) super(ProcessLock, self).__init__(name='ProcessLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
class MemoryReserver(Semaphore):
def __init__(self, mem2lock_gb, max_usage=90, logger=None, **kwargs):
"""
:param reserved_mem: Amount of memory to be reserved during the lock is acquired (gigabytes).
"""
self.disabled = redis_conn is None or CFG.disable_memory_locks or mem2lock_gb in [None, False]
self.mem2lock_gb = mem2lock_gb
self.max_usage = max_usage
self._waiting = False
if not self.disabled:
mem_limit = int(virtual_memory().total * max_usage / 100 / 1024**3)
super(MemoryReserver, self).__init__(client=redis_conn, count=mem_limit, namespace='MemoryReserver',
**kwargs)
self.logger = logger or GMS_logger("RedisLock: 'MemoryReserver'")
@property
def waiting(self):
return self._waiting
@waiting.setter
def waiting(self, val):
if val is not self._waiting:
if val:
self.client.incr(self.waiting_key, 1)
self.client.incr(self.waiting_key_jobID, 1)
else:
self.client.decr(self.waiting_key, 1)
self.client.decr(self.waiting_key_jobID, 1)
self._waiting = val
@property
def mem_reserved_gb(self):
return int(self.client.get(self.reserved_key) or 0)
@property
def usable_memory_gb(self):
return int((virtual_memory().total * self.max_usage / 100 - virtual_memory().used) / 1024**3) \
- int(self.mem_reserved_gb)
@property
def grabbed_key_jobID(self):
return self._get_and_set_key('_grabbed_key_jobID', 'GRABBED_BY_GMSJOB_%s' % CFG.ID)
@property
def reserved_key(self):
return self._get_and_set_key('_reserved_key', 'MEM_RESERVED')
@property
def reserved_key_jobID(self):
return self._get_and_set_key('_reserved_key_jobID', 'MEM_RESERVED_BY_GMSJOB_%s' % CFG.ID)
@property
def acquisition_key(self):
return self._get_and_set_key('_acquisition_key', 'ACQUISITION_LOCK')
@property
def acquisition_key_expiration(self):
return self._get_and_set_key('_acquisition_key_expiration', 'ACQUISITION_LOCK_EXPIRATION_TIME')
@property
def waiting_key(self):
return self._get_and_set_key('_waiting_key', 'NUMBER_WAITING')
@property
def waiting_key_jobID(self):
return self._get_and_set_key('_waiting_key_jobID', 'NUMBER_WAITING_GMSJOB_%s' % CFG.ID)
# def acquire(self, timeout=0, target=None):
# if not self.disabled:
# def check_acquisition_key():
# tok = self.client.getset(self.acquisition_key, CFG.ID)
#
# expiration_time = self.client.get(self.acquisition_key_expiration)
# if expiration_time and float(expiration_time) < time.time():
# self.client.delete(self.acquisition_key_expiration)
# check_acquisition_key()
#
# return tok
#
# while check_acquisition_key() is not None:
# self.waiting = True
# time.sleep(random.uniform(1, 5)) # avoids race conditions
#
# self.client.expire(self.acquisition_key, 10)
# self.client.set(self.acquisition_key_expiration, time.time() + 10)
#
# try:
# if self.usable_memory_gb >= self.mem2lock_gb:
# for i in range(self.mem2lock_gb):
# token = super(MemoryReserver, self).acquire(timeout=timeout)
# self.client.hset(self.grabbed_key_jobID, token, self.current_time)
#
# self.client.incr(self.reserved_key, self.mem2lock_gb)
# self.client.incr(self.reserved_key_jobID, self.mem2lock_gb)
#
# self.logger.info('Reserved %s GB of memory.' % self.mem2lock_gb)
# self.waiting = False
#
# else:
# if not self._waiting:
# self.logger.info('Currently usable memory: %s GB. Waiting until at least %s GB are usable.'
# % (self.usable_memory_gb, self.mem2lock_gb))
# self.waiting = True
#
# time.sleep(random.uniform(1, 2))
# self.acquire(timeout=timeout)
#
# finally:
# self.client.delete(self.acquisition_key_expiration)
# self.client.delete(self.acquisition_key)
def acquire(self, timeout=20, target=None):
try:
with Lock(self.acquisition_key, expires=20, timeout=timeout, redis=self.client):
if self.usable_memory_gb >= self.mem2lock_gb:
t_start = time.time()
for i in range(self.mem2lock_gb):
token = super(MemoryReserver, self).acquire(timeout=0)
self.client.hset(self.grabbed_key_jobID, token, self.current_time)
self.client.incr(self.reserved_key, self.mem2lock_gb)
self.client.incr(self.reserved_key_jobID, self.mem2lock_gb)
self.logger.info('Reserved %s GB of memory.' % self.mem2lock_gb)
self.waiting = False
if time.time() > t_start + timeout:
self.logger.warning('Reservation of memory took more time than expected. '
'Possibly more memory than available has been reserved.')
else:
if not self.waiting:
self.logger.info('Currently usable memory: %s GB. Waiting until at least %s GB are usable.'
% (self.usable_memory_gb, self.mem2lock_gb))
self.waiting = True
except LockTimeout:
# self.client.delete(self.acquisition_key)
self.acquire(timeout=timeout, target=target)
if self.waiting:
time.sleep(random.uniform(1, 2))
self.acquire(timeout=timeout)
# def acquire(self, timeout=0):
# with self.client.pipeline() as pipe:
# pipe.multi()
# pipe.delete(self.grabbed_key, self.available_key)
# pipe.rpush(self.available_key, *range(self.count))
# pipe.execute()
# pair = self.client.blpop(self.available_key, timeout)
# if pair is None:
# raise NotAvailable
# token = pair[1]
# self._local_tokens.append(token)
def release(self):
if not self.disabled:
with Lock(self.acquisition_key, expires=20, timeout=20, redis=self.client):
for token in self._local_tokens:
self.signal(token)
self.client.decr(self.reserved_key, self.mem2lock_gb)
self.client.decr(self.reserved_key_jobID, self.mem2lock_gb)
self.logger.info('Released %s GB of reserved memory.' % self.mem2lock_gb)
def release_all_jobID_tokens(self):
if not self.disabled:
with Lock(self.acquisition_key, expires=20, timeout=20, redis=self.client):
mem_reserved = int(self.client.get(self.reserved_key_jobID) or 0)
if mem_reserved:
self.client.decr(self.reserved_key, mem_reserved)
self.client.delete(self.reserved_key_jobID)
for token in self.client.hkeys(self.grabbed_key_jobID):
self.signal(token)
self.client.delete(self.grabbed_key_jobID)
def delete(self):
if not self.disabled:
self.client.delete(self.check_exists_key)
self.client.delete(self.available_key)
self.client.delete(self.grabbed_key)
# # remove acquisition key (if held by the current job ID)
# if CFG.ID in [int(i) for i in self.client.hkeys(self.acquisition_key)]:
# self.client.delete(self.acquisition_key)
# # self.client.delete(self.acquisition_key_expiration) # might be dangerous (could be held by another job)
# remove waiting keys
self.waiting = False # decrements self.waiting_key
n_waiting_currJob = int(self.client.get(self.waiting_key_jobID) or 0)
self.client.delete(self.waiting_key_jobID)
if n_waiting_currJob:
self.client.decr(self.waiting_key, n_waiting_currJob)
if int(self.client.get(self.waiting_key) or 0) == 0:
self.client.delete(self.waiting_key)
# remove reserved key
if self.mem_reserved_gb <= 0:
self.client.delete(self.reserved_key)
class MemoryReserver(object): class MemoryReserver(object):
def __init__(self, mem2lock_gb, max_usage=90, logger=None): def __init__(self, mem2lock_gb, max_usage=90, logger=None):
""" """
......
...@@ -18,5 +18,6 @@ nested_dict ...@@ -18,5 +18,6 @@ nested_dict
openpyxl openpyxl
timeout_decorator timeout_decorator
redis redis
retools
redis-semaphore redis-semaphore
psutil psutil
...@@ -19,7 +19,7 @@ requirements = [ ...@@ -19,7 +19,7 @@ requirements = [
'matplotlib', 'numpy', 'scikit-learn', 'scipy', 'gdal', 'pyproj', 'shapely', 'ephem', 'pyorbital', 'dill', 'pytz', 'matplotlib', 'numpy', 'scikit-learn', 'scipy', 'gdal', 'pyproj', 'shapely', 'ephem', 'pyorbital', 'dill', 'pytz',
'pandas', 'numba', 'spectral>=0.16', 'geopandas', 'iso8601', 'pyinstrument', 'geoalchemy2', 'sqlalchemy', 'pandas', 'numba', 'spectral>=0.16', 'geopandas', 'iso8601', 'pyinstrument', 'geoalchemy2', 'sqlalchemy',
'psycopg2', 'py_tools_ds>=0.12.4', 'geoarray>=0.7.13', 'arosics>=0.6.6', 'six', 'tqdm', 'jsmin', 'cerberus', 'psycopg2', 'py_tools_ds>=0.12.4', 'geoarray>=0.7.13', 'arosics>=0.6.6', 'six', 'tqdm', 'jsmin', 'cerberus',
'nested_dict', 'openpyxl', 'timeout_decorator', 'redis', 'redis-semaphore', 'psutil' 'nested_dict', 'openpyxl', 'timeout_decorator', 'redis', 'retools', 'redis-semaphore', 'psutil'
# spectral<0.16 has some problems with writing signed integer 8bit data # spectral<0.16 has some problems with writing signed integer 8bit data
# fmask # conda install -c conda-forge python-fmask # fmask # conda install -c conda-forge python-fmask
# 'pyhdf', # conda install --yes -c conda-forge pyhdf # 'pyhdf', # conda install --yes -c conda-forge pyhdf
......
...@@ -75,6 +75,7 @@ dependencies: ...@@ -75,6 +75,7 @@ dependencies:
- openpyxl - openpyxl
- timeout_decorator - timeout_decorator
- redis - redis
- retools
- redis-semaphore - redis-semaphore
- psutil - psutil
- py_tools_ds>=0.12.4 - py_tools_ds>=0.12.4
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment