Commit 2636cfcb authored by Daniel Scheffler's avatar Daniel Scheffler Committed by Mathias Peters
Browse files

Fixed pickling issues after failed AC.

algorithms.L1C_P.AtmCorr:
- run_atmospheric_correction(): AC input arrays are now deleted before object is passed to exception handler
processing.pipeline:
- L1C_map(): disabled auto-dumping of AC inputs in case of error
processing-process_controller:
- L1C_processing(): set CPUs to 5 for testing
- updated __version__
Former-commit-id: 2a997e23
Former-commit-id: 064a13b1
parent a556dfbd
......@@ -15,7 +15,7 @@ from . import config
from .processing.process_controller import process_controller
__version__ = '20170124.04'
__version__ = '20170124.05'
__author__ = 'Daniel Scheffler'
__all__ = ['algorithms',
'io',
......
......@@ -698,6 +698,11 @@ class AtmCorr(object):
self.logger.error('An error occurred during atmospheric correction. Inputs have been dumped to %s.'
%path_dump)
# delete AC input arrays
for inObj in self.inObjs:
inObj.delete_ac_input_arrays()
raise
# get processing infos
......
......@@ -100,6 +100,7 @@ def log_uncaught_exceptions(GMS_mapper):
# if column is empty or scene ID is not in there
DB_T.append_item_to_arrayCol_in_postgreSQLdb(CFG.job.conn_database, 'jobs',
{'failed_sceneids':failed_Obj.scene_ID}, {'id':CFG.job.ID})
return failed_Obj
return wrapped_GMS_mapper
......
......@@ -2,8 +2,7 @@
__author__='Daniel Scheffler'
from ..config import GMS_config as CFG
from ..misc import helper_functions as HLP_F # Helper functions
from ..misc import exception_handler as EXC_H # Helper functions
from ..misc import exception_handler as EXC_H # Exception handler
from ..algorithms import L1A_P # Level 1A Processor
from ..algorithms import L1B_P # Level 1B Processor
from ..algorithms import L1C_P # Level 1C Processor
......@@ -130,7 +129,7 @@ def L1C_map(L1B_objs):
# atmospheric correction (asserts that there is an ac_options.json file on disk for the current sensor)
if L1C_objs[0].ac_options:
# perform atmospheric correction
L1C_objs = L1C_P.AtmCorr(*L1C_objs).run_atmospheric_correction(dump_ac_input=True)
L1C_objs = L1C_P.AtmCorr(*L1C_objs).run_atmospheric_correction(dump_ac_input=False)
else:
[L1C_obj.logger.warning('Atmospheric correction is not yet supported for %s %s and has been skipped.'
%(L1C_obj.satellite, L1C_obj.sensor) ) for L1C_obj in L1C_objs]
......
......@@ -477,7 +477,7 @@ class process_controller(object):
# group by scene ID (all subsystems belonging to the same scene ID must be processed together)
grouped_L1B_Instances = HLP_F.group_objects_by_attributes(L1B_Instances, 'scene_ID')
L1C_resObjects = MAP(L1C_map, grouped_L1B_Instances, flatten_output=True)
L1C_resObjects = MAP(L1C_map, grouped_L1B_Instances, flatten_output=True, CPUs=5) # FIXME CPUs set to 5 for testing
else: # tiles
blocksize = (5000, 5000)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment