Commit f78b5261 authored by Daniel Scheffler's avatar Daniel Scheffler

Merge branch 'master' into enhancement/improve_docs

parents 2ed4dfdf 0c9ece4a
......@@ -2,6 +2,27 @@
History
=======
0.18.12 (2020-11-17)
--------------------
* The clean-test rule no longer needs coverage as a requirement since this is a test requirement.
* Fixed error while dumping AC inputs (due to logger).
* Fixed FutureWarning regarding the use of GeoDataFrame and GeoSeries for data without geopandas geometry
(switched to plain pandas classes).
* Fixed missing log messages regarding released locks.
* Fixed issue #103 ('struct.error: unpack requires a buffer of 4 bytes' within SpatialIndexMediator in case of various
parallel database accesses) by adding a database lock that blocks parallel database queries.
* Fixed exception while closing AtmCorr.logger.
* Fixed issue #104 (FutureWarning: arrays to stack must be passed as a "sequence" type such as list or tuple.).
* Fixed some type hinting issues.
* GMS_object.get_subset_obj() now returns None in case the subset contains no data at all.
* Added missing MemoryReserver.logger.close() call.
* Fixed an issue causing the L2B/L2C output to contain  wrong spectral bands in case the spectral homogenization is
executed and sort_by_cwl is set to True (due to a wrong wavelength order if no_pan=False).
* SpatialIndexMediator.getFullSceneDataForDataset() now retries the query 10 times in case of a struct.error
(relates to issue #103).
0.18.11 (2020-11-03)
--------------------
......
......@@ -577,10 +577,10 @@ class L1A_object(GMS_object):
# if re.search(r'ETM+', self.sensor) and self.acq_datetime > datetime.datetime(year=2003, month=5, day=31,
# tzinfo=datetime.timezone.utc):
if is_dataset_provided_as_fullScene(self.GMS_identifier):
self.trueDataCornerPos = calc_FullDataset_corner_positions(self.mask_nodata, algorithm='numpy',
assert_four_corners=True)
kw = dict(algorithm='numpy', assert_four_corners=True)
else:
self.trueDataCornerPos = calc_FullDataset_corner_positions(self.mask_nodata, assert_four_corners=False)
kw = dict(algorithm='shapely', assert_four_corners=False)
self.trueDataCornerPos = calc_FullDataset_corner_positions(self.mask_nodata, **kw)
# set true data corner positions (lon/lat coordinates)
trueCorPosXY = [tuple(reversed(i)) for i in self.trueDataCornerPos]
......
......@@ -59,6 +59,7 @@ from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc import path_generator as PG
from ..misc.logging import GMS_logger, close_logger
from ..misc.locks import DatabaseLock
from ..misc.spatial_index_mediator import SpatialIndexMediator
from ..misc.definition_dicts import get_GMS_sensorcode, get_outFillZeroSaturated
......@@ -130,6 +131,7 @@ class Scene_finder(object):
"""
SpIM = SpatialIndexMediator(host=CFG.spatial_index_server_host, port=CFG.spatial_index_server_port,
timeout=timeout, retries=10)
with DatabaseLock(allowed_slots=1, logger=self.logger):
self.possib_ref_scenes = SpIM.getFullSceneDataForDataset(envelope=self.boundsLonLat,
timeStart=self.timeStart,
timeEnd=self.timeEnd,
......
......@@ -929,6 +929,7 @@ class AtmCorr(object):
finally:
# rs_image.logger must be closed properly in any case
if rs_image.logger is not None:
rs_image.logger.close()
# get processing infos
......
......@@ -66,19 +66,25 @@ class L2B_object(L2A_object):
# FIXME exclude or include thermal bands; respect sorted CWLs in context of LayerBandsAssignment
tgt_sat, tgt_sen = datasetid_to_sat_sen(CFG.datasetid_spectral_ref)
# NOTE: get target LBA at L2A, because spectral characteristics of target sensor do not change after AC
tgt_LBA = get_LayerBandsAssignment(
GMS_identifier(satellite=tgt_sat, sensor=tgt_sen, subsystem='',
image_type='RSD', proc_level='L2A', dataset_ID=src_dsID, logger=None))
tgt_gmsid_kw = dict(satellite=tgt_sat,
sensor=tgt_sen,
subsystem='',
image_type='RSD',
dataset_ID=src_dsID,
logger=None)
tgt_LBA = get_LayerBandsAssignment(GMS_identifier(proc_level='L2A', **tgt_gmsid_kw))
if CFG.datasetid_spectral_ref is None:
tgt_cwl = CFG.target_CWL
tgt_fwhm = CFG.target_FWHM
else:
# exclude those bands from CFG.target_CWL and CFG.target_FWHM that have been removed after AC
full_LBA = get_LayerBandsAssignment(
GMS_identifier(satellite=tgt_sat, sensor=tgt_sen, subsystem='',
image_type='RSD', proc_level='L1A', dataset_ID=src_dsID, logger=None),
no_thermal=True, no_pan=False, return_fullLBA=True, sort_by_cwl=True, proc_level='L1A')
full_LBA = get_LayerBandsAssignment(GMS_identifier(proc_level='L1A', **tgt_gmsid_kw),
no_thermal=True,
no_pan=False,
return_fullLBA=True,
sort_by_cwl=True,
proc_level='L1A')
tgt_cwl = [dict(zip(full_LBA, CFG.target_CWL))[bN] for bN in tgt_LBA]
tgt_fwhm = [dict(zip(full_LBA, CFG.target_FWHM))[bN] for bN in tgt_LBA]
......@@ -88,7 +94,7 @@ class L2B_object(L2A_object):
if self.dataset_ID == CFG.datasetid_spectral_ref:
self.logger.info("Spectral homogenization has been skipped because the dataset id equals the dataset id of "
"the spectral refernce sensor.")
"the spectral reference sensor.")
return
if src_cwls == CFG.target_CWL or (self.satellite == tgt_sat and self.sensor == tgt_sen):
......
......@@ -118,7 +118,7 @@ class AccuracyCube(GeoArray):
"CFG.spechomo_bandwise_accuracy is False."
# stack all accuracy layers together
accArr = np.dstack(err_layers.values()).astype('int16')
accArr = np.dstack(list(err_layers.values())).astype('int16')
# apply int16 nodata value
accArr[self._GMS_obj.arr.mask_nodata.astype(np.int8) == 0] = get_outFillZeroSaturated('int16')[0]
......
......@@ -59,6 +59,7 @@ from ..misc.logging import GMS_logger, close_logger
from ..misc.database_tools import get_overlapping_scenes_from_postgreSQLdb
from ..misc.path_generator import path_generator
from ..misc.spatial_index_mediator import SpatialIndexMediator
from ..misc.locks import DatabaseLock
if TYPE_CHECKING:
from ..model.gms_object import GMS_identifier # noqa F401 # flake8 issue
......@@ -377,6 +378,7 @@ class DEM_Creator(object):
if use_index_mediator:
SpIM = SpatialIndexMediator(host=CFG.spatial_index_server_host, port=CFG.spatial_index_server_port,
timeout=timeout_sec, retries=10)
with DatabaseLock(allowed_slots=1, logger=self.logger):
scenes = SpIM.getFullSceneDataForDataset(envelope=corner_coord_to_minmax(tgt_corner_coord_lonlat),
timeStart=datetime(1970, 1, 1, 0, 0, 0),
timeEnd=datetime(2100, 12, 31, 0, 0, 0),
......@@ -406,12 +408,19 @@ class DEM_Creator(object):
# get overlapping SRTM scene IDs from GMS database
try:
# try to use the SpatialIndexMediator
# noinspection PyBroadException
try:
sceneIDs_srtm = self.get_overlapping_DEM_tiles(tgt_corner_coord_lonlat, timeout_sec)
except ConnectionRefusedError:
# fallback to plain pgSQL
self.logger.warning('SpatialIndexMediator refused connection. Falling back to plain postgreSQL query.')
sceneIDs_srtm = self.get_overlapping_DEM_tiles(tgt_corner_coord_lonlat, use_index_mediator=False)
except Exception as err:
# fallback to plain pgSQL
self.logger.warning('Error while running SpatialIndexMediator database query. '
'Falling back to plain postgreSQL query. '
'Error message was: %s' % str(repr(err)))
sceneIDs_srtm = self.get_overlapping_DEM_tiles(tgt_corner_coord_lonlat, use_index_mediator=False)
if not sceneIDs_srtm:
# fallback to plain pgSQL
......
......@@ -96,8 +96,9 @@ class MultiSlotLock(Semaphore):
self.client.delete(self.grabbed_key)
def __exit__(self, exc_type, exc_val, exc_tb):
exitcode = super(MultiSlotLock, self).__exit__(exc_type, exc_val, exc_tb)
self.logger.close()
return super(MultiSlotLock, self).__exit__(exc_type, exc_val, exc_tb)
return exitcode
class SharedResourceLock(MultiSlotLock):
......@@ -134,7 +135,6 @@ class SharedResourceLock(MultiSlotLock):
self.client.delete(self.grabbed_key_jobID)
def __exit__(self, exc_type, exc_val, exc_tb):
self.logger.close()
return super(SharedResourceLock, self).__exit__(exc_type, exc_val, exc_tb)
......@@ -146,7 +146,6 @@ class IOLock(SharedResourceLock):
super(IOLock, self).__init__(name='IOLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
def __exit__(self, exc_type, exc_val, exc_tb):
self.logger.close()
return super(IOLock, self).__exit__(exc_type, exc_val, exc_tb)
......@@ -158,10 +157,21 @@ class ProcessLock(SharedResourceLock):
super(ProcessLock, self).__init__(name='ProcessLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
def __exit__(self, exc_type, exc_val, exc_tb):
self.logger.close()
return super(ProcessLock, self).__exit__(exc_type, exc_val, exc_tb)
class DatabaseLock(SharedResourceLock):
def __init__(self, allowed_slots=1, logger=None, **kwargs):
self.disabled = CFG.disable_DB_locks
if not self.disabled:
super(DatabaseLock, self)\
.__init__(name='DatabaseLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
def __exit__(self, exc_type, exc_val, exc_tb):
return super(DatabaseLock, self).__exit__(exc_type, exc_val, exc_tb)
class MemoryReserver(object):
def __init__(self, mem2lock_gb, max_usage=90, logger=None):
"""
......@@ -310,6 +320,7 @@ class MemoryReserver(object):
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
self.logger.close()
return True if exc_type is None else False
......
......@@ -321,6 +321,12 @@ class SpatialIndexMediator:
else:
raise TimeoutError('Spatial query timed out 10 times!')
except struct.error:
if i < self.retries - 1:
continue
else:
raise
return scenes
......
......@@ -37,7 +37,7 @@ import warnings
import logging
from collections import OrderedDict
from itertools import chain
from typing import Iterable, List, Union, TYPE_CHECKING # noqa F401 # flake8 issue
from typing import Iterable, List, Union, Generator, TYPE_CHECKING # noqa F401 # flake8 issue
import psutil
import time
from pkg_resources import parse_version
......@@ -1277,9 +1277,11 @@ class GMS_object(object):
def get_subset_obj(self, imBounds=None, mapBounds=None, mapBounds_prj=None, out_prj=None, logmsg=None,
progress=False, v=False):
# type: (tuple, tuple, str, str, str, bool, bool) -> GMS_object
"""Returns a subset of the given GMS object, based on the given bounds coordinates.
Array attributes are clipped and relevant metadata keys are updated according to new extent.
# type: (tuple, tuple, str, str, str, bool, bool) -> Union[GMS_object, None]
"""Return a subset of the given GMS object, based on the given bounds coordinates.
Array attributes are clipped and relevant metadata keys are updated according to new extent. In case the subset
does not contain any data but only no-data values, None is returned.
:param imBounds: <tuple> tuple of image coordinates in the form (xmin,xmax,ymin,ymax)
:param mapBounds: <tuple> tuple of map coordinates in the form (xmin,xmax,ymin,ymax)
......@@ -1347,6 +1349,12 @@ class GMS_object(object):
# update array-related attributes of sub_GMS_obj
if arrname == 'arr':
# return None in case the subset object contains only nodata
if subArr.min() == subArr.max() and \
np.std(subArr) == 0 and \
np.unique(subArr) == subArr.nodata:
return None
sub_GMS_obj.MetaObj.map_info = geotransform2mapinfo(subArr.gt, subArr.prj)
sub_GMS_obj.MetaObj.projection = subArr.prj
sub_GMS_obj.MetaObj.rows, sub_GMS_obj.MetaObj.cols = subArr.arr.shape[:2]
......@@ -1647,7 +1655,7 @@ class GMS_object(object):
self.MetaObj.to_odict(), overwrite=overwrite)
def to_tiles(self, blocksize=(2048, 2048)):
# type: (tuple) -> GMS_object
# type: (tuple) -> Generator[GMS_object]
"""Returns a generator object where items represent tiles of the given block size for the GMS object.
# NOTE: it's better to call get_subset_obj (also takes care of tile map infos)
......@@ -1667,7 +1675,7 @@ class GMS_object(object):
yield tileObj
def to_MGRS_tiles(self, pixbuffer=10, v=False):
# type: (int, bool) -> GMS_object
# type: (int, bool) -> Generator[GMS_object]
"""Returns a generator object where items represent the MGRS tiles for the GMS object.
:param pixbuffer: <int> a buffer in pixel values used to generate an overlap between the returned MGRS tiles
......@@ -1724,7 +1732,8 @@ class GMS_object(object):
# validate that the MGRS tile truly contains data
# -> this may not be the case if get_overlapping_MGRS_tiles() yielded invalid tiles due to inaccurate
# self.trueDataCornerLonLat
if True not in list(np.unique(tileObj.arr.mask_nodata)):
if tileObj is None or \
True not in list(np.unique(tileObj.arr.mask_nodata)):
self.logger.info("MGRS tile '%s' has not been skipped because it contains only no data values."
% MGRS_tileID)
continue
......
......@@ -2203,8 +2203,9 @@ def get_LayerBandsAssignment(GMS_id, nBands=None, sort_by_cwl=None, no_thermal=N
dict_cwlSorted_LayerBandsAssignment = {
'TM4': ['1', '2', '3', '4', '5', '7', '6'],
'TM5': ['1', '2', '3', '4', '5', '7', '6'],
'TM7': ['1', '2', '3', '4', '5', '7', '8', '6L', '6H'],
'LDCM': ['1', '2', '3', '4', '5', '9', '6', '7', '8', '10', '11'], }
'TM7': ['1', '2', '3', '8', '4', '5', '7', '6L', '6H'],
'LDCM': ['1', '2', '3', '8', '4', '5', '9', '6', '7', '10', '11'],
}
if nBands is None or nBands == len(dict_LayerBandsAssignment[GMS_sensorcode]):
assert GMS_sensorcode in dict_LayerBandsAssignment, \
......
......@@ -243,6 +243,7 @@ class JobConfig(object):
self.disable_exception_handler = gp('disable_exception_handler')
self.disable_IO_locks = gp('disable_IO_locks')
self.disable_CPU_locks = gp('disable_CPU_locks')
self.disable_DB_locks = gp('disable_DB_locks')
self.disable_memory_locks = gp('disable_memory_locks')
self.min_version_mem_usage_stats = gp('min_version_mem_usage_stats')
self.log_level = gp('log_level')
......@@ -279,7 +280,7 @@ class JobConfig(object):
self.path_archive = gp('path_archive', fallback=get_dbpath('foldername_download'))
self.path_procdata_scenes = gp('path_procdata_scenes', fallback=get_dbpath('foldername_procdata_scenes'))
self.path_procdata_MGRS = gp('path_procdata_MGRS', fallback=get_dbpath('foldername_procdata_MGRS'))
self.path_ECMWF_db = self.absP(self.DB_config_table['path_ECMWF_db'])
self.path_ECMWF_db = gp('path_ECMWF_db', fallback=self.absP(self.DB_config_table['path_ECMWF_db']))
self.path_benchmarks = gp('path_benchmarks', fallback=self.absP(self.DB_config_table['path_benchmarks']))
self.path_job_logs = gp('path_job_logs', fallback=get_dbpath('foldername_job_logs'))
......
......@@ -27,6 +27,7 @@
"disable_IO_locks": false, /*disable limiting of parallel disk read/write processes*/
"disable_CPU_locks": false, /*disable system-wide limiting of CPUs to be used for GeoMultiSens
(adjustable via 'CPUs_all_jobs')*/
"disable_DB_locks": false, /*disable system-wide limiting of parallel database accesses*/
"disable_memory_locks": false, /*disable reservation of a certain amount of memory per process
NOTE: disabling this might lead to RAM overload*/
"min_version_mem_usage_stats": "0.13.16", /*If 'disable_memory_locks' is False, GeoMultiSens reserves a
......
......@@ -45,6 +45,7 @@ gms_schema_input = dict(
disable_exception_handler=dict(type='boolean', required=False),
disable_IO_locks=dict(type='boolean', required=False),
disable_CPU_locks=dict(type='boolean', required=False),
disable_DB_locks=dict(type='boolean', required=False),
disable_memory_locks=dict(type='boolean', required=False),
min_version_mem_usage_stats=dict(type='string', required=False),
log_level=dict(type='string', required=False, allowed=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
......@@ -215,6 +216,7 @@ parameter_mapping = dict(
disable_exception_handler=('global_opts', 'disable_exception_handler'),
disable_IO_locks=('global_opts', 'disable_IO_locks'),
disable_CPU_locks=('global_opts', 'disable_CPU_locks'),
disable_DB_locks=('global_opts', 'disable_DB_locks'),
disable_memory_locks=('global_opts', 'disable_memory_locks'),
min_version_mem_usage_stats=('global_opts', 'min_version_mem_usage_stats'),
log_level=('global_opts', 'log_level'),
......
......@@ -77,7 +77,7 @@ def L1A_map(dataset_dict): # map (scene-wise parallelization)
@EXC_H.log_uncaught_exceptions
@update_proc_status
def L1A_map_1(dataset_dict, block_size=None): # map (scene-wise parallelization)
# type: (dict) -> List[L1A_P.L1A_object]
# type: (dict, tuple) -> List[L1A_P.L1A_object]
L1A_obj = L1A_P.L1A_object(**dataset_dict)
L1A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
......@@ -87,8 +87,10 @@ def L1A_map_1(dataset_dict, block_size=None): # map (scene-wise parallelization
L1A_obj.apply_nodata_mask_to_ObjAttr('arr') # nodata mask is automatically calculated
L1A_obj.add_rasterInfo_to_MetaObj()
L1A_obj.reference_data('UTM')
tiles = list(L1A_obj.to_tiles(
block_size if block_size else CFG.tiling_block_size_XY)) # cut (block-wise parallelization)
tiles = [tile for tile in
# cut (block-wise parallelization)
L1A_obj.to_tiles(blocksize=block_size if block_size else CFG.tiling_block_size_XY)
if tile is not None] # None is returned in case the tile contains no data at all
return tiles
......@@ -182,7 +184,7 @@ def L1C_map(L1B_objs):
@EXC_H.log_uncaught_exceptions
@update_proc_status
def L2A_map(L1C_objs, block_size=None, return_tiles=True):
# type: (Union[List[L1C_P.L1C_object], Tuple[L1C_P.L1C_object]]) -> Union[List[L2A_P.L2A_object], L2A_P.L2A_object]
# type: (Union[List[L1C_P.L1C_object], Tuple[L1C_P.L1C_object]], tuple, bool) -> Union[List[L2A_P.L2A_object], L2A_P.L2A_object] # noqa
"""Geometric homogenization.
Performs correction of geometric displacements, resampling to target grid of the usecase and merges multiple
......@@ -229,6 +231,7 @@ def L2A_map(L1C_objs, block_size=None, return_tiles=True):
if return_tiles:
L2A_tiles = list(L2A_obj.to_tiles(blocksize=block_size if block_size else CFG.tiling_block_size_XY))
L2A_tiles = [i for i in L2A_tiles if i is not None] # None is returned in case the tile contains no data at all
[L2A_tile.record_mem_usage() for L2A_tile in L2A_tiles]
return L2A_tiles
else:
......
......@@ -216,13 +216,15 @@ class ProcessController(object):
if os.path.isfile(assumed_path_GMS_file):
GMS_file_dict = INP_R.GMSfile2dict(assumed_path_GMS_file)
target_LayerBandsAssignment = \
get_LayerBandsAssignment(GMS_identifier(
get_LayerBandsAssignment(
GMS_identifier(
image_type=dataset['image_type'],
satellite=dataset['satellite'],
sensor=dataset['sensor'],
subsystem=dataset['subsystem'] if path_logfile != path_logfile_merged_ss else '',
proc_level=ProcL, # must be respected because LBA changes after atm. Corr.
dataset_ID=dataset['dataset_ID']), nBands=(1 if dataset['sensormode'] == 'P' else None))
dataset_ID=dataset['dataset_ID']),
nBands=(1 if dataset['sensormode'] == 'P' else None))
# check if the LayerBandsAssignment of the written dataset on disk equals the
# desired LayerBandsAssignment
......
......@@ -24,5 +24,5 @@
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
__version__ = '0.18.11'
__versionalias__ = '20201103.01'
__version__ = '0.18.12'
__versionalias__ = '20201127.01'
......@@ -21,7 +21,7 @@ pyinstrument
pyorbital
pyproj
pyrsr>=0.3.1
py_tools_ds>=0.12.4
py_tools_ds>=0.16.2
psutil
pytz
redis
......
......@@ -62,7 +62,7 @@ req = [
'pyorbital',
'pyproj',
'pyrsr>=0.3.1',
'py_tools_ds>=0.12.4',
'py_tools_ds>=0.16.2',
'pytz',
'redis', # named redis on PyPI and redis-py on conda-forge
'redis-semaphore',
......
......@@ -27,7 +27,6 @@
"""Unit test package for testproject."""
import os
import sklearn # noqa # avoids a static TLS ImportError during runtime of SICOR (when importing sklearn there)
# set database host during tests
db_host = 'localhost' if 'GMS_db_host' not in os.environ else os.environ['GMS_db_host']
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment