Commit ce2e9a10 authored by Daniel Scheffler's avatar Daniel Scheffler Committed by Mathias Peters
Browse files

Fixed logging errors in multiprocessing

algorithms.gms_object.GMS_object:
- from_sensor_subsystems(): loggers of input objects are now properly closed
processing.pipeline:
- L2A_map(): now returns a list because generators cannot be pickled
processing.process_controller:
- L1C_processing(): set CPUs to 10 for testing
- updated __version__
Former-commit-id: 9a399df5
Former-commit-id: 5a80e7b3
parent 4673ef7f
......@@ -15,7 +15,7 @@ from . import config
from .processing.process_controller import process_controller
__version__ = '20170124.07'
__version__ = '20170125.01'
__author__ = 'Daniel Scheffler'
__all__ = ['algorithms',
'io',
......
......@@ -803,6 +803,7 @@ class GMS_object(object):
sensor subsystems (e.g. 3 GMS_objects for Sentinel-2 10m/20m/60m bands)
"""
# assertions
assert len(list_GMS_objs)>1, "'GMS_object.from_sensor_subsystems()' expects multiple input GMS objects. " \
"Got %d." %len(list_GMS_objs)
assert all([is_coord_grid_equal(list_GMS_objs[0].arr.gt, *obj.arr.xygrid_specs) for obj in list_GMS_objs[1:]]),\
......@@ -814,10 +815,9 @@ class GMS_object(object):
assert len(subsystems) == len(list(set(subsystems))), \
"The input 'list_GMS_objs' contains duplicates: %s" %subsystems
# log and close loggers
# log
list_GMS_objs[0].logger.info('Merging the subsystems %s to a single GMS object...'
%', '.join([GMS_obj.subsystem for GMS_obj in list_GMS_objs]))
[GMS_obj.close_GMS_loggers() for GMS_obj in list_GMS_objs]
# find the common extent. NOTE: boundsMap is expected in the order [xmin,xmax,ymin,ymax]
......@@ -876,6 +876,7 @@ class GMS_object(object):
setattr(self.MetaObj, attrN, val2set)
# merge logfiles (read all logs into DataFrame, sort it by the first column and write to new logfile
[GMS_obj.close_GMS_loggers() for GMS_obj in list_GMS_objs] # close the loggers of the input objects
paths_inLogs = [GMS_obj.pathGen.get_path_logfile() for GMS_obj in list_GMS_objs]
allLogs_df = DataFrame()
for log in paths_inLogs:
......
......@@ -183,7 +183,7 @@ def L2A_map(L1C_objs, block_size=None):
L2A_tiles = L2A_obj.to_tiles(blocksize=block_size if block_size else CFG.job.tiling_block_size_XY)
return L2A_tiles
return list(L2A_tiles)
@EXC_H.log_uncaught_exceptions
......
......@@ -477,7 +477,7 @@ class process_controller(object):
# group by scene ID (all subsystems belonging to the same scene ID must be processed together)
grouped_L1B_Instances = HLP_F.group_objects_by_attributes(L1B_Instances, 'scene_ID')
L1C_resObjects = MAP(L1C_map, grouped_L1B_Instances, flatten_output=True, CPUs=5) # FIXME CPUs set to 5 for testing
L1C_resObjects = MAP(L1C_map, grouped_L1B_Instances, flatten_output=True, CPUs=10) # FIXME CPUs set to 10 for testing
else: # tiles
blocksize = (5000, 5000)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment