Commit 3897c345 authored by Daniel Scheffler's avatar Daniel Scheffler Committed by Mathias Peters
Browse files

Replaced spatial query within DEMCreator by SpatialIndexMediator query.

Moved retry functionality of spatial query within L1B to SpatialIndexMediator.
parent 2b0e10ca
......@@ -8,7 +8,6 @@ Detection of global/local geometric displacements.
import collections
import os
import socket
import time
import warnings
from datetime import datetime, timedelta
......@@ -85,23 +84,16 @@ class Scene_finder(object):
:param timeout: maximum query duration allowed (seconds)
"""
for i in range(10):
try:
SpIM = SpatialIndexMediator(host=CFG.spatial_index_server_host,
port=CFG.spatial_index_server_port,
timeout=timeout)
self.possib_ref_scenes = \
SpIM.getFullSceneDataForDataset(self.boundsLonLat, self.timeStart, self.timeEnd, self.min_cloudcov,
self.max_cloudcov, CFG.datasetid_spatial_ref,
refDate=self.src_AcqDate, maxDaysDelta=self.plusminus_days)
break
except socket.timeout:
if i < 9:
continue
else:
raise TimeoutError('Spatial query timed out 10 times!')
# TODO catch error when index server is not running:
# TODO ConnectionRefusedError: [Errno 111] Connection refused
SpIM = SpatialIndexMediator(host=CFG.spatial_index_server_host, port=CFG.spatial_index_server_port,
timeout=timeout, retries=10)
self.possib_ref_scenes = SpIM.getFullSceneDataForDataset(envelope=self.boundsLonLat,
timeStart=self.timeStart,
timeEnd=self.timeEnd,
minCloudCover=self.min_cloudcov,
maxCloudCover=self.max_cloudcov,
datasetid=CFG.datasetid_spatial_ref,
refDate=self.src_AcqDate,
maxDaysDelta=self.plusminus_days)
if self.possib_ref_scenes:
# fill GeoDataFrame with possible ref scene parameters
......
......@@ -13,6 +13,7 @@ from tempfile import NamedTemporaryFile as tempFile
from logging import Logger
from matplotlib import pyplot as plt
from typing import Union, Dict, List # noqa F401 # flake8 issue
from datetime import datetime
import dill
import numpy as np
......@@ -34,6 +35,7 @@ from ..misc import helper_functions as HLP_F
from ..misc.logging import GMS_logger
from ..misc.database_tools import get_overlapping_scenes_from_postgreSQLdb
from ..misc.path_generator import path_generator
from ..misc.spatial_index_mediator import SpatialIndexMediator
def read_ENVIfile(path, arr_shape, arr_pos, logger=None, return_meta=False, q=0):
......@@ -481,13 +483,25 @@ class DEM_Creator(object):
:return: list of GDAL readable pathes
"""
# get overlapping SRTM scene IDs from GMS database
dsID = self.dsID_dic[self.dem_sensor]
sceneIDs_srtm = get_overlapping_scenes_from_postgreSQLdb(self.db_conn,
table='scenes',
tgt_corners_lonlat=tgt_corner_coord_lonlat,
conditions=['datasetid=%s' % dsID],
timeout=20000) # default timeout (15sec) is not enough
sceneIDs_srtm = [i[0] for i in sceneIDs_srtm]
try:
SpIM = SpatialIndexMediator(host=CFG.spatial_index_server_host, port=CFG.spatial_index_server_port,
timeout=20, retries=10)
scenes = SpIM.getFullSceneDataForDataset(envelope=corner_coord_to_minmax(tgt_corner_coord_lonlat),
timeStart=datetime(1970, 1, 1),
timeEnd=datetime(2100, 12, 31),
minCloudCover=0, maxCloudCover=100,
datasetid=self.dsID_dic[self.dem_sensor])
sceneIDs_srtm = [scene.sceneid for scene in scenes]
except ConnectionRefusedError:
dsID = self.dsID_dic[self.dem_sensor]
sceneIDs_srtm = get_overlapping_scenes_from_postgreSQLdb(self.db_conn,
table='scenes',
tgt_corners_lonlat=tgt_corner_coord_lonlat,
conditions=['datasetid=%s' % dsID],
timeout=20000) # default (15sec) is not enough
sceneIDs_srtm = [i[0] for i in sceneIDs_srtm]
if not sceneIDs_srtm:
raise RuntimeError('No matching %s scene IDs for DEM generation found.' % self.dem_sensor)
......
......@@ -9,6 +9,7 @@ from datetime import datetime, timedelta
from shapely.geometry import Polygon
import pytz
from logging import Logger
from typing import List # noqa F401 # flake8 issue
from ..misc.exceptions import GMSEnvironmentError
......@@ -113,17 +114,19 @@ class SpatialIndexMediator:
""" message value for a full scene query message """
# def __init__(self, host="geoms.gfz-potsdam.de", port=8654):
def __init__(self, host="localhost", port=8654, timeout=5.0):
def __init__(self, host="localhost", port=8654, timeout=5.0, retries=10):
"""
Establishes a connection to the spatial index mediator server.
:param host: host address of the index mediator server (default "localhost")
:param port: port number of the index mediator server (default 8654)
:param timeout timeout as float in seconds (default 5.0 sec)
:param timeout: timeout as float in seconds (default 5.0 sec)
:param retries: number of retries in case of timeout
"""
self.host = host
self.port = port
self.timeout = timeout
self.retries = retries
@staticmethod
def __deriveSeasonCode(refDate, maxDaysDelta):
......@@ -149,6 +152,7 @@ class SpatialIndexMediator:
def getFullSceneDataForDataset(self, envelope, timeStart, timeEnd, minCloudCover, maxCloudCover, datasetid,
dayNight=0, refDate=None, maxDaysDelta=None):
# type: (list, datetime, datetime, float, float, int, int, datetime, int) -> List[Scene]
"""
Query the spatial index with the given parameters in order to get a list of matching scenes intersecting the
given envelope
......@@ -167,107 +171,118 @@ class SpatialIndexMediator:
:param maxDaysDelta: maximum allowed number of days the target scenes might be apart from the given refDate
[optional]
"""
filterTimerange = not (refDate is None or maxDaysDelta is None)
# prepare buffer
# numbytes = 1 + 4*8 + 8 + 8 + 4 + 1 + 1 + 2 + 2 + 1
b = bytearray(60)
# pack msg header and envelope
offset = 0
struct.pack_into('> b 4d', b, offset, self.FULL_SCENE_QUERY_MSG, *envelope)
offset += 33
# pack the dates
struct.pack_into('> h 6b', b, offset, timeStart.year, timeStart.month, timeStart.day, timeStart.hour,
timeStart.minute, timeStart.second, 0)
offset += 8
struct.pack_into('> h 6b', b, offset, timeEnd.year, timeEnd.month, timeEnd.day, timeEnd.hour, timeEnd.minute,
timeEnd.second, 0)
offset += 8
# derive season code
seasonCode = self.__deriveSeasonCode(refDate, maxDaysDelta)
# pack the rest
# TODO: send unconstraint min/max proclevel values
struct.pack_into('> i 2b h 3b', b, offset, seasonCode, minCloudCover, maxCloudCover, datasetid, 0, 127,
dayNight)
# get connection and lock the channel
con = Connection(self.host, self.port, self.timeout)
# send the buffer
con.socket.sendall(b)
# receive the response
# read first byte, indicating the response type, must match full scene query msg
if con.recvByte() != self.FULL_SCENE_QUERY_MSG:
raise EnvironmentError('Bad Protocol')
# now read the number of bytes that follow
numBytes = con.recvInt()
b = bytearray(numBytes)
offset = 0
# read all data from the channel and unlock it
con.recvBuffer(b, numBytes)
# we received the entire message - return the connection to the global pool
con.disconnect()
# interpret received data
# extract datasetid and number of scenes
dataset = struct.unpack_from('> h', b, offset)[0]
offset += 2
if dataset != datasetid:
raise EnvironmentError('Bad Protocol')
numScenes = struct.unpack_from('> i', b, offset)[0]
offset += 4
scenes = []
for _x in range(numScenes):
# [0] id (4 byte)
# [1] year (2 byte)
# [2] month (1 byte)
# [3] day (1 byte)
# [4] hour (1 byte)
# [5] minute (1 byte)
# [6] second (1 byte)
# [7] empty (1 byte)
# [8] cloud cover (1 byte)
# [9] proc_level (1 byte) caution: this gets not yet updated in the index
# [10] day/night (1 byte)
# [11] length of bounds array (1 byte)
scenedata = struct.unpack_from('> i h 10b', b, offset)
offset += 16
# print(scenedata)
timestamp = datetime(scenedata[1], scenedata[2], scenedata[3], scenedata[4], scenedata[5], scenedata[6])
# read bounds
numBounds = scenedata[11]
fmt = "> {0}d".format(numBounds)
bounds = struct.unpack_from(fmt, b, offset)
offset += numBounds * 8
# check ref date
if filterTimerange:
if timestamp.month == 2 and timestamp.day == 29:
# deal with feb.29th
timestampInRefYear = timestamp.replace(refDate.year, 3, 1).replace(tzinfo=pytz.UTC)
else:
timestampInRefYear = timestamp.replace(refDate.year).replace(tzinfo=pytz.UTC)
for i in range(self.retries):
try:
filterTimerange = not (refDate is None or maxDaysDelta is None)
# prepare buffer
# numbytes = 1 + 4*8 + 8 + 8 + 4 + 1 + 1 + 2 + 2 + 1
b = bytearray(60)
# pack msg header and envelope
offset = 0
struct.pack_into('> b 4d', b, offset, self.FULL_SCENE_QUERY_MSG, *envelope)
offset += 33
# pack the dates
struct.pack_into('> h 6b', b, offset, timeStart.year, timeStart.month, timeStart.day, timeStart.hour,
timeStart.minute, timeStart.second, 0)
offset += 8
struct.pack_into('> h 6b', b, offset, timeEnd.year, timeEnd.month, timeEnd.day, timeEnd.hour,
timeEnd.minute, timeEnd.second, 0)
offset += 8
# derive season code
seasonCode = self.__deriveSeasonCode(refDate, maxDaysDelta)
# pack the rest
# TODO: send unconstraint min/max proclevel values
struct.pack_into('> i 2b h 3b', b, offset, seasonCode, minCloudCover, maxCloudCover, datasetid, 0, 127,
dayNight)
# get connection and lock the channel
con = Connection(self.host, self.port, self.timeout)
# send the buffer
con.socket.sendall(b)
# receive the response
# read first byte, indicating the response type, must match full scene query msg
if con.recvByte() != self.FULL_SCENE_QUERY_MSG:
raise EnvironmentError('Bad Protocol')
# now read the number of bytes that follow
numBytes = con.recvInt()
b = bytearray(numBytes)
offset = 0
# read all data from the channel and unlock it
con.recvBuffer(b, numBytes)
# we received the entire message - return the connection to the global pool
con.disconnect()
# interpret received data
# extract datasetid and number of scenes
dataset = struct.unpack_from('> h', b, offset)[0]
offset += 2
if dataset != datasetid:
raise EnvironmentError('Bad Protocol')
numScenes = struct.unpack_from('> i', b, offset)[0]
offset += 4
scenes = []
for _x in range(numScenes):
# [0] id (4 byte)
# [1] year (2 byte)
# [2] month (1 byte)
# [3] day (1 byte)
# [4] hour (1 byte)
# [5] minute (1 byte)
# [6] second (1 byte)
# [7] empty (1 byte)
# [8] cloud cover (1 byte)
# [9] proc_level (1 byte) caution: this gets not yet updated in the index
# [10] day/night (1 byte)
# [11] length of bounds array (1 byte)
scenedata = struct.unpack_from('> i h 10b', b, offset)
offset += 16
# print(scenedata)
timestamp = datetime(scenedata[1], scenedata[2], scenedata[3], scenedata[4], scenedata[5],
scenedata[6])
# read bounds
numBounds = scenedata[11]
fmt = "> {0}d".format(numBounds)
bounds = struct.unpack_from(fmt, b, offset)
offset += numBounds * 8
# check ref date
if filterTimerange:
if timestamp.month == 2 and timestamp.day == 29:
# deal with feb.29th
timestampInRefYear = timestamp.replace(refDate.year, 3, 1).replace(tzinfo=pytz.UTC)
else:
timestampInRefYear = timestamp.replace(refDate.year).replace(tzinfo=pytz.UTC)
if abs(refDate - timestampInRefYear).days > maxDaysDelta:
# skip scene
continue
# create scene
scenes.append(Scene(scenedata[0], timestamp, scenedata[8], scenedata[9], scenedata[10], bounds))
if abs(refDate - timestampInRefYear).days > maxDaysDelta:
# skip scene
break
except socket.timeout:
if i < self.retries - 1:
continue
# create scene
scenes.append(Scene(scenedata[0], timestamp, scenedata[8], scenedata[9], scenedata[10], bounds))
else:
raise TimeoutError('Spatial query timed out 10 times!')
return scenes
......
......@@ -51,7 +51,7 @@ from gms_preprocessing.algorithms.L1B_P import L1B_object
from gms_preprocessing.algorithms.L1C_P import L1C_object
from gms_preprocessing.algorithms.L2A_P import L2A_object
from gms_preprocessing.algorithms.L2B_P import L2B_object
from gms_preprocessing.algorithms.L2C_P import L2C_object
# from gms_preprocessing.algorithms.L2C_P import L2C_object
from gms_preprocessing.misc.database_tools import get_info_from_postgreSQLdb
from gms_preprocessing.model.gms_object import GMS_object_2_dataset_dict
......@@ -453,7 +453,8 @@ class Test_in_normal_mode(unittest.TestCase):
def setUp(self):
# self.job_id = 26186740 # Testjob Landsat-8
# self.job_id = 26186906 # Bug Input Validator
self.job_id = 26186925 # 1 Sentinel-2A, Bug NoneType' object has no attribute 'find'
# self.job_id = 26186925 # 1 Sentinel-2A, Bug NoneType' object has no attribute 'find'
self.job_id = 26187051 # 1 Landsat, FileNotFoundError
self.PC = process_controller(self.job_id, **dict(is_test=False, parallelization_level='scenes', db_host=db_host,
delete_old_output=True, disable_exception_handler=True))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment