Commit 880f9ef6 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Added config options 'spatial_index_server_host' and...

Added config options 'spatial_index_server_host' and 'spatial_index_server_port'. Catched ConnectionRefusedError during connection to index server. Bugfix SpatialIndexMediatorServer.status. Added tempdir deletion to controller shutdown.
parent f9d6f4ec
...@@ -87,7 +87,9 @@ class Scene_finder(object): ...@@ -87,7 +87,9 @@ class Scene_finder(object):
""" """
for i in range(10): for i in range(10):
try: try:
SpIM = SpatialIndexMediator(timeout=timeout) SpIM = SpatialIndexMediator(host=CFG.spatial_index_server_host,
port=CFG.spatial_index_server_port,
timeout=timeout)
self.possib_ref_scenes = \ self.possib_ref_scenes = \
SpIM.getFullSceneDataForDataset(self.boundsLonLat, self.timeStart, self.timeEnd, self.min_cloudcov, SpIM.getFullSceneDataForDataset(self.boundsLonLat, self.timeStart, self.timeEnd, self.min_cloudcov,
self.max_cloudcov, CFG.datasetid_spatial_ref, self.max_cloudcov, CFG.datasetid_spatial_ref,
......
...@@ -856,7 +856,7 @@ class GMS_JOB(object): ...@@ -856,7 +856,7 @@ class GMS_JOB(object):
count_no_match = len(list_entityids) - len(list_sceneIDs) count_no_match = len(list_entityids) - len(list_sceneIDs)
if count_no_match: if count_no_match:
warnings.warn('%s datasets could not be found the database. They cannot be processed.') warnings.warn('%s datasets could not be found the database. They cannot be processed.' % count_no_match)
return self.from_sceneIDlist(list_sceneIDs, virtual_sensor_id, return self.from_sceneIDlist(list_sceneIDs, virtual_sensor_id,
datasetid_spatial_ref=datasetid_spatial_ref, comment=comment) datasetid_spatial_ref=datasetid_spatial_ref, comment=comment)
......
...@@ -12,7 +12,7 @@ except ImportError: ...@@ -12,7 +12,7 @@ except ImportError:
from logging import getLogger from logging import getLogger
from ..options.config import GMS_config as CFG from ..options.config import GMS_config as CFG
from .spatial_index_mediator import SpatialIndexMediatorServer from .spatial_index_mediator import SpatialIndexMediatorServer, Connection
from .exceptions import GMSEnvironmentError, MissingNonPipLibraryWarning from .exceptions import GMSEnvironmentError, MissingNonPipLibraryWarning
__author__ = 'Daniel Scheffler' __author__ = 'Daniel Scheffler'
...@@ -35,9 +35,13 @@ class GMSEnvironment(object): ...@@ -35,9 +35,13 @@ class GMSEnvironment(object):
if not SpatIdxSrv.is_running: if not SpatIdxSrv.is_running:
SpatIdxSrv.start() SpatIdxSrv.start()
# test connection
conn = Connection(host=CFG.spatial_index_server_host, port=CFG.spatial_index_server_port, timeout=5.0)
conn.disconnect()
os.environ['GMS_SPAT_IDX_SRV_STATUS'] = 'available' os.environ['GMS_SPAT_IDX_SRV_STATUS'] = 'available'
except GMSEnvironmentError as e: except Exception as e:
self.logger.error(e, exc_info=False) self.logger.error(e, exc_info=False)
self.logger.warning('Coregistration will be skipped!') self.logger.warning('Coregistration will be skipped!')
os.environ['GMS_SPAT_IDX_SRV_STATUS'] = 'unavailable' os.environ['GMS_SPAT_IDX_SRV_STATUS'] = 'unavailable'
......
...@@ -44,10 +44,15 @@ class SpatialIndexMediatorServer: ...@@ -44,10 +44,15 @@ class SpatialIndexMediatorServer:
""" """
outputStr = self._communicate('status') outputStr = self._communicate('status')
# decrypt # decrypt status
running = 'is running' in outputStr running = 'is running' in outputStr
# get PID
_process_id = re.search('with pid ([\d]*)', outputStr) _process_id = re.search('with pid ([\d]*)', outputStr)
process_id = int(_process_id.group(1)) if _process_id else None if _process_id and _process_id.group(1):
process_id = int(_process_id.group(1))
else:
process_id = None
return {'running': running, 'process_id': process_id} return {'running': running, 'process_id': process_id}
...@@ -108,7 +113,7 @@ class SpatialIndexMediator: ...@@ -108,7 +113,7 @@ class SpatialIndexMediator:
""" message value for a full scene query message """ """ message value for a full scene query message """
# def __init__(self, host="geoms.gfz-potsdam.de", port=8654): # def __init__(self, host="geoms.gfz-potsdam.de", port=8654):
def __init__(self, host="localhost", port=8654, timeout=5.0): # FIXME could be a problem on other nodes def __init__(self, host="localhost", port=8654, timeout=5.0):
""" """
Establishes a connection to the spatial index mediator server. Establishes a connection to the spatial index mediator server.
...@@ -278,7 +283,10 @@ class Connection: ...@@ -278,7 +283,10 @@ class Connection:
def __init__(self, host, port, timeout): def __init__(self, host, port, timeout):
# connect to index mediator server # connect to index mediator server
self.socket = socket.create_connection((host, port), timeout) try:
self.socket = socket.create_connection((host, port), timeout)
except ConnectionRefusedError:
raise ConnectionRefusedError('The spatial index mediator server refused the connection!')
# send hello and confirm response # send hello and confirm response
if not self.__greet(): if not self.__greet():
......
...@@ -197,6 +197,10 @@ class JobConfig(object): ...@@ -197,6 +197,10 @@ class JobConfig(object):
gp('inmem_serialization', json_globts['inmem_serialization']) gp('inmem_serialization', json_globts['inmem_serialization'])
self.parallelization_level = \ self.parallelization_level = \
gp('parallelization_level', json_globts['parallelization_level']) gp('parallelization_level', json_globts['parallelization_level'])
self.spatial_index_server_host = \
gp('spatial_index_server_host', json_globts['spatial_index_server_host'])
self.spatial_index_server_port = \
gp('spatial_index_server_port', json_globts['spatial_index_server_port'])
self.CPUs = \ self.CPUs = \
gp('CPUs', json_globts['CPUs'], fallback=multiprocessing.cpu_count()) gp('CPUs', json_globts['CPUs'], fallback=multiprocessing.cpu_count())
self.delete_old_output = \ self.delete_old_output = \
......
...@@ -3,7 +3,10 @@ ...@@ -3,7 +3,10 @@
"inmem_serialization": false, /*If "true", all intermediate processing results are kept in memory. This avoids "inmem_serialization": false, /*If "true", all intermediate processing results are kept in memory. This avoids
disk I/O but requires a lot of RAM. Implemented for execution via Flink.*/ disk I/O but requires a lot of RAM. Implemented for execution via Flink.*/
"parallelization_level": "scenes", /*"scenes" or "tiles"*/ "parallelization_level": "scenes", /*"scenes" or "tiles"*/
"db_host": "localhost", "spatial_index_server_host": "localhost", /*name of the host that runs the spatial index mediator server
NOTE: The host that runs the GeoMultiSens database has to be
CLI frontend or to the set_config function directly!*/
"spatial_index_server_port": 8654, /*"port used for connecting to the spatial index mediator server"*/
"CPUs": "None", /*number of CPU cores to be used for processing (default: "None" -> use all available)*/ "CPUs": "None", /*number of CPU cores to be used for processing (default: "None" -> use all available)*/
"delete_old_output": false, /*whether to delete previously created output of the given job ID*/ "delete_old_output": false, /*whether to delete previously created output of the given job ID*/
"allow_subMultiprocessing": true, /*allow multiprocessing within multiprocessing workers*/ "allow_subMultiprocessing": true, /*allow multiprocessing within multiprocessing workers*/
......
...@@ -5,7 +5,8 @@ gms_schema_input = dict( ...@@ -5,7 +5,8 @@ gms_schema_input = dict(
schema=dict( schema=dict(
inmem_serialization=dict(type='boolean', required=False), inmem_serialization=dict(type='boolean', required=False),
parallelization_level=dict(type='string', required=False, allowed=['scenes', 'tiles']), parallelization_level=dict(type='string', required=False, allowed=['scenes', 'tiles']),
db_host=dict(type='string', required=False), spatial_index_server_host=dict(type='string', required=False),
spatial_index_server_port=dict(type='integer', required=False),
CPUs=dict(type='integer', required=False, nullable=True), CPUs=dict(type='integer', required=False, nullable=True),
delete_old_output=dict(type='boolean', required=False), delete_old_output=dict(type='boolean', required=False),
allow_subMultiprocessing=dict(type='boolean', required=False), allow_subMultiprocessing=dict(type='boolean', required=False),
......
...@@ -11,6 +11,7 @@ from itertools import chain ...@@ -11,6 +11,7 @@ from itertools import chain
import signal import signal
import re import re
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import shutil
from ..io import output_writer as OUT_W from ..io import output_writer as OUT_W
from ..io import input_reader as INP_R from ..io import input_reader as INP_R
...@@ -504,6 +505,11 @@ class process_controller(object): ...@@ -504,6 +505,11 @@ class process_controller(object):
del self.logger del self.logger
shutdown_loggers() shutdown_loggers()
# clear any temporary files
tempdir = os.path.join(self.config.path_tempdir + 'GeoMultiSens_*')
self.logger.warning('Deleting temporary directory %s.' % tempdir)
shutil.rmtree(tempdir)
raise KeyboardInterrupt # terminate execution and show traceback raise KeyboardInterrupt # terminate execution and show traceback
def benchmark(self): def benchmark(self):
...@@ -719,7 +725,7 @@ class process_controller(object): ...@@ -719,7 +725,7 @@ class process_controller(object):
{'failed_sceneids': sceneids_failed, # update 'failed_sceneids' column {'failed_sceneids': sceneids_failed, # update 'failed_sceneids' column
'finishtime': self.config.end_time, # add job finish timestamp 'finishtime': self.config.end_time, # add job finish timestamp
'status': self.config.status}, # update 'job_status' column 'status': self.config.status}, # update 'job_status' column
{'id': self.config.ID}) {'id': self.config.ID}, timeout=30000)
def update_DB_job_statistics(self, usecase_datalist): def update_DB_job_statistics(self, usecase_datalist):
""" """
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment