Commit b4905b95 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Fixed ExceptionHandler.handle_failed() not raising exceptions that occur during handle_failed().

Fixed buggy handling of acq_datetime when recreating GMS_object from disk.
Fixed 'str' object has no attribute 'month'.
parent 92a0f4e0
......@@ -43,6 +43,10 @@ class L1A_object(GMS_object):
# load default attribute values and methods
super(L1A_object, self).__init__()
if 'proc_level' in kwargs and kwargs['proc_level'] not in [None, 'L1A']:
raise RuntimeError("Tried to instanciate L1A_object although kwargs['proc_level'] is '%s'"
% kwargs['proc_level'])
# unpack kwargs
self.proc_level = 'L1A'
self.image_type = image_type # FIXME not needed anymore?
......
......@@ -16,7 +16,7 @@ import numpy as np
from geopandas import GeoDataFrame
from shapely.geometry import box
import pytz
from typing import Union # noqa F401 # flake8 issue
from typing import Union, TYPE_CHECKING # noqa F401 # flake8 issue
from arosics import COREG, DESHIFTER
from geoarray import GeoArray
......@@ -36,6 +36,10 @@ from ..misc.logging import GMS_logger
from ..misc.spatial_index_mediator import SpatialIndexMediator
from ..misc.definition_dicts import get_GMS_sensorcode, get_outFillZeroSaturated
if TYPE_CHECKING:
from shapely.geometry import Polygon
from logging import Logger
__author__ = 'Daniel Scheffler'
......@@ -44,6 +48,7 @@ class Scene_finder(object):
def __init__(self, src_boundsLonLat, src_AcqDate, src_prj, src_footprint_poly, sceneID_excluded=None,
min_overlap=20, min_cloudcov=0, max_cloudcov=20, plusminus_days=30, plusminus_years=10, logger=None):
# type: (list, datetime, str, Polygon, int, int, int, int, int, int, Logger) -> None
"""Initialize Scene_finder.
:param src_boundsLonLat:
......
......@@ -219,62 +219,67 @@ class ExceptionHandler(object):
timeout=30000)
def handle_failed(self):
_, exc_val, exc_tb = self.exc_details
try:
_, exc_val, exc_tb = self.exc_details
# collect some informations about failed GMS object and summarize them in failed_GMS_object
failed_Obj = failed_GMS_object(self.get_sample_GMS_obj(self.GMS_objs),
self.GMS_mapper_name, *self.exc_details)
# collect some informations about failed GMS object and summarize them in failed_GMS_object
failed_Obj = failed_GMS_object(self.get_sample_GMS_obj(self.GMS_objs),
self.GMS_mapper_name, *self.exc_details)
# log the exception and raise warning
failed_Obj.logger.error('\n' + exc_tb, exc_info=False)
self.logger.warning("\nLogged an uncaught exception within %s during processing of scene ID %s "
"(entity ID %s):\n '%s'\n"
% (self.GMS_mapper_name, failed_Obj.scene_ID, failed_Obj.entity_ID, exc_val))
# log the exception and raise warning
failed_Obj.logger.error('\n' + exc_tb, exc_info=False)
self.logger.warning("\nLogged an uncaught exception within %s during processing of scene ID %s "
"(entity ID %s):\n '%s'\n"
% (self.GMS_mapper_name, failed_Obj.scene_ID, failed_Obj.entity_ID, exc_val))
# add the scene ID to failed_sceneids column in jobs table of DB and update statistics column
# NOTE: in case failed_Obj represents a subsystem and another one has already been marked as FAILED the
# failed_sceneids column and the statistics column is NOT updated once more
# add the scene ID to failed_sceneids column in jobs table of DB and update statistics column
# NOTE: in case failed_Obj represents a subsystem and another one has already been marked as FAILED the
# failed_sceneids column and the statistics column is NOT updated once more
another_ss_failed = False
another_ss_succeeded = False
higher_procL = None
if failed_Obj.subsystem:
# check if another subsystem of the same scene ID has been marked as failed before
res = DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'jobs', ['failed_sceneids'], {'id': CFG.ID})
assert res, "Query delivered no result."
another_ss_failed = False
another_ss_succeeded = False
higher_procL = None
if failed_Obj.subsystem:
# check if another subsystem of the same scene ID has been marked as failed before
res = DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'jobs', ['failed_sceneids'], {'id': CFG.ID})
assert res, "Query delivered no result."
if res[0][0] is not None and failed_Obj.scene_ID in res[0][0]:
self.logger.debug("Found another failed subsystem of scene %s in the database.")
another_ss_failed = True
# check if another subsystem already reached a higher processing level
# NOTE: this fixes issue #50
# NOTE: This works not only for GMS_object instances but also for L1A inputs (OrderedDicts) because
# failed_GMS_object inherits from GMS_object and GMS_object.proc_status_all_GMS_objs has already
# been updated by the first subsystem (that earlier reached L1A)
# FIXME proc_status_all_GMSobjs is not available if other subsystems are processed by another
# FIXME multiprocessing worker or on another machine (cluster node)
procstatus_other_ss = {k: v for k, v in GMS_object.proc_status_all_GMSobjs[failed_Obj.scene_ID].items()
if k != failed_Obj.subsystem}
for ss, statusentry in procstatus_other_ss.items():
for procL in statusentry.keys():
if is_proc_level_lower(failed_Obj.proc_level, procL) and statusentry[procL] == 'finished':
higher_procL = procL
self.logger.debug("Found another subsystem that already reached a higher processing level.")
another_ss_succeeded = True
break
if not another_ss_failed: # applies also to full cubes
DB_T.append_item_to_arrayCol_in_postgreSQLdb(CFG.conn_database, 'jobs',
{'failed_sceneids': failed_Obj.scene_ID}, {'id': CFG.ID})
if not another_ss_succeeded:
self.update_progress_failed(failed_Obj)
else:
self.update_progress_failed(failed_Obj, procL_failed=higher_procL)
return failed_Obj
if res[0][0] is not None and failed_Obj.scene_ID in res[0][0]:
self.logger.debug("Found another failed subsystem of scene %s in the database.")
another_ss_failed = True
# check if another subsystem already reached a higher processing level
# NOTE: this fixes issue #50
# NOTE: This works not only for GMS_object instances but also for L1A inputs (OrderedDicts) because
# failed_GMS_object inherits from GMS_object and GMS_object.proc_status_all_GMS_objs has already
# been updated by the first subsystem (that earlier reached L1A)
# FIXME proc_status_all_GMSobjs is not available if other subsystems are processed by another
# FIXME multiprocessing worker or on another machine (cluster node)
procstatus_other_ss = {k: v for k, v in GMS_object.proc_status_all_GMSobjs[failed_Obj.scene_ID].items()
if k != failed_Obj.subsystem}
for ss, statusentry in procstatus_other_ss.items():
for procL in statusentry.keys():
if is_proc_level_lower(failed_Obj.proc_level, procL) and statusentry[procL] == 'finished':
higher_procL = procL
self.logger.debug("Found another subsystem that already reached a higher processing level.")
another_ss_succeeded = True
break
if not another_ss_failed: # applies also to full cubes
DB_T.append_item_to_arrayCol_in_postgreSQLdb(CFG.conn_database, 'jobs',
{'failed_sceneids': failed_Obj.scene_ID}, {'id': CFG.ID})
if not another_ss_succeeded:
self.update_progress_failed(failed_Obj)
else:
self.update_progress_failed(failed_Obj, procL_failed=higher_procL)
return failed_Obj
except Exception:
# raise exceptions that occurr during self.handle_failed() -> must be ProgrammingErrors
raise
def log_uncaught_exceptions(GMS_mapper, logger=None):
......
......@@ -639,10 +639,13 @@ class GMS_object(Dataset):
for key, value in GMSfileDict.items():
if key in ['GMS_identifier', 'georef', 'dict_LayerOptTherm']:
continue # properties that should better be created on the fly
try:
setattr(GMS_obj, key, value)
except Exception:
raise AttributeError("Can't set attribute %s." % key)
elif key == 'acq_datetime':
GMS_obj.acq_datetime = datetime.datetime.strptime(value, '%Y-%m-%d %H:%M:%S.%f%z')
else:
try:
setattr(GMS_obj, key, value)
except Exception:
raise AttributeError("Can't set attribute %s." % key)
GMS_obj.arr_shape, GMS_obj.arr_pos = tuple_GMS_subset[1]
......
......@@ -151,15 +151,22 @@ class METADATA(object):
@property
def AcqDateTime(self):
"""Returns a datetime.datetime object containing date, time and timezone (UTC time)."""
if not self._AcqDateTime and self.AcqDate and self.AcqTime:
self._AcqDateTime = datetime.datetime.strptime('%s %s%s' % (self.AcqDate, self.AcqTime, '.000000+0000'),
'%Y-%m-%d %H:%M:%S.%f%z')
return self._AcqDateTime
@AcqDateTime.setter
def AcqDateTime(self, DateTime):
# type: (datetime.datetime) -> None
self._AcqDateTime = DateTime
self.AcqDate = datetime.datetime.strftime(DateTime, format='%Y-%m-%d')
self.AcqTime = datetime.datetime.strftime(DateTime, format='%H:%M:%S')
if isinstance(DateTime, str):
self._AcqDateTime = datetime.datetime.strptime(DateTime, '%Y-%m-%d %H:%M:%S.%f%z')
elif isinstance(DateTime, datetime.datetime):
self._AcqDateTime = DateTime
self.AcqDate = DateTime.strftime('%Y-%m-%d')
self.AcqTime = DateTime.strftime('%H:%M:%S')
@property
def overview(self):
......@@ -1639,9 +1646,6 @@ class METADATA(object):
# convert band specific metadata to dicts
setattr(self, attrN, dict(zip(odict['LayerBandsAssignment'], odict[odictKey])))
# for attr in self._bandspecific_attrs:
# if attr in odict:
# set the remaining attributes
if 'map info' in odict:
self.map_info = odict['map info']
......@@ -1742,7 +1746,7 @@ class METADATA(object):
sol_azs = 180 * sols_az_rad / math.pi
diff_az = np.abs(float(self.SunAzimuth) - sol_azs)
acq_datetime = time_stamps[np.where(diff_az == np.min(diff_az))][0]
AcqTime = datetime.datetime.strftime(acq_datetime, format='%H:%M:%S')
AcqTime = acq_datetime.strftime(format='%H:%M:%S')
logger.info('Center acquisition time has been calculated: %s' % AcqTime)
# update self.
......
......@@ -515,15 +515,17 @@ class Test_in_normal_mode(unittest.TestCase):
# self.job_id = 26187053 # GMS41: AC: The input 'list_GMS_objs' contains duplicates: ['', '']
# self.job_id = 26187750 # GEOMS: [AC]: RuntimeWarning: All-NaN slice encountered
# self.job_id = 26187760 # GEOMS: [L2C]: ValueError: 'axis' entry is out of bounds
self.job_id = 26187804 # GEOMS: Spatial homogenization leaves resampling artifacs at the image edges.
# self.job_id = 26187804 # GEOMS: Spatial homogenization leaves resampling artifacs at the image edges.
self.job_id = 26187922 # GEOMS: AssertionError (self.job_id = 26187922 # GEOMS: AssertionError)
self.PC = process_controller(self.job_id, **dict(is_test=False, parallelization_level='scenes', db_host=db_host,
delete_old_output=True, disable_exception_handler=True))
self.PC.config.spathomo_estimate_accuracy = True
self.PC.config.ac_estimate_accuracy = True # FIXME
self.PC.config.spechomo_estimate_accuracy = True # FIXME
# self.PC.config.spathomo_estimate_accuracy = True
# self.PC.config.ac_estimate_accuracy = True
# self.PC.config.spechomo_estimate_accuracy = True
self.PC.config.exec_L1CP = [1, 1, 0]
self.PC.config.exec_2ACP = [1, 1, 0]
self.PC.config.path_procdata_scenes = '/storage/gms/processed_scenes/20180227_MGRS33UUU_S2_L8_L7/'
def test(self):
self.PC.run_all_processors()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment