Commit 0dd12593 authored by Mathias Peters's avatar Mathias Peters
Browse files

Fixed logging permission errors.

parent 0e9de131
......@@ -54,6 +54,10 @@ class MultiSlotLock(Semaphore):
self.client.delete(self.available_key)
self.client.delete(self.grabbed_key)
def __exit__(self, exc_type, exc_val, exc_tb):
self.logger.close()
return super(MultiSlotLock, self).__exit__(exc_type, exc_val, exc_tb)
class SharedResourceLock(MultiSlotLock):
def acquire(self, timeout=0, target=None):
......@@ -196,6 +200,10 @@ class MemoryReserver(Semaphore):
if self.mem_reserved_gb <= 0:
self.client.delete(self.reserved_key)
def __exit__(self, exc_type, exc_val, exc_tb):
self.logger.close()
return super(MemoryReserver, self).__exit__(exc_type, exc_val, exc_tb)
def acquire_process_lock(**processlock_kwargs):
"""Decorator function for ProcessLock.
......
......@@ -5,11 +5,6 @@ import sys
import warnings
import logging
import os
import socketserver
import select
import pickle
import struct
from logging.handlers import SocketHandler, DEFAULT_TCP_LOGGING_PORT
from ..options.config import GMS_config as CFG
......@@ -66,27 +61,22 @@ class GMS_logger(logging.Logger):
fileHandler = None
# set fileHandler for job logfile
joblog_Handler = None
if False: # log_to_joblog:
# if log_to_joblog:
if log_to_joblog:
job_logfile = os.path.join(CFG.path_job_logs, '%s.log' % CFG.ID)
# create output directory
silentmkdir(job_logfile)
if False: # CFG.CPUs > 1:
# Logging messages from multiple workers to the same file in multiprocessing will cause
# PermissionErrors. Thats why use a SocketHandler in the middle which then logs to a FileHandler.
# create SocketHandler
joblog_Handler = SocketHandler('localhost', DEFAULT_TCP_LOGGING_PORT) # FIXME static host
joblog_Handler.setFormatter(self.formatter_fileH)
joblog_Handler.setLevel(log_level)
else:
# create FileHandler
joblog_Handler = logging.FileHandler(job_logfile, mode='a' if append else 'w')
joblog_Handler.setFormatter(self.formatter_fileH)
joblog_Handler.setLevel(log_level)
# create FileHandler
joblog_Handler = logging.FileHandler(job_logfile, mode='a' if append else 'w')
joblog_Handler.setFormatter(self.formatter_fileH)
joblog_Handler.setLevel(log_level)
# create SocketHandler
# joblog_Handler = SocketHandler('localhost', DEFAULT_TCP_LOGGING_PORT) # FIXME static host
# joblog_Handler.setFormatter(self.formatter_fileH)
# joblog_Handler.setLevel(log_level)
else:
joblog_Handler = None
......@@ -149,9 +139,6 @@ class GMS_logger(logging.Logger):
ObjDict = self.__dict__
return ObjDict
# def __del__(self):
# self.close()
@property
def captured_stream(self):
if not self._captured_stream:
......@@ -173,22 +160,6 @@ class GMS_logger(logging.Logger):
# print(dir(self.streamHandler))
# self.streamHandler = None
# for handler in self.handlers[:]:
# from time import time
# t0=time()
# while True:
# try:
# # if handler.get_name()=='StringIO handler':
# # self.streamObj.flush()
# # self.streamHandler.flush()
# self.removeHandler(handler) # if not called with '[:]' the StreamHandlers are left open
# handler.flush()
# handler.close()
# except PermissionError:
# if time()-t0 < 5:
# pass
# else:
# warnings.warn('Could not properly close logfile due to a PermissionError: %s' % sys.exc_info()[1])
for handler in self.handlers[:]:
try:
# if handler.get_name()=='StringIO handler':
......@@ -209,6 +180,13 @@ class GMS_logger(logging.Logger):
with open(self.path_logfile) as inF:
print(inF.read())
def __enter__(self):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return True if exc_type is None else False
def close_logger(logger):
if logger and hasattr(logger, 'handlers'):
......@@ -237,153 +215,3 @@ class LessThanFilter(logging.Filter):
def filter(self, record):
# non-zero return means we log this message
return True if record.levelno < self.max_level else False
class LogRecordStreamHandler(socketserver.StreamRequestHandler):
"""Handle incoming logs"""
def handle(self):
"""Deal with the incoming log data"""
while True:
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
struct_len = struct.unpack('>L', chunk)[0]
chunk = self.connection.recv(struct_len)
while len(chunk) < struct_len:
chunk = chunk + self.connection.recv(struct_len - len(chunk))
obj = pickle.loads(chunk)
record = logging.makeLogRecord(obj)
self.handle_log_record(record)
print('Server: %s' % record.msg)
import pkgutil
with open(os.path.abspath(os.path.join(os.path.dirname(pkgutil.get_loader("gms_preprocessing").path), '..',
'tests', 'data', 'testlog.log')), 'w') as outW:
outW.write(record.msg)
def handle_log_record(self, record):
"""Process incoming log record
:param record: The record to write
"""
name = self.server.logname if self.server.logname is not None else record.name
logging.getLogger(name).handle(record)
# import daemon
# class LogReceiver(socketserver.ThreadingTCPServer):
# """Simple TCP socket-based logging receiver."""
#
# allow_reuse_address = True
#
# def __init__(self, host='localhost', port=DEFAULT_TCP_LOGGING_PORT, handler=LogRecordStreamHandler):
# """Initialize the log receiver
#
# :param host: The hostname to bind to
# :param port: The port to listen on
# :param handler: The handler to send received messages to
# """
# socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
# self.host = host
# self.port = port
# self.abort = 0
# self.timeout = 1
# self.logname = None
#
# def serve_until_stopped(self):
# """Run the server"""
# abort = 0
# started = 0
# while not abort:
# if not started:
# print("Starting LogReceiver on host '%s'..." % self.host)
# read_items, _, _ = select.select([self.socket.fileno()], [], [], self.timeout)
# if read_items:
# self.handle_request()
# abort = self.abort
# if not started:
# started = 1
# print('LogReceiver is listening on port %s.' % self.port)
#
# def stop(self):
# self.abort = True
#
# def __enter__(self):
# with daemon.DaemonContext():
# self.serve_until_stopped()
# return self
#
# def __exit__(self, exc_type=None, exc_value=None, traceback=None):
# self.stop()
import threading
class LogReceiver(socketserver.ThreadingTCPServer):
"""Simple TCP socket-based logging receiver."""
allow_reuse_address = True
def __init__(self, host='localhost', port=DEFAULT_TCP_LOGGING_PORT, handler=LogRecordStreamHandler):
"""Initialize the log receiver
:param host: The hostname to bind to
:param port: The port to listen on
:param handler: The handler to send received messages to
"""
self.abort = 1
socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
# setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
self.host = host
self.port = port
self.abort = 0
self.timeout = 1
self.logname = None
self._server_thread = None # type: threading.Thread
def serve_until_stopped(self, stop):
"""Run the server"""
abort = 0
started = 0
while not stop.wait():
if not started:
print("Starting LogReceiver on host '%s'..." % self.host)
read_items, _, _ = select.select([self.socket.fileno()], [], [], self.timeout)
if read_items:
self.handle_request()
abort = self.abort
if not started:
started = 1
print('LogReceiver is listening on port %s.' % self.port)
def _start_server_thread(self):
self._server_stop = threading.Event()
self._server_thread = threading.Thread(
group=None,
target=self.serve_until_stopped,
kwargs=dict(stop=self._server_stop)
)
self._server_thread.setDaemon(True)
self._server_thread.start()
def _stop_server_thread(self):
# self.abort = True
self._server_stop.set()
self._server_thread.join()
self._server_thread = None
@property
def is_alive(self):
if self._server_thread:
return self._server_thread.is_alive()
else:
return False
def __enter__(self):
print('starting logserver')
socketserver.ThreadingTCPServer.allow_reuse_address = True
self._start_server_thread()
return self
def __exit__(self, exc_type=None, exc_value=None, traceback=None):
self._stop_server_thread()
......@@ -251,138 +251,139 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
:param list_dataset_dicts_per_scene:
:return:
"""
pipeline_logger = GMS_logger('log__%s' % CFG.ID, fmt_suffix=list_dataset_dicts_per_scene[0]['scene_ID'],
log_level=CFG.log_level, append=True)
# set CPU and memory limits
cpulimit = CFG.CPUs_all_jobs
mem2reserve = 15
if redis_conn:
mem_estim = estimate_mem_usage(list_dataset_dicts_per_scene[0]['dataset_ID'],
list_dataset_dicts_per_scene[0]['satellite'])
if mem_estim:
mem2reserve = mem_estim
else:
cpulimit = int((virtual_memory().total * .8 - virtual_memory().used) / 1024 ** 3 / mem2reserve)
pipeline_logger.info('No memory usage statistics from earlier jobs found for the current configuration. '
'Limiting processes to %s in order to collect memory statistics first.' % cpulimit)
with GMS_logger('log__%s' % CFG.ID, fmt_suffix=list_dataset_dicts_per_scene[0]['scene_ID'],
log_level=CFG.log_level, append=True) as pipeline_logger:
# start processing
with MemoryReserver(mem2lock_gb=mem2reserve, logger=pipeline_logger, max_usage=CFG.max_mem_usage),\
ProcessLock(allowed_slots=cpulimit, logger=pipeline_logger):
# set CPU and memory limits
cpulimit = CFG.CPUs_all_jobs
mem2reserve = 15
if len(list(set([ds['proc_level'] for ds in list_dataset_dicts_per_scene]))) != 1:
raise ValueError('Lists of subsystem datasets with different processing levels are not supported here. '
'Received %s.' % list_dataset_dicts_per_scene)
if redis_conn:
mem_estim = estimate_mem_usage(list_dataset_dicts_per_scene[0]['dataset_ID'],
list_dataset_dicts_per_scene[0]['satellite'])
if mem_estim:
mem2reserve = mem_estim
else:
cpulimit = int((virtual_memory().total * .8 - virtual_memory().used) / 1024 ** 3 / mem2reserve)
pipeline_logger.info('No memory usage statistics from earlier jobs found for the current '
'configuration. Limiting processes to %s in order to collect memory statistics '
'first.' % cpulimit)
input_proc_level = list_dataset_dicts_per_scene[0]['proc_level']
# start processing
with MemoryReserver(mem2lock_gb=mem2reserve, logger=pipeline_logger, max_usage=CFG.max_mem_usage),\
ProcessLock(allowed_slots=cpulimit, logger=pipeline_logger):
##################
# L1A processing #
##################
if len(list(set([ds['proc_level'] for ds in list_dataset_dicts_per_scene]))) != 1:
raise ValueError('Lists of subsystem datasets with different processing levels are not supported here. '
'Received %s.' % list_dataset_dicts_per_scene)
L1A_objects = []
if CFG.exec_L1AP[0] and input_proc_level is None:
L1A_objects = \
[L1A_map(subsystem_dataset_dict) for subsystem_dataset_dict in list_dataset_dicts_per_scene]
input_proc_level = list_dataset_dicts_per_scene[0]['proc_level']
if any([isinstance(obj, failed_GMS_object) for obj in L1A_objects]):
return L1A_objects
##################
# L1A processing #
##################
##################
# L1B processing #
##################
L1A_objects = []
if CFG.exec_L1AP[0] and input_proc_level is None:
L1A_objects = \
[L1A_map(subsystem_dataset_dict) for subsystem_dataset_dict in list_dataset_dicts_per_scene]
# select subsystem with optimal band for co-registration
# L1B_obj_coreg = L1B_map(L1A_objects[0])
# copy coreg information to remaining subsets
if any([isinstance(obj, failed_GMS_object) for obj in L1A_objects]):
return L1A_objects
L1B_objects = L1A_objects
if CFG.exec_L1BP[0]:
# add earlier processed L1A data
if input_proc_level == 'L1A':
for ds in list_dataset_dicts_per_scene:
GMSfile = path_generator(ds, proc_level='L1A').get_path_gmsfile()
L1A_objects.append(L1A_P.L1A_object.from_disk([GMSfile, ['cube', None]]))
##################
# L1B processing #
##################
L1B_objects = [L1B_map(L1A_obj) for L1A_obj in L1A_objects]
# select subsystem with optimal band for co-registration
# L1B_obj_coreg = L1B_map(L1A_objects[0])
# copy coreg information to remaining subsets
del L1A_objects
L1B_objects = L1A_objects
if CFG.exec_L1BP[0]:
# add earlier processed L1A data
if input_proc_level == 'L1A':
for ds in list_dataset_dicts_per_scene:
GMSfile = path_generator(ds, proc_level='L1A').get_path_gmsfile()
L1A_objects.append(L1A_P.L1A_object.from_disk([GMSfile, ['cube', None]]))
if any([isinstance(obj, failed_GMS_object) for obj in L1B_objects]):
return L1B_objects
L1B_objects = [L1B_map(L1A_obj) for L1A_obj in L1A_objects]
##################
# L1C processing #
##################
del L1A_objects
L1C_objects = L1B_objects
if CFG.exec_L1CP[0]:
# add earlier processed L1B data
if input_proc_level == 'L1B':
for ds in list_dataset_dicts_per_scene:
GMSfile = path_generator(ds, proc_level='L1B').get_path_gmsfile()
L1B_objects.append(L1B_P.L1B_object.from_disk([GMSfile, ['cube', None]]))
if any([isinstance(obj, failed_GMS_object) for obj in L1B_objects]):
return L1B_objects
L1C_objects = L1C_map(L1B_objects)
##################
# L1C processing #
##################
del L1B_objects
L1C_objects = L1B_objects
if CFG.exec_L1CP[0]:
# add earlier processed L1B data
if input_proc_level == 'L1B':
for ds in list_dataset_dicts_per_scene:
GMSfile = path_generator(ds, proc_level='L1B').get_path_gmsfile()
L1B_objects.append(L1B_P.L1B_object.from_disk([GMSfile, ['cube', None]]))
if any([isinstance(obj, failed_GMS_object) for obj in L1C_objects]):
return L1C_objects
L1C_objects = L1C_map(L1B_objects)
if not CFG.exec_L2AP[0]:
return L1C_objects
del L1B_objects
##################
# L2A processing #
##################
if any([isinstance(obj, failed_GMS_object) for obj in L1C_objects]):
return L1C_objects
# add earlier processed L1C data
if input_proc_level == 'L1C':
for ds in list_dataset_dicts_per_scene:
GMSfile = path_generator(ds, proc_level='L1C').get_path_gmsfile()
L1C_objects.append(L1C_P.L1C_object.from_disk([GMSfile, ['cube', None]]))
if not CFG.exec_L2AP[0]:
return L1C_objects
##################
# L2A processing #
##################
# add earlier processed L1C data
if input_proc_level == 'L1C':
for ds in list_dataset_dicts_per_scene:
GMSfile = path_generator(ds, proc_level='L1C').get_path_gmsfile()
L1C_objects.append(L1C_P.L1C_object.from_disk([GMSfile, ['cube', None]]))
L2A_obj = L2A_map(L1C_objects, return_tiles=False)
L2A_obj = L2A_map(L1C_objects, return_tiles=False)
del L1C_objects
del L1C_objects
if isinstance(L2A_obj, failed_GMS_object) or not CFG.exec_L2BP[0]:
return L2A_obj
if isinstance(L2A_obj, failed_GMS_object) or not CFG.exec_L2BP[0]:
return L2A_obj
##################
# L2B processing #
##################
##################
# L2B processing #
##################
# add earlier processed L2A data
if input_proc_level == 'L2A':
assert len(list_dataset_dicts_per_scene) == 1, \
'Expected only a single L2A dataset since subsystems are merged.'
GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2A').get_path_gmsfile()
L2A_obj = L2A_P.L2A_object.from_disk([GMSfile, ['cube', None]])
# add earlier processed L2A data
if input_proc_level == 'L2A':
assert len(list_dataset_dicts_per_scene) == 1, \
'Expected only a single L2A dataset since subsystems are merged.'
GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2A').get_path_gmsfile()
L2A_obj = L2A_P.L2A_object.from_disk([GMSfile, ['cube', None]])
L2B_obj = L2B_map(L2A_obj)
L2B_obj = L2B_map(L2A_obj)
del L2A_obj
del L2A_obj
if isinstance(L2B_obj, failed_GMS_object) or not CFG.exec_L2CP[0]:
return L2B_obj
if isinstance(L2B_obj, failed_GMS_object) or not CFG.exec_L2CP[0]:
return L2B_obj
##################
# L2C processing #
##################
##################
# L2C processing #
##################
# add earlier processed L2B data
if input_proc_level == 'L2B':
assert len(list_dataset_dicts_per_scene) == 1, \
'Expected only a single L2B dataset since subsystems are merged.'
GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2B').get_path_gmsfile()
L2B_obj = L2B_P.L2B_object.from_disk([GMSfile, ['cube', None]])
# add earlier processed L2B data
if input_proc_level == 'L2B':
assert len(list_dataset_dicts_per_scene) == 1, \
'Expected only a single L2B dataset since subsystems are merged.'
GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2B').get_path_gmsfile()
L2B_obj = L2B_P.L2B_object.from_disk([GMSfile, ['cube', None]])
L2C_obj = L2C_map(L2B_obj) # type: Union[GMS_object, failed_GMS_object, List]
L2C_obj = L2C_map(L2B_obj) # type: Union[GMS_object, failed_GMS_object, List]
del L2B_obj
del L2B_obj
return L2C_obj
return L2C_obj
......@@ -465,9 +465,6 @@ class process_controller(object):
# RUN PREPROCESSING
from .pipeline import run_complete_preprocessing
from ..misc.logging import LogReceiver
# with LogReceiver() as lr:
# alive = lr.is_alive
GMS_objs = imap_unordered(run_complete_preprocessing, dataset_groups, flatten_output=True)
# separate results into successful and failed objects
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment