diff --git a/HISTORY.rst b/HISTORY.rst index 06427a472ed010074940ab424968c2c66388de79..55515f6069dc198585de02b25daf39a1794b5a08 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -6,6 +6,8 @@ History --------------------- * Fixed missing log messages regarding released locks. +* Fixed issue #103 ('struct.error: unpack requires a buffer of 4 bytes' within SpatialIndexMediator in case of various + parallel database accesses) by adding a database lock that blocks parallel database queries. 0.18.11 (2020-11-03) diff --git a/gms_preprocessing/algorithms/L1B_P.py b/gms_preprocessing/algorithms/L1B_P.py index 0b50582357a7656bdae8bb3d8db12b356b0dbf65..46bd687496cf2e0482cfa6353943f8c1f1e684c5 100644 --- a/gms_preprocessing/algorithms/L1B_P.py +++ b/gms_preprocessing/algorithms/L1B_P.py @@ -59,6 +59,7 @@ from ..misc import database_tools as DB_T from ..misc import helper_functions as HLP_F from ..misc import path_generator as PG from ..misc.logging import GMS_logger, close_logger +from ..misc.locks import DatabaseLock from ..misc.spatial_index_mediator import SpatialIndexMediator from ..misc.definition_dicts import get_GMS_sensorcode, get_outFillZeroSaturated @@ -130,14 +131,15 @@ class Scene_finder(object): """ SpIM = SpatialIndexMediator(host=CFG.spatial_index_server_host, port=CFG.spatial_index_server_port, timeout=timeout, retries=10) - self.possib_ref_scenes = SpIM.getFullSceneDataForDataset(envelope=self.boundsLonLat, - timeStart=self.timeStart, - timeEnd=self.timeEnd, - minCloudCover=self.min_cloudcov, - maxCloudCover=self.max_cloudcov, - datasetid=CFG.datasetid_spatial_ref, - refDate=self.src_AcqDate, - maxDaysDelta=self.plusminus_days) + with DatabaseLock(allowed_slots=1, logger=self.logger): + self.possib_ref_scenes = SpIM.getFullSceneDataForDataset(envelope=self.boundsLonLat, + timeStart=self.timeStart, + timeEnd=self.timeEnd, + minCloudCover=self.min_cloudcov, + maxCloudCover=self.max_cloudcov, + datasetid=CFG.datasetid_spatial_ref, + refDate=self.src_AcqDate, + maxDaysDelta=self.plusminus_days) if self.possib_ref_scenes: # fill GeoDataFrame with possible ref scene parameters diff --git a/gms_preprocessing/io/input_reader.py b/gms_preprocessing/io/input_reader.py index 3204be704fe38fd3c3eae4d4822aa2ae04d8b9a6..a99b9bef63db5eaa8b92f3e452ce33952c53f669 100644 --- a/gms_preprocessing/io/input_reader.py +++ b/gms_preprocessing/io/input_reader.py @@ -59,6 +59,7 @@ from ..misc.logging import GMS_logger, close_logger from ..misc.database_tools import get_overlapping_scenes_from_postgreSQLdb from ..misc.path_generator import path_generator from ..misc.spatial_index_mediator import SpatialIndexMediator +from ..misc.locks import DatabaseLock if TYPE_CHECKING: from ..model.gms_object import GMS_identifier # noqa F401 # flake8 issue @@ -377,11 +378,12 @@ class DEM_Creator(object): if use_index_mediator: SpIM = SpatialIndexMediator(host=CFG.spatial_index_server_host, port=CFG.spatial_index_server_port, timeout=timeout_sec, retries=10) - scenes = SpIM.getFullSceneDataForDataset(envelope=corner_coord_to_minmax(tgt_corner_coord_lonlat), - timeStart=datetime(1970, 1, 1, 0, 0, 0), - timeEnd=datetime(2100, 12, 31, 0, 0, 0), - minCloudCover=0, maxCloudCover=100, - datasetid=self.dsID_dic[self.dem_sensor]) + with DatabaseLock(allowed_slots=1, logger=self.logger): + scenes = SpIM.getFullSceneDataForDataset(envelope=corner_coord_to_minmax(tgt_corner_coord_lonlat), + timeStart=datetime(1970, 1, 1, 0, 0, 0), + timeEnd=datetime(2100, 12, 31, 0, 0, 0), + minCloudCover=0, maxCloudCover=100, + datasetid=self.dsID_dic[self.dem_sensor]) sceneIDs_srtm = [scene.sceneid for scene in scenes] else: @@ -406,12 +408,19 @@ class DEM_Creator(object): # get overlapping SRTM scene IDs from GMS database try: # try to use the SpatialIndexMediator + # noinspection PyBroadException try: sceneIDs_srtm = self.get_overlapping_DEM_tiles(tgt_corner_coord_lonlat, timeout_sec) except ConnectionRefusedError: # fallback to plain pgSQL self.logger.warning('SpatialIndexMediator refused connection. Falling back to plain postgreSQL query.') sceneIDs_srtm = self.get_overlapping_DEM_tiles(tgt_corner_coord_lonlat, use_index_mediator=False) + except Exception as err: + # fallback to plain pgSQL + self.logger.warning('Error while running SpatialIndexMediator database query. ' + 'Falling back to plain postgreSQL query. ' + 'Error message was: %s' % str(repr(err))) + sceneIDs_srtm = self.get_overlapping_DEM_tiles(tgt_corner_coord_lonlat, use_index_mediator=False) if not sceneIDs_srtm: # fallback to plain pgSQL diff --git a/gms_preprocessing/misc/locks.py b/gms_preprocessing/misc/locks.py index cbf012008d4f98c1df36683c4956be4358cb1622..f85f398a540705447ffec80c1ca215ab546ef47d 100644 --- a/gms_preprocessing/misc/locks.py +++ b/gms_preprocessing/misc/locks.py @@ -160,6 +160,18 @@ class ProcessLock(SharedResourceLock): return super(ProcessLock, self).__exit__(exc_type, exc_val, exc_tb) +class DatabaseLock(SharedResourceLock): + def __init__(self, allowed_slots=1, logger=None, **kwargs): + self.disabled = CFG.disable_DB_locks + + if not self.disabled: + super(DatabaseLock, self)\ + .__init__(name='DatabaseLock', allowed_slots=allowed_slots, logger=logger, **kwargs) + + def __exit__(self, exc_type, exc_val, exc_tb): + return super(DatabaseLock, self).__exit__(exc_type, exc_val, exc_tb) + + class MemoryReserver(object): def __init__(self, mem2lock_gb, max_usage=90, logger=None): """ diff --git a/gms_preprocessing/options/config.py b/gms_preprocessing/options/config.py index 96ff536023d9a2c49ea996de1406026a5a999c09..36196fbb237059179f82ded38e8cf568229f96d9 100644 --- a/gms_preprocessing/options/config.py +++ b/gms_preprocessing/options/config.py @@ -243,6 +243,7 @@ class JobConfig(object): self.disable_exception_handler = gp('disable_exception_handler') self.disable_IO_locks = gp('disable_IO_locks') self.disable_CPU_locks = gp('disable_CPU_locks') + self.disable_DB_locks = gp('disable_DB_locks') self.disable_memory_locks = gp('disable_memory_locks') self.min_version_mem_usage_stats = gp('min_version_mem_usage_stats') self.log_level = gp('log_level') diff --git a/gms_preprocessing/options/options_default.json b/gms_preprocessing/options/options_default.json index e9396bd8cf5a0ecbb4925f3c4adfa905b9fd3335..3c4d4a788779fdb9141a4b44c9f96f5afa606109 100644 --- a/gms_preprocessing/options/options_default.json +++ b/gms_preprocessing/options/options_default.json @@ -27,6 +27,7 @@ "disable_IO_locks": false, /*disable limiting of parallel disk read/write processes*/ "disable_CPU_locks": false, /*disable system-wide limiting of CPUs to be used for GeoMultiSens (adjustable via 'CPUs_all_jobs')*/ + "disable_DB_locks": false, /*disable system-wide limiting of parallel database accesses*/ "disable_memory_locks": false, /*disable reservation of a certain amount of memory per process NOTE: disabling this might lead to RAM overload*/ "min_version_mem_usage_stats": "0.13.16", /*If 'disable_memory_locks' is False, GeoMultiSens reserves a diff --git a/gms_preprocessing/options/options_schema.py b/gms_preprocessing/options/options_schema.py index fbcd6e7c7e3085553180196377d267bd7136ab9a..80c05cd63b184a8e2a4c0ea7abc56c76dbbc1c3a 100644 --- a/gms_preprocessing/options/options_schema.py +++ b/gms_preprocessing/options/options_schema.py @@ -45,6 +45,7 @@ gms_schema_input = dict( disable_exception_handler=dict(type='boolean', required=False), disable_IO_locks=dict(type='boolean', required=False), disable_CPU_locks=dict(type='boolean', required=False), + disable_DB_locks=dict(type='boolean', required=False), disable_memory_locks=dict(type='boolean', required=False), min_version_mem_usage_stats=dict(type='string', required=False), log_level=dict(type='string', required=False, allowed=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']), @@ -215,6 +216,7 @@ parameter_mapping = dict( disable_exception_handler=('global_opts', 'disable_exception_handler'), disable_IO_locks=('global_opts', 'disable_IO_locks'), disable_CPU_locks=('global_opts', 'disable_CPU_locks'), + disable_DB_locks=('global_opts', 'disable_DB_locks'), disable_memory_locks=('global_opts', 'disable_memory_locks'), min_version_mem_usage_stats=('global_opts', 'min_version_mem_usage_stats'), log_level=('global_opts', 'log_level'),