Commit 20083316 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Simplified process controller. GMS jobs now delete their own GMS_mem_acquire_lock during shutdown.

parent ce09c4d8
...@@ -4,7 +4,7 @@ __author__ = 'Daniel Scheffler' ...@@ -4,7 +4,7 @@ __author__ = 'Daniel Scheffler'
import time import time
from redis import StrictRedis from redis import StrictRedis
from redis_semaphore import Semaphore from redis_semaphore import Semaphore
from redis_lock import Lock from redis_lock import Lock, NotAcquired
from redis.exceptions import ConnectionError as RedisConnectionError from redis.exceptions import ConnectionError as RedisConnectionError
import functools import functools
from psutil import virtual_memory from psutil import virtual_memory
...@@ -145,7 +145,19 @@ class MemoryReserver(Semaphore): ...@@ -145,7 +145,19 @@ class MemoryReserver(Semaphore):
def acquire(self, timeout=0, target=None): def acquire(self, timeout=0, target=None):
if not self.disabled: if not self.disabled:
with Lock(self.client, 'GMS_mem_acquire_lock', expire=5, auto_renewal=True):
# # due to, e.g., a MemoryError redis_lock.Lock may try to extend an already expired lock
# # (which does not exist anymore) -> for GMS, thats also
# class mocked_Lock(Lock):
# def extend(self, expire=None):
# try:
# super(mocked_Lock, self).extend(expire=expire)
# except NotAcquired:
# pass
with Lock(self.client, 'GMS_mem_acquire_lock'): # , expire=3, auto_renewal=True):
self.client.set('GMS_mem_acquire_lock:GRABBED_BY_GMSJOB', CFG.ID)
if self.usable_memory_gb >= self.mem2lock_gb: if self.usable_memory_gb >= self.mem2lock_gb:
for i in range(self.mem2lock_gb): for i in range(self.mem2lock_gb):
token = super(MemoryReserver, self).acquire(timeout=timeout) token = super(MemoryReserver, self).acquire(timeout=timeout)
...@@ -166,6 +178,8 @@ class MemoryReserver(Semaphore): ...@@ -166,6 +178,8 @@ class MemoryReserver(Semaphore):
time.sleep(1) time.sleep(1)
self.acquire(timeout=timeout) self.acquire(timeout=timeout)
self.client.delete('GMS_mem_acquire_lock:GRABBED_BY_GMSJOB')
def release(self): def release(self):
if not self.disabled: if not self.disabled:
for token in self._local_tokens: for token in self._local_tokens:
...@@ -251,3 +265,7 @@ def release_unclosed_locks(): ...@@ -251,3 +265,7 @@ def release_unclosed_locks():
# delete the complete redis namespace if no lock slot is acquired anymore # delete the complete redis namespace if no lock slot is acquired anymore
if ML.client.hlen(ML.grabbed_key) == 0: if ML.client.hlen(ML.grabbed_key) == 0:
ML.delete() ML.delete()
if int(redis_conn.get('GMS_mem_acquire_lock:GRABBED_BY_GMSJOB') or -9999) == CFG.ID:
redis_conn.delete('GMS_mem_acquire_lock')
redis_conn.delete('GMS_mem_acquire_lock:GRABBED_BY_GMSJOB')
...@@ -399,31 +399,25 @@ class process_controller(object): ...@@ -399,31 +399,25 @@ class process_controller(object):
self.config.computation_time = self.config.end_time - self.config.start_time self.config.computation_time = self.config.end_time - self.config.start_time
self.logger.info('Time for execution: %s' % self.config.computation_time) self.logger.info('Time for execution: %s' % self.config.computation_time)
# update database entry of current job
self.update_DB_job_record()
if self.config.profiling:
self.profiler.stop()
print(self.profiler.output_text(unicode=True, color=True))
self.shutdown()
except Exception: # noqa E722 # bare except except Exception: # noqa E722 # bare except
if self.config.profiling:
self.profiler.stop()
print(self.profiler.output_text(unicode=True, color=True))
self.config.status = 'failed' self.config.status = 'failed'
self.update_DB_job_record()
if not self.config.disable_exception_handler: if not self.config.disable_exception_handler:
self.logger.error('Execution failed with an error:', exc_info=True) self.logger.error('Execution failed with an error:', exc_info=True)
self.shutdown()
else: else:
self.logger.error('Execution failed with an error:') self.logger.error('Execution failed with an error:')
self.shutdown()
raise raise
finally:
# update database entry of current job
self.update_DB_job_record()
if self.config.profiling:
self.profiler.stop()
print(self.profiler.output_text(unicode=True, color=True))
self.shutdown()
def run_all_processors(self, custom_data_list=None): def run_all_processors(self, custom_data_list=None):
# enable clean shutdown possibility # enable clean shutdown possibility
# NOTE: a signal.SIGKILL (kill -9 ...) forces to kill the process and cannot be catched or handled # NOTE: a signal.SIGKILL (kill -9 ...) forces to kill the process and cannot be catched or handled
...@@ -484,31 +478,25 @@ class process_controller(object): ...@@ -484,31 +478,25 @@ class process_controller(object):
self.config.computation_time = self.config.end_time - self.config.start_time self.config.computation_time = self.config.end_time - self.config.start_time
self.logger.info('Time for execution: %s' % self.config.computation_time) self.logger.info('Time for execution: %s' % self.config.computation_time)
# update database entry of current job
self.update_DB_job_record()
if self.config.profiling:
self.profiler.stop()
print(self.profiler.output_text(unicode=True, color=True))
self.shutdown()
except Exception: # noqa E722 # bare except except Exception: # noqa E722 # bare except
if self.config.profiling:
self.profiler.stop()
print(self.profiler.output_text(unicode=True, color=True))
self.config.status = 'failed' self.config.status = 'failed'
self.update_DB_job_record()
if not self.config.disable_exception_handler: if not self.config.disable_exception_handler:
self.logger.error('Execution failed with an error:', exc_info=True) self.logger.error('Execution failed with an error:', exc_info=True)
self.shutdown()
else: else:
self.logger.error('Execution failed with an error:') self.logger.error('Execution failed with an error:')
self.shutdown()
raise raise
finally:
# update database entry of current job
self.update_DB_job_record()
if self.config.profiling:
self.profiler.stop()
print(self.profiler.output_text(unicode=True, color=True))
self.shutdown()
def stop(self, signum, frame): def stop(self, signum, frame):
"""Interrupt the running process controller gracefully.""" """Interrupt the running process controller gracefully."""
...@@ -536,7 +524,7 @@ class process_controller(object): ...@@ -536,7 +524,7 @@ class process_controller(object):
tempdir = os.path.join(self.config.path_tempdir) tempdir = os.path.join(self.config.path_tempdir)
self.logger.info('Deleting temporary directory %s.' % tempdir) self.logger.info('Deleting temporary directory %s.' % tempdir)
if os.path.exists(tempdir): if os.path.exists(tempdir):
shutil.rmtree(tempdir) shutil.rmtree(tempdir, ignore_errors=True)
del self.logger del self.logger
shutdown_loggers() shutdown_loggers()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment