Commit 29590240 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Pipeline now returns processed GMS_objects without array data. Multiprocessing...

Pipeline now returns processed GMS_objects without array data. Multiprocessing now uses pool.imap_unordered. Fixed job summaries.
Former-commit-id: 9747ba76
Former-commit-id: 16b356e9
parent 68a551b2
......@@ -71,6 +71,10 @@ class L1C_object(L1B_object):
def lonlat_arr(self, lonlat_arr):
self._lonlat_arr = lonlat_arr
@lonlat_arr.deleter
def lonlat_arr(self):
self._lonlat_arr = None
@property
def VZA_arr(self):
"""Get viewing zenith angle.
......@@ -100,6 +104,10 @@ class L1C_object(L1B_object):
def VZA_arr(self, VZA_arr):
self._VZA_arr = VZA_arr
@VZA_arr.deleter
def VZA_arr(self):
self._VZA_arr = None
@property
def VAA_arr(self):
"""Get viewing azimuth angle.
......@@ -129,6 +137,10 @@ class L1C_object(L1B_object):
def VAA_arr(self, VAA_arr):
self._VAA_arr = VAA_arr
@VAA_arr.deleter
def VAA_arr(self):
self._VAA_arr = None
@property
def SZA_arr(self):
"""Get solar zenith angle.
......@@ -157,6 +169,10 @@ class L1C_object(L1B_object):
def SZA_arr(self, SZA_arr):
self._SZA_arr = SZA_arr
@SZA_arr.deleter
def SZA_arr(self):
self._SZA_arr = None
@property
def SAA_arr(self):
"""Get solar azimuth angle.
......@@ -172,6 +188,10 @@ class L1C_object(L1B_object):
def SAA_arr(self, SAA_arr):
self._SAA_arr = SAA_arr
@SAA_arr.deleter
def SAA_arr(self):
self._SAA_arr = None
@property
def RAA_arr(self):
"""Get relative azimuth angle.
......@@ -188,12 +208,18 @@ class L1C_object(L1B_object):
def RAA_arr(self, RAA_arr):
self._RAA_arr = RAA_arr
@RAA_arr.deleter
def RAA_arr(self):
self._RAA_arr = None
def delete_ac_input_arrays(self):
self.VZA_arr = None # not needed anymore
self.SZA_arr = None # not needed anymore
self.SAA_arr = None # not needed anymore
self.RAA_arr = None # not needed anymore
self.lonlat_arr = None # not needed anymore
"""Delete AC input arrays if they are not needed anymore."""
self.logger.info('Deleting input arrays for atmospheric correction...')
del self.VZA_arr
del self.SZA_arr
del self.SAA_arr
del self.RAA_arr
del self.lonlat_arr
# use self.dem deleter
# would have to be resampled when writing MGRS tiles
......
......@@ -14,7 +14,7 @@ import warnings
import logging
from collections import OrderedDict
from itertools import chain
from typing import Iterable, List, Union # noqa F401 # flake8 issue
from typing import Iterable, List, Union, TYPE_CHECKING # noqa F401 # flake8 issue
import numpy as np
import spectral
......@@ -49,6 +49,9 @@ from ..io import output_writer as OUT_W
from ..misc import helper_functions as HLP_F
from ..misc import definition_dicts as DEF_D
if TYPE_CHECKING:
from ..algorithms.L1C_P import L1C_object # noqa F401 # flake8 issue
__author__ = 'Daniel Scheffler'
......@@ -1385,6 +1388,14 @@ class GMS_object(Dataset):
self.logger.close()
self.logger = None
def flush_array_data(self):
del self.arr
del self.mask_nodata
del self.mask_clouds
del self.masks
del self.mask_clouds_confidence
del self.ac_errors
class failed_GMS_object(GMS_object):
def delete_tempFiles(self):
......@@ -1471,3 +1482,44 @@ def update_proc_status(GMS_mapper):
return GMS_objs # type: Union[GMS_object, List[GMS_object]]
return wrapped_GMS_mapper
def return_GMS_objs_without_arrays(GMS_pipeline):
"""Decorator function for flushing any array attributes within the return value of a GMS pipeline function.
:param GMS_pipeline: A GMS mapper function that takes a GMS object, does some processing and returns it back.
"""
@functools.wraps(GMS_pipeline) # needed to avoid pickling errors
def wrapped_GMS_pipeline(*args, **kwargs):
###################################################################
# prepare results to be back-serialized to multiprocessing master #
###################################################################
def flush_arrays(GMS_obj):
# type: (Union[GMS_object, L1C_object]) -> GMS_object
GMS_obj.flush_array_data()
try:
GMS_obj.delete_ac_input_arrays()
except AttributeError:
pass
return GMS_obj
returnVal = None
try:
# NOTE: The GMS pipeline will never raise an exception as long as the exception handler is turned on.
returnVal = GMS_pipeline(*args, **kwargs)
finally:
# flush array data because they are too big to be kept in memory for many GMS_objects
if isinstance(returnVal, (GMS_object, failed_GMS_object)):
returnVal = flush_arrays(returnVal)
elif isinstance(returnVal, Iterable):
returnVal = [flush_arrays(obj) for obj in returnVal]
else: # OrderedDict (dataset)
# the OrderedDict will not contain any arrays
pass
return returnVal
return wrapped_GMS_pipeline
......@@ -17,14 +17,14 @@ def MAP(func, args, CPUs=None, flatten_output=False):
:param args: function arguments
:param CPUs: number of CPUs to use
:param flatten_output: whether to flatten output list,
e.g. [ [ Tile1Scene1, Tile2Scene1], Tile1Scene2, Tile2Scene2] ] to
e.g. [ [ Tile1Scene1, Tile2Scene1], Tile1Scene2, Tile2Scene2] to
[ Tile1Scene1, Tile2Scene1, Tile1Scene2, Tile2Scene2 ]
"""
CPUs = CPUs if CPUs is not None else CFG.CPUs
CPUs = CPUs or CFG.CPUs
CPUs = CPUs if CPUs <= CFG.CPUs else CFG.CPUs # treat CFG.CPUs as maximum number of CPUs
if CPUs is not None and CPUs > 1 and len(args) > 1:
if CPUs and CPUs > 1 and len(args) > 1:
with Pool(CPUs) as pool:
results = pool.map(func, args) # always returns a list
else:
......@@ -39,3 +39,36 @@ def MAP(func, args, CPUs=None, flatten_output=False):
return list(ch)
else:
return results
def imap_unordered(func, args, CPUs=None, flatten_output=False):
# type: (any, list, int, bool) -> list
"""Parallelize the execution of the given function.
NOTE: if Job.CPUs in config is 1, execution is not parallelized.
:param func: function to parallelize
:param args: function arguments
:param CPUs: number of CPUs to use
:param flatten_output: whether to flatten output list,
e.g. [ [ Tile1Scene1, Tile2Scene1], Tile1Scene2, Tile2Scene2] to
[ Tile1Scene1, Tile2Scene1, Tile1Scene2, Tile2Scene2 ]
"""
CPUs = CPUs or CFG.CPUs
CPUs = CPUs if CPUs <= CFG.CPUs else CFG.CPUs # treat CFG.CPUs as maximum number of CPUs
if CPUs and CPUs > 1 and len(args) > 1:
with Pool(CPUs) as pool:
results = list(pool.imap_unordered(func, args)) # returns an iterator
else:
results = [func(argset) for argset in args] # generator does not always work properly here
if flatten_output:
try:
ch = chain.from_iterable(results)
return list(ch)
except TypeError: # if elements of chain are not iterable
ch = chain.from_iterable([results])
return list(ch)
else:
return list(results)
......@@ -11,7 +11,7 @@ from ..algorithms import L1C_P
from ..algorithms import L2A_P
from ..algorithms import L2B_P
from ..algorithms import L2C_P
from ..model.gms_object import failed_GMS_object, update_proc_status
from ..model.gms_object import GMS_object, failed_GMS_object, update_proc_status, return_GMS_objs_without_arrays
from ..algorithms.geoprocessing import get_common_extent
__author__ = 'Daniel Scheffler'
......@@ -136,6 +136,7 @@ def L1C_map(L1B_objs):
L1C_obj.to_ENVI()
if L1C_obj.arr_shape == 'cube':
L1C_obj.delete_tempFiles()
L1C_obj.delete_ac_input_arrays()
return L1C_objs
......@@ -210,6 +211,7 @@ def L2C_map(L2B_obj):
return L2C_obj # contains no array data in Python mode
@return_GMS_objs_without_arrays
def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise parallelization)
# type: (List[dict]) -> Union[L1A_P.GMS_object, List, Generator]
"""
......@@ -253,6 +255,7 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
L1A_objects.append(L1A_P.L1A_object().from_disk([GMSfile, ['cube', None]]))
L1B_objects = [L1B_map(L1A_obj) for L1A_obj in L1A_objects]
del L1A_objects
if any([isinstance(obj, failed_GMS_object) for obj in L1B_objects]):
return L1B_objects
......@@ -270,6 +273,7 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
L1B_objects.append(L1B_P.L1B_object().from_disk([GMSfile, ['cube', None]]))
L1C_objects = L1C_map(L1B_objects)
del L1B_objects
if any([isinstance(obj, failed_GMS_object) for obj in L1C_objects]):
return L1C_objects
......@@ -288,6 +292,7 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
L1C_objects.append(L1C_P.L1C_object().from_disk([GMSfile, ['cube', None]]))
L2A_obj = L2A_map(L1C_objects, return_tiles=False)
del L1C_objects
if isinstance(L2A_obj, failed_GMS_object) or not CFG.exec_L2BP[0]:
return L2A_obj
......@@ -303,6 +308,7 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
L2A_obj = L2A_P.L2A_object().from_disk([GMSfile, ['cube', None]])
L2B_obj = L2B_map(L2A_obj)
del L2A_obj
if isinstance(L2B_obj, failed_GMS_object) or not CFG.exec_L2CP[0]:
return L2B_obj
......@@ -317,6 +323,8 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2B').get_path_gmsfile()
L2B_obj = L2B_P.L2B_object().from_disk([GMSfile, ['cube', None]])
L2C_obj = L2C_map(L2B_obj)
L2C_obj = L2C_map(L2B_obj) # type: Union[GMS_object, failed_GMS_object, List]
del L2B_obj
a=1
return [] # FIXME L2C_obj
return L2C_obj
......@@ -24,7 +24,7 @@ from ..model.gms_object import failed_GMS_object, GMS_object
from .pipeline import (L1A_map, L1A_map_1, L1A_map_2, L1A_map_3, L1B_map, L1C_map,
L2A_map, L2B_map, L2C_map)
from ..options.config import set_config
from .multiproc import MAP
from .multiproc import MAP, imap_unordered
from ..misc.definition_dicts import proc_chain, db_jobs_statistics_def
from py_tools_ds.numeric.array import get_array_tilebounds
......@@ -63,6 +63,7 @@ class process_controller(object):
self.L1A_newObjects = []
self.L1B_newObjects = []
self.L1C_newObjects = []
self.L2A_newObjects = []
self.L2A_tiles = []
self.L2B_newObjects = []
self.L2C_newObjects = []
......@@ -437,17 +438,25 @@ class process_controller(object):
dataset_groups = HLP_F.group_dicts_by_key(self.config.data_list, key='scene_ID')
from .pipeline import run_complete_preprocessing
GMS_objs = MAP(run_complete_preprocessing, dataset_groups)
GMS_objs = imap_unordered(run_complete_preprocessing, dataset_groups)
# separate results into successful and failed objects
self.L2C_newObjects = [obj for obj in GMS_objs if isinstance(obj, L2C_P.L2C_object)]
def assign_attr(tgt_procL):
return [obj for obj in GMS_objs if isinstance(obj, GMS_object) and obj.proc_level == tgt_procL]
self.L1A_newObjects = assign_attr('L1A')
self.L1B_newObjects = assign_attr('L1B')
self.L1C_newObjects = assign_attr('L1C')
self.L2A_newObjects = assign_attr('L2A')
self.L2B_newObjects = assign_attr('L2B')
self.L2C_newObjects = assign_attr('L2C')
self.failed_objects = [obj for obj in GMS_objs if isinstance(obj, failed_GMS_object)]
# create summary
self.create_job_summary()
self.logger.info('Execution finished.')
# TODO implement failed_with_warnings:
# TODO implement failed_with_warnings
self.config.status = 'finished' if not self.failed_objects else 'finished_with_errors'
self.config.end_time = datetime.datetime.now()
self.config.computation_time = self.config.end_time - self.config.start_time
......
......@@ -165,7 +165,6 @@ class BaseTestCases:
cls.PC.logger.info('Execution of entire GeoMultiSens pre-processing chain started for job ID %s...'
% cls.PC.config.ID)
cls.PC.config.CPUs = 1 # FIXME
[cls.validate_db_entry(ds['filename']) for ds in cls.PC.config.data_list]
......@@ -268,6 +267,11 @@ class Test_Sentinel2A_SingleGranuleFormat_CompletePipeline(BaseTestCases.TestCom
# job_config_kwargs['CPUs'] = 1
cls.create_job(26186268, job_config_kwargs)
@classmethod
def tearDownClass(cls):
super().tearDownClass()
# PC = cls.PC
class Test_Sentinel2A_MultiGranuleFormat(BaseTestCases.TestAll):
"""
......@@ -308,6 +312,11 @@ class Test_MultipleDatasetsInOneJob_CompletePipeline(BaseTestCases.TestCompleteP
def setUpClass(cls):
cls.create_job(26186273, job_config_kwargs)
@classmethod
def tearDownClass(cls):
super().tearDownClass()
# PC = cls.PC
###################################################################################
# Summarizing the information regarding the test datasets.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment