Commit d9dfe64c authored by Daniel Scheffler's avatar Daniel Scheffler Committed by Mathias Peters
Browse files

Added (not yet working) TCPSocketServer for logging in multiprocessing.

parent 2574a5a3
......@@ -5,6 +5,11 @@ import sys
import warnings
import logging
import os
import socketserver
import select
import pickle
import struct
from logging.handlers import SocketHandler, DEFAULT_TCP_LOGGING_PORT
from ..options.config import GMS_config as CFG
......@@ -61,18 +66,28 @@ class GMS_logger(logging.Logger):
fileHandler = None
# set fileHandler for job logfile
joblog_Handler = None
if log_to_joblog:
job_logfile = os.path.join(CFG.path_job_logs, '%s.log' % CFG.ID)
# create output directory
silentmkdir(job_logfile)
# create FileHandler
joblog_fileHandler = logging.FileHandler(job_logfile, mode='a' if append else 'w')
joblog_fileHandler.setFormatter(self.formatter_fileH)
joblog_fileHandler.setLevel(log_level)
if False: # CFG.CPUs > 1:
# Logging messages from multiple workers to the same file in multiprocessing will cause
# PermissionErrors. Thats why use a SocketHandler in the middle which then logs to a FileHandler.
# create SocketHandler
joblog_Handler = SocketHandler('localhost', DEFAULT_TCP_LOGGING_PORT) # FIXME static host
joblog_Handler.setFormatter(self.formatter_fileH)
joblog_Handler.setLevel(log_level)
else:
# create FileHandler
joblog_Handler = logging.FileHandler(job_logfile, mode='a' if append else 'w')
joblog_Handler.setFormatter(self.formatter_fileH)
joblog_Handler.setLevel(log_level)
else:
joblog_fileHandler = None
joblog_Handler = None
# create StreamHandler # TODO add a StringIO handler
# self.streamObj = StringIO()
......@@ -98,8 +113,8 @@ class GMS_logger(logging.Logger):
if not self.handlers:
if fileHandler:
self.addHandler(fileHandler)
if joblog_fileHandler:
self.addHandler(joblog_fileHandler)
if joblog_Handler:
self.addHandler(joblog_Handler)
# self.addHandler(self.streamHandler)
self.addHandler(consoleHandler_out)
self.addHandler(consoleHandler_err)
......@@ -133,8 +148,8 @@ class GMS_logger(logging.Logger):
ObjDict = self.__dict__
return ObjDict
def __del__(self):
self.close()
# def __del__(self):
# self.close()
@property
def captured_stream(self):
......@@ -204,3 +219,141 @@ class LessThanFilter(logging.Filter):
def filter(self, record):
# non-zero return means we log this message
return True if record.levelno < self.max_level else False
class LogRecordStreamHandler(socketserver.StreamRequestHandler):
"""Handle incoming logs"""
def handle(self):
"""Deal with the incoming log data"""
while True:
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
struct_len = struct.unpack('>L', chunk)[0]
chunk = self.connection.recv(struct_len)
while len(chunk) < struct_len:
chunk = chunk + self.connection.recv(struct_len - len(chunk))
obj = pickle.loads(chunk)
record = logging.makeLogRecord(obj)
self.handle_log_record(record)
print('Server: %s' % record.msg)
def handle_log_record(self, record):
"""Process incoming log record
:param record: The record to write
"""
name = self.server.logname if self.server.logname is not None else record.name
logging.getLogger(name).handle(record)
# import daemon
# class LogReceiver(socketserver.ThreadingTCPServer):
# """Simple TCP socket-based logging receiver."""
#
# allow_reuse_address = True
#
# def __init__(self, host='localhost', port=DEFAULT_TCP_LOGGING_PORT, handler=LogRecordStreamHandler):
# """Initialize the log receiver
#
# :param host: The hostname to bind to
# :param port: The port to listen on
# :param handler: The handler to send received messages to
# """
# socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
# self.host = host
# self.port = port
# self.abort = 0
# self.timeout = 1
# self.logname = None
#
# def serve_until_stopped(self):
# """Run the server"""
# abort = 0
# started = 0
# while not abort:
# if not started:
# print("Starting LogReceiver on host '%s'..." % self.host)
# read_items, _, _ = select.select([self.socket.fileno()], [], [], self.timeout)
# if read_items:
# self.handle_request()
# abort = self.abort
# if not started:
# started = 1
# print('LogReceiver is listening on port %s.' % self.port)
#
# def stop(self):
# self.abort = True
#
# def __enter__(self):
# with daemon.DaemonContext():
# self.serve_until_stopped()
# return self
#
# def __exit__(self, exc_type=None, exc_value=None, traceback=None):
# self.stop()
import threading
class LogReceiver(socketserver.ThreadingTCPServer):
"""Simple TCP socket-based logging receiver."""
allow_reuse_address = True
def __init__(self, host='localhost', port=DEFAULT_TCP_LOGGING_PORT, handler=LogRecordStreamHandler):
"""Initialize the log receiver
:param host: The hostname to bind to
:param port: The port to listen on
:param handler: The handler to send received messages to
"""
self.abort = 1
socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
# setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
self.host = host
self.port = port
self.abort = 0
self.timeout = 1
self.logname = None
self._server_thread = None
def serve_until_stopped(self, stop):
"""Run the server"""
abort = 0
started = 0
while not stop.wait():
if not started:
print("Starting LogReceiver on host '%s'..." % self.host)
read_items, _, _ = select.select([self.socket.fileno()], [], [], self.timeout)
if read_items:
self.handle_request()
abort = self.abort
if not started:
started = 1
print('LogReceiver is listening on port %s.' % self.port)
def _start_server_thread(self):
self._server_stop = threading.Event()
self._server_thread = threading.Thread(
group=None,
target=self.serve_until_stopped,
kwargs=dict(stop=self._server_stop)
)
self._server_thread.setDaemon(True)
self._server_thread.start()
def _stop_server_thread(self):
# self.abort = True
self._server_stop.set()
self._server_thread.join()
self._server_thread = None
def __enter__(self):
print('starting logserver')
socketserver.ThreadingTCPServer.allow_reuse_address = True
self._start_server_thread()
return self
def __exit__(self, exc_type=None, exc_value=None, traceback=None):
self._stop_server_thread()
......@@ -148,6 +148,13 @@ class METADATA(object):
return self
def __getstate__(self):
"""Defines how the attributes of MetaObj instances are pickled."""
if self.logger:
self.logger.close()
return self.__dict__
@property
def AcqDateTime(self):
"""Returns a datetime.datetime object containing date, time and timezone (UTC time)."""
......
......@@ -81,13 +81,14 @@ class process_controller(object):
# called_from_iPyNb = 1 if 'ipykernel/__main__.py' in sys.argv[0] else 0
path_job_logfile = os.path.join(self.config.path_job_logs, '%s.log' % self.config.ID)
if os.path.exists(path_job_logfile):
HLP_F.silentremove(path_job_logfile)
self._path_job_logfile = os.path.join(self.config.path_job_logs, '%s.log' % self.config.ID)
if os.path.exists(self._path_job_logfile):
HLP_F.silentremove(self._path_job_logfile)
self.logger.info("Executing gms_preprocessing, version: %s (%s)" % (__version__, __versionalias__))
self.logger.info('Process Controller initialized for job ID %s (comment: %s).'
% (self.config.ID, self.DB_job_record.comment))
self.logger.info('Job logfile: %s' % self._path_job_logfile)
if self.config.delete_old_output:
self.logger.info('Deleting previously processed data...')
......@@ -99,7 +100,7 @@ class process_controller(object):
return self._logger
else:
self._logger = GMS_logger('log__%s' % self.config.ID, fmt_suffix='ProcessController',
log_level=self.config.log_level, append=True, log_to_joblog=True)
path_logfile=self._path_job_logfile, log_level=self.config.log_level, append=True)
return self._logger
@logger.setter
......@@ -109,8 +110,8 @@ class process_controller(object):
@logger.deleter
def logger(self):
if self._logger not in [None, 'not set']:
self.logger.close()
self.logger = None # FIXME _logger = None?
self._logger.close()
self._logger = None
@property
def DB_job_record(self):
......@@ -391,6 +392,8 @@ class process_controller(object):
self.create_job_summary()
self.logger.info('Execution finished.')
self.logger.info('The job logfile and the summary files have been saved here: \n'
'%s.*' % os.path.splitext(self.logger.path_logfile)[0])
# TODO implement failed_with_warnings:
self.config.status = 'finished' if not self.failed_objects else 'finished_with_errors'
self.config.end_time = datetime.datetime.now()
......@@ -453,8 +456,13 @@ class process_controller(object):
# group dataset dicts by sceneid
dataset_groups = HLP_F.group_dicts_by_key(self.config.data_list, key='scene_ID')
# close logger to release FileHandler of job log (workers will log into job logfile)
del self.logger
# RUN PREPROCESSING
from .pipeline import run_complete_preprocessing
from ..misc.logging import LogReceiver
# with LogReceiver():
GMS_objs = imap_unordered(run_complete_preprocessing, dataset_groups, flatten_output=True)
# separate results into successful and failed objects
......@@ -473,6 +481,8 @@ class process_controller(object):
self.create_job_summary()
self.logger.info('Execution finished.')
self.logger.info('The job logfile and the summary files have been saved here: \n'
'%s.*' % os.path.splitext(self.logger.path_logfile)[0])
# TODO implement failed_with_warnings
self.config.status = 'finished' if not self.failed_objects else 'finished_with_errors'
self.config.end_time = datetime.datetime.now()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment