Commit fdd33707 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

added handling of unexpected exceptions within all GMS mappers; added...

added handling of unexpected exceptions within all GMS mappers;  added creation of job success summary

L1A_P:
- removed deprecated attributes path_Outfile_L1A and path_Outfile_L1A_masks
- calc_cloud_mask(): bugfix - wrong arguments passed to log_for_fullArr_or_firstTile

L1B_P:
- revised get_reference_image_params()
     -> spatial query works now but it still does not handle the case that no reference scene could be found or reference scene has a different projection

L1C_P:
- removed deprecated import

INP_R:
- get_list_GMSfiles(): bugfix for ignoring the given target processing level if job.call_type=='webapp'

OUT_W:
- Obj2ENVI(): bugfix for not assigning the correct output path to attribute 'mask_1bit' in case job.exec_mode=='Python'

 DB_T:
 - moved SpatialIndexMediator to a separate module within misc
 - simplified datetime reference
 - GMS_JOB: added comment keyword in order to allow short descriptions and comments for the job

HLP_F:
- added trace_unhandled_exceptions(): a dummy decorator function for handling unexpected exceptions
- added class 'failed_GMS_object': a class to represent failed GMS objects (allows to get information from them - even after processing failed)
- added log_uncaught_exceptions(): a decorator function for logging unhandled exceptions within GMS mapper functions
- added get_job_summary(): a function that returns a detailed and a less detailed pandas dataframe summarizing job processing success
- cut_GMS_obj_into_MGRS_tiles(): added logging
- get_mask_classdefinition(): added mask_1bit class definition

SpatialIndexMediator (SIM):
- added new module SpatialIndexMediator for performing fast spatial queries targeting the 'scenes' table of postgreSQL database

PC:
- added decorators to all GMS mapper functions that enable handling of unexpected exceptions
- added new list "failed_objects" in order to collect information about all GMS objects where processing failed
- added job database entry updates regarding failed scenes and finish time
- added creation and logging of job summary

VC:
- added cloud classifier objects for Landsat-5/7/8 to version control
parent 55474136
......@@ -2,7 +2,6 @@
.idea/
BAK/
OLD/
database/cloud_classifier/
database/sampledata/
database/metadata/
database/processed_data/
......
......@@ -97,8 +97,6 @@ class L1A_object(object):
self.arr_pos = None # <tuple> in the form ((row_start,row_end),(col_start,col_end))
self.tile_pos = None # <list>, filled by self.get_tilepos()
self.meta = {}
self.path_Outfile_L1A = None
self.path_Outfile_L1A_masks = None
self.MetaObj = None # set by self.get_MetaObj()
self.meta = None # set by self.MetaObj2ODict()
self.GeoTransProj_ok = True # set by self.validate_GeoTransProj_GeoAlign()
......@@ -479,7 +477,7 @@ class L1A_object(object):
project_dir = os.path.abspath(os.curdir)
os.chdir(os.path.dirname(path_archive))
files_in_archive = gdal.ReadDirRecursive(gdal_path_archive) # needs ~12sek for Landsat-8
assert files_in_archive, 'No files in archive for scene %s. '%self.scene_ID
assert files_in_archive, 'No files in archive for scene %s (entity ID: %s). '%(self.scene_ID,self.entity_ID)
full_LayerBandsAssignment = META.get_LayerBandsAssignment(self.GMS_identifier, ignore_usecase=True)
image_files = []
......@@ -865,8 +863,8 @@ class L1A_object(object):
(hasattr(self,'MetaObj') and self.MetaObj) else self.meta['Dataname'] # FIXME ersetzen durch path generator?
if not self.path_cloud_class_obj:
self.log_for_fullArr_or_firstTile(
self.logger, 'Cloud masking is not yet implemented for %s %s...' %(self.satellite,self.sensor),subset)
self.log_for_fullArr_or_firstTile('Cloud masking is not yet implemented for %s %s...'
%(self.satellite,self.sensor),subset)
mask_clouds = None
else:
self.log_for_fullArr_or_firstTile('Calculating cloud mask...', subset)
......@@ -1045,7 +1043,6 @@ class L1A_object(object):
self.logger.info('Preparing extracted metadata to be written to disk...')
self.meta = self.MetaObj.Meta2ODict()
self.path_Outfile_L1A = self.MetaObj.Dataname
def apply_nodata_mask_to_ObjAttr(self, attrname, out_nodata_val=None):
......
......@@ -58,6 +58,7 @@ from algorithms import GEOPROCESSING as GEOP
from misc import path_generator as PG
from misc import database_tools as DB_T
from algorithms.L1A_P import L1A_object
from misc.SpatialIndexMediator import SpatialIndexMediator
#from algorithms.L2A_P import get_DESHIFTER_configs, DESHIFTER
#sys.path.append('/home/gfz-fe/scheffler/python')
#from CoReg_Sat import COREG
......@@ -603,20 +604,16 @@ class COREG(object):
plusminus_years = 10
boundsLonLat = GEOP.corner_coord_to_minmax(self.trueDataCornerLonLat)
AcqDate = self.im2shift_objDict['acquisition_date']
yeardelta = lambda date, years: datetime(date.year + years, date.month, date.day,
date.hour, date.minute, date.second)
timeStart = yeardelta(AcqDate,-plusminus_years)
timeEnd = yeardelta(AcqDate,+plusminus_years)
# connect to index
index = DB_T.SpatialIndexMediator()
add_years = lambda dt, years: dt.replace(dt.year + years) \
if not (dt.month==2 and dt.day==29) else dt.replace(dt.year+years,3,1)
timeStart = add_years(AcqDate,-plusminus_years)
timeEnd = add_years(AcqDate,+plusminus_years)
timeEnd = timeEnd if timeEnd <= datetime.now() else datetime.now()
# place query
scenes = index.getFullSceneDataForDataset(boundsLonLat, timeStart, timeEnd, min_cloudcov, max_cloudcov,
usecase.datasetid_spatial_ref,
refDate=AcqDate,maxDaysDelta=plusminus_days)
# close the connection to the server
index.disconnect()
scenes = SpatialIndexMediator().getFullSceneDataForDataset(boundsLonLat, timeStart, timeEnd, min_cloudcov,
max_cloudcov, usecase.datasetid_spatial_ref,
refDate=AcqDate,maxDaysDelta=plusminus_days)
GDF = GeoDataFrame(scenes, columns=['object'])
GDF['sceneid'] = [*GDF['object'].map(lambda obj: obj.sceneid)]
......@@ -644,15 +641,18 @@ class COREG(object):
ref_available = True
ref_scene_record = GDF_res.iloc[0]
self.imref_scene_ID = int(ref_scene_record['sceneid'])
self.imref_footprint_poly = ref_scene_record['polyUTM']
self.overlap_poly = ref_scene_record['overlap poly']
self.overlap_percentage = ref_scene_record['overlap percentage']
self.overlap_area = ref_scene_record['overlap area']
self.imref_scene_ID = int(ref_scene_record['sceneid'])
self.imref_footprint_poly = ref_scene_record['polyUTM']
self.overlap_poly = ref_scene_record['overlap poly']
self.overlap_percentage = ref_scene_record['overlap percentage']
self.overlap_area = ref_scene_record['overlap area']
ref_procL = DB_T.get_info_from_postgreSQLdb(job.conn_database,'scenes_proc',['proc_level'],
{'sceneid':self.imref_scene_ID})
assert ref_procL
if not ref_procL:
raise RuntimeError('The requested spatial reference scene (ID %s) has not been processed yet.'
%self.imref_scene_ID)
assert ref_procL, '%s / %s' %(ref_procL, self.imref_scene_ID)
self.path_imref = PG.path_generator(scene_ID=self.imref_scene_ID,
proc_level=ref_procL[0][0]).get_path_imagedata()
query_res = DB_T.get_info_from_postgreSQLdb(job.conn_database, 'scenes', ['entityid'],
......
......@@ -11,7 +11,6 @@ import builtins
import numpy as np
from scipy.interpolate import interp1d
from misc import helper_functions as HLP_F
from gms_io import Input_reader as INP_R
from algorithms.L2A_P import L2A_object
......
......@@ -147,7 +147,8 @@ def get_list_GMSfiles(dataset_list__target__tuple):
dataset_list,target = dataset_list__target__tuple[0] if not isinstance(dataset_list__target__tuple[0],dict) \
else [dataset_list__target__tuple[0]],dataset_list__target__tuple[1]
if job.call_type == 'webapp':
GMS_list = [p for p in [PG.path_generator(ds).get_path_gmsfile() for ds in dataset_list] if os.path.exists(p)]
get_gmsP = lambda ds,tgt: PG.path_generator(ds,proc_level=tgt).get_path_gmsfile()
GMS_list = [p for p in [get_gmsP(ds,target) for ds in dataset_list] if os.path.exists(p)]
else: # job.call_type == 'console'
SQLquery = lambda ds: DB_T.get_info_from_SQLdb(job.path_database,'processed_data', ['path_procdata','baseN'],
{'image_type':ds['image_type'],'entity_ID':ds['entity_ID'],'subsystem':ds['subsystem'],'proc_level':target})
......
......@@ -412,6 +412,8 @@ def Obj2ENVI(InObj, write_masks_as_ENVI_classification=True, is_tempfile=False,
assert check_header_not_empty(outpath_hdr), "HEADER EMPTY: %s" % outpath_hdr
setattr(InObj, arrayname, outpath_arr) # refresh arr/masks/mask_clouds attributes
if arrayname=='masks':
setattr(InObj, 'mask_1bit', outpath_arr)
else: # 'block' or 'MGRS_tile
# data have to be read in subset and then be written
......@@ -461,16 +463,13 @@ def Obj2ENVI(InObj, write_masks_as_ENVI_classification=True, is_tempfile=False,
outpath_arr = os.path.splitext(outpath_hdr)[0] + '.%s' %InObj.outInterleave
if job.exec_mode=='Python':
setattr(InObj, arrayname, outpath_arr) # replace array by output path
if arrayname=='masks':
setattr(InObj, 'mask_1bit', outpath_arr)
if compression:
raise NotImplementedError # FIXME implement working compression
HLP_F.ENVIfile_to_ENVIcompressed(outpath_hdr)
if arrayname=='arr':
InObj.path_Outfile_L1A = outpath_arr
elif arrayname=='masks':
InObj.path_Outfile_L1A_masks = outpath_arr
else:
if not is_tempfile:
InObj.log_for_fullArr_or_firstTile(
......
#!/usr/bin/env python
import socket
import struct
#import ctypes
#import multiprocessing
from datetime import datetime, timedelta
from shapely.geometry import Polygon
class SpatialIndexMediator:
FULL_SCENE_QUERY_MSG = 3
""" message value for a full scene query message """
# def __init__(self, host="geoms.gfz-potsdam.de", port=8654, useConnectionPool=True, maxConnections=1):
def __init__(self, host="geoms.gfz-potsdam.de", port=8654):
"""
Establishes a connection to the spatial index mediator server.
:param host: host address of the index mediator server (default "geoms.gfz-potsdam.de")
:param port: port number of the index mediator server (default 8654)
"""
self.host = host
self.port = port
# self.useConnectionPool = useConnectionPool
#
# if(useConnectionPool):
# self.maxConnections = maxConnections
# self.connectionLimitReached = False
# self.connectionCount = multiprocessing.Value(ctypes.c_uint32)
# self.connections = multiprocessing.Queue(maxConnections)
# def disconnect(self):
# if(self.useConnectionPool):
# with self.connectionCount.get_lock():
# while(self.connectionCount.value > 0):
# connection = self.connections.get()
# connection.disconnect()
# self.connectionCount.value -= 1
# def __getConnection(self):
# """ retrieves a connection from the connection pool or creates a new one if the connection limit is not yet reached and no free connection is available"""
# if(self.useConnectionPool):
# # are there connections available or can we create a new one
# if(self.connections.empty() and not self.connectionLimitReached):
# with self.connectionCount.get_lock():
# if(self.connectionCount.value < self.maxConnections):
# self.connectionCount.value += 1
# self.connectionLimitReached = self.connectionCount.value == self.maxConnections
#
# connection = Connection(self.host, self.port)
# return connection
#
# # wait for connection to become available
# return self.connections.get()
#
# else:
# return Connection(self.host, self.port)
#
# def __returnConnection(self, connection):
# """ Returns the given connection to the connection pool. In case the connection pool is not used the connection is closed instead
#
# :param connection: connection that is returned to the global connection pool
# """
#
# if(self.useConnectionPool):
# self.connections.put(connection)
# else:
# connection.disconnect()
# def __getConnection(self):
# with self.connectionsLock:
# print("[{0}] num connections: {1}".format(multiprocessing.current_process(), len(self.connections)))
#
# # TODO: continue here!
#
# if self.connections:
# # find the connection with the shortest query queue
# connectionWithShortestQueue = self.connections[0]
# minWaitingConnections = connectionWithShortestQueue.waitingQueries.counter
#
# # this one is idle - return it
# if(minWaitingConnections == 0): return connectionWithShortestQueue
#
# # find a free connection
# for con in self.connections:
# count = con.waitingQueries.counter
# # we found an idle connection - return it
# if(count == 0): return con
# if(minWaitingConnections > count):
# connectionWithShortestQueue = con
#
# # check grace len and number of max connections
# if(minWaitingConnections < self.PENDING_QUERY_QUEUE_GRACE_LEN) or (len(self.connections) >= self.maxConnections):
# return connectionWithShortestQueue
#
# # if we end up here - there is no idle/graced connection - create a new one
# connection = self.Connection(self.host, self.port)
# self.connections.append(connection)
# return connection
@staticmethod
def __deriveSeasonCode(refDate, maxDaysDelta):
if refDate is None or maxDaysDelta is None:
return 0
delta = timedelta(days=maxDaysDelta)
startMonth = (refDate - delta).month - 1
endMonth = (refDate + delta).month - 1
seasonCode = 0
for x in range(12):
month = (startMonth + x) % 12
seasonCode |= 1 << month
if month == endMonth: break
return seasonCode
def getFullSceneDataForDataset(self, envelope, timeStart, timeEnd, minCloudCover, maxCloudCover, datasetid, refDate=None, maxDaysDelta=None):
"""
Query the spatial index with the given parameters in order to get a list of matching scenes intersecting the
given envelope
:param envelope: list of left, right and low, up coordinates (in lat/lon wgs84) of the region of
interest in the form of (min_lon, max_lon, min_lat, max_lat),
e.g. envelope = (10.0, 16.0, 50.0, 60.0)
:param timeStart: start timestamp of the relevant timerange as datetime instance, e.g. datetime(2015, 1, 1)
:param timeEnd: end timestamp of the relevant timerange as datetime instance, e.g. datetime(2016, 6, 15)
:param minCloudCover: minimum cloudcover in percent, e.g. 12, will return scenes with cloudcover >= 12% only
:param maxCloudCover: maximum cloudcover in percent, e.g. 23, will return scenes with cloudcover <= 23% only
:param datasetid: datasetid of the dataset in question, e.g. 104 for Landsat-8
:param refDate: reference timestamp as datetime instance, e.g. datetime(2015, 1, 1) [optional]
:param maxDaysDelta: maximum allowed number of days the target scenes might be apart from the given refDate
[optional]
"""
filterTimerange = not (refDate is None or maxDaysDelta is None)
# prepare buffer
# numbytes = 1 + 4*8 + 8 + 8 + 4 + 1 + 1 + 2
b = bytearray(57)
# pack msg header and envelope
offset = 0
struct.pack_into('> b 4d', b, offset, self.FULL_SCENE_QUERY_MSG, *envelope)
offset += 33
# pack the dates
struct.pack_into('> h 6b', b, offset, timeStart.year, timeStart.month, timeStart.day, timeStart.hour, timeStart.minute, timeStart.second, 0)
offset += 8
struct.pack_into('> h 6b', b, offset, timeEnd.year, timeEnd.month, timeEnd.day, timeEnd.hour, timeEnd.minute, timeEnd.second, 0)
offset += 8
# derive season code
seasonCode = self.__deriveSeasonCode(refDate, maxDaysDelta)
# pack the rest
struct.pack_into('> i 2b h', b, offset, seasonCode, minCloudCover, maxCloudCover, datasetid)
# get connection and lock the channel
con = Connection(self.host, self.port)
# con.lockChannel()
# send the buffer
con.socket.sendall(b)
# receive the response
# read first byte, indicating the response type, must match full scene query msg
if con.recvByte() != self.FULL_SCENE_QUERY_MSG:
raise EnvironmentError('Bad Protocol')
# now read the number of bytes that follow
numBytes = con.recvInt()
b = bytearray(numBytes)#ctypes.create_string_buffer(numBytes)
offset = 0
# read all data from the channel and unlock it
con.recvBuffer(b, numBytes)
# we received the entire message - return the connection to the global pool
con.disconnect()
# con.unlockChannel()
# interpret received data
# extract datasetid and number of scenes
dataset = struct.unpack_from('> h', b, offset)[0]
offset += 2
if dataset != datasetid:
raise EnvironmentError('Bad Protocol')
numScenes = struct.unpack_from('> i', b, offset)[0]
offset += 4
scenes = []
# name = multiprocessing.current_process().name
for _x in range(numScenes):
scenedata = struct.unpack_from('> i h 8b', b, offset)
offset += 14
timestamp = datetime(scenedata[1], scenedata[2], scenedata[3], scenedata[4], scenedata[5], scenedata[6])
# read bounds
numBounds = scenedata[9]
fmt = "> {0}d".format(numBounds)
bounds = struct.unpack_from(fmt, b, offset)
offset += numBounds * 8
# check ref date
if filterTimerange:
if timestamp.month == 2 and timestamp.day == 29:
# deal with feb.29th
timestampInRefYear = timestamp.replace(refDate.year, 3, 1)
else:
timestampInRefYear = timestamp.replace(refDate.year)
if abs(refDate - timestampInRefYear).days > maxDaysDelta:
# print("{0}, scene({1}, {4}): {2} skipped ({3}), offset({5})".format(name, _x, timestamp, timestampInRefYear, scenedata[0], offset))
# skip scene
continue
#print("{0}, scene({1}, {4}): {2} kept ({3}), offset({5})".format(name, _x, timestamp, timestampInRefYear, scenedata[0], offset))
# create scene
scenes.append(Scene(scenedata[0], timestamp, scenedata[8], bounds))
return scenes
class Connection:
""" Connection to the spatial index mediator server """
HELLO_MSG = 1
""" message value for a "hello" message """
DISCONNECT_MSG = 6
""" message value for a disconnect message """
def __init__(self, host, port):
# get lock for locking the channel access
# self.channelLock = multiprocessing.Lock()
# self.waitingQueries = self.AtomicInteger()
# connect to index mediator server
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((host, port))
# send hello and confirm response
if not self.__greet():
raise EnvironmentError('Bad protocol')
#
# def lockChannel(self):
# self.waitingQueries.inc()
# self.channelLock.acquire()
#
# def unlockChannel(self):
# self.waitingQueries.dec()
# self.channelLock.release()
def __greet(self):
# lock the channel
# self.lockChannel()
# send hello byte
self.writeByte(self.HELLO_MSG)
# receive hello byte echo
response = self.recvByte()
# unlock the channel
# self.unlockChannel()
return response == self.HELLO_MSG
def writeByte(self, byte):
# send byte
self.socket.sendall(struct.pack('b', byte))
def recvByte(self):
return struct.unpack('b', self.socket.recv(1))[0]
def recvInt(self):
return struct.unpack('>i', self.socket.recv(4))[0]
def recvBuffer(self, buffer, numBytes):
toread = numBytes
view = memoryview(buffer)
while toread:
nbytes = self.socket.recv_into(view, toread)
view = view[nbytes:]
toread -= nbytes
def disconnect(self):
"""
Closes the connection to the index mediator server. No further communication, like placing queries will be possible.
"""
self.writeByte(self.DISCONNECT_MSG)
self.socket.close()
# class AtomicInteger():
# def __init__(self):
# self.lock = multiprocessing.Lock()
# self.counter = 0
#
# def inc(self):
# with self.lock:
# self.counter += 1
#
#
# def dec(self):
# with self.lock:
# self.counter -= 1
class Scene:
"""Scene Metadata class"""
def __init__(self, sceneid, acquisitiondate, cloudcover, bounds):
"""
:param sceneid: database sceneid, e.g. 26366229
:param acquisitiondate: acquisition date of the scene as datetime instance, e.g. 2016-03-25 10:15:26
:param cloudcover: cloudcover value of the scene, e.g. 11
:param bounds: scene bounds as list of lat/lon wgs84 coordinates (lon1, lat1, lon2, lat2, ...),
e.g. (10.00604, 49.19385, 7.45638, 49.64513, 8.13739, 51.3515, 10.77705, 50.89307)
"""
self.sceneid = sceneid
self.acquisitiondate = acquisitiondate
self.cloudcover = cloudcover
self.bounds = bounds
tempList = list(bounds) + [None] * 2
self.coordsLonLat = [tempList[n:n + 2] for n in range(0, len(bounds), 2)]
self.polyLonLat = Polygon(self.coordsLonLat)
This diff is collapsed.
......@@ -10,7 +10,12 @@
########################### Library import ####################################
#from __future__ import (division, print_function, unicode_literals, absolute_import)
import logging,os,re,json,numpy as np
import logging
import sys
import os
import re
import json
import numpy as np
import itertools
import operator
import tarfile
......@@ -26,7 +31,11 @@ import math
import shapely
import gzip
import copy
import traceback
import functools
from datetime import datetime
from pandas import DataFrame
try: from osgeo import ogr
except ImportError: import ogr
from numba import autojit
......@@ -109,6 +118,106 @@ def setup_logger(name_logfile, path_logfile,append=1):
# logger.addHandler(consoleHandler)
return logger
def trace_unhandled_exceptions(func):
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
result = None
try:
result = func(*args, **kwargs)
except Exception as e:
print ('Exception in '+func.__name__)
traceback.print_exc()
return result
return wrapped_func
class failed_GMS_object(L1A_object): # FIXME actually inherit from a separate GMS_object class
# """@DynamicAttrs"""
def __init__(self,GMS_object, failedMapper, exc_type, exc_val, exc_tb):
super().__init__(None)
[setattr(self,k,getattr(GMS_object,k)) for k in
['proc_level','image_type','scene_ID','entity_ID','satellite','sensor','subsystem','arr_shape','arr_pos']]
self.failedMapper = failedMapper
self.ExceptionType = exc_type.__name__
self.ExceptionValue = repr(exc_val)
self.ExceptionTraceback = exc_tb
@property
def pandasRecord(self):
columns = ['scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem', 'image_type', 'proc_level',
'arr_shape','arr_pos', 'failedMapper', 'ExceptionType', 'ExceptionValue', 'ExceptionTraceback']
return DataFrame([getattr(self,k) for k in columns], columns=columns)
def log_uncaught_exceptions(GMS_mapper):
"""Decorator function for handling unexpected exceptions that occurr within GMS mapper functions. Traceback is
sent to logfile of the respective GMS object and the scene ID is added to the 'failed_sceneids' column within
the jobs table of the postgreSQL database.
:param GMS_mapper: A GMS mapper function that takes a GMS object, does some processing and returns it back."""
@functools.wraps(GMS_mapper) # needed to avoid pickling errors
def wrapped_GMS_mapper(GMS_obj):
try:
if isinstance(GMS_obj, failed_GMS_object):
print("Scene %s (entity ID %s) skipped %s due to an unexpected exception in %s."
% (GMS_obj.scene_ID, GMS_obj.entity_ID, GMS_mapper.__name__, GMS_obj.failedMapper))
return GMS_obj
else:
return GMS_mapper(GMS_obj)
except Exception as e:
# get Exception details
type_, value_ = sys.exc_info()[:2]
traceback_ = traceback.format_exc()
# log the exception and raise warning
GMS_obj.logger.error(e, exc_info=True)
warnings.warn("Logged uncaught exception '%s' within %s during processing of scene ID %s (entity ID %s)!"
% (value_, GMS_mapper.__name__, GMS_obj.scene_ID, GMS_obj.entity_ID))
# add the scene ID to failed_sceneids column in jobs table of postgreSQL database
res = DB_T.get_info_from_postgreSQLdb(config.job.conn_database, 'jobs', ['failed_sceneids'],
{'id': config.job.ID})
assert res, "Query delivered no result."
if res[0][0] is None or GMS_obj.scene_ID not in res[0][0]: # if column is empty or scene ID is not in there
DB_T.append_item_to_arrayCol_in_postgreSQLdb(config.job.conn_database, 'jobs',
{'failed_sceneids':GMS_obj.scene_ID}, {'id':config.job.ID})