Commit 474762cb authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Refactored CFG.exec_mode to CFG.inmem_serialization.

Former-commit-id: cfe6c223
Former-commit-id: 48007c69
parent 87c0d2bb
...@@ -143,7 +143,7 @@ class L1A_object(GMS_object): ...@@ -143,7 +143,7 @@ class L1A_object(GMS_object):
subset = ['block', [[sub_dim[0], sub_dim[1] + 1], [sub_dim[2], sub_dim[3] + 1]]] subset = ['block', [[sub_dim[0], sub_dim[1] + 1], [sub_dim[2], sub_dim[3] + 1]]]
rasObj = GEOP.GEOPROCESSING(paths_files2stack[0], self.logger, subset=subset) rasObj = GEOP.GEOPROCESSING(paths_files2stack[0], self.logger, subset=subset)
if CFG.exec_mode == 'Flink' and path_output is None: # numpy array output if CFG.inmem_serialization and path_output is None: # numpy array output
self.arr = rasObj.Layerstacking(paths_files2stack) self.arr = rasObj.Layerstacking(paths_files2stack)
self.path_InFilePreprocessor = paths_files2stack[0] self.path_InFilePreprocessor = paths_files2stack[0]
else: # 'MEMORY' or physical output else: # 'MEMORY' or physical output
...@@ -162,7 +162,7 @@ class L1A_object(GMS_object): ...@@ -162,7 +162,7 @@ class L1A_object(GMS_object):
subset = ['block', [[sub_dim[0], sub_dim[1] + 1], [sub_dim[2], sub_dim[3] + 1]]] subset = ['block', [[sub_dim[0], sub_dim[1] + 1], [sub_dim[2], sub_dim[3] + 1]]]
rasObj = GEOP.GEOPROCESSING(path_file2load, self.logger, subset=subset) rasObj = GEOP.GEOPROCESSING(path_file2load, self.logger, subset=subset)
if CFG.exec_mode == 'Flink' and path_output is None: # numpy array output if CFG.inmem_serialization and path_output is None: # numpy array output
self.arr = gdalnumeric.LoadFile(path_file2load) if subset is None else \ self.arr = gdalnumeric.LoadFile(path_file2load) if subset is None else \
gdalnumeric.LoadFile(path_file2load, rasObj.colStart, rasObj.rowStart, rasObj.cols, rasObj.rows) gdalnumeric.LoadFile(path_file2load, rasObj.colStart, rasObj.rowStart, rasObj.cols, rasObj.rows)
self.path_InFilePreprocessor = path_file2load self.path_InFilePreprocessor = path_file2load
...@@ -190,7 +190,7 @@ class L1A_object(GMS_object): ...@@ -190,7 +190,7 @@ class L1A_object(GMS_object):
data_arr = np.empty(data.shape + (len(self.LayerBandsAssignment),), data.dtype) data_arr = np.empty(data.shape + (len(self.LayerBandsAssignment),), data.dtype)
data_arr[:, :, bidx] = data data_arr[:, :, bidx] = data
if CFG.exec_mode == 'Flink' and path_output is None: # numpy array output if CFG.inmem_serialization and path_output is None: # numpy array output
self.arr = data_arr self.arr = data_arr
else: else:
GEOP.ndarray2gdal(data_arr, path_output, geotransform=ds.GetGeoTransform(), GEOP.ndarray2gdal(data_arr, path_output, geotransform=ds.GetGeoTransform(),
...@@ -221,7 +221,7 @@ class L1A_object(GMS_object): ...@@ -221,7 +221,7 @@ class L1A_object(GMS_object):
data_arr = np.empty(data.shape + (len(self.LayerBandsAssignment),), data.dtype) data_arr = np.empty(data.shape + (len(self.LayerBandsAssignment),), data.dtype)
data_arr[:, :, i] = data data_arr[:, :, i] = data
if CFG.exec_mode == 'Flink' and path_output is None: # numpy array output if CFG.inmem_serialization and path_output is None: # numpy array output
self.arr = data_arr self.arr = data_arr
else: else:
GEOP.ndarray2gdal(data_arr, path_output, direction=3) GEOP.ndarray2gdal(data_arr, path_output, direction=3)
...@@ -452,12 +452,12 @@ class L1A_object(GMS_object): ...@@ -452,12 +452,12 @@ class L1A_object(GMS_object):
dst_CS_datum='WGS84', mode='GDAL', use_workspace=True, dst_CS_datum='WGS84', mode='GDAL', use_workspace=True,
inFill=self.MetaObj.spec_vals['fill']) inFill=self.MetaObj.spec_vals['fill'])
if CFG.exec_mode == 'Python': if not CFG.inmem_serialization:
path_warped = os.path.join(self.ExtractedFolder, self.baseN + '__' + self.arr_desc) path_warped = os.path.join(self.ExtractedFolder, self.baseN + '__' + self.arr_desc)
GEOP.ndarray2gdal(rasObj.tondarray(direction=3), path_warped, importFile=rasObj.desc, direction=3) GEOP.ndarray2gdal(rasObj.tondarray(direction=3), path_warped, importFile=rasObj.desc, direction=3)
self.MetaObj.Dataname = path_warped self.MetaObj.Dataname = path_warped
self.arr = path_warped self.arr = path_warped
else: # CFG.exec_mode=='Flink': else: # CFG.inmem_serialization is True
self.arr = rasObj.tondarray(direction=3) self.arr = rasObj.tondarray(direction=3)
self.shape_fullArr = [rasObj.rows, rasObj.cols, rasObj.bands] self.shape_fullArr = [rasObj.rows, rasObj.cols, rasObj.bands]
...@@ -473,7 +473,7 @@ class L1A_object(GMS_object): ...@@ -473,7 +473,7 @@ class L1A_object(GMS_object):
self.MetaObj.CornerTieP_UTM = rasObj.get_corner_coordinates('UTM') self.MetaObj.CornerTieP_UTM = rasObj.get_corner_coordinates('UTM')
self.meta_odict = self.MetaObj.to_odict() # important in order to keep geotransform/projection self.meta_odict = self.MetaObj.to_odict() # important in order to keep geotransform/projection
if CFG.exec_mode == 'Flink': if CFG.inmem_serialization:
self.delete_tempFiles() # these files are needed later in Python execution mode self.delete_tempFiles() # these files are needed later in Python execution mode
self.MetaObj.Dataname = previous_dataname # /vsi.. pointing directly to raw data archive (which exists) self.MetaObj.Dataname = previous_dataname # /vsi.. pointing directly to raw data archive (which exists)
...@@ -485,7 +485,7 @@ class L1A_object(GMS_object): ...@@ -485,7 +485,7 @@ class L1A_object(GMS_object):
rows, cols = self.shape_fullArr[:2] rows, cols = self.shape_fullArr[:2]
CorPosXY = [(0, 0), (cols, 0), (0, rows), (cols, rows)] CorPosXY = [(0, 0), (cols, 0), (0, rows), (cols, rows)]
gt = self.mask_nodata.geotransform gt = self.mask_nodata.geotransform
# using EPSG code ensures that excactly the same WKT code is used in Flink and Python mode # using EPSG code ensures that excactly the same WKT code is used in case of in-mem and disk serialization
prj = EPSG2WKT(self.mask_nodata.epsg) prj = EPSG2WKT(self.mask_nodata.epsg)
CorLatLon = pixelToLatLon(CorPosXY, geotransform=gt, projection=prj) CorLatLon = pixelToLatLon(CorPosXY, geotransform=gt, projection=prj)
self.corner_lonlat = [tuple(reversed(i)) for i in CorLatLon] self.corner_lonlat = [tuple(reversed(i)) for i in CorLatLon]
......
...@@ -607,8 +607,8 @@ class L1B_object(L1A_object): ...@@ -607,8 +607,8 @@ class L1B_object(L1A_object):
mapBounds = box(xmin, ymin, xmax, ymax).bounds mapBounds = box(xmin, ymin, xmax, ymax).bounds
# correct shifts and clip to extent # correct shifts and clip to extent
# ensure self.masks exists (does not exist in Flink mode because # ensure self.masks exists (does not exist in case of inmem_serialization mode because
# in that case self.fill_from_disk() is skipped) # then self.fill_from_disk() is skipped)
if not hasattr(self, 'masks') or self.masks is None: if not hasattr(self, 'masks') or self.masks is None:
self.build_combined_masks_array() # creates self.masks and self.masks_meta self.build_combined_masks_array() # creates self.masks and self.masks_meta
......
...@@ -611,7 +611,7 @@ class GEOPROCESSING(object): ...@@ -611,7 +611,7 @@ class GEOPROCESSING(object):
return stacked return stacked
elif path_output == 'MEMORY': # exec_mode == 'Flink' elif path_output == 'MEMORY': # CFG.inmem_serialization is True
stack_in_mem = gdal.GetDriverByName("MEM").Create("stack.mem", cols, rows, bands) stack_in_mem = gdal.GetDriverByName("MEM").Create("stack.mem", cols, rows, bands)
for idx, layer in enumerate(layers_pathlist): for idx, layer in enumerate(layers_pathlist):
...@@ -635,7 +635,7 @@ class GEOPROCESSING(object): ...@@ -635,7 +635,7 @@ class GEOPROCESSING(object):
self.bands = bands self.bands = bands
self.inDs = stack_in_mem self.inDs = stack_in_mem
else: # exec_mode == 'Python' else: # CFG.inmem_serialization is False
self.logger.info('Adding the following bands to Layerstack:') self.logger.info('Adding the following bands to Layerstack:')
[self.logger.info(os.path.basename(i)) for i in layers_pathlist] [self.logger.info(os.path.basename(i)) for i in layers_pathlist]
......
...@@ -84,7 +84,7 @@ class GMS_object(Dataset): ...@@ -84,7 +84,7 @@ class GMS_object(Dataset):
# delete arrays if their in-mem size is to big to be pickled # delete arrays if their in-mem size is to big to be pickled
# => (avoids MaybeEncodingError: Error sending result: '[<gms_preprocessing.algorithms.L2C_P.L2C_object # => (avoids MaybeEncodingError: Error sending result: '[<gms_preprocessing.algorithms.L2C_P.L2C_object
# object at 0x7fc44f6399e8>]'. Reason: 'error("'i' format requires -2147483648 <= number <= 2147483647",)') # object at 0x7fc44f6399e8>]'. Reason: 'error("'i' format requires -2147483648 <= number <= 2147483647",)')
if self.proc_level == 'L2C' and CFG.exec_mode == 'Flink': if self.proc_level == 'L2C' and CFG.inmem_serialization:
# FIXME check by bandname # FIXME check by bandname
if self.mask_nodata is not None and self.masks.bands > 1 and self.mask_clouds is not None: if self.mask_nodata is not None and self.masks.bands > 1 and self.mask_clouds is not None:
del self.masks del self.masks
...@@ -99,11 +99,11 @@ class GMS_object(Dataset): ...@@ -99,11 +99,11 @@ class GMS_object(Dataset):
self.pathGen = PG.path_generator(self.__dict__) # passes a logger in addition to previous attributes self.pathGen = PG.path_generator(self.__dict__) # passes a logger in addition to previous attributes
self.path_archive = self.pathGen.get_local_archive_path_baseN() self.path_archive = self.pathGen.get_local_archive_path_baseN()
if CFG.exec_mode == 'Python': if not CFG.inmem_serialization:
self.path_InFilePreprocessor = os.path.join(self.ExtractedFolder, '%s%s_DN.bsq' self.path_InFilePreprocessor = os.path.join(self.ExtractedFolder, '%s%s_DN.bsq'
% (self.entity_ID, % (self.entity_ID,
('_%s' % self.subsystem if self.subsystem else ''))) ('_%s' % self.subsystem if self.subsystem else '')))
else: # Flink else: # keep data in memory
self.path_InFilePreprocessor = None # None: keeps all produced data in memory (numpy array attributes) self.path_InFilePreprocessor = None # None: keeps all produced data in memory (numpy array attributes)
self.path_MetaPreprocessor = self.path_archive self.path_MetaPreprocessor = self.path_archive
...@@ -117,7 +117,7 @@ class GMS_object(Dataset): ...@@ -117,7 +117,7 @@ class GMS_object(Dataset):
else: else:
self.path_archive_valid = True self.path_archive_valid = True
if CFG.exec_mode == 'Python' and self.ExtractedFolder and not os.path.isdir(self.ExtractedFolder): if not CFG.inmem_serialization and self.ExtractedFolder and not os.path.isdir(self.ExtractedFolder):
os.makedirs(self.ExtractedFolder) os.makedirs(self.ExtractedFolder)
assert os.path.exists(self.path_archive), 'Invalid path to RAW data. File %s does not exist at %s.' \ assert os.path.exists(self.path_archive), 'Invalid path to RAW data. File %s does not exist at %s.' \
...@@ -125,7 +125,7 @@ class GMS_object(Dataset): ...@@ -125,7 +125,7 @@ class GMS_object(Dataset):
os.path.dirname(self.path_archive)) os.path.dirname(self.path_archive))
assert isinstance(self.path_archive, str), 'Invalid path to RAW data. Got %s instead of string or unicode.' \ assert isinstance(self.path_archive, str), 'Invalid path to RAW data. Got %s instead of string or unicode.' \
% type(self.path_archive) % type(self.path_archive)
if CFG.exec_mode == 'Python' and self.ExtractedFolder: if not CFG.inmem_serialization and self.ExtractedFolder:
assert os.path.exists(self.path_archive), \ assert os.path.exists(self.path_archive), \
'Invalid path for temporary files. Directory %s does not exist.' % self.ExtractedFolder 'Invalid path for temporary files. Directory %s does not exist.' % self.ExtractedFolder
...@@ -817,7 +817,7 @@ class GMS_object(Dataset): ...@@ -817,7 +817,7 @@ class GMS_object(Dataset):
def combine_tiles_to_ObjAttr(self, tiles, target_attr): def combine_tiles_to_ObjAttr(self, tiles, target_attr):
# type: (list,str) -> None # type: (list,str) -> None
"""Combines tiles, e.g. produced by L1A_P.L1A_object.DN2TOARadRefTemp() to a single attribute. """Combines tiles, e.g. produced by L1A_P.L1A_object.DN2TOARadRefTemp() to a single attribute.
If CFG.CFG.exec_mode == 'Python' the produced attribute is additionally written to disk. If CFG.inmem_serialization is False, the produced attribute is additionally written to disk.
:param tiles: <list> a list of dictionaries with the keys 'desc', 'data', 'row_start','row_end', :param tiles: <list> a list of dictionaries with the keys 'desc', 'data', 'row_start','row_end',
'col_start' and 'col_end' 'col_start' and 'col_end'
...@@ -843,7 +843,7 @@ class GMS_object(Dataset): ...@@ -843,7 +843,7 @@ class GMS_object(Dataset):
self.arr_desc = sampleTile['desc'] self.arr_desc = sampleTile['desc']
self.arr_shape = 'cube' if len(self.arr.shape) == 3 else 'band' if len(self.arr.shape) == 2 else 'unknown' self.arr_shape = 'cube' if len(self.arr.shape) == 3 else 'band' if len(self.arr.shape) == 2 else 'unknown'
if CFG.exec_mode == 'Python': # and not 'Flink' if not CFG.inmem_serialization:
path_radref_file = os.path.join(self.ExtractedFolder, self.baseN + '__' + self.arr_desc) path_radref_file = os.path.join(self.ExtractedFolder, self.baseN + '__' + self.arr_desc)
# path_radref_file = os.path.abspath('./testing/out/%s_TOA_Ref' % self.baseN) # path_radref_file = os.path.abspath('./testing/out/%s_TOA_Ref' % self.baseN)
while not os.path.isdir(os.path.dirname(path_radref_file)): while not os.path.isdir(os.path.dirname(path_radref_file)):
...@@ -918,7 +918,8 @@ class GMS_object(Dataset): ...@@ -918,7 +918,8 @@ class GMS_object(Dataset):
zip(GDF_MGRS_tiles['granuleid'], GDF_MGRS_tiles['map_bounds_MGRS'])} zip(GDF_MGRS_tiles['granuleid'], GDF_MGRS_tiles['map_bounds_MGRS'])}
firstTile_ID = dictIDxminymin[min(dictIDxminymin.keys())] firstTile_ID = dictIDxminymin[min(dictIDxminymin.keys())]
# ensure self.masks exists (does not exist in Flink mode because in that case self.fill_from_disk() is skipped) # ensure self.masks exists (does not exist in case of inmem_serialization mode
# because in that case self.fill_from_disk() is skipped)
if not hasattr(self, 'masks') or self.masks is None: if not hasattr(self, 'masks') or self.masks is None:
self.build_combined_masks_array() # creates self.masks and self.masks_meta self.build_combined_masks_array() # creates self.masks and self.masks_meta
...@@ -1043,7 +1044,7 @@ class GMS_object(Dataset): ...@@ -1043,7 +1044,7 @@ class GMS_object(Dataset):
# set self.arr from L1B path to L1A path in order to make to_ENVI copy L1A array (if .arr is not an array) # set self.arr from L1B path to L1A path in order to make to_ENVI copy L1A array (if .arr is not an array)
if self.proc_level == 'L1B' and not self.arr.is_inmem and os.path.isfile(self.arr.filePath): if self.proc_level == 'L1B' and not self.arr.is_inmem and os.path.isfile(self.arr.filePath):
# FIXME this could leed to check the wrong folder in Python exec_mode: # FIXME this could leed to check the wrong folder if CFG.inmem_serialization is False:
self.arr = PG.path_generator('L1A', self.image_type, self.satellite, self.sensor, self.subsystem, self.arr = PG.path_generator('L1A', self.image_type, self.satellite, self.sensor, self.subsystem,
self.acq_datetime, self.entity_ID, self.logger, self.acq_datetime, self.entity_ID, self.logger,
MGRS_info=MGRS_info).get_path_imagedata() MGRS_info=MGRS_info).get_path_imagedata()
...@@ -1218,7 +1219,7 @@ class GMS_object(Dataset): ...@@ -1218,7 +1219,7 @@ class GMS_object(Dataset):
assert OUT_W.check_header_not_empty(outpath_hdr), "HEADER EMPTY: %s" % outpath_hdr assert OUT_W.check_header_not_empty(outpath_hdr), "HEADER EMPTY: %s" % outpath_hdr
outpath_arr = os.path.splitext(outpath_hdr)[0] + '.%s' % self.outInterleave outpath_arr = os.path.splitext(outpath_hdr)[0] + '.%s' % self.outInterleave
if CFG.exec_mode == 'Python': if not CFG.inmem_serialization:
setattr(self, arrayname, outpath_arr) # replace array by output path setattr(self, arrayname, outpath_arr) # replace array by output path
if arrayname == 'masks': if arrayname == 'masks':
setattr(self, 'mask_nodata', outpath_arr) setattr(self, 'mask_nodata', outpath_arr)
......
...@@ -50,7 +50,7 @@ path_gmslib = os.path.dirname(pkgutil.get_loader("gms_preprocessing").path) ...@@ -50,7 +50,7 @@ path_gmslib = os.path.dirname(pkgutil.get_loader("gms_preprocessing").path)
path_options_default = os.path.join(path_gmslib, 'options', 'options_default.json') path_options_default = os.path.join(path_gmslib, 'options', 'options_default.json')
def set_config(job_ID, json_config='', exec_mode='Python', parallelization_level='scenes', db_host='localhost', def set_config(job_ID, json_config='', inmem_serialization=False, parallelization_level='scenes', db_host='localhost',
reset_status=False, delete_old_output=False, exec_L1AP=None, reset_status=False, delete_old_output=False, exec_L1AP=None,
exec_L1BP=None, exec_L1CP=None, exec_L2AP=None, exec_L2BP=None, exec_L2CP=None, CPUs=None, exec_L1BP=None, exec_L1CP=None, exec_L2AP=None, exec_L2BP=None, exec_L2CP=None, CPUs=None,
allow_subMultiprocessing=True, disable_exception_handler=True, log_level='INFO', allow_subMultiprocessing=True, disable_exception_handler=True, log_level='INFO',
...@@ -61,8 +61,8 @@ def set_config(job_ID, json_config='', exec_mode='Python', parallelization_level ...@@ -61,8 +61,8 @@ def set_config(job_ID, json_config='', exec_mode='Python', parallelization_level
:param job_ID: job ID of the job to be executed, e.g. 123456 (must be present in database) :param job_ID: job ID of the job to be executed, e.g. 123456 (must be present in database)
:param json_config path to JSON file containing configuration parameters or a string in JSON format :param json_config path to JSON file containing configuration parameters or a string in JSON format
:param exec_mode: 'Python': writes intermediate results to disk in order to save memory :param inmem_serialization: False: write intermediate results to disk in order to save memory
'Flink': keeps intermediate results in memory in order to save IO time True: keep intermediate results in memory in order to save IO time
:param parallelization_level: <str> choices: 'scenes' - parallelization on scene-level :param parallelization_level: <str> choices: 'scenes' - parallelization on scene-level
'tiles' - parallelisation on tile-level 'tiles' - parallelisation on tile-level
:param db_host: host name of the server that runs the postgreSQL database :param db_host: host name of the server that runs the postgreSQL database
...@@ -179,8 +179,8 @@ class JobConfig(object): ...@@ -179,8 +179,8 @@ class JobConfig(object):
json_globts = json_opts['global_opts'] # type: dict json_globts = json_opts['global_opts'] # type: dict
self.exec_mode = \ self.inmem_serialization = \
gp('exec_mode', json_globts['exec_mode']) gp('inmem_serialization', json_globts['inmem_serialization'])
self.parallelization_level = \ self.parallelization_level = \
gp('parallelization_level', json_globts['parallelization_level']) gp('parallelization_level', json_globts['parallelization_level'])
self.CPUs = \ self.CPUs = \
...@@ -647,8 +647,8 @@ class JobConfig(object): ...@@ -647,8 +647,8 @@ class JobConfig(object):
execute, write, delete = exec_lvl execute, write, delete = exec_lvl
# written output cannot be turned off in execution mode 'Python' # written output cannot be turned off in execution mode 'Python'
if self.exec_mode == 'Python' and execute and not write: if not self.inmem_serialization and execute and not write:
warnings.warn("If job.exec_mode is set to 'Python' the output writer for %s has to be enabled " warnings.warn("If CFG.inmem_serialization is False the output writer for %s has to be enabled "
"because any operations on GMS_obj.arr read the intermediate results from disk. " "because any operations on GMS_obj.arr read the intermediate results from disk. "
"Turning it on.." % i) "Turning it on.." % i)
write = True write = True
......
{ {
"global_opts": { "global_opts": {
"exec_mode": "Python", /*"Python" or "Flink"*/ "inmem_serialization": false, /*If "true", all intermediate processing results are kept in memory. This avoids
disk I/O but requires a lot of RAM. Implemented for execution via Flink.*/
"parallelization_level": "scenes", /*"scenes" or "tiles"*/ "parallelization_level": "scenes", /*"scenes" or "tiles"*/
"db_host": "localhost", "db_host": "localhost",
"CPUs": "None", /*number of CPU cores to be used for processing (default: "None" -> use all available)*/ "CPUs": "None", /*number of CPU cores to be used for processing (default: "None" -> use all available)*/
......
...@@ -3,7 +3,7 @@ gms_schema_input = dict( ...@@ -3,7 +3,7 @@ gms_schema_input = dict(
global_opts=dict( global_opts=dict(
type='dict', required=False, type='dict', required=False,
schema=dict( schema=dict(
exec_mode=dict(type='string', required=False, allowed=['Python', 'Flink']), inmem_serialization=dict(type='boolean', required=False),
parallelization_level=dict(type='string', required=False, allowed=['scenes', 'tiles']), parallelization_level=dict(type='string', required=False, allowed=['scenes', 'tiles']),
db_host=dict(type='string', required=False), db_host=dict(type='string', required=False),
CPUs=dict(type='integer', required=False, nullable=True), CPUs=dict(type='integer', required=False, nullable=True),
......
...@@ -60,7 +60,7 @@ def L1A_map_1(dataset_dict, block_size=None): # map (scene-wise parallelization ...@@ -60,7 +60,7 @@ def L1A_map_1(dataset_dict, block_size=None): # map (scene-wise parallelization
def L1A_map_2(L1A_tile): # map (block-wise parallelization) def L1A_map_2(L1A_tile): # map (block-wise parallelization)
# type: (L1A_P.L1A_object) -> L1A_P.L1A_object # type: (L1A_P.L1A_object) -> L1A_P.L1A_object
L1A_tile.calc_TOARadRefTemp() L1A_tile.calc_TOARadRefTemp()
if CFG.exec_mode == 'Python': if not CFG.inmem_serialization:
L1A_tile.to_ENVI(is_tempfile=True) L1A_tile.to_ENVI(is_tempfile=True)
return L1A_tile return L1A_tile
...@@ -84,7 +84,7 @@ def L1A_map_3(L1A_obj): # map (scene-wise parallelization) ...@@ -84,7 +84,7 @@ def L1A_map_3(L1A_obj): # map (scene-wise parallelization)
@EXC_H.log_uncaught_exceptions @EXC_H.log_uncaught_exceptions
def L1B_map(L1A_obj): def L1B_map(L1A_obj):
# type: (L1A_P.L1A_object) -> L1B_P.L1B_object # type: (L1A_P.L1A_object) -> L1B_P.L1B_object
"""L1A_obj enthält in Python- (im Gegensatz zur Flink-) Implementierung KEINE ARRAY-DATEN!, """L1A_obj enthält in Python- (im Gegensatz zur inmem_serialization-) Implementierung KEINE ARRAY-DATEN!,
nur die für die ganze Szene gültigen Metadaten""" nur die für die ganze Szene gültigen Metadaten"""
L1B_obj = L1B_P.L1B_object(L1A_obj) L1B_obj = L1B_P.L1B_object(L1A_obj)
......
...@@ -597,7 +597,7 @@ class process_controller(object): ...@@ -597,7 +597,7 @@ class process_controller(object):
L2B_Instances = self.L2B_newObjects + L2B_DBObjects # combine newly and earlier processed L2A data L2B_Instances = self.L2B_newObjects + L2B_DBObjects # combine newly and earlier processed L2A data
L2C_resObjects = MAP(L2C_map, L2B_Instances, CPUs=8) # FIXME 8 workers due to heavy IO L2C_resObjects = MAP(L2C_map, L2B_Instances, CPUs=8) # FIXME 8 workers due to heavy IO
# FIXME in Flink mode results are too big to be back-pickled # FIXME in case of inmem_serialization mode results are too big to be back-pickled
self.L2C_newObjects = [obj for obj in L2C_resObjects if isinstance(obj, L2C_P.L2C_object)] self.L2C_newObjects = [obj for obj in L2C_resObjects if isinstance(obj, L2C_P.L2C_object)]
self.failed_objects += [obj for obj in L2C_resObjects if isinstance(obj, failed_GMS_object) and self.failed_objects += [obj for obj in L2C_resObjects if isinstance(obj, failed_GMS_object) and
obj.scene_ID not in self.sceneids_failed] obj.scene_ID not in self.sceneids_failed]
......
...@@ -61,7 +61,7 @@ class Test_JobConfig(TestCase): ...@@ -61,7 +61,7 @@ class Test_JobConfig(TestCase):
JobConfig(self.jobID, db_host=self.db_host, json_config=cfg) JobConfig(self.jobID, db_host=self.db_host, json_config=cfg)
def test_jsonconfig_str_schema_violation(self): def test_jsonconfig_str_schema_violation(self):
cfg = '{"global_opts": {"exec_mode": "badvalue"}}' cfg = '{"global_opts": {"inmem_serialization": "badvalue"}}'
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
JobConfig(self.jobID, db_host=self.db_host, json_config=cfg) JobConfig(self.jobID, db_host=self.db_host, json_config=cfg)
......
...@@ -24,7 +24,7 @@ gmsRepo_rootpath = os.path.abspath(os.path.join(os.path.dirname(__file__), '..') ...@@ -24,7 +24,7 @@ gmsRepo_rootpath = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')
class Test_DEM_Creator(unittest.TestCase): class Test_DEM_Creator(unittest.TestCase):
def setUp(self): def setUp(self):
# Testjob Landsat-8 # Testjob Landsat-8
set_config(exec_mode='Python', job_ID=26186196, db_host=db_host, reset_status=True, is_test=True, set_config(job_ID=26186196, db_host=db_host, reset_status=True, is_test=True,
path_archive=os.path.join(gmsRepo_rootpath, 'tests', 'data', 'archive_data')) path_archive=os.path.join(gmsRepo_rootpath, 'tests', 'data', 'archive_data'))
self.boxMapXY = ((277365.0, 5546625.0), (292365.0, 5546625.0), (292365.0, 5531625.0), (277365.0, 5531625.0)) self.boxMapXY = ((277365.0, 5546625.0), (292365.0, 5546625.0), (292365.0, 5531625.0), (277365.0, 5531625.0))
......
...@@ -31,7 +31,7 @@ class Test_KMeansRSImage(unittest.TestCase): ...@@ -31,7 +31,7 @@ class Test_KMeansRSImage(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
# Testjob Landsat-8 # Testjob Landsat-8
set_config(exec_mode='Python', job_ID=26186196, db_host=db_host, reset_status=True) set_config(job_ID=26186196, db_host=db_host, reset_status=True)
cls.geoArr = GeoArray(testdata) cls.geoArr = GeoArray(testdata)
cls.geoArr.to_mem() cls.geoArr.to_mem()
cls.kmeans = KMeansRSImage(cls.geoArr, n_clusters=10) cls.kmeans = KMeansRSImage(cls.geoArr, n_clusters=10)
......
...@@ -32,7 +32,7 @@ testdata = os.path.join(__path__[0], '../tests/data/hy_spec_data/Bavaria_farmlan ...@@ -32,7 +32,7 @@ testdata = os.path.join(__path__[0], '../tests/data/hy_spec_data/Bavaria_farmlan
# @classmethod # @classmethod
# def setUpClass(cls): # def setUpClass(cls):
# # Testjob Landsat-8 # # Testjob Landsat-8
# set_config(exec_mode='Python', job_ID=26186196, db_host=db_host, reset_status=True, is_test=True) # set_config(job_ID=26186196, db_host=db_host, reset_status=True, is_test=True)
# cls.SHC = ReferenceCube_Generator_OLD([testdata, testdata, ], v=False) # cls.SHC = ReferenceCube_Generator_OLD([testdata, testdata, ], v=False)
# #
# def test_generate_reference_cube_L8(self): # def test_generate_reference_cube_L8(self):
...@@ -67,7 +67,7 @@ class Test_ReferenceCube_Generator(unittest.TestCase): ...@@ -67,7 +67,7 @@ class Test_ReferenceCube_Generator(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
# Testjob Landsat-8 # Testjob Landsat-8
set_config(exec_mode='Python', job_ID=26186196, db_host=db_host, reset_status=True, is_test=True) set_config(job_ID=26186196, db_host=db_host, reset_status=True, is_test=True)
cls.tmpOutdir = tempfile.TemporaryDirectory() cls.tmpOutdir = tempfile.TemporaryDirectory()
cls.testIms = [testdata, testdata, ] cls.testIms = [testdata, testdata, ]
cls.tgt_sat_sen_list = [ cls.tgt_sat_sen_list = [
...@@ -136,7 +136,7 @@ class Test_SpectralHomogenizer(unittest.TestCase): ...@@ -136,7 +136,7 @@ class Test_SpectralHomogenizer(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
# Testjob Landsat-8 # Testjob Landsat-8
cfg = set_config(exec_mode='Python', job_ID=26186196, db_host=db_host, reset_status=True, is_test=True) cfg = set_config(job_ID=26186196, db_host=db_host, reset_status=True, is_test=True)
cls.SpH = SpectralHomogenizer(classifier_rootDir=cfg.path_spechomo_classif) cls.SpH = SpectralHomogenizer(classifier_rootDir=cfg.path_spechomo_classif)
cls.testArr_L8 = GeoArray(np.random.randint(1, 10000, (50, 50, 7), dtype=np.int16)) # no band 9, no pan cls.testArr_L8 = GeoArray(np.random.randint(1, 10000, (50, 50, 7), dtype=np.int16)) # no band 9, no pan
......
...@@ -31,7 +31,7 @@ class Test_SpectralResampler(unittest.TestCase): ...@@ -31,7 +31,7 @@ class Test_SpectralResampler(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
# Testjob Landsat-8 # Testjob Landsat-8
set_config(exec_mode='Python', job_ID=26186196, db_host=db_host, reset_status=True, is_test=True) set_config(job_ID=26186196, db_host=db_host, reset_status=True, is_test=True)
cls.geoArr = GeoArray(testdata) cls.geoArr = GeoArray(testdata)
cls.geoArr.to_mem() cls.geoArr.to_mem()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment