Commit be56836c authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Added IO locks for array reader and writer.

parent 00c74adf
......@@ -24,6 +24,7 @@ from . import geoprocessing as GEOP
from ..io import output_writer as OUT_W
from ..misc import helper_functions as HLP_F
from ..misc.definition_dicts import get_outFillZeroSaturated, is_dataset_provided_as_fullScene
from ..misc.locks import Lock
from ..model.gms_object import GMS_object
from ..model import metadata as META
......@@ -175,12 +176,13 @@ class L1A_object(GMS_object):
rasObj = GEOP.GEOPROCESSING(paths_files2stack[0], self.logger, subset=subset)
# perform layer stack
if CFG.inmem_serialization and path_output is None: # numpy array output
self.arr = rasObj.Layerstacking(paths_files2stack)
self.path_InFilePreprocessor = paths_files2stack[0]
else: # 'MEMORY' or physical output
rasObj.Layerstacking(paths_files2stack, path_output=path_output) # this writes an output (gdal_merge)
self.arr = path_output
with Lock('IO', allowed_threads=1, logger=self.logger): # FIXME hardcoded
if CFG.inmem_serialization and path_output is None: # numpy array output
self.arr = rasObj.Layerstacking(paths_files2stack)
self.path_InFilePreprocessor = paths_files2stack[0]
else: # 'MEMORY' or physical output
rasObj.Layerstacking(paths_files2stack, path_output=path_output) # writes an output (gdal_merge)
self.arr = path_output
else:
assert image_files != [], 'No image files found within the archive matching the expected naming scheme.'
......@@ -194,14 +196,16 @@ class L1A_object(GMS_object):
subset = ['block', [[sub_dim[0], sub_dim[1] + 1], [sub_dim[2], sub_dim[3] + 1]]]
rasObj = GEOP.GEOPROCESSING(path_file2load, self.logger, subset=subset)
if CFG.inmem_serialization and path_output is None: # numpy array output
self.arr = gdalnumeric.LoadFile(path_file2load) if subset is None else \
gdalnumeric.LoadFile(path_file2load, rasObj.colStart, rasObj.rowStart, rasObj.cols, rasObj.rows)
self.path_InFilePreprocessor = path_file2load
else: # 'MEMORY' or physical output
GEOP.ndarray2gdal(rasObj.tondarray(), path_output,
geotransform=rasObj.geotransform, projection=rasObj.projection)
self.arr = path_output
# read a single file
with Lock('IO', allowed_threads=1, logger=self.logger): # FIXME hardcoded
if CFG.inmem_serialization and path_output is None: # numpy array output
self.arr = gdalnumeric.LoadFile(path_file2load) if subset is None else \
gdalnumeric.LoadFile(path_file2load, rasObj.colStart, rasObj.rowStart, rasObj.cols, rasObj.rows)
self.path_InFilePreprocessor = path_file2load
else: # 'MEMORY' or physical output
GEOP.ndarray2gdal(rasObj.tondarray(), path_output,
geotransform=rasObj.geotransform, projection=rasObj.projection)
self.arr = path_output
os.chdir(project_dir)
......
......@@ -15,6 +15,7 @@ from ecmwfapi.api import APIKeyFetchError, get_apikey_values
from ..options.config import GMS_config as CFG
from .spatial_index_mediator import SpatialIndexMediatorServer, Connection
from .exceptions import GMSEnvironmentError, MissingNonPipLibraryWarning
from ..misc.locks import redis_conn
__author__ = 'Daniel Scheffler'
......@@ -47,6 +48,11 @@ class GMSEnvironment(object):
self.logger.warning('Coregistration will be skipped!')
os.environ['GMS_SPAT_IDX_SRV_STATUS'] = 'unavailable'
def _check_redis_server(self):
if not redis_conn:
self.logger.warning("Unable to connect to redis server. Is the server installed and running? For "
"installation on Ubuntu, use 'sudo apt install redis-server'.")
def _check_nonpip_packages(self):
"""Check for not pip-installable packages."""
......@@ -84,6 +90,7 @@ class GMSEnvironment(object):
# javaLibs = []
self._check_spatial_index_mediator_server()
self._check_redis_server()
self._check_nonpip_packages()
def check_ports(self):
......
# -*- coding: utf-8 -*-
__author__ = 'Daniel Scheffler'
import time
import redis_lock
import logging
try:
redis_conn = redis_lock.StrictRedis(host='localhost')
redis_conn.keys() # may raise ConnectionError
except ConnectionError:
redis_conn = None
class Lock(redis_lock.Lock):
def __init__(self, name, allowed_threads=1, logger=None, **kwargs):
self.conn = redis_conn
self.allowed_threads = allowed_threads
self.allowed_slot_names = ['%s, slot #%s' % (name, i) for i in range(1, allowed_threads + 1)]
if redis_conn:
if allowed_threads > 1:
while True:
name_free_slot = self.get_free_slot_name()
if not name_free_slot:
time.sleep(0.2)
else:
break
name = name_free_slot
super().__init__(self.conn, name, **kwargs)
else:
pass
self.name = name
self.logger = logger or logging.getLogger("RedisLock: '%s'" % name)
def get_existing_locks(self):
return [i.decode('utf8').split('lock:')[1] for i in self.conn.keys()]
def get_free_slot_name(self):
free_slots = [sn for sn in self.allowed_slot_names if sn not in self.get_existing_locks()]
if free_slots:
return free_slots[0]
def __enter__(self):
if self.conn:
super().__enter__()
self.logger.info("Acquired lock '%s'." % self.name)
else:
pass
def __exit__(self, exc_type=None, exc_value=None, traceback=None):
if self.conn:
super().__exit__(exc_type=exc_type, exc_value=exc_value, traceback=traceback)
self.logger.info("Released lock '%s'." % self.name)
else:
pass
......@@ -48,6 +48,7 @@ from ..io import input_reader as INP_R
from ..io import output_writer as OUT_W
from ..misc import helper_functions as HLP_F
from ..misc import definition_dicts as DEF_D
from ..misc.locks import Lock
if TYPE_CHECKING:
from ..algorithms.L1C_P import L1C_object # noqa F401 # flake8 issue
......@@ -1307,196 +1308,198 @@ class GMS_object(Dataset):
# loop through all attributes to write and execute writer #
###########################################################
for arrayname in attributes2write:
descriptor = '%s_%s' % (image_type_dict[arrayname], self.proc_level)
with Lock('IO', allowed_threads=1, logger=self.logger):
for arrayname in attributes2write:
descriptor = '%s_%s' % (image_type_dict[arrayname], self.proc_level)
if hasattr(self, arrayname) and getattr(self, arrayname) is not None:
arrayval = getattr(self, arrayname) # can be a GeoArray (in mem / not in mem) or a numpy.ndarray
if hasattr(self, arrayname) and getattr(self, arrayname) is not None:
arrayval = getattr(self, arrayname) # can be a GeoArray (in mem / not in mem) or a numpy.ndarray
# initial assertions
assert arrayname in metaAttr_dict, "GMS_object.to_ENVI cannot yet write %s attribute." % arrayname
assert isinstance(arrayval, (GeoArray, np.ndarray)), "Expected a GeoArray instance or a numpy array " \
"for object attribute %s. Got %s." % (
arrayname, type(arrayval))
# initial assertions
assert arrayname in metaAttr_dict, "GMS_object.to_ENVI cannot yet write %s attribute." % arrayname
assert isinstance(arrayval, (GeoArray, np.ndarray)), "Expected a GeoArray instance or a numpy " \
"array for object attribute %s. Got %s." \
% (arrayname, type(arrayval))
outpath_hdr = self.pathGen.get_outPath_hdr(arrayname)
outpath_hdr = os.path.splitext(outpath_hdr)[0] + '__TEMPFILE.hdr' if is_tempfile else outpath_hdr
if not os.path.exists(os.path.dirname(outpath_hdr)):
os.makedirs(os.path.dirname(outpath_hdr))
out_dtype = out_dtype_dict[arrayname]
meta_attr = metaAttr_dict[arrayname]
outpath_hdr = self.pathGen.get_outPath_hdr(arrayname)
outpath_hdr = os.path.splitext(outpath_hdr)[0] + '__TEMPFILE.hdr' if is_tempfile else outpath_hdr
if not os.path.exists(os.path.dirname(outpath_hdr)):
os.makedirs(os.path.dirname(outpath_hdr))
out_dtype = out_dtype_dict[arrayname]
meta_attr = metaAttr_dict[arrayname]
if not is_tempfile:
self.log_for_fullArr_or_firstTile('Writing %s %s.' % (self.proc_level, print_dict[descriptor]))
#########################
# GeoArray in disk mode #
#########################
if isinstance(arrayval, GeoArray) and not arrayval.is_inmem:
# object attribute contains GeoArray in disk mode. This is usually the case if the attribute has
# been read in Python exec mode from previous processing level and has NOT been modified during
# processing.
assert os.path.isfile(arrayval.filePath), "The object attribute '%s' contains a not existing " \
"file path: %s" % (arrayname, arrayval.filePath)
path_to_array = arrayval.filePath
#############
# full cube #
#############
if self.arr_shape == 'cube':
# image data can just be copied
outpath_arr = os.path.splitext(outpath_hdr)[0] + (os.path.splitext(path_to_array)[1]
if os.path.splitext(path_to_array)[
1] else '.%s' % self.outInterleave)
hdr2readMeta = os.path.splitext(path_to_array)[0] + '.hdr'
meta2write = INP_R.read_ENVIhdr_to_dict(hdr2readMeta, self.logger) \
if arrayname in ['mask_clouds', ] else getattr(self, meta_attr)
meta2write.update({'interleave': self.outInterleave,
'byte order': 0,
'header offset': 0,
'data type': DEF_D.dtype_lib_Python_IDL[out_dtype],
'lines': self.shape_fullArr[0],
'samples': self.shape_fullArr[1]})
meta2write = metaDict_to_metaODict(meta2write, self.logger)
if '__TEMPFILE' in path_to_array:
os.rename(path_to_array, outpath_arr)
envi.write_envi_header(outpath_hdr, meta2write)
HLP_F.silentremove(path_to_array)
HLP_F.silentremove(os.path.splitext(path_to_array)[0] + '.hdr')
else:
try:
shutil.copy(path_to_array, outpath_arr) # copies file + permissions
except PermissionError:
# prevents permission error if outputfile already exists and is owned by another user
HLP_F.silentremove(outpath_arr)
shutil.copy(path_to_array, outpath_arr)
envi.write_envi_header(outpath_hdr, meta2write)
assert OUT_W.check_header_not_empty(outpath_hdr), "HEADER EMPTY: %s" % outpath_hdr
setattr(self, arrayname, outpath_arr) # refresh arr/masks/mask_clouds attributes
if arrayname == 'masks':
setattr(self, 'mask_nodata', outpath_arr)
if not is_tempfile:
self.log_for_fullArr_or_firstTile('Writing %s %s.' % (self.proc_level, print_dict[descriptor]))
#########################
# 'block' or 'MGRS_tile #
# GeoArray in disk mode #
#########################
else:
# data have to be read in subset and then be written
if self.arr_pos:
(rS, rE), (cS, cE) = self.arr_pos
cols, rows = cE - cS + 1, rE - rS + 1
else:
cS, rS, cols, rows = [None] * 4
if '__TEMPFILE' in path_to_array:
raise NotImplementedError
if arrayname not in ['mask_clouds', 'mask_nodata']:
# read image data in subset
tempArr = gdalnumeric.LoadFile(path_to_array, cS, rS, cols,
rows) # bands, rows, columns OR rows, columns
arr2write = tempArr if len(tempArr.shape) == 2 else \
np.swapaxes(np.swapaxes(tempArr, 0, 2), 0, 1) # rows, columns, (bands)
if isinstance(arrayval, GeoArray) and not arrayval.is_inmem:
# object attribute contains GeoArray in disk mode. This is usually the case if the attribute has
# been read in Python exec mode from previous processing level and has NOT been modified during
# processing.
assert os.path.isfile(arrayval.filePath), "The object attribute '%s' contains a not existing " \
"file path: %s" % (arrayname, arrayval.filePath)
path_to_array = arrayval.filePath
#############
# full cube #
#############
if self.arr_shape == 'cube':
# image data can just be copied
outpath_arr = os.path.splitext(outpath_hdr)[0] + (os.path.splitext(path_to_array)[1]
if os.path.splitext(path_to_array)[
1] else '.%s' % self.outInterleave)
hdr2readMeta = os.path.splitext(path_to_array)[0] + '.hdr'
meta2write = INP_R.read_ENVIhdr_to_dict(hdr2readMeta, self.logger) \
if arrayname in ['mask_clouds', ] else getattr(self, meta_attr)
meta2write.update({'interleave': self.outInterleave,
'byte order': 0,
'header offset': 0,
'data type': DEF_D.dtype_lib_Python_IDL[out_dtype],
'lines': self.shape_fullArr[0],
'samples': self.shape_fullArr[1]})
meta2write = metaDict_to_metaODict(meta2write, self.logger)
if '__TEMPFILE' in path_to_array:
os.rename(path_to_array, outpath_arr)
envi.write_envi_header(outpath_hdr, meta2write)
HLP_F.silentremove(path_to_array)
HLP_F.silentremove(os.path.splitext(path_to_array)[0] + '.hdr')
else:
try:
shutil.copy(path_to_array, outpath_arr) # copies file + permissions
except PermissionError:
# prevents permission error if outfile already exists and is owned by another user
HLP_F.silentremove(outpath_arr)
shutil.copy(path_to_array, outpath_arr)
envi.write_envi_header(outpath_hdr, meta2write)
assert OUT_W.check_header_not_empty(outpath_hdr), "HEADER EMPTY: %s" % outpath_hdr
setattr(self, arrayname, outpath_arr) # refresh arr/masks/mask_clouds attributes
if arrayname == 'masks':
setattr(self, 'mask_nodata', outpath_arr)
#########################
# 'block' or 'MGRS_tile #
#########################
else:
# read mask data in subset
previous_procL = DEF_D.proc_chain[DEF_D.proc_chain.index(self.proc_level) - 1]
PG_obj = PG.path_generator(self.__dict__, proc_level=previous_procL)
path_masks = PG_obj.get_path_maskdata()
arr2write = INP_R.read_mask_subset(path_masks, arrayname, self.logger,
(self.arr_shape, self.arr_pos))
setattr(self, arrayname, arr2write)
arrayval = getattr(self, arrayname) # can be a GeoArray (in mem / not in mem) or a numpy.ndarray
####################################
# np.ndarray or GeoArray in memory #
####################################
if isinstance(arrayval, np.ndarray) or isinstance(arrayval, GeoArray) and arrayval.is_inmem:
# must be an if-condition because arrayval can change attribute type from not-inmem-GeoArray
# to np.ndarray
'''object attribute contains array'''
# convert array and metadata of mask clouds to envi classification file ready data
arr2write, meta2write = OUT_W.mask_to_ENVI_Classification(self, arrayname) \
if arrayname in ['mask_clouds', ] else (arrayval, getattr(self, meta_attr))
arr2write = arr2write.arr if isinstance(arr2write, GeoArray) else arr2write
assert isinstance(arr2write, np.ndarray), 'Expected a numpy ndarray. Got %s.' % type(arr2write)
##########################
# full cube or MGRS_tile #
##########################
if self.arr_shape in ['cube', 'MGRS_tile']:
# TODO write a function that implements the algorithm from Tiles_Writer for writing cubes
# TODO -> no need for Spectral Python
# write cube-like attributes
meta2write = metaDict_to_metaODict(meta2write, self.logger)
success = 1
if arrayname not in ['mask_clouds', ]:
if compression:
success = OUT_W.write_ENVI_compressed(outpath_hdr, arr2write, meta2write)
if not success:
warnings.warn('Written compressed ENVI file is not GDAL readable! '
'Writing uncompressed file.')
if not compression or not success:
envi.save_image(outpath_hdr, arr2write, metadata=meta2write, dtype=out_dtype,
interleave=self.outInterleave, ext=self.outInterleave, force=True)
# data have to be read in subset and then be written
if self.arr_pos:
(rS, rE), (cS, cE) = self.arr_pos
cols, rows = cE - cS + 1, rE - rS + 1
else:
cS, rS, cols, rows = [None] * 4
if '__TEMPFILE' in path_to_array:
raise NotImplementedError
if arrayname not in ['mask_clouds', 'mask_nodata']:
# read image data in subset
tempArr = gdalnumeric.LoadFile(path_to_array, cS, rS, cols,
rows) # bands, rows, columns OR rows, columns
arr2write = tempArr if len(tempArr.shape) == 2 else \
np.swapaxes(np.swapaxes(tempArr, 0, 2), 0, 1) # rows, columns, (bands)
else:
# read mask data in subset
previous_procL = DEF_D.proc_chain[DEF_D.proc_chain.index(self.proc_level) - 1]
PG_obj = PG.path_generator(self.__dict__, proc_level=previous_procL)
path_masks = PG_obj.get_path_maskdata()
arr2write = INP_R.read_mask_subset(path_masks, arrayname, self.logger,
(self.arr_shape, self.arr_pos))
setattr(self, arrayname, arr2write)
arrayval = getattr(self, arrayname) # can be a GeoArray (in mem / not in mem) or a numpy.ndarray
####################################
# np.ndarray or GeoArray in memory #
####################################
if isinstance(arrayval, np.ndarray) or isinstance(arrayval, GeoArray) and arrayval.is_inmem:
# must be an if-condition because arrayval can change attribute type from not-inmem-GeoArray
# to np.ndarray
'''object attribute contains array'''
# convert array and metadata of mask clouds to envi classification file ready data
arr2write, meta2write = OUT_W.mask_to_ENVI_Classification(self, arrayname) \
if arrayname in ['mask_clouds', ] else (arrayval, getattr(self, meta_attr))
arr2write = arr2write.arr if isinstance(arr2write, GeoArray) else arr2write
assert isinstance(arr2write, np.ndarray), 'Expected a numpy ndarray. Got %s.' % type(arr2write)
##########################
# full cube or MGRS_tile #
##########################
if self.arr_shape in ['cube', 'MGRS_tile']:
# TODO write a function that implements the algorithm from Tiles_Writer for writing cubes
# TODO -> no need for Spectral Python
# write cube-like attributes
meta2write = metaDict_to_metaODict(meta2write, self.logger)
success = 1
if arrayname not in ['mask_clouds', ]:
if compression:
success = OUT_W.write_ENVI_compressed(outpath_hdr, arr2write, meta2write)
if not success:
warnings.warn('Written compressed ENVI file is not GDAL readable! '
'Writing uncompressed file.')
if not compression or not success:
envi.save_image(outpath_hdr, arr2write, metadata=meta2write, dtype=out_dtype,
interleave=self.outInterleave, ext=self.outInterleave, force=True)
else:
if compression:
success = OUT_W.write_ENVI_compressed(outpath_hdr, arr2write, meta2write)
if not success:
self.logger.warning('Written compressed ENVI file is not GDAL readable! '
'Writing uncompressed file.')
if not compression or not success:
class_names = meta2write['class names']
class_colors = meta2write['class lookup']
envi.save_classification(outpath_hdr, arr2write, metadata=meta2write,
dtype=out_dtype, interleave=self.outInterleave,
ext=self.outInterleave, force=True,
class_names=class_names, class_colors=class_colors)
if os.path.exists(outpath_hdr):
OUT_W.reorder_ENVI_header(outpath_hdr, OUT_W.enviHdr_keyOrder)
#########################
# block-like attributes #
#########################
else:
if compression:
success = OUT_W.write_ENVI_compressed(outpath_hdr, arr2write, meta2write)
if not success:
self.logger.warning('Written compressed ENVI file is not GDAL readable! '
'Writing uncompressed file.')
if not compression or not success:
class_names = meta2write['class names']
class_colors = meta2write['class lookup']
envi.save_classification(outpath_hdr, arr2write, metadata=meta2write, dtype=out_dtype,
interleave=self.outInterleave, ext=self.outInterleave,
force=True, class_names=class_names, class_colors=class_colors)
if os.path.exists(outpath_hdr):
OUT_W.reorder_ENVI_header(outpath_hdr, OUT_W.enviHdr_keyOrder)
#########################
# block-like attributes #
#########################
else:
if compression: # FIXME
warnings.warn(
'Compression is not yet supported for GMS object tiles. Writing uncompressed data.')
# write block-like attributes
bands = arr2write.shape[2] if len(arr2write.shape) == 3 else 1
out_shape = tuple(self.shape_fullArr[:2]) + (bands,)
OUT_W.Tiles_Writer(arr2write, outpath_hdr, out_shape, out_dtype, self.outInterleave,
out_meta=meta2write, arr_pos=self.arr_pos, overwrite=False)
assert OUT_W.check_header_not_empty(outpath_hdr), "HEADER EMPTY: %s" % outpath_hdr
outpath_arr = os.path.splitext(outpath_hdr)[0] + '.%s' % self.outInterleave
if not CFG.inmem_serialization:
setattr(self, arrayname, outpath_arr) # replace array by output path
if arrayname == 'masks':
setattr(self, 'mask_nodata', outpath_arr)
# if compression:
# raise NotImplementedError # FIXME implement working compression
# HLP_F.ENVIfile_to_ENVIcompressed(outpath_hdr)
if compression: # FIXME
warnings.warn(
'Compression is not yet supported for GMS object tiles. Writing uncompressed data.')
# write block-like attributes
bands = arr2write.shape[2] if len(arr2write.shape) == 3 else 1
out_shape = tuple(self.shape_fullArr[:2]) + (bands,)
OUT_W.Tiles_Writer(arr2write, outpath_hdr, out_shape, out_dtype, self.outInterleave,
out_meta=meta2write, arr_pos=self.arr_pos, overwrite=False)
assert OUT_W.check_header_not_empty(outpath_hdr), "HEADER EMPTY: %s" % outpath_hdr
outpath_arr = os.path.splitext(outpath_hdr)[0] + '.%s' % self.outInterleave
if not CFG.inmem_serialization:
setattr(self, arrayname, outpath_arr) # replace array by output path
if arrayname == 'masks':
setattr(self, 'mask_nodata', outpath_arr)
# if compression:
# raise NotImplementedError # FIXME implement working compression
# HLP_F.ENVIfile_to_ENVIcompressed(outpath_hdr)
else:
if not is_tempfile:
......
......@@ -17,3 +17,4 @@ cerberus
nested_dict
openpyxl
timeout_decorator
redis_lock
......@@ -15,7 +15,7 @@ requirements = [
'matplotlib', 'numpy', 'scikit-learn', 'scipy', 'gdal', 'pyproj', 'shapely', 'ephem', 'pyorbital', 'dill', 'pytz',
'pandas', 'numba', 'spectral>=0.16', 'geopandas', 'iso8601', 'pyinstrument', 'geoalchemy2', 'sqlalchemy',
'psycopg2', 'py_tools_ds>=0.12.4', 'geoarray>=0.7.1', 'arosics>=0.6.6', 'six', 'tqdm', 'jsmin', 'cerberus',
'nested_dict', 'openpyxl', 'timeout_decorator'
'nested_dict', 'openpyxl', 'timeout_decorator', 'redis_lock'
# spectral<0.16 has some problems with writing signed integer 8bit data
# fmask # conda install -c conda-forge python-fmask
# 'pyhdf', # conda install --yes -c conda-forge pyhdf
......
......@@ -74,6 +74,7 @@ dependencies:
- nested_dict
- openpyxl
- timeout_decorator
- redis_lock
- py_tools_ds>=0.12.4
- geoarray>=0.7.0
- arosics>=0.6.6
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment