Commit 709747cc authored by Daniel Scheffler's avatar Daniel Scheffler Committed by Mathias Peters
Browse files

Further developed TCPSocketServer.

parent 97f42360
......@@ -74,7 +74,7 @@ class GMS_logger(logging.Logger):
# create output directory
silentmkdir(job_logfile)
if False: # CFG.CPUs > 1:
if True: # CFG.CPUs > 1:
# Logging messages from multiple workers to the same file in multiprocessing will cause
# PermissionErrors. Thats why use a SocketHandler in the middle which then logs to a FileHandler.
......@@ -241,6 +241,11 @@ class LogRecordStreamHandler(socketserver.StreamRequestHandler):
self.handle_log_record(record)
print('Server: %s' % record.msg)
import pkgutil
with open(os.path.abspath(os.path.join(os.path.dirname(pkgutil.get_loader("gms_preprocessing").path), '..',
'tests', 'data', 'testlog.log')), 'w') as outW:
outW.write(record.msg)
def handle_log_record(self, record):
"""Process incoming log record
:param record: The record to write
......@@ -318,7 +323,7 @@ class LogReceiver(socketserver.ThreadingTCPServer):
self.timeout = 1
self.logname = None
self._server_thread = None
self._server_thread = None # type: threading.Thread
def serve_until_stopped(self, stop):
"""Run the server"""
......@@ -351,6 +356,13 @@ class LogReceiver(socketserver.ThreadingTCPServer):
self._server_thread.join()
self._server_thread = None
@property
def is_alive(self):
if self._server_thread:
return self._server_thread.is_alive()
else:
return False
def __enter__(self):
print('starting logserver')
socketserver.ThreadingTCPServer.allow_reuse_address = True
......
......@@ -461,8 +461,9 @@ class process_controller(object):
# RUN PREPROCESSING
from .pipeline import run_complete_preprocessing
from ..misc.logging import LogReceiver
# with LogReceiver():
GMS_objs = imap_unordered(run_complete_preprocessing, dataset_groups, flatten_output=True)
with LogReceiver() as lr:
alive = lr.is_alive
GMS_objs = imap_unordered(run_complete_preprocessing, dataset_groups, flatten_output=True)
# separate results into successful and failed objects
def assign_attr(tgt_procL):
......
......@@ -229,6 +229,17 @@ class Test_Landsat5_PreCollectionData(BaseTestCases.TestAll):
def setUpClass(cls):
cls.create_job(26186263, job_config_kwargs)
class Test_Landsat5_PreCollectionData_CompletePipeline(BaseTestCases.TestCompletePipeline):
"""
Parametrized testclass. Tests the level-processes on a Landsat-5 TM scene (pre-collection data).
More information on the dataset will be output after the tests-classes are executed.
"""
@classmethod
def setUpClass(cls):
cls.create_job(26186263, job_config_kwargs)
# class Test_Landsat5_CollectionData(BaseTestCases.TestAll):
# """
# Parametrized testclass. Tests the level-processes on a Landsat-5 TM scene (collection data).
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment