Commit 3419bed5 authored by Daniel Scheffler's avatar Daniel Scheffler Committed by Mathias Peters
Browse files

Revised DEM_Creator to fix TimeoutErrors during spatial query.

parent 1e9252a9
...@@ -2,6 +2,13 @@ before_script: ...@@ -2,6 +2,13 @@ before_script:
- git lfs pull - git lfs pull
# Advise GitLab that these environment vars should be loaded from the Variables config.
variables:
GMS_DB_HOST: SECURE
GMS_INDEX_HOST: SECURE
GMS_INDEX_PORT: SECURE
stages: stages:
- test - test
- deploy - deploy
...@@ -13,7 +20,9 @@ test_gms_preprocessing: ...@@ -13,7 +20,9 @@ test_gms_preprocessing:
- source /root/miniconda3/bin/activate - source /root/miniconda3/bin/activate
- export GDAL_DATA=/root/miniconda3/share/gdal - export GDAL_DATA=/root/miniconda3/share/gdal
- export PYTHONPATH=$PYTHONPATH:/root # /root <- directory needed later - export PYTHONPATH=$PYTHONPATH:/root # /root <- directory needed later
- export GMS_db_host=geoms - export GMS_db_host=${GMS_DB_HOST}
- export GMS_index_host=${GMS_INDEX_HOST}
- export GMS_index_port=${GMS_INDEX_PORT}
# update sicor # update sicor
# - conda install -y -q -c conda-forge basemap # - conda install -y -q -c conda-forge basemap
# - rm -rf context/sicor # - rm -rf context/sicor
......
...@@ -12,7 +12,7 @@ import zipfile ...@@ -12,7 +12,7 @@ import zipfile
from tempfile import NamedTemporaryFile as tempFile from tempfile import NamedTemporaryFile as tempFile
from logging import Logger from logging import Logger
from matplotlib import pyplot as plt from matplotlib import pyplot as plt
from typing import Union, Dict, List # noqa F401 # flake8 issue from typing import Union, Dict, List, Tuple # noqa F401 # flake8 issue
from datetime import datetime from datetime import datetime
import dill import dill
...@@ -469,23 +469,25 @@ class DEM_Creator(object): ...@@ -469,23 +469,25 @@ class DEM_Creator(object):
# transform to Longitude/Latitude coordinates # transform to Longitude/Latitude coordinates
tgt_corner_coord_lonlat = [transform_any_prj(prj, 4326, X, Y) for X, Y in cornerCoords_tgt] tgt_corner_coord_lonlat = [transform_any_prj(prj, 4326, X, Y) for X, Y in cornerCoords_tgt]
# handle coordinates crossing the 180 degress meridian # handle coordinates crossing the 180 degrees meridian
xvals = [x for x, y in tgt_corner_coord_lonlat] xvals = [x for x, y in tgt_corner_coord_lonlat]
if max(xvals) - min(xvals) > 180: if max(xvals) - min(xvals) > 180:
tgt_corner_coord_lonlat = [(x, y) if x > 0 else (x + 360, y) for x, y in tgt_corner_coord_lonlat] tgt_corner_coord_lonlat = [(x, y) if x > 0 else (x + 360, y) for x, y in tgt_corner_coord_lonlat]
return tgt_corner_coord_lonlat return tgt_corner_coord_lonlat
def _get_DEMPathes_to_include(self, tgt_corner_coord_lonlat): def get_overlapping_DEM_tiles(self, tgt_corner_coord_lonlat, timeout_sec=30, use_index_mediator=True):
"""Returns the pathes of DEM files to merge in order to generate a DEM covering the given area of interest. # type: (List[Tuple], int, bool) -> list
"""Get the overlapping DEM tiles for the given extent.
:param tgt_corner_coord_lonlat: list of longitude/latitude target coordinates [[X,Y], [X,Y], ...]] :param tgt_corner_coord_lonlat: list of longitude/latitude target coordinates [[X,Y], [X,Y], ...]]
:return: list of GDAL readable pathes :param timeout_sec: database query timeout (seconds)
:param use_index_mediator: whether to use or not to use the SpatialIndexMediator (default: True)
:return: list of matching DEM tile scene IDs
""" """
# get overlapping SRTM scene IDs from GMS database if use_index_mediator:
try:
SpIM = SpatialIndexMediator(host=CFG.spatial_index_server_host, port=CFG.spatial_index_server_port, SpIM = SpatialIndexMediator(host=CFG.spatial_index_server_host, port=CFG.spatial_index_server_port,
timeout=30, retries=10) timeout=timeout_sec, retries=10)
scenes = SpIM.getFullSceneDataForDataset(envelope=corner_coord_to_minmax(tgt_corner_coord_lonlat), scenes = SpIM.getFullSceneDataForDataset(envelope=corner_coord_to_minmax(tgt_corner_coord_lonlat),
timeStart=datetime(1970, 1, 1, 0, 0, 0), timeStart=datetime(1970, 1, 1, 0, 0, 0),
timeEnd=datetime(2100, 12, 31, 0, 0, 0), timeEnd=datetime(2100, 12, 31, 0, 0, 0),
...@@ -493,19 +495,41 @@ class DEM_Creator(object): ...@@ -493,19 +495,41 @@ class DEM_Creator(object):
datasetid=self.dsID_dic[self.dem_sensor]) datasetid=self.dsID_dic[self.dem_sensor])
sceneIDs_srtm = [scene.sceneid for scene in scenes] sceneIDs_srtm = [scene.sceneid for scene in scenes]
# except ConnectionRefusedError: else:
sceneIDs_srtm = get_overlapping_scenes_from_postgreSQLdb(self.db_conn,
table='scenes',
tgt_corners_lonlat=tgt_corner_coord_lonlat,
conditions=['datasetid=%s'
% self.dsID_dic[self.dem_sensor]],
timeout=timeout_sec*1000) # milliseconds
sceneIDs_srtm = [i[0] for i in sceneIDs_srtm]
return sceneIDs_srtm
def _get_DEMPathes_to_include(self, tgt_corner_coord_lonlat, timeout_sec=30):
# type: (List[Tuple], int) -> list
"""Return the paths of DEM files to merge in order to generate a DEM covering the given area of interest.
:param tgt_corner_coord_lonlat: list of longitude/latitude target coordinates [(X,Y), (X,Y), ...]]
:param timeout_sec: database query timeout (seconds)
:return: list of GDAL readable paths
"""
# get overlapping SRTM scene IDs from GMS database
try:
# try to use the SpatialIndexMediator
try:
sceneIDs_srtm = self.get_overlapping_DEM_tiles(tgt_corner_coord_lonlat, timeout_sec)
except ConnectionRefusedError:
# fallback to plain pgSQL
sceneIDs_srtm = self.get_overlapping_DEM_tiles(tgt_corner_coord_lonlat, use_index_mediator=False)
if not sceneIDs_srtm: if not sceneIDs_srtm:
dsID = self.dsID_dic[self.dem_sensor] # fallback to plain pgSQL
sceneIDs_srtm = get_overlapping_scenes_from_postgreSQLdb(self.db_conn, sceneIDs_srtm = self.get_overlapping_DEM_tiles(tgt_corner_coord_lonlat, use_index_mediator=False)
table='scenes',
tgt_corners_lonlat=tgt_corner_coord_lonlat,
conditions=['datasetid=%s' % dsID],
timeout=30000) # default (15sec) is not enough
sceneIDs_srtm = [i[0] for i in sceneIDs_srtm]
except TimeoutError: except TimeoutError:
raise TimeoutError('Spatial database query for %s DEM generation timed out after 30 seconds.' raise TimeoutError('Spatial database query for %s DEM generation timed out after %s seconds.'
% self.dem_sensor) % (self.dem_sensor, timeout_sec))
if not sceneIDs_srtm: if not sceneIDs_srtm:
raise RuntimeError('No matching %s scene IDs for DEM generation found.' % self.dem_sensor) raise RuntimeError('No matching %s scene IDs for DEM generation found.' % self.dem_sensor)
......
...@@ -52,6 +52,7 @@ path_options_default = os.path.join(path_gmslib, 'options', 'options_default.jso ...@@ -52,6 +52,7 @@ path_options_default = os.path.join(path_gmslib, 'options', 'options_default.jso
def set_config(job_ID, json_config='', inmem_serialization=False, parallelization_level='scenes', db_host='localhost', def set_config(job_ID, json_config='', inmem_serialization=False, parallelization_level='scenes', db_host='localhost',
spatial_index_server_host='localhost', spatial_index_server_port=8654,
reset_status=False, delete_old_output=False, exec_L1AP=None, reset_status=False, delete_old_output=False, exec_L1AP=None,
exec_L1BP=None, exec_L1CP=None, exec_L2AP=None, exec_L2BP=None, exec_L2CP=None, CPUs=None, exec_L1BP=None, exec_L1CP=None, exec_L2AP=None, exec_L2BP=None, exec_L2CP=None, CPUs=None,
allow_subMultiprocessing=True, disable_exception_handler=True, log_level='INFO', allow_subMultiprocessing=True, disable_exception_handler=True, log_level='INFO',
...@@ -65,8 +66,10 @@ def set_config(job_ID, json_config='', inmem_serialization=False, parallelizatio ...@@ -65,8 +66,10 @@ def set_config(job_ID, json_config='', inmem_serialization=False, parallelizatio
:param inmem_serialization: False: write intermediate results to disk in order to save memory :param inmem_serialization: False: write intermediate results to disk in order to save memory
True: keep intermediate results in memory in order to save IO time True: keep intermediate results in memory in order to save IO time
:param parallelization_level: <str> choices: 'scenes' - parallelization on scene-level :param parallelization_level: <str> choices: 'scenes' - parallelization on scene-level
'tiles' - parallelisation on tile-level 'tiles' - parallelization on tile-level
:param db_host: host name of the server that runs the postgreSQL database :param db_host: host name of the server that runs the postgreSQL database
:param spatial_index_server_host: host name of the server that runs the SpatialIndexMediator
:param spatial_index_server_port: port used for connecting to SpatialIndexMediator
:param reset_status: whether to reset the job status or not (default=False) :param reset_status: whether to reset the job status or not (default=False)
:param delete_old_output: <bool> whether to delete previously created output of the given job ID :param delete_old_output: <bool> whether to delete previously created output of the given job ID
before running the job (default = False) before running the job (default = False)
......
...@@ -6,3 +6,5 @@ import os ...@@ -6,3 +6,5 @@ import os
# set database host during tests # set database host during tests
db_host = 'localhost' if 'GMS_db_host' not in os.environ else os.environ['GMS_db_host'] db_host = 'localhost' if 'GMS_db_host' not in os.environ else os.environ['GMS_db_host']
index_host = 'localhost' if 'GMS_index_host' not in os.environ else os.environ['GMS_index_host']
index_port = 8654 if 'GMS_index_port' not in os.environ else os.environ['GMS_index_port']
...@@ -55,7 +55,7 @@ from gms_preprocessing.algorithms.L2B_P import L2B_object ...@@ -55,7 +55,7 @@ from gms_preprocessing.algorithms.L2B_P import L2B_object
from gms_preprocessing.misc.database_tools import get_info_from_postgreSQLdb from gms_preprocessing.misc.database_tools import get_info_from_postgreSQLdb
from gms_preprocessing.model.gms_object import GMS_object_2_dataset_dict from gms_preprocessing.model.gms_object import GMS_object_2_dataset_dict
from . import db_host from . import db_host, index_host
__author__ = 'Daniel Scheffler' # edited by Jessica Palka. __author__ = 'Daniel Scheffler' # edited by Jessica Palka.
...@@ -64,7 +64,8 @@ gmsRepo_rootpath = os.path.abspath(os.path.join(os.path.dirname(__file__), '..') ...@@ -64,7 +64,8 @@ gmsRepo_rootpath = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')
# Defining the configurations needed to start a job containing the different dataset scenes. # Defining the configurations needed to start a job containing the different dataset scenes.
# TODO Change the job-configurations for selected datasets. # TODO Change the job-configurations for selected datasets.
job_config_kwargs = dict(parallelization_level='scenes', db_host=db_host, delete_old_output=True, is_test=True) job_config_kwargs = dict(parallelization_level='scenes', db_host=db_host, spatial_index_server_host=index_host,
delete_old_output=True, is_test=True)
########################## ##########################
# Test case: BaseTestCases # Test case: BaseTestCases
...@@ -443,7 +444,7 @@ testdata.append('MultipleDatasetsInOneJob') ...@@ -443,7 +444,7 @@ testdata.append('MultipleDatasetsInOneJob')
################################################################################### ###################################################################################
# Parametrizing the test cases and creating a summary of the testresults. # Parameterizing the test cases and creating a summary of the test results.
summary_testResults, summary_errors, summary_failures, summary_skipped, jobstatus = [[] for _ in range(5)] summary_testResults, summary_errors, summary_failures, summary_skipped, jobstatus = [[] for _ in range(5)]
......
...@@ -10,13 +10,15 @@ Tests for gms_preprocessing.io.Input_Reader. ...@@ -10,13 +10,15 @@ Tests for gms_preprocessing.io.Input_Reader.
import unittest import unittest
import os import os
import warnings
from gms_preprocessing import __file__ from gms_preprocessing import __file__
from gms_preprocessing.io.input_reader import DEM_Creator from gms_preprocessing.io.input_reader import DEM_Creator
from gms_preprocessing import set_config from gms_preprocessing import set_config
from gms_preprocessing.misc.spatial_index_mediator import SpatialIndexMediator
from py_tools_ds.geo.projection import EPSG2WKT, WKT2EPSG from py_tools_ds.geo.projection import EPSG2WKT, WKT2EPSG
from .import db_host from .import db_host, index_host
gmsRepo_rootpath = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) gmsRepo_rootpath = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
...@@ -24,7 +26,11 @@ gmsRepo_rootpath = os.path.abspath(os.path.join(os.path.dirname(__file__), '..') ...@@ -24,7 +26,11 @@ gmsRepo_rootpath = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')
class Test_DEM_Creator(unittest.TestCase): class Test_DEM_Creator(unittest.TestCase):
def setUp(self): def setUp(self):
# Testjob Landsat-8 # Testjob Landsat-8
self.config = set_config(job_ID=26186196, db_host=db_host, reset_status=True, is_test=True, self.config = set_config(job_ID=26186196, db_host=db_host, spatial_index_server_host=index_host,
reset_status=True, is_test=True,
path_archive=os.path.join(gmsRepo_rootpath, 'tests', 'data', 'archive_data'))
self.config = set_config(job_ID=26186196, db_host=db_host, spatial_index_server_host=index_host,
reset_status=True, is_test=True,
path_archive=os.path.join(gmsRepo_rootpath, 'tests', 'data', 'archive_data')) path_archive=os.path.join(gmsRepo_rootpath, 'tests', 'data', 'archive_data'))
self.boxMapXY = ((277365.0, 5546625.0), (292365.0, 5546625.0), (292365.0, 5531625.0), (277365.0, 5531625.0)) self.boxMapXY = ((277365.0, 5546625.0), (292365.0, 5546625.0), (292365.0, 5531625.0), (277365.0, 5531625.0))
...@@ -45,3 +51,21 @@ class Test_DEM_Creator(unittest.TestCase): ...@@ -45,3 +51,21 @@ class Test_DEM_Creator(unittest.TestCase):
DC = DEM_Creator(dem_sensor='ASTER') DC = DEM_Creator(dem_sensor='ASTER')
DEM = DC.from_extent(self.boxMapXY, self.prj, 30, 30) DEM = DC.from_extent(self.boxMapXY, self.prj, 30, 30)
self.validate_output(DEM) self.validate_output(DEM)
def test_index_mediator_query_equals_pgSQL_query(self):
try:
# connection test that may raise ConnectionRefusedError
SpatialIndexMediator(host=index_host)
# actual test
DC = DEM_Creator(dem_sensor='SRTM')
tgt_corner_coord_lonlat = DC._get_corner_coords_lonlat(self.boxMapXY, self.prj)
res_SpIM = DC.get_overlapping_DEM_tiles(tgt_corner_coord_lonlat)
res_pgSQL = DC.get_overlapping_DEM_tiles(tgt_corner_coord_lonlat, use_index_mediator=False)
self.assertEqual(list(sorted(res_SpIM)), list(sorted(res_pgSQL)))
except ConnectionRefusedError:
warnings.warn("test_index_mediator_query_equals_pgSQL_query() could not been run because "
"SpatialIndexMediator refused the connection.")
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment