Commit 663d4555 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Replaced some calls of MultiProcessLock with IOLock.

Added option 'write_ENVIclassif_cloudmask' (fixes issue #72).
parent 03f6a463
......@@ -24,7 +24,7 @@ from . import geoprocessing as GEOP
from ..io import output_writer as OUT_W
from ..misc import helper_functions as HLP_F
from ..misc.definition_dicts import get_outFillZeroSaturated, is_dataset_provided_as_fullScene
from ..misc.locks import MultiSlotLock
from ..misc.locks import IOLock
from ..model.gms_object import GMS_object
from ..model import metadata as META
......@@ -176,7 +176,7 @@ class L1A_object(GMS_object):
rasObj = GEOP.GEOPROCESSING(paths_files2stack[0], self.logger, subset=subset)
# perform layer stack
with MultiSlotLock('IOLock', allowed_threads=CFG.max_parallel_reads_writes, logger=self.logger):
with IOLock(allowed_slots=CFG.max_parallel_reads_writes, logger=self.logger):
if CFG.inmem_serialization and path_output is None: # numpy array output
self.arr = rasObj.Layerstacking(paths_files2stack)
self.path_InFilePreprocessor = paths_files2stack[0]
......@@ -197,7 +197,7 @@ class L1A_object(GMS_object):
rasObj = GEOP.GEOPROCESSING(path_file2load, self.logger, subset=subset)
# read a single file
with MultiSlotLock('IOLock', allowed_threads=CFG.max_parallel_reads_writes, logger=self.logger):
with IOLock(allowed_slots=CFG.max_parallel_reads_writes, logger=self.logger):
if CFG.inmem_serialization and path_output is None: # numpy array output
self.arr = gdalnumeric.LoadFile(path_file2load) if subset is None else \
gdalnumeric.LoadFile(path_file2load, rasObj.colStart, rasObj.rowStart, rasObj.cols, rasObj.rows)
......
......@@ -21,10 +21,10 @@ except RedisConnectionError:
class MultiSlotLock(Lock):
def __init__(self, name, allowed_threads=1, logger=None, **kwargs):
def __init__(self, name, allowed_slots=1, logger=None, **kwargs):
self.conn = redis_conn
self.name = name
self.allowed_threads = allowed_threads or 0
self.allowed_threads = allowed_slots or 0
self.logger = logger or GMS_logger("RedisLock: '%s'" % self.name)
self.kwargs = kwargs
......@@ -32,7 +32,7 @@ class MultiSlotLock(Lock):
self.final_name = ''
self._acquired = None
if allowed_threads and redis_conn:
if allowed_slots and redis_conn:
logged = False
while True:
time.sleep(random.uniform(0, 1.5)) # avoids race conditions in case multiple tasks are waiting
......@@ -76,11 +76,11 @@ class MultiSlotLock(Lock):
# this happens in case the lock has already been acquired by another instance of MultiSlotLock due
# to a race condition (time gap between finding the free slot and the call of self.acquire())
# -> in that case: re-initialize to get a new free slot
self.__init__(self.name, allowed_threads=self.allowed_threads, logger=self.logger,
self.__init__(self.name, allowed_slots=self.allowed_threads, logger=self.logger,
**self.kwargs)
if self._acquired is False: # and not None
self.__init__(self.name, allowed_threads=self.allowed_threads, logger=self.logger,
self.__init__(self.name, allowed_slots=self.allowed_threads, logger=self.logger,
**self.kwargs)
# print(self.final_name.split('GMS_%s__' % CFG.ID)[1], self._acquired)
......@@ -99,16 +99,16 @@ class MultiSlotLock(Lock):
class IOLock(MultiSlotLock):
def __init__(self, processes=1, logger=None, **kwargs):
super(IOLock, self).__init__(name='IOLock', allowed_threads=processes, logger=logger, **kwargs)
def __init__(self, allowed_slots=1, logger=None, **kwargs):
super(IOLock, self).__init__(name='IOLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
class ProcessLock(MultiSlotLock):
def __init__(self, processes=1, logger=None, **kwargs):
super(ProcessLock, self).__init__(name='ProcessLock', allowed_threads=processes, logger=logger, **kwargs)
def __init__(self, allowed_slots=1, logger=None, **kwargs):
super(ProcessLock, self).__init__(name='ProcessLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
def acquire_process_lock(processes=None, logger=None):
def acquire_process_lock(allowed_slots=None, logger=None):
if not logger:
logger = logging.getLogger('ProcessLock')
logger.setLevel('INFO')
......@@ -117,7 +117,7 @@ def acquire_process_lock(processes=None, logger=None):
@functools.wraps(func) # needed to avoid pickling errors
def wrapped_func(*args, **kwargs):
with ProcessLock(allowed_threads=processes, logger=logger):
with ProcessLock(allowed_threads=allowed_slots, logger=logger):
result = func(*args, **kwargs)
return result
......
......@@ -48,7 +48,7 @@ from ..io import input_reader as INP_R
from ..io import output_writer as OUT_W
from ..misc import helper_functions as HLP_F
from ..misc import definition_dicts as DEF_D
from ..misc.locks import MultiSlotLock
from ..misc.locks import IOLock
if TYPE_CHECKING:
from ..algorithms.L1C_P import L1C_object # noqa F401 # flake8 issue
......@@ -1397,7 +1397,7 @@ class GMS_object(Dataset):
# loop through all attributes to write and execute writer #
###########################################################
with MultiSlotLock('IOLock', allowed_threads=CFG.max_parallel_reads_writes, logger=self.logger):
with IOLock(allowed_slots=CFG.max_parallel_reads_writes, logger=self.logger):
for arrayname in attributes2write:
descriptor = '%s_%s' % (image_type_dict[arrayname], self.proc_level)
......
......@@ -316,6 +316,9 @@ class JobConfig(object):
gp('mgrs_pixel_buffer', json_processors['general_opts']['mgrs_pixel_buffer'])
self.output_data_compression = \
gp('output_data_compression', json_processors['general_opts']['output_data_compression'])
self.write_ENVIclassif_cloudmask = \
gp('write_ENVIclassif_cloudmask', json_processors['general_opts']['write_ENVIclassif_cloudmask'])
# processor specific opts
# L1A
......
......@@ -65,7 +65,9 @@
"scale_factor_TOARef": 10000,
"scale_factor_BOARef": 10000,
"mgrs_pixel_buffer": 10, /*pixel buffer to apply to each mgrs tile polygon when clipping output data*/
"output_data_compression": false /*compress output data*/
"output_data_compression": false, /*compress output data*/
"write_ENVIclassif_cloudmask": true /*whether to write additional ENVI classification file for cloud masks
(useful for visualizing colored cloud masks)*/
},
"L1A": { /*Level 1A processing: Data import and metadata homogenization*/
......
......@@ -54,6 +54,7 @@ gms_schema_input = dict(
scale_factor_BOARef=dict(type='integer', required=False),
mgrs_pixel_buffer=dict(type='integer', required=False),
output_data_compression=dict(type='boolean', required=False),
write_ENVIclassif_cloudmask=dict(type='boolean', required=False),
)),
L1A=dict(type='dict', required=False, schema=dict(
run_processor=dict(type='boolean', required=False),
......
......@@ -40,7 +40,7 @@ def L1A_map(dataset_dict): # map (scene-wise parallelization)
L1A_obj.apply_nodata_mask_to_ObjAttr('mask_clouds', 0)
if CFG.exec_L1AP[1]:
L1A_obj.to_ENVI()
L1A_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
L1A_obj.delete_tempFiles()
return L1A_obj
......@@ -69,7 +69,7 @@ def L1A_map_2(L1A_tile): # map (block-wise parallelization)
# type: (L1A_P.L1A_object) -> L1A_P.L1A_object
L1A_tile.calc_TOARadRefTemp()
if not CFG.inmem_serialization:
L1A_tile.to_ENVI(is_tempfile=True)
L1A_tile.to_ENVI(CFG.write_ENVIclassif_cloudmask, is_tempfile=True)
return L1A_tile
......@@ -83,7 +83,7 @@ def L1A_map_3(L1A_obj): # map (scene-wise parallelization)
L1A_obj.calc_orbit_overpassParams() # requires corner positions
L1A_obj.apply_nodata_mask_to_ObjAttr('mask_clouds', 0)
if CFG.exec_L1AP[1]:
L1A_obj.to_ENVI()
L1A_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
L1A_obj.delete_tempFiles()
else:
L1A_obj.delete_tempFiles()
......@@ -101,7 +101,7 @@ def L1B_map(L1A_obj):
L1B_obj.compute_global_shifts()
if CFG.exec_L1BP[1]:
L1B_obj.to_ENVI()
L1B_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
L1B_obj.delete_tempFiles()
return L1B_obj
......@@ -136,7 +136,7 @@ def L1C_map(L1B_objs):
# write outputs and delete temporary data
for L1C_obj in L1C_objs:
if CFG.exec_L1CP[1]:
L1C_obj.to_ENVI()
L1C_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
if L1C_obj.arr_shape == 'cube':
L1C_obj.delete_tempFiles()
L1C_obj.delete_ac_input_arrays()
......@@ -186,7 +186,7 @@ def L2A_map(L1C_objs, block_size=None, return_tiles=True):
# write output
if CFG.exec_L2AP[1]:
L2A_obj.to_ENVI()
L2A_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
# delete tempfiles of separate subsystem GMS objects
[L2A_obj.delete_tempFiles() for L2A_obj in L2A_objs]
......@@ -205,7 +205,7 @@ def L2B_map(L2A_obj):
L2B_obj = L2B_P.L2B_object(L2A_obj)
L2B_obj.spectral_homogenization()
if CFG.exec_L2BP[1]:
L2B_obj.to_ENVI()
L2B_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
if L2B_obj.arr_shape == 'cube':
L2B_obj.delete_tempFiles()
return L2B_obj
......@@ -218,7 +218,8 @@ def L2C_map(L2B_obj):
L2C_obj = L2C_P.L2C_object(L2B_obj)
if CFG.exec_L2CP[1]:
L2C_MRGS_tiles = L2C_obj.to_MGRS_tiles(pixbuffer=CFG.mgrs_pixel_buffer)
[MGRS_tile.to_ENVI(compression=CFG.output_data_compression) for MGRS_tile in L2C_MRGS_tiles]
[MGRS_tile.to_ENVI(CFG.write_ENVIclassif_cloudmask,
compression=CFG.output_data_compression) for MGRS_tile in L2C_MRGS_tiles]
L2C_obj.delete_tempFiles()
return L2C_obj # contains no array data in Python mode
......@@ -233,7 +234,7 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
:param list_dataset_dicts_per_scene:
:return:
"""
with ProcessLock(processes=CFG.CPUs_all_jobs,
with ProcessLock(allowed_slots=CFG.CPUs_all_jobs,
logger=GMS_logger('log__%s' % CFG.ID, fmt_suffix=list_dataset_dicts_per_scene[0]['scene_ID'],
log_level=CFG.log_level, append=True)):
......
......@@ -21,7 +21,7 @@ class Test_MultiSlotLock(unittest.TestCase):
self.config = set_config(job_ID=26186196, db_host=db_host, reset_status=True)
def test_acquire_release(self):
msl = MultiSlotLock('unittest', allowed_threads=15)
msl = MultiSlotLock('unittest', allowed_slots=15)
msl.acquire()
# with self.assertRaises(AlreadyAcquired): # FIXME currently does not work because acquire blocks
......@@ -33,7 +33,7 @@ class Test_MultiSlotLock(unittest.TestCase):
self.failIf(True in [i.startswith('unittest') for i in msl.existing_locks])
def test_with_statement(self):
with MultiSlotLock('unittest', allowed_threads=15) as lock:
with MultiSlotLock('unittest', allowed_slots=15) as lock:
self.assertNotEquals(lock, None)
......@@ -42,7 +42,7 @@ class Test_ProcessLock(unittest.TestCase):
self.config = set_config(job_ID=26186196, db_host=db_host, reset_status=True)
def test_acquire_release(self):
msl = ProcessLock(processes=15)
msl = ProcessLock(allowed_slots=15)
msl.acquire()
# with self.assertRaises(AlreadyAcquired):
......@@ -54,5 +54,5 @@ class Test_ProcessLock(unittest.TestCase):
self.failIf(True in [i.startswith('ProcessLock') for i in msl.existing_locks])
def test_with_statement(self):
with ProcessLock(processes=15) as lock:
with ProcessLock(allowed_slots=15) as lock:
self.assertNotEquals(lock, None)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment