Commit dbbed264 authored by Daniel Scheffler's avatar Daniel Scheffler

Fixed issue #103 ('struct.error: unpack requires a buffer of 4 bytes' within...

Fixed issue #103 ('struct.error: unpack requires a buffer of 4 bytes' within SpatialIndexMediator in case of various parallel database accesses) by adding a database lock that blocks parallel database queries.
Signed-off-by: Daniel Scheffler's avatarDaniel Scheffler <danschef@gfz-potsdam.de>
parent 59570fdc
......@@ -6,6 +6,8 @@ History
---------------------
* Fixed missing log messages regarding released locks.
* Fixed issue #103 ('struct.error: unpack requires a buffer of 4 bytes' within SpatialIndexMediator in case of various
parallel database accesses) by adding a database lock that blocks parallel database queries.
0.18.11 (2020-11-03)
......
......@@ -59,6 +59,7 @@ from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc import path_generator as PG
from ..misc.logging import GMS_logger, close_logger
from ..misc.locks import DatabaseLock
from ..misc.spatial_index_mediator import SpatialIndexMediator
from ..misc.definition_dicts import get_GMS_sensorcode, get_outFillZeroSaturated
......@@ -130,14 +131,15 @@ class Scene_finder(object):
"""
SpIM = SpatialIndexMediator(host=CFG.spatial_index_server_host, port=CFG.spatial_index_server_port,
timeout=timeout, retries=10)
self.possib_ref_scenes = SpIM.getFullSceneDataForDataset(envelope=self.boundsLonLat,
timeStart=self.timeStart,
timeEnd=self.timeEnd,
minCloudCover=self.min_cloudcov,
maxCloudCover=self.max_cloudcov,
datasetid=CFG.datasetid_spatial_ref,
refDate=self.src_AcqDate,
maxDaysDelta=self.plusminus_days)
with DatabaseLock(allowed_slots=1, logger=self.logger):
self.possib_ref_scenes = SpIM.getFullSceneDataForDataset(envelope=self.boundsLonLat,
timeStart=self.timeStart,
timeEnd=self.timeEnd,
minCloudCover=self.min_cloudcov,
maxCloudCover=self.max_cloudcov,
datasetid=CFG.datasetid_spatial_ref,
refDate=self.src_AcqDate,
maxDaysDelta=self.plusminus_days)
if self.possib_ref_scenes:
# fill GeoDataFrame with possible ref scene parameters
......
......@@ -59,6 +59,7 @@ from ..misc.logging import GMS_logger, close_logger
from ..misc.database_tools import get_overlapping_scenes_from_postgreSQLdb
from ..misc.path_generator import path_generator
from ..misc.spatial_index_mediator import SpatialIndexMediator
from ..misc.locks import DatabaseLock
if TYPE_CHECKING:
from ..model.gms_object import GMS_identifier # noqa F401 # flake8 issue
......@@ -377,11 +378,12 @@ class DEM_Creator(object):
if use_index_mediator:
SpIM = SpatialIndexMediator(host=CFG.spatial_index_server_host, port=CFG.spatial_index_server_port,
timeout=timeout_sec, retries=10)
scenes = SpIM.getFullSceneDataForDataset(envelope=corner_coord_to_minmax(tgt_corner_coord_lonlat),
timeStart=datetime(1970, 1, 1, 0, 0, 0),
timeEnd=datetime(2100, 12, 31, 0, 0, 0),
minCloudCover=0, maxCloudCover=100,
datasetid=self.dsID_dic[self.dem_sensor])
with DatabaseLock(allowed_slots=1, logger=self.logger):
scenes = SpIM.getFullSceneDataForDataset(envelope=corner_coord_to_minmax(tgt_corner_coord_lonlat),
timeStart=datetime(1970, 1, 1, 0, 0, 0),
timeEnd=datetime(2100, 12, 31, 0, 0, 0),
minCloudCover=0, maxCloudCover=100,
datasetid=self.dsID_dic[self.dem_sensor])
sceneIDs_srtm = [scene.sceneid for scene in scenes]
else:
......@@ -406,12 +408,19 @@ class DEM_Creator(object):
# get overlapping SRTM scene IDs from GMS database
try:
# try to use the SpatialIndexMediator
# noinspection PyBroadException
try:
sceneIDs_srtm = self.get_overlapping_DEM_tiles(tgt_corner_coord_lonlat, timeout_sec)
except ConnectionRefusedError:
# fallback to plain pgSQL
self.logger.warning('SpatialIndexMediator refused connection. Falling back to plain postgreSQL query.')
sceneIDs_srtm = self.get_overlapping_DEM_tiles(tgt_corner_coord_lonlat, use_index_mediator=False)
except Exception as err:
# fallback to plain pgSQL
self.logger.warning('Error while running SpatialIndexMediator database query. '
'Falling back to plain postgreSQL query. '
'Error message was: %s' % str(repr(err)))
sceneIDs_srtm = self.get_overlapping_DEM_tiles(tgt_corner_coord_lonlat, use_index_mediator=False)
if not sceneIDs_srtm:
# fallback to plain pgSQL
......
......@@ -160,6 +160,18 @@ class ProcessLock(SharedResourceLock):
return super(ProcessLock, self).__exit__(exc_type, exc_val, exc_tb)
class DatabaseLock(SharedResourceLock):
def __init__(self, allowed_slots=1, logger=None, **kwargs):
self.disabled = CFG.disable_DB_locks
if not self.disabled:
super(DatabaseLock, self)\
.__init__(name='DatabaseLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
def __exit__(self, exc_type, exc_val, exc_tb):
return super(DatabaseLock, self).__exit__(exc_type, exc_val, exc_tb)
class MemoryReserver(object):
def __init__(self, mem2lock_gb, max_usage=90, logger=None):
"""
......
......@@ -243,6 +243,7 @@ class JobConfig(object):
self.disable_exception_handler = gp('disable_exception_handler')
self.disable_IO_locks = gp('disable_IO_locks')
self.disable_CPU_locks = gp('disable_CPU_locks')
self.disable_DB_locks = gp('disable_DB_locks')
self.disable_memory_locks = gp('disable_memory_locks')
self.min_version_mem_usage_stats = gp('min_version_mem_usage_stats')
self.log_level = gp('log_level')
......
......@@ -27,6 +27,7 @@
"disable_IO_locks": false, /*disable limiting of parallel disk read/write processes*/
"disable_CPU_locks": false, /*disable system-wide limiting of CPUs to be used for GeoMultiSens
(adjustable via 'CPUs_all_jobs')*/
"disable_DB_locks": false, /*disable system-wide limiting of parallel database accesses*/
"disable_memory_locks": false, /*disable reservation of a certain amount of memory per process
NOTE: disabling this might lead to RAM overload*/
"min_version_mem_usage_stats": "0.13.16", /*If 'disable_memory_locks' is False, GeoMultiSens reserves a
......
......@@ -45,6 +45,7 @@ gms_schema_input = dict(
disable_exception_handler=dict(type='boolean', required=False),
disable_IO_locks=dict(type='boolean', required=False),
disable_CPU_locks=dict(type='boolean', required=False),
disable_DB_locks=dict(type='boolean', required=False),
disable_memory_locks=dict(type='boolean', required=False),
min_version_mem_usage_stats=dict(type='string', required=False),
log_level=dict(type='string', required=False, allowed=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
......@@ -215,6 +216,7 @@ parameter_mapping = dict(
disable_exception_handler=('global_opts', 'disable_exception_handler'),
disable_IO_locks=('global_opts', 'disable_IO_locks'),
disable_CPU_locks=('global_opts', 'disable_CPU_locks'),
disable_DB_locks=('global_opts', 'disable_DB_locks'),
disable_memory_locks=('global_opts', 'disable_memory_locks'),
min_version_mem_usage_stats=('global_opts', 'min_version_mem_usage_stats'),
log_level=('global_opts', 'log_level'),
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment