Commit ff539709 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Fixed issue #50 (Invalid job progress statistics in case a subsystem fails...

Fixed issue #50 (Invalid job progress statistics in case a subsystem fails after another one of the same scene ID already succeeded in the same mapper) by adding class attribute GMS_object.proc_status_all_GMSobjs and GMS_object.proc_status() property. Added gms_object.update_proc_status() decorator.
Revised exception handler. Improved test_exception_handler module.
Fixed a severe bug that copied the same dataset list to all subsequent process controllers.
Former-commit-id: cd572222
Former-commit-id: 0cce191d
parent cad83ba5
......@@ -34,7 +34,7 @@ class L1A_object(GMS_object):
"""Features input reader and raster-/metadata homogenization."""
def __init__(self, image_type='', satellite='', sensor='', subsystem='', sensormode='', acq_datetime=None,
entity_ID='', scene_ID=-9999, filename='', dataset_ID=-9999, **kwargs):
entity_ID='', scene_ID=-9999, filename='', dataset_ID=-9999, proc_status='', **kwargs):
""":param : instance of gms_object.GMS_object or None
"""
# TODO docstring
......@@ -65,6 +65,12 @@ class L1A_object(GMS_object):
% (self.satellite, self.sensor,
(' ' + self.subsystem) if self.subsystem not in [None, ''] else '', self.entity_ID))
# (re)set the processing status
if self.scene_ID in self.proc_status_all_GMSobjs:
del self.proc_status_all_GMSobjs[self.scene_ID]
self.proc_status = proc_status or 'initialized' # if proc_status = 'running' is given by L1A_map
def import_rasterdata(self):
if re.search("ALOS", self.satellite, re.I):
'''First 22 lines are nodata: = maybe due to an issue of the GDAL CEOS driver.
......
......@@ -242,6 +242,7 @@ class L1B_object(L1A_object):
[setattr(self, key, value) for key, value in L1A_obj.__dict__.items()]
self.proc_level = 'L1B'
self.proc_status = 'initialized'
@property
def spatRef_available(self):
......
......@@ -48,6 +48,7 @@ class L1C_object(L1B_object):
self._lonlat_arr = None
self.proc_level = 'L1C'
self.proc_status = 'initialized'
@property
def lonlat_arr(self):
......
......@@ -15,3 +15,4 @@ class L2A_object(L1C_object):
[setattr(self, key, value) for key, value in L1C_obj.__dict__.items()]
self.proc_level = 'L2A'
self.proc_status = 'initialized'
......@@ -50,6 +50,7 @@ class L2B_object(L2A_object):
[setattr(self, key, value) for key, value in L2A_obj.__dict__.items()]
self.proc_level = 'L2B'
self.proc_status = 'initialized'
def spectral_homogenization(self):
"""Apply spectral homogenization, i.e., prediction of the spectral bands of the target sensor."""
......
......@@ -18,6 +18,7 @@ class L2C_object(L2B_object):
[setattr(self, key, value) for key, value in L2B_obj.__dict__.items()]
self.proc_level = 'L2C'
self.proc_status = 'initialized'
def calc_geometric_accurracy(self):
pass
......
......@@ -273,7 +273,7 @@ def increment_decrement_arrayCol_in_postgreSQLdb(conn_params, tablename, col2upd
idx_val2increment=None, cond_dict=None, timeout=15000):
# type: (str, str, str, int, int, dict, int) -> Union[None, str]
"""Updates an array column of a specific postgreSQL table in the form that it increments or decrements the elements
at a given position. HINT: The column must have values like that: [0,1,0,3,1,0]
at a given position. HINT: The column must have values like that: [52,0,27,10,8,0,0,0,0]
:param conn_params: <str> connection parameters as provided by CFG.conn_params
:param tablename: <str> name of the table within the database to be update
......
......@@ -18,7 +18,7 @@ dtype_lib_GDAL_Python = {"uint8": 1, "int8": 1, "uint16": 2, "int16": 3, "uint32
"float64": 7, "complex64": 10, "complex128": 11}
proc_chain = ['L1A', 'L1B', 'L1C', 'L2A', 'L2B', 'L2C']
db_jobs_statistics_def = {'downloaded': 1, 'started': 2, None: 2, 'L1A': 3, 'L1B': 4, 'L1C': 5, 'L2A': 6, 'L2B': 7,
'L2C': 8, 'FAILED': 9}
'L2C': 8, 'FAILED': 9} # NOTE: OrderedDicts passed to L1A_map have proc_level=None
def get_GMS_sensorcode(GMS_identifier):
......
......@@ -13,6 +13,7 @@ from ..model.gms_object import GMS_object # noqa F401 # flake8 issue
from ..model.gms_object import failed_GMS_object
from ..options.config import GMS_config as CFG
from ..misc import database_tools as DB_T
from ..misc.helper_functions import is_proc_level_lower
from .definition_dicts import db_jobs_statistics_def, proc_chain
__author__ = 'Daniel Scheffler'
......@@ -76,6 +77,9 @@ class ExceptionHandler(object):
self.GMS_mapper_name = GMS_mapper.__name__
self.GMS_objs = GMS_objs
if not GMS_objs:
raise ValueError('Unexpected argument for %s. Received %s.' % (self.GMS_mapper_name, GMS_objs))
# noinspection PyBroadException
try:
# GMS_mapper inputs CONTAIN NO failed_GMS_objects -> run the mapper normally
......@@ -154,13 +158,22 @@ class ExceptionHandler(object):
# update statistics column ONLY in case of full cube or first subsystem
DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
CFG.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.ID},
idx_val2decrement=db_jobs_statistics_def['started'] - 1,
idx_val2decrement=db_jobs_statistics_def['started'] - 1, # FIXME BUG?
idx_val2increment=db_jobs_statistics_def['started'])
def increment_progress(self):
"""update statistics column in jobs table of postgreSQL database"""
"""Update statistics column in jobs table of postgreSQL database.
NOTE: This function ONLY receives those GMS_objects that have been sucessfully processed by the GMS_mapper.
"""
# get a GMS object from which we get the new proc_level
GMS_obj = self.get_sample_GMS_obj(self.GMS_objs)
# TODO if another subsystem already suceeded -> decrement the higher proc level and increment failed
# validate proc_level
if GMS_obj.proc_level is None:
raise ValueError('Received GMS_object for %s %s without processing level after being processed by %s.'
% (GMS_obj.entity_ID, GMS_obj.subsystem, self.GMS_mapper_name))
# NOTE: in case GMS_obj represents a subsystem and another one has already been marked as FAILED the
# failed_sceneids column and the statistics column is NOT updated once more
......@@ -190,11 +203,16 @@ class ExceptionHandler(object):
idx_val2increment=db_jobs_statistics_def[GMS_obj.proc_level])
@staticmethod
def update_progress_failed(failed_Obj):
"""Update statistics column in jobs table of postgreSQL database."""
def update_progress_failed(failed_Obj, procL_failed=None):
"""Update statistics column in jobs table of postgreSQL database.
:param failed_Obj: instance of gms_object failed_GMS_object
:param procL_failed: processing level to be decremented. If not given, the one from failed_Obj is used.
:return:
"""
DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
CFG.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.ID},
idx_val2decrement=db_jobs_statistics_def[failed_Obj.proc_level],
idx_val2decrement=db_jobs_statistics_def[procL_failed or failed_Obj.proc_level],
idx_val2increment=db_jobs_statistics_def['FAILED'])
def handle_failed(self):
......@@ -213,16 +231,42 @@ class ExceptionHandler(object):
# add the scene ID to failed_sceneids column in jobs table of DB and update statistics column
# NOTE: in case failed_Obj represents a subsystem and another one has already been marked as FAILED the
# failed_sceneids column and the statistics column is NOT updated once more
res = DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'jobs', ['failed_sceneids'],
{'id': CFG.ID})
assert res, "Query delivered no result."
if res[0][0] is None or failed_Obj.scene_ID not in res[0][0]:
# if column is empty or scene ID is not in there
another_ss_failed = False
another_ss_succeeded = False
higher_procL = None
if failed_Obj.subsystem:
# check if another subsystem of the same scene ID has been marked as failed before
res = DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'jobs', ['failed_sceneids'], {'id': CFG.ID})
assert res, "Query delivered no result."
if res[0][0] is not None and failed_Obj.scene_ID in res[0][0]:
self.logger.debug("Found another failed subsystem of scene %s in the database.")
another_ss_failed = True
# check if another subsystem already reached a higher processing level
# NOTE: this fixes issue #50
# FIXME proc_status_all_GMSobjs is not available if other subsystems are processed by another
# FIXME multiprocessing worker or on another machine (cluster node)
procstatus_other_ss = {k: v for k, v in GMS_object.proc_status_all_GMSobjs[failed_Obj.scene_ID].items()
if k != failed_Obj.subsystem}
for ss, statusentry in procstatus_other_ss.items():
for procL in statusentry.keys():
if is_proc_level_lower(failed_Obj.proc_level, procL) and statusentry[procL] == 'finished':
higher_procL = procL
self.logger.debug("Found another subsystem that already reached a higher processing level.")
another_ss_succeeded = True
break
if not another_ss_failed: # applies also to full cubes
DB_T.append_item_to_arrayCol_in_postgreSQLdb(CFG.conn_database, 'jobs',
{'failed_sceneids': failed_Obj.scene_ID}, {'id': CFG.ID})
self.update_progress_failed(failed_Obj)
if not another_ss_succeeded:
self.update_progress_failed(failed_Obj)
else:
self.update_progress_failed(failed_Obj, procL_failed=higher_procL)
return failed_Obj
......
......@@ -3,6 +3,7 @@
import collections
import copy
import datetime
import functools
import glob
import json
import os
......@@ -11,13 +12,16 @@ import shutil
import sys
import warnings
import logging
from collections import OrderedDict
from itertools import chain
from typing import Iterable, List, Union # noqa F401 # flake8 issue
import numpy as np
import spectral
from spectral.io import envi
from numba import jit
from pandas import DataFrame, read_csv
from nested_dict import nested_dict
try:
from osgeo import gdalnumeric
......@@ -49,6 +53,10 @@ __author__ = 'Daniel Scheffler'
class GMS_object(Dataset):
# class attributes
# NOTE: these attributes can be modified and seen by ALL GMS_object instances
proc_status_all_GMSobjs = nested_dict()
def __init__(self, pathImage=''):
# get all attributes of base class "Dataset"
super(GMS_object, self).__init__()
......@@ -150,6 +158,22 @@ class GMS_object(Dataset):
# self.log += self.logger.captured_stream
self._logger = logger
@property
def proc_status(self):
# type: () -> str
"""
Get the processing status of the current GMS_object (subclass) instance for the current processing level.
Possible values: 'initialized', 'running', 'finished', 'failed'
"""
# NOTE: self.proc_status_all_GMSobjs is a class attribute (visible and modifyable from any other subsystem)
return self.proc_status_all_GMSobjs[self.scene_ID][self.subsystem][self.proc_level]
@proc_status.setter
def proc_status(self, new_status):
# type: (str) -> None
self.proc_status_all_GMSobjs[self.scene_ID][self.subsystem][self.proc_level] = new_status
@property
def GMS_identifier(self):
return collections.OrderedDict(zip(
......@@ -1384,9 +1408,66 @@ class failed_GMS_object(GMS_object):
self.ExceptionType = exc_type.__name__
self.ExceptionValue = repr(exc_val)
self.ExceptionTraceback = exc_tb
self.proc_status = 'failed'
@property
def pandasRecord(self):
columns = ['scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem', 'image_type', 'proc_level',
'arr_shape', 'arr_pos', 'failedMapper', 'ExceptionType', 'ExceptionValue', 'ExceptionTraceback']
return DataFrame([getattr(self, k) for k in columns], columns=columns)
def update_proc_status(GMS_mapper):
"""Decorator function for updating the processing status of each GMS_object (subclass) instance.
:param GMS_mapper: A GMS mapper function that takes a GMS object, does some processing and returns it back.
"""
@functools.wraps(GMS_mapper) # needed to avoid pickling errors
def wrapped_GMS_mapper(GMS_objs, **kwargs):
# type: (Union[List[GMS_object], GMS_object, OrderedDict, failed_GMS_object], dict) -> any
# noinspection PyBroadException
try:
# set processing status to 'running'
if isinstance(GMS_objs, OrderedDict):
GMS_objs['proc_status'] = 'running'
elif isinstance(GMS_objs, GMS_object):
GMS_objs.proc_status = 'running'
elif isinstance(GMS_objs, Iterable):
for GMS_obj in GMS_objs:
GMS_obj.proc_status = 'running'
elif isinstance(GMS_objs, failed_GMS_object):
assert GMS_objs.proc_status == 'failed'
return GMS_objs
else:
raise TypeError("Unexpected type of 'GMS_objs': %s" % type(GMS_objs))
# RUN the GMS_mapper
GMS_objs = GMS_mapper(GMS_objs, **kwargs)
# set processing status to 'finished'
if isinstance(GMS_objs, GMS_object):
GMS_objs.proc_status = 'finished'
elif isinstance(GMS_objs, Iterable):
for GMS_obj in GMS_objs:
GMS_obj.proc_status = 'finished'
else:
raise TypeError("Unexpected type of 'GMS_objs': %s" % type(GMS_objs))
except Exception:
# set processing status to 'running'
if isinstance(GMS_objs, OrderedDict):
GMS_objs['proc_status'] = 'failed'
elif isinstance(GMS_objs, GMS_object):
GMS_objs.proc_status = 'failed'
elif isinstance(GMS_objs, Iterable):
for GMS_obj in GMS_objs:
GMS_obj.proc_status = 'failed'
else:
raise TypeError("Unexpected type of 'GMS_objs': %s" % type(GMS_objs))
raise
return GMS_objs # type: Union[GMS_object, List[GMS_object]]
return wrapped_GMS_mapper
......@@ -605,7 +605,7 @@ class JobConfig(object):
for row in cur.fetchall():
ds = OrderedDict()
ds["proc_level"] = row["proc_level"]
ds["proc_level"] = row["proc_level"] if not self.is_test else None
ds["scene_ID"] = row["sceneid"]
ds["dataset_ID"] = row["datasetid"]
ds["image_type"] = row["image_type"]
......
......@@ -11,13 +11,14 @@ from ..algorithms import L1C_P
from ..algorithms import L2A_P
from ..algorithms import L2B_P
from ..algorithms import L2C_P
from ..model.gms_object import failed_GMS_object
from ..model.gms_object import failed_GMS_object, update_proc_status
from ..algorithms.geoprocessing import get_common_extent
__author__ = 'Daniel Scheffler'
@EXC_H.log_uncaught_exceptions
@update_proc_status
def L1A_map(dataset_dict): # map (scene-wise parallelization)
# type: (dict) -> L1A_P.L1A_object
......@@ -43,6 +44,7 @@ def L1A_map(dataset_dict): # map (scene-wise parallelization)
@EXC_H.log_uncaught_exceptions
@update_proc_status
def L1A_map_1(dataset_dict, block_size=None): # map (scene-wise parallelization)
# type: (dict) -> List[L1A_P.L1A_object]
......@@ -59,6 +61,7 @@ def L1A_map_1(dataset_dict, block_size=None): # map (scene-wise parallelization
@EXC_H.log_uncaught_exceptions
@update_proc_status
def L1A_map_2(L1A_tile): # map (block-wise parallelization)
# type: (L1A_P.L1A_object) -> L1A_P.L1A_object
L1A_tile.calc_TOARadRefTemp()
......@@ -68,6 +71,7 @@ def L1A_map_2(L1A_tile): # map (block-wise parallelization)
@EXC_H.log_uncaught_exceptions
@update_proc_status
def L1A_map_3(L1A_obj): # map (scene-wise parallelization)
# type: (L1A_P.L1A_object) -> L1A_P.L1A_object
L1A_obj.calc_corner_positions() # requires mask_nodata
......@@ -84,6 +88,7 @@ def L1A_map_3(L1A_obj): # map (scene-wise parallelization)
@EXC_H.log_uncaught_exceptions
@update_proc_status
def L1B_map(L1A_obj):
# type: (L1A_P.L1A_object) -> L1B_P.L1B_object
"""L1A_obj enthält in Python- (im Gegensatz zur inmem_serialization-) Implementierung KEINE ARRAY-DATEN!,
......@@ -99,6 +104,7 @@ def L1B_map(L1A_obj):
@EXC_H.log_uncaught_exceptions
@update_proc_status
def L1C_map(L1B_objs):
# type: (Iterable[L1B_P.L1B_object]) -> List[L1C_P.L1C_object]
"""Atmospheric correction.
......@@ -135,6 +141,7 @@ def L1C_map(L1B_objs):
@EXC_H.log_uncaught_exceptions
@update_proc_status
def L2A_map(L1C_objs, block_size=None, return_tiles=True):
# type: (Union[List[L1C_P.L1C_object], Tuple[L1C_P.L1C_object]]) -> Union[List[L2A_P.L2A_object], L2A_P.L2A_object]
"""Geometric homogenization.
......@@ -179,6 +186,7 @@ def L2A_map(L1C_objs, block_size=None, return_tiles=True):
@EXC_H.log_uncaught_exceptions
@update_proc_status
def L2B_map(L2A_obj):
# type: (L2A_P.L2A_object) -> L2B_P.L2B_object
L2B_obj = L2B_P.L2B_object(L2A_obj)
......@@ -191,6 +199,7 @@ def L2B_map(L2A_obj):
@EXC_H.log_uncaught_exceptions
@update_proc_status
def L2C_map(L2B_obj):
# type: (L2B_P.L2B_object) -> L2C_P.L2C_object
L2C_obj = L2C_P.L2C_object(L2B_obj)
......
......@@ -10,7 +10,7 @@ import time
from itertools import chain
import signal
import re
from typing import TYPE_CHECKING, List
from typing import TYPE_CHECKING
from ..io import output_writer as OUT_W
from ..io import input_reader as INP_R
......@@ -20,10 +20,10 @@ from ..misc.path_generator import path_generator
from ..misc.logging import GMS_logger, shutdown_loggers
from ..algorithms import L1A_P, L1B_P, L1C_P, L2A_P, L2B_P, L2C_P
from ..model.metadata import get_LayerBandsAssignment
from ..model.gms_object import failed_GMS_object
from ..model.gms_object import failed_GMS_object, GMS_object
from .pipeline import (L1A_map, L1A_map_1, L1A_map_2, L1A_map_3, L1B_map, L1C_map,
L2A_map, L2B_map, L2C_map)
from ..options.config import set_config, GMS_config
from ..options.config import set_config
from .multiproc import MAP
from ..misc.definition_dicts import proc_chain, db_jobs_statistics_def
......@@ -31,6 +31,8 @@ from py_tools_ds.numeric.array import get_array_tilebounds
if TYPE_CHECKING:
from collections import OrderedDict # noqa F401 # flake8 issue
from typing import List # noqa F401 # flake8 issue
from ..options.config import GMS_config # noqa F401 # flake8 issue
__author__ = 'Daniel Scheffler'
......@@ -50,8 +52,7 @@ class process_controller(object):
# set GMS configuration
config_kwargs.update(dict(reset_status=True))
set_config(job_ID, **config_kwargs)
self.config = GMS_config # type: GMS_config
self.config = set_config(job_ID, **config_kwargs) # type: GMS_config
# defaults
self._logger = None
......@@ -125,7 +126,7 @@ class process_controller(object):
# TODO revise this function
# query the database and get the last written processing level and LayerBandsAssignment
DB_match = DB_T.get_info_from_postgreSQLdb(
GMS_config.conn_database, 'scenes_proc', ['proc_level', 'layer_bands_assignment'],
self.config.conn_database, 'scenes_proc', ['proc_level', 'layer_bands_assignment'],
dict(sceneid=dataset['scene_ID']))
# get the corresponding logfile
......@@ -419,6 +420,7 @@ class process_controller(object):
% self.config.ID)
self.DB_job_record.reset_job_progress() # updates attributes of DB_job_record and related DB entry
self.config.status = 'running'
GMS_object.proc_status_all_GMSobjs.clear() # reset
self.update_DB_job_record() # TODO implement that into config.status.setter
self.failed_objects = []
......
......@@ -9,12 +9,13 @@ Tests for gms_preprocessing.misc.exception_handler.ExceptionHandler
"""
import unittest
from collections import OrderedDict # noqa F401 # flake8 issue
from gms_preprocessing import process_controller
from gms_preprocessing.misc.exception_handler import log_uncaught_exceptions
from gms_preprocessing.algorithms.L1A_P import L1A_object
from gms_preprocessing.misc.database_tools import get_info_from_postgreSQLdb
from gms_preprocessing.model.gms_object import failed_GMS_object
from gms_preprocessing.model.gms_object import failed_GMS_object, GMS_object, update_proc_status
from . import db_host
......@@ -33,6 +34,7 @@ class BaseTest_ExceptionHandler:
self.PC.DB_job_record.reset_job_progress()
[ds.update({'proc_level': None}) for ds in self.PC.config.data_list]
GMS_object.proc_status_all_GMSobjs.clear() # reset
self.PC.config.status = 'running'
self.PC.update_DB_job_record() # TODO implement that into job.status.setter
......@@ -42,19 +44,34 @@ class BaseTest_ExceptionHandler:
@staticmethod
@log_uncaught_exceptions
@update_proc_status
def dummy_L1A_mapper_success(dummy_GMSobj):
# type: (OrderedDict) -> L1A_object
return L1A_object(**dummy_GMSobj)
@staticmethod
@log_uncaught_exceptions
@update_proc_status
def dummy_L1A_mapper_fail(dummy_GMSobj):
if True:
raise RuntimeError('TestException raised by dummy_gms_mapper_fail()')
return dummy_GMSobj
@staticmethod
@log_uncaught_exceptions
@update_proc_status
def dummy_mapper_multiple_inputs(list_dummy_GMSobjs):
list_dummy_GMSobjs[0].proc_level = 'changed'
return list_dummy_GMSobjs
@staticmethod
@log_uncaught_exceptions
@update_proc_status
def dummy_gms_mapper_fail(dummy_GMSobj):
raise RuntimeError('TestException raised by dummy_gms_mapper_fail()')
# type: (GMS_object) -> GMS_object
if True:
raise RuntimeError('TestException raised by dummy_gms_mapper_fail()')
return dummy_GMSobj
def get_current_progress_stats(self):
return get_info_from_postgreSQLdb(self.PC.config.conn_database, 'jobs', 'statistics',
......@@ -98,10 +115,14 @@ class Test_ExceptionHandler_Subsystems(BaseTest_ExceptionHandler.Test_ExceptionH
def setUp(self):
super().get_process_controller(26186268) # Sentinel-2
def test_L1A_mapper_success_progress_stats(self):
def test_L1A_mapper_success(self):
"""Check correctness of progress stats if all subsystems succeed."""
for subObj in self.PC.config.data_list:
self.dummy_L1A_mapper_success(subObj)
for subDs in self.PC.config.data_list:
outObj = self.dummy_L1A_mapper_success(subDs)
# validate output type and processing status
self.assertIsInstance(outObj, L1A_object)
self.assertEqual(outObj.proc_status, 'finished')
# validate that stats are only updated by first subsystem and keep the same value
self.assertEqual(self.get_current_progress_stats(), [0, 0, 1, 0, 0, 0, 0, 0, 0])
......@@ -109,11 +130,15 @@ class Test_ExceptionHandler_Subsystems(BaseTest_ExceptionHandler.Test_ExceptionH
# validate that stats only show ONE scene instead of the number of subsystems
self.assertEqual(self.get_current_progress_stats(), [0, 0, 1, 0, 0, 0, 0, 0, 0])
def test_gms_mapper_fail_firstSS_progress_stats(self):
def test_gms_mapper_fail_firstSS(self):
"""Check correctness of progress stats if the first subsystem first fails and another one succeeds."""
for i, subObj in enumerate(self.PC.config.data_list):
if subObj['subsystem'] == 'S2A10':
outObj = self.dummy_gms_mapper_fail(subObj)
for i, subDs in enumerate(self.PC.config.data_list):
if subDs['subsystem'] == 'S2A10':
outObj = self.dummy_gms_mapper_fail(subDs)
# validate output type and processing status
self.assertIsInstance(outObj, GMS_object)
self.assertEqual(outObj.proc_status, 'failed')
# check that the scene ID of the failed subsystem has been added to failed_sceneids db column
self.assertTrue(self.is_sceneid_in_failedIDs(outObj.scene_ID))
......@@ -121,29 +146,24 @@ class Test_ExceptionHandler_Subsystems(BaseTest_ExceptionHandler.Test_ExceptionH
# check that the scene has been marked as failed in progress stats
self.assertEqual(self.get_current_progress_stats(), [0, 0, 0, 0, 0, 0, 0, 0, 1])
else:
self.dummy_L1A_mapper_success(subObj)
self.dummy_L1A_mapper_success(subDs)
# check that the scene keeps marked as failed
self.assertEqual(self.get_current_progress_stats(), [0, 0, 0, 0, 0, 0, 0, 0, 1])
@unittest.SkipTest
def test_gms_mapper_fail_secondSS_progress_stats(self):
"""Check correctness of progress stats if a first subsystem succeeds and then another one fails.
NOTE: This happens quite rarely because if a mapper fails for a subsystem, it usualy fails for the first
subsystem it receives."""
# FIXME: This test currently fails because earlier subsystem updates stats from
# FIXME: [0, 1, 0, 0, 0, 0, 0, 0, 0] to [0, 0, 1, 0, 0, 0, 0, 0, 0] and later one from
# FIXME: [0, 0, 1, 0, 0, 0, 0, 0, 0] to [0, -1, 1, 0, 0, 0, 0, 0, 1]
# FIXME: This is known bug (issue # 50).
for i, subObj in enumerate(self.PC.config.data_list):
if subObj['subsystem'] == 'S2A10':
self.dummy_L1A_mapper_success(subObj)
# progress stats must be incremented
self.assertEqual(self.get_current_progress_stats(), [0, 0, 1, 0, 0, 0, 0, 0, 0])
for i, subDs in enumerate(self.PC.config.data_list):
if subDs['subsystem'] == 'S2A10':
self.dummy_L1A_mapper_success(subDs)
# progress stats must be incremented [0, 1, 0, 0, 0, 0, 0, 0, 0] (downloaded) to L1A
self.assertEqual(self.get_current_progress_stats(), [0, 0, 1, 0, 0, 0, 0, 0, 0]) # stats at L1A
else:
outObj = self.dummy_gms_mapper_fail(subObj)
outObj = self.dummy_gms_mapper_fail(subDs)
# scene must be added to failed_scenes column
self.assertTrue(self.is_sceneid_in_failedIDs(outObj.scene_ID))
......