spatial_index_mediator.py 16.2 KB
Newer Older
1 2 3 4
# -*- coding: utf-8 -*-

# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
#
5
# Copyright (C) 2020  Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
6 7 8 9 10 11
#
# This software was developed within the context of the GeoMultiSens project funded
# by the German Federal Ministry of Education and Research
# (project grant code: 01 IS 14 010 A-C).
#
# This program is free software: you can redistribute it and/or modify it under
12
# the terms of the GNU General Public License as published by the Free Software
13 14
# Foundation, either version 3 of the License, or (at your option) any later version.
# Please note the following exception: `gms_preprocessing` depends on tqdm, which
15 16 17
# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
18 19 20 21 22 23 24 25
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.
26 27 28

import socket
import struct
29 30 31
import os
import re
import warnings
32
from datetime import datetime, timedelta
33
from shapely.geometry import Polygon
34
import pytz
35
from logging import getLogger
36
from typing import List  # noqa F401  # flake8 issue
37 38

from ..misc.exceptions import GMSEnvironmentError
39
from ..misc.logging import close_logger
40

41

42
class SpatialIndexMediatorServer:
43
    def __init__(self, rootDir, logger=None):
44
        self.rootDir = rootDir
45
        self.path_idxMedSrv = os.path.join(rootDir, 'index-mediator-server.sh')
46
        self.logger = logger or getLogger('SpatialIndexMediatorServer')
47 48 49

        # validate
        if not os.path.isfile(self.path_idxMedSrv):
50 51 52 53 54
            self.path_idxMedSrv = os.path.join(rootDir, 'index-mediator-server')

            if not os.path.isfile(self.path_idxMedSrv):
                raise GMSEnvironmentError('File path of index mediator server does not exist at %s.'
                                          % self.path_idxMedSrv)
55

56 57 58 59 60 61 62 63 64 65 66 67
    def __getstate__(self):
        """Defines how the attributes of SpatialIndexMediatorServer are pickled."""

        if self.logger not in [None, 'not set']:
            close_logger(self.logger)
            self.logger = None
        return self.__dict__

    def __del__(self):
        close_logger(self.logger)
        self.logger = None

68 69
    @property
    def is_running(self):
70
        return self.status['running']
71 72

    @property
73 74
    def process_id(self):
        return self.status['process_id']
75 76 77 78 79 80

    @property
    def status(self):
        """Check server status.

        :return running(bool):    running or not?
81
        :return process_id(int):
82 83 84
        """
        outputStr = self._communicate('status')

85
        # decrypt status
86
        running = 'is running' in outputStr
87 88

        # get PID
89
        _process_id = re.search(r'with pid ([\d]*)', outputStr)
90 91 92 93
        if _process_id and _process_id.group(1):
            process_id = int(_process_id.group(1))
        else:
            process_id = None
94

95
        return {'running': running, 'process_id': process_id}
96

97
    def start(self):  # FIXME can be executed twice without a message that server is already running
98
        outputStr = self._communicate('start')
99
        if outputStr == 'success' and self.is_running:
100
            self.logger.info('Spatial Index Mediator Server started successfully.')
101 102
            return 'started'
        else:
103
            if outputStr != 'success':
104 105
                self.logger.warning("\nAttempt to start Spatial Index Mediator Server failed with message '%s'!"
                                    % outputStr.replace('\n', ''))
106
            else:
107 108 109
                self.logger.warning("\nCommunication to Spatial Index Mediator Server was successful but "
                                    "the server is still not running. Returned message was: %s"
                                    % outputStr.replace('\n', ''))
110 111 112 113

    def stop(self):
        outputStr = self._communicate('stop')

114
        if outputStr == 'success' or re.search(r'index-mediator-server stopped', outputStr, re.I):
115 116 117
            return 'stopped'
        else:
            warnings.warn("\nStopping Spatial Index Mediator Server failed with message '%s'!"
118
                          % outputStr.replace('\n', ''))
119 120 121

    def restart(self):
        outputStr = self._communicate('restart')
122
        if outputStr == 'success' and self.is_running:
123 124 125
            return 'restarted'
        else:
            warnings.warn("\nRestarting Spatial Index Mediator Server failed with message '%s'!"
126
                          % outputStr.replace('\n', ''))
127 128 129 130

    def _communicate(self, controller_cmd):
        curdir = os.path.abspath(os.curdir)
        os.chdir(self.rootDir)
131 132 133
        # FIXME workaround: otherwise subcall_with_output hangs at proc.communicate (waiting for EOF forever)
        no_stdout = no_stderr = controller_cmd in ['start', 'restart']
        # no_stdout = no_stderr = None, None
134
        from ..misc.helper_functions import subcall_with_output
135
        output, exitcode, err = subcall_with_output('bash %s %s' % (self.path_idxMedSrv,
136 137 138 139
                                                                    controller_cmd), no_stdout, no_stderr)
        os.chdir(curdir)

        if exitcode:
140
            raise Exception(err)
141 142 143
        else:
            if output:
                return output.decode('UTF-8')
144 145
            else:
                # FIXME actually there should be always an output (also with controller commands 'start' and 'restart'
146 147 148
                return 'success'


149 150 151 152
class SpatialIndexMediator:
    FULL_SCENE_QUERY_MSG = 3
    """ message value for a full scene query message """

153
    def __init__(self, host="localhost", port=8654, timeout=5.0, retries=10):
154 155 156
        """
        Establishes a connection to the spatial index mediator server.

157
        :param host:    host address of the index mediator server (default "localhost")
158
        :param port:    port number of the index mediator server (default 8654)
159 160
        :param timeout: timeout as float in seconds (default 5.0 sec)
        :param retries: number of retries in case of timeout
161 162 163
        """
        self.host = host
        self.port = port
164
        self.timeout = timeout
165
        self.retries = retries
166

167 168 169 170
    @staticmethod
    def __deriveSeasonCode(refDate, maxDaysDelta):
        if refDate is None or maxDaysDelta is None:
            return 0
171

172
        delta = timedelta(days=maxDaysDelta)
173

174 175
        startMonth = (refDate - delta).month - 1
        endMonth = (refDate + delta).month - 1
176

177
        seasonCode = 0
178

179 180 181 182
        for x in range(12):
            month = (startMonth + x) % 12

            seasonCode |= 1 << month
183

184 185
            if month == endMonth:
                break
186

187
        return seasonCode
188

Daniel Eggert's avatar
Daniel Eggert committed
189
    def getFullSceneDataForDataset(self, envelope, timeStart, timeEnd, minCloudCover, maxCloudCover, datasetid,
Daniel Eggert's avatar
Daniel Eggert committed
190
                                   dayNight=0, refDate=None, maxDaysDelta=None):
191
        # type: (list, datetime, datetime, float, float, int, int, datetime, int) -> List[Scene]
192 193 194 195 196 197 198
        """
        Query the spatial index with the given parameters in order to get a list of matching scenes intersecting the
        given envelope

        :param envelope:        list of left, right and low, up coordinates (in lat/lon wgs84) of the region of
                                interest in the form of (min_lon, max_lon, min_lat, max_lat),
                                e.g. envelope = (10.0, 16.0, 50.0, 60.0)
199 200
        :param timeStart:       start timestamp of the relevant timerange as datetime instance,
                                e.g., datetime(2015, 1, 1)
201 202 203 204
        :param timeEnd:         end timestamp of the relevant timerange as datetime instance, e.g. datetime(2016, 6, 15)
        :param minCloudCover:   minimum cloudcover in percent, e.g. 12, will return scenes with cloudcover >= 12% only
        :param maxCloudCover:   maximum cloudcover in percent, e.g. 23, will return scenes with cloudcover <= 23% only
        :param datasetid:       datasetid of the dataset in question, e.g. 104 for Landsat-8
Daniel Eggert's avatar
Daniel Eggert committed
205
        :param dayNight         day/night indicator, with (0 = both, 1 = day, 2 = night)
206 207 208 209 210
        :param refDate:         reference timestamp as datetime instance, e.g. datetime(2015, 1, 1) [optional]
        :param maxDaysDelta:    maximum allowed number of days the target scenes might be apart from the given refDate
                                [optional]
        """
        scenes = []
211

212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
        for i in range(self.retries):
            try:
                filterTimerange = not (refDate is None or maxDaysDelta is None)

                # prepare buffer
                # numbytes = 1 + 4*8 + 8 + 8 + 4 + 1 + 1 + 2 + 2 + 1
                b = bytearray(60)

                # pack msg header and envelope
                offset = 0
                struct.pack_into('> b 4d', b, offset, self.FULL_SCENE_QUERY_MSG, *envelope)
                offset += 33

                # pack the dates
                struct.pack_into('> h 6b', b, offset, timeStart.year, timeStart.month, timeStart.day, timeStart.hour,
                                 timeStart.minute, timeStart.second, 0)
                offset += 8
                struct.pack_into('> h 6b', b, offset, timeEnd.year, timeEnd.month, timeEnd.day, timeEnd.hour,
                                 timeEnd.minute, timeEnd.second, 0)
                offset += 8

                # derive season code
                seasonCode = self.__deriveSeasonCode(refDate, maxDaysDelta)

                # pack the rest
                #  TODO: send unconstraint min/max proclevel values
                struct.pack_into('> i 2b h 3b', b, offset, seasonCode, minCloudCover, maxCloudCover, datasetid, 0, 127,
                                 dayNight)

                # get connection and lock the channel
                con = Connection(self.host, self.port, self.timeout)

                # send the buffer
                con.socket.sendall(b)

                # receive the response
                # read first byte, indicating the response type, must match full scene query msg
                if con.recvByte() != self.FULL_SCENE_QUERY_MSG:
                    raise EnvironmentError('Bad Protocol')

                # now read the number of bytes that follow
                numBytes = con.recvInt()
                b = bytearray(numBytes)
                offset = 0

                # read all data from the channel and unlock it
                con.recvBuffer(b, numBytes)

                # we received the entire message - return the connection to the global pool
                con.disconnect()

                # interpret received data
                # extract datasetid and number of scenes
                dataset = struct.unpack_from('> h', b, offset)[0]
                offset += 2
                if dataset != datasetid:
                    raise EnvironmentError('Bad Protocol')

                numScenes = struct.unpack_from('> i', b, offset)[0]
                offset += 4

                scenes = []

                for _x in range(numScenes):
                    # [0] id (4 byte)
                    # [1] year (2 byte)
                    # [2] month (1 byte)
                    # [3] day (1 byte)
                    # [4] hour (1 byte)
                    # [5] minute (1 byte)
                    # [6] second (1 byte)
                    # [7] empty (1 byte)
                    # [8] cloud cover (1 byte)
                    # [9] proc_level (1 byte) caution: this gets not yet updated in the index
                    # [10] day/night (1 byte)
                    # [11] length of bounds array (1 byte)
                    scenedata = struct.unpack_from('> i h 10b', b, offset)
                    offset += 16

                    # print(scenedata)
                    timestamp = datetime(scenedata[1], scenedata[2], scenedata[3], scenedata[4], scenedata[5],
                                         scenedata[6])

                    # read bounds
                    numBounds = scenedata[11]
                    fmt = "> {0}d".format(numBounds)
                    bounds = struct.unpack_from(fmt, b, offset)
                    offset += numBounds * 8

                    # check ref date
                    if filterTimerange:
                        if timestamp.month == 2 and timestamp.day == 29:
                            # deal with feb.29th
                            timestampInRefYear = timestamp.replace(refDate.year, 3, 1).replace(tzinfo=pytz.UTC)
                        else:
                            timestampInRefYear = timestamp.replace(refDate.year).replace(tzinfo=pytz.UTC)

                        if abs(refDate - timestampInRefYear).days > maxDaysDelta:
                            # skip scene
                            continue

                    # create scene
                    scenes.append(Scene(scenedata[0], timestamp, scenedata[8], scenedata[9], scenedata[10], bounds))
315

316
                break
317

318 319
            except socket.timeout:
                if i < self.retries - 1:
320
                    continue
321 322
                else:
                    raise TimeoutError('Spatial query timed out 10 times!')
323

324 325 326 327 328 329
            except struct.error:
                if i < self.retries - 1:
                    continue
                else:
                    raise

330 331 332 333 334
        return scenes


class Connection:
    """ Connection to the spatial index mediator server """
335

336 337 338 339 340
    HELLO_MSG = 1
    """ message value for a "hello" message """

    DISCONNECT_MSG = 6
    """ message value for a disconnect message """
341

342
    def __init__(self, host, port, timeout):
343
        # connect to index mediator server
344 345 346 347
        try:
            self.socket = socket.create_connection((host, port), timeout)
        except ConnectionRefusedError:
            raise ConnectionRefusedError('The spatial index mediator server refused the connection!')
348

349 350 351 352 353 354 355
        # send hello and confirm response
        if not self.__greet():
            raise EnvironmentError('Bad protocol')

    def __greet(self):
        # send hello byte
        self.writeByte(self.HELLO_MSG)
356

357 358
        # receive hello byte echo
        response = self.recvByte()
359

360
        return response == self.HELLO_MSG
361

362 363 364
    def writeByte(self, byte):
        # send byte
        self.socket.sendall(struct.pack('b', byte))
365

366 367
    def recvByte(self):
        return struct.unpack('b', self.socket.recv(1))[0]
368

369 370
    def recvInt(self):
        return struct.unpack('>i', self.socket.recv(4))[0]
371

372 373 374 375 376 377 378
    def recvBuffer(self, buffer, numBytes):
        toread = numBytes
        view = memoryview(buffer)
        while toread:
            nbytes = self.socket.recv_into(view, toread)
            view = view[nbytes:]
            toread -= nbytes
379

380
    def disconnect(self):
381 382 383
        """Closes the connection to the index mediator server.

        No further communication, like placing queries will be possible.
384 385 386
        """
        self.writeByte(self.DISCONNECT_MSG)
        self.socket.close()
387

388

389 390 391
class Scene:
    """Scene Metadata class"""

392
    def __init__(self, sceneid, acquisitiondate, cloudcover, proclevel, daynight, bounds):
393 394 395 396
        """
        :param sceneid:         database sceneid, e.g. 26366229
        :param acquisitiondate: acquisition date of the scene as datetime instance, e.g. 2016-03-25 10:15:26
        :param cloudcover:      cloudcover value of the scene, e.g. 11
Daniel Eggert's avatar
Daniel Eggert committed
397
        :param daynight:        day/night indicator (0=unknown, 1=day, 2=night)
398 399 400
        :param bounds:          scene bounds as list of lat/lon wgs84 coordinates (lon1, lat1, lon2, lat2, ...),
                                e.g. (10.00604, 49.19385, 7.45638, 49.64513, 8.13739, 51.3515, 10.77705, 50.89307)
        """
401
        self.sceneid = sceneid
402
        self.acquisitiondate = acquisitiondate
403 404
        self.cloudcover = cloudcover
        self.proclevel = proclevel
405
        self.daynight = daynight
406 407 408
        self.bounds = bounds
        tempList = list(bounds) + [None] * 2
        self.coordsLonLat = [tempList[n:n + 2] for n in range(0, len(bounds), 2)]
409 410 411 412 413 414 415

        # set validated (!) polygon
        poly = Polygon(self.coordsLonLat)
        if not poly.is_valid:
            poly = poly.buffer(0)
            assert poly.is_valid
        self.polyLonLat = poly