Commit 4fb112e5 authored by Daniel Scheffler's avatar Daniel Scheffler Committed by Mathias Peters
Browse files

Pipeline 'run_complete_preprocessing' now returns processing reports only (may...

Pipeline 'run_complete_preprocessing' now returns processing reports only (may fix deadlock after large reference jobs).
parent fa769ced
......@@ -2225,11 +2225,14 @@ class GMS_object(object):
del self.mask_nodata
del self.mask_clouds
del self.masks
del self.dem
del self.mask_clouds_confidence
del self.ac_errors
del self.spat_homo_errors
del self.spec_homo_errors
del self.accuracy_layers
self.MetaObj.ViewingAngle_arrProv = {}
self.MetaObj.IncidenceAngle_arrProv = {}
class GMS_identifier(object):
......@@ -2298,6 +2301,27 @@ class failed_GMS_object(GMS_object):
return DataFrame([getattr(self, k) for k in columns], columns=columns)
class finished_GMS_object(GMS_object):
def __init__(self, GMS_obj):
# type: (GMS_obj) -> None
super(finished_GMS_object, self).__init__()
self.proc_level = GMS_obj.proc_level
self.image_type = GMS_obj.image_type
self.scene_ID = GMS_obj.scene_ID
self.entity_ID = GMS_obj.entity_ID
self.satellite = GMS_obj.satellite
self.sensor = GMS_obj.sensor
self.subsystem = GMS_obj.subsystem
self.arr_shape = GMS_obj.arr_shape
self.proc_status = GMS_obj.proc_status
@property
def pandasRecord(self):
columns = ['scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem', 'image_type', 'proc_level',
'arr_shape', 'arr_pos']
return DataFrame([getattr(self, k) for k in columns], columns=columns)
def update_proc_status(GMS_mapper):
"""Decorator function for updating the processing status of each GMS_object (subclass) instance.
......@@ -2394,6 +2418,38 @@ def return_GMS_objs_without_arrays(GMS_pipeline):
return wrapped_GMS_pipeline
def return_proc_reports_only(GMS_pipeline):
"""Decorator function for flushing any array attributes within the return value of a GMS pipeline function.
:param GMS_pipeline: A GMS mapper function that takes a GMS object, does some processing and returns it back.
"""
@functools.wraps(GMS_pipeline) # needed to avoid pickling errors
def wrapped_GMS_pipeline(*args, **kwargs):
###################################################################
# prepare results to be back-serialized to multiprocessing master #
###################################################################
# NOTE: Exceptions within GMS pipeline will be forwarded to calling function.
# NOTE: Exceptions within GMS mappers are catched by exception handler (if enabled)
returnVal = GMS_pipeline(*args, **kwargs)
# flush array data because they are too big to be kept in memory for many GMS_objects
if isinstance(returnVal, failed_GMS_object):
pass
elif isinstance(returnVal, GMS_object):
returnVal = finished_GMS_object(returnVal)
elif isinstance(returnVal, Iterable):
returnVal = [obj if isinstance(obj, failed_GMS_object) else finished_GMS_object(obj) for obj in returnVal]
else: # OrderedDict (dataset)
# the OrderedDict will not contain any arrays
pass
return returnVal
return wrapped_GMS_pipeline
def GMS_object_2_dataset_dict(GMS_obj):
# type: (GMS_object) -> OrderedDict
return OrderedDict([
......
......@@ -14,7 +14,8 @@ from ..algorithms import L1C_P
from ..algorithms import L2A_P
from ..algorithms import L2B_P
from ..algorithms import L2C_P
from ..model.gms_object import failed_GMS_object, update_proc_status, return_GMS_objs_without_arrays, estimate_mem_usage
from ..model.gms_object import \
failed_GMS_object, update_proc_status, return_GMS_objs_without_arrays, return_proc_reports_only, estimate_mem_usage
from ..model.gms_object import GMS_object # noqa F401 # flake8 issue
from ..algorithms.geoprocessing import get_common_extent
......@@ -241,6 +242,7 @@ def L2C_map(L2B_obj):
return L2C_obj # contains no array data in Python mode
@return_proc_reports_only
@return_GMS_objs_without_arrays
def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise parallelization)
# type: (List[dict]) -> Union[L1A_P.GMS_object, List, Generator]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment