Commit ed2ef3af authored by Mathias Peters's avatar Mathias Peters
Browse files

Merge remote-tracking branch 'origin/master'


Former-commit-id: 3a0dbe59
parents 32c982dc cdbb9bec
......@@ -15,8 +15,8 @@ from .processing.process_controller import process_controller # noqa: E402
__author__ = """Daniel Scheffler"""
__email__ = 'daniel.scheffler@gfz-potsdam.de'
__version__ = '0.12.2'
__versionalias__ = '20180117.01'
__version__ = '0.12.3'
__versionalias__ = '20180126.01'
__all__ = ['algorithms',
'io',
'misc',
......
......@@ -143,7 +143,7 @@ class L1A_object(GMS_object):
subset = ['block', [[sub_dim[0], sub_dim[1] + 1], [sub_dim[2], sub_dim[3] + 1]]]
rasObj = GEOP.GEOPROCESSING(paths_files2stack[0], self.logger, subset=subset)
if CFG.exec_mode == 'Flink' and path_output is None: # numpy array output
if CFG.inmem_serialization and path_output is None: # numpy array output
self.arr = rasObj.Layerstacking(paths_files2stack)
self.path_InFilePreprocessor = paths_files2stack[0]
else: # 'MEMORY' or physical output
......@@ -162,7 +162,7 @@ class L1A_object(GMS_object):
subset = ['block', [[sub_dim[0], sub_dim[1] + 1], [sub_dim[2], sub_dim[3] + 1]]]
rasObj = GEOP.GEOPROCESSING(path_file2load, self.logger, subset=subset)
if CFG.exec_mode == 'Flink' and path_output is None: # numpy array output
if CFG.inmem_serialization and path_output is None: # numpy array output
self.arr = gdalnumeric.LoadFile(path_file2load) if subset is None else \
gdalnumeric.LoadFile(path_file2load, rasObj.colStart, rasObj.rowStart, rasObj.cols, rasObj.rows)
self.path_InFilePreprocessor = path_file2load
......@@ -190,7 +190,7 @@ class L1A_object(GMS_object):
data_arr = np.empty(data.shape + (len(self.LayerBandsAssignment),), data.dtype)
data_arr[:, :, bidx] = data
if CFG.exec_mode == 'Flink' and path_output is None: # numpy array output
if CFG.inmem_serialization and path_output is None: # numpy array output
self.arr = data_arr
else:
GEOP.ndarray2gdal(data_arr, path_output, geotransform=ds.GetGeoTransform(),
......@@ -221,7 +221,7 @@ class L1A_object(GMS_object):
data_arr = np.empty(data.shape + (len(self.LayerBandsAssignment),), data.dtype)
data_arr[:, :, i] = data
if CFG.exec_mode == 'Flink' and path_output is None: # numpy array output
if CFG.inmem_serialization and path_output is None: # numpy array output
self.arr = data_arr
else:
GEOP.ndarray2gdal(data_arr, path_output, direction=3)
......@@ -452,12 +452,12 @@ class L1A_object(GMS_object):
dst_CS_datum='WGS84', mode='GDAL', use_workspace=True,
inFill=self.MetaObj.spec_vals['fill'])
if CFG.exec_mode == 'Python':
if not CFG.inmem_serialization:
path_warped = os.path.join(self.ExtractedFolder, self.baseN + '__' + self.arr_desc)
GEOP.ndarray2gdal(rasObj.tondarray(direction=3), path_warped, importFile=rasObj.desc, direction=3)
self.MetaObj.Dataname = path_warped
self.arr = path_warped
else: # CFG.exec_mode=='Flink':
else: # CFG.inmem_serialization is True
self.arr = rasObj.tondarray(direction=3)
self.shape_fullArr = [rasObj.rows, rasObj.cols, rasObj.bands]
......@@ -473,7 +473,7 @@ class L1A_object(GMS_object):
self.MetaObj.CornerTieP_UTM = rasObj.get_corner_coordinates('UTM')
self.meta_odict = self.MetaObj.to_odict() # important in order to keep geotransform/projection
if CFG.exec_mode == 'Flink':
if CFG.inmem_serialization:
self.delete_tempFiles() # these files are needed later in Python execution mode
self.MetaObj.Dataname = previous_dataname # /vsi.. pointing directly to raw data archive (which exists)
......@@ -485,7 +485,7 @@ class L1A_object(GMS_object):
rows, cols = self.shape_fullArr[:2]
CorPosXY = [(0, 0), (cols, 0), (0, rows), (cols, rows)]
gt = self.mask_nodata.geotransform
# using EPSG code ensures that excactly the same WKT code is used in Flink and Python mode
# using EPSG code ensures that excactly the same WKT code is used in case of in-mem and disk serialization
prj = EPSG2WKT(self.mask_nodata.epsg)
CorLatLon = pixelToLatLon(CorPosXY, geotransform=gt, projection=prj)
self.corner_lonlat = [tuple(reversed(i)) for i in CorLatLon]
......
......@@ -607,8 +607,8 @@ class L1B_object(L1A_object):
mapBounds = box(xmin, ymin, xmax, ymax).bounds
# correct shifts and clip to extent
# ensure self.masks exists (does not exist in Flink mode because
# in that case self.fill_from_disk() is skipped)
# ensure self.masks exists (does not exist in case of inmem_serialization mode because
# then self.fill_from_disk() is skipped)
if not hasattr(self, 'masks') or self.masks is None:
self.build_combined_masks_array() # creates self.masks and self.masks_meta
......
......@@ -611,7 +611,7 @@ class GEOPROCESSING(object):
return stacked
elif path_output == 'MEMORY': # exec_mode == 'Flink'
elif path_output == 'MEMORY': # CFG.inmem_serialization is True
stack_in_mem = gdal.GetDriverByName("MEM").Create("stack.mem", cols, rows, bands)
for idx, layer in enumerate(layers_pathlist):
......@@ -635,7 +635,7 @@ class GEOPROCESSING(object):
self.bands = bands
self.inDs = stack_in_mem
else: # exec_mode == 'Python'
else: # CFG.inmem_serialization is False
self.logger.info('Adding the following bands to Layerstack:')
[self.logger.info(os.path.basename(i)) for i in layers_pathlist]
......
......@@ -84,7 +84,7 @@ class GMS_object(Dataset):
# delete arrays if their in-mem size is to big to be pickled
# => (avoids MaybeEncodingError: Error sending result: '[<gms_preprocessing.algorithms.L2C_P.L2C_object
# object at 0x7fc44f6399e8>]'. Reason: 'error("'i' format requires -2147483648 <= number <= 2147483647",)')
if self.proc_level == 'L2C' and CFG.exec_mode == 'Flink':
if self.proc_level == 'L2C' and CFG.inmem_serialization:
# FIXME check by bandname
if self.mask_nodata is not None and self.masks.bands > 1 and self.mask_clouds is not None:
del self.masks
......@@ -99,11 +99,11 @@ class GMS_object(Dataset):
self.pathGen = PG.path_generator(self.__dict__) # passes a logger in addition to previous attributes
self.path_archive = self.pathGen.get_local_archive_path_baseN()
if CFG.exec_mode == 'Python':
if not CFG.inmem_serialization:
self.path_InFilePreprocessor = os.path.join(self.ExtractedFolder, '%s%s_DN.bsq'
% (self.entity_ID,
('_%s' % self.subsystem if self.subsystem else '')))
else: # Flink
else: # keep data in memory
self.path_InFilePreprocessor = None # None: keeps all produced data in memory (numpy array attributes)
self.path_MetaPreprocessor = self.path_archive
......@@ -117,7 +117,7 @@ class GMS_object(Dataset):
else:
self.path_archive_valid = True
if CFG.exec_mode == 'Python' and self.ExtractedFolder and not os.path.isdir(self.ExtractedFolder):
if not CFG.inmem_serialization and self.ExtractedFolder and not os.path.isdir(self.ExtractedFolder):
os.makedirs(self.ExtractedFolder)
assert os.path.exists(self.path_archive), 'Invalid path to RAW data. File %s does not exist at %s.' \
......@@ -125,7 +125,7 @@ class GMS_object(Dataset):
os.path.dirname(self.path_archive))
assert isinstance(self.path_archive, str), 'Invalid path to RAW data. Got %s instead of string or unicode.' \
% type(self.path_archive)
if CFG.exec_mode == 'Python' and self.ExtractedFolder:
if not CFG.inmem_serialization and self.ExtractedFolder:
assert os.path.exists(self.path_archive), \
'Invalid path for temporary files. Directory %s does not exist.' % self.ExtractedFolder
......@@ -817,7 +817,7 @@ class GMS_object(Dataset):
def combine_tiles_to_ObjAttr(self, tiles, target_attr):
# type: (list,str) -> None
"""Combines tiles, e.g. produced by L1A_P.L1A_object.DN2TOARadRefTemp() to a single attribute.
If CFG.CFG.exec_mode == 'Python' the produced attribute is additionally written to disk.
If CFG.inmem_serialization is False, the produced attribute is additionally written to disk.
:param tiles: <list> a list of dictionaries with the keys 'desc', 'data', 'row_start','row_end',
'col_start' and 'col_end'
......@@ -843,7 +843,7 @@ class GMS_object(Dataset):
self.arr_desc = sampleTile['desc']
self.arr_shape = 'cube' if len(self.arr.shape) == 3 else 'band' if len(self.arr.shape) == 2 else 'unknown'
if CFG.exec_mode == 'Python': # and not 'Flink'
if not CFG.inmem_serialization:
path_radref_file = os.path.join(self.ExtractedFolder, self.baseN + '__' + self.arr_desc)
# path_radref_file = os.path.abspath('./testing/out/%s_TOA_Ref' % self.baseN)
while not os.path.isdir(os.path.dirname(path_radref_file)):
......@@ -918,7 +918,8 @@ class GMS_object(Dataset):
zip(GDF_MGRS_tiles['granuleid'], GDF_MGRS_tiles['map_bounds_MGRS'])}
firstTile_ID = dictIDxminymin[min(dictIDxminymin.keys())]
# ensure self.masks exists (does not exist in Flink mode because in that case self.fill_from_disk() is skipped)
# ensure self.masks exists (does not exist in case of inmem_serialization mode
# because in that case self.fill_from_disk() is skipped)
if not hasattr(self, 'masks') or self.masks is None:
self.build_combined_masks_array() # creates self.masks and self.masks_meta
......@@ -1043,7 +1044,7 @@ class GMS_object(Dataset):
# set self.arr from L1B path to L1A path in order to make to_ENVI copy L1A array (if .arr is not an array)
if self.proc_level == 'L1B' and not self.arr.is_inmem and os.path.isfile(self.arr.filePath):
# FIXME this could leed to check the wrong folder in Python exec_mode:
# FIXME this could leed to check the wrong folder if CFG.inmem_serialization is False:
self.arr = PG.path_generator('L1A', self.image_type, self.satellite, self.sensor, self.subsystem,
self.acq_datetime, self.entity_ID, self.logger,
MGRS_info=MGRS_info).get_path_imagedata()
......@@ -1218,7 +1219,7 @@ class GMS_object(Dataset):
assert OUT_W.check_header_not_empty(outpath_hdr), "HEADER EMPTY: %s" % outpath_hdr
outpath_arr = os.path.splitext(outpath_hdr)[0] + '.%s' % self.outInterleave
if CFG.exec_mode == 'Python':
if not CFG.inmem_serialization:
setattr(self, arrayname, outpath_arr) # replace array by output path
if arrayname == 'masks':
setattr(self, 'mask_nodata', outpath_arr)
......
......@@ -51,7 +51,7 @@ path_gmslib = os.path.dirname(pkgutil.get_loader("gms_preprocessing").path)
path_options_default = os.path.join(path_gmslib, 'options', 'options_default.json')
def set_config(job_ID, json_config='', exec_mode='Python', parallelization_level='scenes', db_host='localhost',
def set_config(job_ID, json_config='', inmem_serialization=False, parallelization_level='scenes', db_host='localhost',
reset_status=False, delete_old_output=False, exec_L1AP=None,
exec_L1BP=None, exec_L1CP=None, exec_L2AP=None, exec_L2BP=None, exec_L2CP=None, CPUs=None,
allow_subMultiprocessing=True, disable_exception_handler=True, log_level='INFO',
......@@ -62,8 +62,8 @@ def set_config(job_ID, json_config='', exec_mode='Python', parallelization_level
:param job_ID: job ID of the job to be executed, e.g. 123456 (must be present in database)
:param json_config path to JSON file containing configuration parameters or a string in JSON format
:param exec_mode: 'Python': writes intermediate results to disk in order to save memory
'Flink': keeps intermediate results in memory in order to save IO time
:param inmem_serialization: False: write intermediate results to disk in order to save memory
True: keep intermediate results in memory in order to save IO time
:param parallelization_level: <str> choices: 'scenes' - parallelization on scene-level
'tiles' - parallelisation on tile-level
:param db_host: host name of the server that runs the postgreSQL database
......@@ -195,8 +195,8 @@ class JobConfig(object):
json_globts = json_opts['global_opts'] # type: dict
self.exec_mode = \
gp('exec_mode', json_globts['exec_mode'])
self.inmem_serialization = \
gp('inmem_serialization', json_globts['inmem_serialization'])
self.parallelization_level = \
gp('parallelization_level', json_globts['parallelization_level'])
self.CPUs = \
......@@ -660,8 +660,8 @@ class JobConfig(object):
execute, write, delete = exec_lvl
# written output cannot be turned off in execution mode 'Python'
if self.exec_mode == 'Python' and execute and not write:
warnings.warn("If job.exec_mode is set to 'Python' the output writer for %s has to be enabled "
if not self.inmem_serialization and execute and not write:
warnings.warn("If CFG.inmem_serialization is False the output writer for %s has to be enabled "
"because any operations on GMS_obj.arr read the intermediate results from disk. "
"Turning it on.." % i)
write = True
......
{
"global_opts": {
"exec_mode": "Python", /*"Python" or "Flink"*/
"inmem_serialization": false, /*If "true", all intermediate processing results are kept in memory. This avoids
disk I/O but requires a lot of RAM. Implemented for execution via Flink.*/
"parallelization_level": "scenes", /*"scenes" or "tiles"*/
"db_host": "localhost",
"CPUs": "None", /*number of CPU cores to be used for processing (default: "None" -> use all available)*/
......
......@@ -3,7 +3,7 @@ gms_schema_input = dict(
global_opts=dict(
type='dict', required=False,
schema=dict(
exec_mode=dict(type='string', required=False, allowed=['Python', 'Flink']),
inmem_serialization=dict(type='boolean', required=False),
parallelization_level=dict(type='string', required=False, allowed=['scenes', 'tiles']),
db_host=dict(type='string', required=False),
CPUs=dict(type='integer', required=False, nullable=True),
......
......@@ -60,7 +60,7 @@ def L1A_map_1(dataset_dict, block_size=None): # map (scene-wise parallelization
def L1A_map_2(L1A_tile): # map (block-wise parallelization)
# type: (L1A_P.L1A_object) -> L1A_P.L1A_object
L1A_tile.calc_TOARadRefTemp()
if CFG.exec_mode == 'Python':
if not CFG.inmem_serialization:
L1A_tile.to_ENVI(is_tempfile=True)
return L1A_tile
......@@ -84,7 +84,7 @@ def L1A_map_3(L1A_obj): # map (scene-wise parallelization)
@EXC_H.log_uncaught_exceptions
def L1B_map(L1A_obj):
# type: (L1A_P.L1A_object) -> L1B_P.L1B_object
"""L1A_obj enthält in Python- (im Gegensatz zur Flink-) Implementierung KEINE ARRAY-DATEN!,
"""L1A_obj enthält in Python- (im Gegensatz zur inmem_serialization-) Implementierung KEINE ARRAY-DATEN!,
nur die für die ganze Szene gültigen Metadaten"""
L1B_obj = L1B_P.L1B_object(L1A_obj)
......
......@@ -589,7 +589,7 @@ class process_controller(object):
L2B_Instances = self.L2B_newObjects + L2B_DBObjects # combine newly and earlier processed L2A data
L2C_resObjects = MAP(L2C_map, L2B_Instances, CPUs=8) # FIXME 8 workers due to heavy IO
# FIXME in Flink mode results are too big to be back-pickled
# FIXME in case of inmem_serialization mode results are too big to be back-pickled
self.L2C_newObjects = [obj for obj in L2C_resObjects if isinstance(obj, L2C_P.L2C_object)]
self.failed_objects += [obj for obj in L2C_resObjects if isinstance(obj, failed_GMS_object) and
obj.scene_ID not in self.sceneids_failed]
......
......@@ -26,7 +26,7 @@ test_requirements = requirements + ['coverage', 'nose', 'nose2', 'nose-htmloutpu
setup(
name='gms_preprocessing',
version='0.12.2',
version='0.12.3',
description="GeoMultiSens - Scalable Multi-Sensor Analysis of Remote Sensing Data",
long_description=readme + '\n\n' + history,
author="Daniel Scheffler",
......
......@@ -61,7 +61,7 @@ class Test_JobConfig(TestCase):
JobConfig(self.jobID, db_host=self.db_host, json_config=cfg)
def test_jsonconfig_str_schema_violation(self):
cfg = '{"global_opts": {"exec_mode": "badvalue"}}'
cfg = '{"global_opts": {"inmem_serialization": "badvalue"}}'
with self.assertRaises(ValueError):
JobConfig(self.jobID, db_host=self.db_host, json_config=cfg)
......
......@@ -24,7 +24,7 @@ gmsRepo_rootpath = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')
class Test_DEM_Creator(unittest.TestCase):
def setUp(self):
# Testjob Landsat-8
set_config(exec_mode='Python', job_ID=26186196, db_host=db_host, reset_status=True, is_test=True,
set_config(job_ID=26186196, db_host=db_host, reset_status=True, is_test=True,
path_archive=os.path.join(gmsRepo_rootpath, 'tests', 'data', 'archive_data'))
self.boxMapXY = ((277365.0, 5546625.0), (292365.0, 5546625.0), (292365.0, 5531625.0), (277365.0, 5531625.0))
......
......@@ -31,7 +31,7 @@ class Test_KMeansRSImage(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Testjob Landsat-8
set_config(exec_mode='Python', job_ID=26186196, db_host=db_host, reset_status=True)
set_config(job_ID=26186196, db_host=db_host, reset_status=True)
cls.geoArr = GeoArray(testdata)
cls.geoArr.to_mem()
cls.kmeans = KMeansRSImage(cls.geoArr, n_clusters=10)
......
......@@ -32,7 +32,7 @@ testdata = os.path.join(__path__[0], '../tests/data/hy_spec_data/Bavaria_farmlan
# @classmethod
# def setUpClass(cls):
# # Testjob Landsat-8
# set_config(exec_mode='Python', job_ID=26186196, db_host=db_host, reset_status=True, is_test=True)
# set_config(job_ID=26186196, db_host=db_host, reset_status=True, is_test=True)
# cls.SHC = ReferenceCube_Generator_OLD([testdata, testdata, ], v=False)
#
# def test_generate_reference_cube_L8(self):
......@@ -67,7 +67,7 @@ class Test_ReferenceCube_Generator(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Testjob Landsat-8
set_config(exec_mode='Python', job_ID=26186196, db_host=db_host, reset_status=True, is_test=True)
set_config(job_ID=26186196, db_host=db_host, reset_status=True, is_test=True)
cls.tmpOutdir = tempfile.TemporaryDirectory()
cls.testIms = [testdata, testdata, ]
cls.tgt_sat_sen_list = [
......@@ -136,7 +136,7 @@ class Test_SpectralHomogenizer(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Testjob Landsat-8
cfg = set_config(exec_mode='Python', job_ID=26186196, db_host=db_host, reset_status=True, is_test=True)
cfg = set_config(job_ID=26186196, db_host=db_host, reset_status=True, is_test=True)
cls.SpH = SpectralHomogenizer(classifier_rootDir=cfg.path_spechomo_classif)
cls.testArr_L8 = GeoArray(np.random.randint(1, 10000, (50, 50, 7), dtype=np.int16)) # no band 9, no pan
......
......@@ -31,7 +31,7 @@ class Test_SpectralResampler(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Testjob Landsat-8
set_config(exec_mode='Python', job_ID=26186196, db_host=db_host, reset_status=True, is_test=True)
set_config(job_ID=26186196, db_host=db_host, reset_status=True, is_test=True)
cls.geoArr = GeoArray(testdata)
cls.geoArr.to_mem()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment