Commit cd572222 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Fixed issue #50 (Invalid job progress statistics in case a subsystem fails...

Fixed issue #50 (Invalid job progress statistics in case a subsystem fails after another one of the same scene ID already succeeded in the same mapper) by adding class attribute GMS_object.proc_status_all_GMSobjs and GMS_object.proc_status() property. Added gms_object.update_proc_status() decorator.

Revised exception handler. Improved test_exception_handler module.

Fixed a severe bug that copied the same dataset list to all subsequent process controllers.
parent 2ab4a564
Pipeline #1854 failed with stage
in 13 minutes and 23 seconds
...@@ -34,7 +34,7 @@ class L1A_object(GMS_object): ...@@ -34,7 +34,7 @@ class L1A_object(GMS_object):
"""Features input reader and raster-/metadata homogenization.""" """Features input reader and raster-/metadata homogenization."""
def __init__(self, image_type='', satellite='', sensor='', subsystem='', sensormode='', acq_datetime=None, def __init__(self, image_type='', satellite='', sensor='', subsystem='', sensormode='', acq_datetime=None,
entity_ID='', scene_ID=-9999, filename='', dataset_ID=-9999, **kwargs): entity_ID='', scene_ID=-9999, filename='', dataset_ID=-9999, proc_status='', **kwargs):
""":param : instance of gms_object.GMS_object or None """:param : instance of gms_object.GMS_object or None
""" """
# TODO docstring # TODO docstring
...@@ -65,6 +65,12 @@ class L1A_object(GMS_object): ...@@ -65,6 +65,12 @@ class L1A_object(GMS_object):
% (self.satellite, self.sensor, % (self.satellite, self.sensor,
(' ' + self.subsystem) if self.subsystem not in [None, ''] else '', self.entity_ID)) (' ' + self.subsystem) if self.subsystem not in [None, ''] else '', self.entity_ID))
# (re)set the processing status
if self.scene_ID in self.proc_status_all_GMSobjs:
del self.proc_status_all_GMSobjs[self.scene_ID]
self.proc_status = proc_status or 'initialized' # if proc_status = 'running' is given by L1A_map
def import_rasterdata(self): def import_rasterdata(self):
if re.search("ALOS", self.satellite, re.I): if re.search("ALOS", self.satellite, re.I):
'''First 22 lines are nodata: = maybe due to an issue of the GDAL CEOS driver. '''First 22 lines are nodata: = maybe due to an issue of the GDAL CEOS driver.
......
...@@ -242,6 +242,7 @@ class L1B_object(L1A_object): ...@@ -242,6 +242,7 @@ class L1B_object(L1A_object):
[setattr(self, key, value) for key, value in L1A_obj.__dict__.items()] [setattr(self, key, value) for key, value in L1A_obj.__dict__.items()]
self.proc_level = 'L1B' self.proc_level = 'L1B'
self.proc_status = 'initialized'
@property @property
def spatRef_available(self): def spatRef_available(self):
......
...@@ -48,6 +48,7 @@ class L1C_object(L1B_object): ...@@ -48,6 +48,7 @@ class L1C_object(L1B_object):
self._lonlat_arr = None self._lonlat_arr = None
self.proc_level = 'L1C' self.proc_level = 'L1C'
self.proc_status = 'initialized'
@property @property
def lonlat_arr(self): def lonlat_arr(self):
......
...@@ -15,3 +15,4 @@ class L2A_object(L1C_object): ...@@ -15,3 +15,4 @@ class L2A_object(L1C_object):
[setattr(self, key, value) for key, value in L1C_obj.__dict__.items()] [setattr(self, key, value) for key, value in L1C_obj.__dict__.items()]
self.proc_level = 'L2A' self.proc_level = 'L2A'
self.proc_status = 'initialized'
...@@ -50,6 +50,7 @@ class L2B_object(L2A_object): ...@@ -50,6 +50,7 @@ class L2B_object(L2A_object):
[setattr(self, key, value) for key, value in L2A_obj.__dict__.items()] [setattr(self, key, value) for key, value in L2A_obj.__dict__.items()]
self.proc_level = 'L2B' self.proc_level = 'L2B'
self.proc_status = 'initialized'
def spectral_homogenization(self): def spectral_homogenization(self):
"""Apply spectral homogenization, i.e., prediction of the spectral bands of the target sensor.""" """Apply spectral homogenization, i.e., prediction of the spectral bands of the target sensor."""
......
...@@ -18,6 +18,7 @@ class L2C_object(L2B_object): ...@@ -18,6 +18,7 @@ class L2C_object(L2B_object):
[setattr(self, key, value) for key, value in L2B_obj.__dict__.items()] [setattr(self, key, value) for key, value in L2B_obj.__dict__.items()]
self.proc_level = 'L2C' self.proc_level = 'L2C'
self.proc_status = 'initialized'
def calc_geometric_accurracy(self): def calc_geometric_accurracy(self):
pass pass
......
...@@ -273,7 +273,7 @@ def increment_decrement_arrayCol_in_postgreSQLdb(conn_params, tablename, col2upd ...@@ -273,7 +273,7 @@ def increment_decrement_arrayCol_in_postgreSQLdb(conn_params, tablename, col2upd
idx_val2increment=None, cond_dict=None, timeout=15000): idx_val2increment=None, cond_dict=None, timeout=15000):
# type: (str, str, str, int, int, dict, int) -> Union[None, str] # type: (str, str, str, int, int, dict, int) -> Union[None, str]
"""Updates an array column of a specific postgreSQL table in the form that it increments or decrements the elements """Updates an array column of a specific postgreSQL table in the form that it increments or decrements the elements
at a given position. HINT: The column must have values like that: [0,1,0,3,1,0] at a given position. HINT: The column must have values like that: [52,0,27,10,8,0,0,0,0]
:param conn_params: <str> connection parameters as provided by CFG.conn_params :param conn_params: <str> connection parameters as provided by CFG.conn_params
:param tablename: <str> name of the table within the database to be update :param tablename: <str> name of the table within the database to be update
......
...@@ -18,7 +18,7 @@ dtype_lib_GDAL_Python = {"uint8": 1, "int8": 1, "uint16": 2, "int16": 3, "uint32 ...@@ -18,7 +18,7 @@ dtype_lib_GDAL_Python = {"uint8": 1, "int8": 1, "uint16": 2, "int16": 3, "uint32
"float64": 7, "complex64": 10, "complex128": 11} "float64": 7, "complex64": 10, "complex128": 11}
proc_chain = ['L1A', 'L1B', 'L1C', 'L2A', 'L2B', 'L2C'] proc_chain = ['L1A', 'L1B', 'L1C', 'L2A', 'L2B', 'L2C']
db_jobs_statistics_def = {'downloaded': 1, 'started': 2, None: 2, 'L1A': 3, 'L1B': 4, 'L1C': 5, 'L2A': 6, 'L2B': 7, db_jobs_statistics_def = {'downloaded': 1, 'started': 2, None: 2, 'L1A': 3, 'L1B': 4, 'L1C': 5, 'L2A': 6, 'L2B': 7,
'L2C': 8, 'FAILED': 9} 'L2C': 8, 'FAILED': 9} # NOTE: OrderedDicts passed to L1A_map have proc_level=None
def get_GMS_sensorcode(GMS_identifier): def get_GMS_sensorcode(GMS_identifier):
......
...@@ -13,6 +13,7 @@ from ..model.gms_object import GMS_object # noqa F401 # flake8 issue ...@@ -13,6 +13,7 @@ from ..model.gms_object import GMS_object # noqa F401 # flake8 issue
from ..model.gms_object import failed_GMS_object from ..model.gms_object import failed_GMS_object
from ..options.config import GMS_config as CFG from ..options.config import GMS_config as CFG
from ..misc import database_tools as DB_T from ..misc import database_tools as DB_T
from ..misc.helper_functions import is_proc_level_lower
from .definition_dicts import db_jobs_statistics_def, proc_chain from .definition_dicts import db_jobs_statistics_def, proc_chain
__author__ = 'Daniel Scheffler' __author__ = 'Daniel Scheffler'
...@@ -76,6 +77,9 @@ class ExceptionHandler(object): ...@@ -76,6 +77,9 @@ class ExceptionHandler(object):
self.GMS_mapper_name = GMS_mapper.__name__ self.GMS_mapper_name = GMS_mapper.__name__
self.GMS_objs = GMS_objs self.GMS_objs = GMS_objs
if not GMS_objs:
raise ValueError('Unexpected argument for %s. Received %s.' % (self.GMS_mapper_name, GMS_objs))
# noinspection PyBroadException # noinspection PyBroadException
try: try:
# GMS_mapper inputs CONTAIN NO failed_GMS_objects -> run the mapper normally # GMS_mapper inputs CONTAIN NO failed_GMS_objects -> run the mapper normally
...@@ -154,13 +158,22 @@ class ExceptionHandler(object): ...@@ -154,13 +158,22 @@ class ExceptionHandler(object):
# update statistics column ONLY in case of full cube or first subsystem # update statistics column ONLY in case of full cube or first subsystem
DB_T.increment_decrement_arrayCol_in_postgreSQLdb( DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
CFG.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.ID}, CFG.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.ID},
idx_val2decrement=db_jobs_statistics_def['started'] - 1, idx_val2decrement=db_jobs_statistics_def['started'] - 1, # FIXME BUG?
idx_val2increment=db_jobs_statistics_def['started']) idx_val2increment=db_jobs_statistics_def['started'])
def increment_progress(self): def increment_progress(self):
"""update statistics column in jobs table of postgreSQL database""" """Update statistics column in jobs table of postgreSQL database.
NOTE: This function ONLY receives those GMS_objects that have been sucessfully processed by the GMS_mapper.
"""
# get a GMS object from which we get the new proc_level # get a GMS object from which we get the new proc_level
GMS_obj = self.get_sample_GMS_obj(self.GMS_objs) GMS_obj = self.get_sample_GMS_obj(self.GMS_objs)
# TODO if another subsystem already suceeded -> decrement the higher proc level and increment failed
# validate proc_level
if GMS_obj.proc_level is None:
raise ValueError('Received GMS_object for %s %s without processing level after being processed by %s.'
% (GMS_obj.entity_ID, GMS_obj.subsystem, self.GMS_mapper_name))
# NOTE: in case GMS_obj represents a subsystem and another one has already been marked as FAILED the # NOTE: in case GMS_obj represents a subsystem and another one has already been marked as FAILED the
# failed_sceneids column and the statistics column is NOT updated once more # failed_sceneids column and the statistics column is NOT updated once more
...@@ -190,11 +203,16 @@ class ExceptionHandler(object): ...@@ -190,11 +203,16 @@ class ExceptionHandler(object):
idx_val2increment=db_jobs_statistics_def[GMS_obj.proc_level]) idx_val2increment=db_jobs_statistics_def[GMS_obj.proc_level])
@staticmethod @staticmethod
def update_progress_failed(failed_Obj): def update_progress_failed(failed_Obj, procL_failed=None):
"""Update statistics column in jobs table of postgreSQL database.""" """Update statistics column in jobs table of postgreSQL database.
:param failed_Obj: instance of gms_object failed_GMS_object
:param procL_failed: processing level to be decremented. If not given, the one from failed_Obj is used.
:return:
"""
DB_T.increment_decrement_arrayCol_in_postgreSQLdb( DB_T.increment_decrement_arrayCol_in_postgreSQLdb(
CFG.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.ID}, CFG.conn_database, 'jobs', 'statistics', cond_dict={'id': CFG.ID},
idx_val2decrement=db_jobs_statistics_def[failed_Obj.proc_level], idx_val2decrement=db_jobs_statistics_def[procL_failed or failed_Obj.proc_level],
idx_val2increment=db_jobs_statistics_def['FAILED']) idx_val2increment=db_jobs_statistics_def['FAILED'])
def handle_failed(self): def handle_failed(self):
...@@ -213,16 +231,42 @@ class ExceptionHandler(object): ...@@ -213,16 +231,42 @@ class ExceptionHandler(object):
# add the scene ID to failed_sceneids column in jobs table of DB and update statistics column # add the scene ID to failed_sceneids column in jobs table of DB and update statistics column
# NOTE: in case failed_Obj represents a subsystem and another one has already been marked as FAILED the # NOTE: in case failed_Obj represents a subsystem and another one has already been marked as FAILED the
# failed_sceneids column and the statistics column is NOT updated once more # failed_sceneids column and the statistics column is NOT updated once more
res = DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'jobs', ['failed_sceneids'],
{'id': CFG.ID})
assert res, "Query delivered no result."
if res[0][0] is None or failed_Obj.scene_ID not in res[0][0]: another_ss_failed = False
# if column is empty or scene ID is not in there another_ss_succeeded = False
higher_procL = None
if failed_Obj.subsystem:
# check if another subsystem of the same scene ID has been marked as failed before
res = DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'jobs', ['failed_sceneids'], {'id': CFG.ID})
assert res, "Query delivered no result."
if res[0][0] is not None and failed_Obj.scene_ID in res[0][0]:
self.logger.debug("Found another failed subsystem of scene %s in the database.")
another_ss_failed = True
# check if another subsystem already reached a higher processing level
# NOTE: this fixes issue #50
# FIXME proc_status_all_GMSobjs is not available if other subsystems are processed by another
# FIXME multiprocessing worker or on another machine (cluster node)
procstatus_other_ss = {k: v for k, v in GMS_object.proc_status_all_GMSobjs[failed_Obj.scene_ID].items()
if k != failed_Obj.subsystem}
for ss, statusentry in procstatus_other_ss.items():
for procL in statusentry.keys():
if is_proc_level_lower(failed_Obj.proc_level, procL) and statusentry[procL] == 'finished':
higher_procL = procL
self.logger.debug("Found another subsystem that already reached a higher processing level.")
another_ss_succeeded = True
break
if not another_ss_failed: # applies also to full cubes
DB_T.append_item_to_arrayCol_in_postgreSQLdb(CFG.conn_database, 'jobs', DB_T.append_item_to_arrayCol_in_postgreSQLdb(CFG.conn_database, 'jobs',
{'failed_sceneids': failed_Obj.scene_ID}, {'id': CFG.ID}) {'failed_sceneids': failed_Obj.scene_ID}, {'id': CFG.ID})
self.update_progress_failed(failed_Obj) if not another_ss_succeeded:
self.update_progress_failed(failed_Obj)
else:
self.update_progress_failed(failed_Obj, procL_failed=higher_procL)
return failed_Obj return failed_Obj
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
import collections import collections
import copy import copy
import datetime import datetime
import functools
import glob import glob
import json import json
import os import os
...@@ -11,13 +12,16 @@ import shutil ...@@ -11,13 +12,16 @@ import shutil
import sys import sys
import warnings import warnings
import logging import logging
from collections import OrderedDict
from itertools import chain from itertools import chain
from typing import Iterable, List, Union # noqa F401 # flake8 issue
import numpy as np import numpy as np
import spectral import spectral
from spectral.io import envi from spectral.io import envi
from numba import jit from numba import jit
from pandas import DataFrame, read_csv from pandas import DataFrame, read_csv
from nested_dict import nested_dict
try: try:
from osgeo import gdalnumeric from osgeo import gdalnumeric
...@@ -49,6 +53,10 @@ __author__ = 'Daniel Scheffler' ...@@ -49,6 +53,10 @@ __author__ = 'Daniel Scheffler'
class GMS_object(Dataset): class GMS_object(Dataset):
# class attributes
# NOTE: these attributes can be modified and seen by ALL GMS_object instances
proc_status_all_GMSobjs = nested_dict()
def __init__(self, pathImage=''): def __init__(self, pathImage=''):
# get all attributes of base class "Dataset" # get all attributes of base class "Dataset"
super(GMS_object, self).__init__() super(GMS_object, self).__init__()
...@@ -150,6 +158,22 @@ class GMS_object(Dataset): ...@@ -150,6 +158,22 @@ class GMS_object(Dataset):
# self.log += self.logger.captured_stream # self.log += self.logger.captured_stream
self._logger = logger self._logger = logger
@property
def proc_status(self):
# type: () -> str
"""
Get the processing status of the current GMS_object (subclass) instance for the current processing level.
Possible values: 'initialized', 'running', 'finished', 'failed'
"""
# NOTE: self.proc_status_all_GMSobjs is a class attribute (visible and modifyable from any other subsystem)
return self.proc_status_all_GMSobjs[self.scene_ID][self.subsystem][self.proc_level]
@proc_status.setter
def proc_status(self, new_status):
# type: (str) -> None
self.proc_status_all_GMSobjs[self.scene_ID][self.subsystem][self.proc_level] = new_status
@property @property
def GMS_identifier(self): def GMS_identifier(self):
return collections.OrderedDict(zip( return collections.OrderedDict(zip(
...@@ -1384,9 +1408,66 @@ class failed_GMS_object(GMS_object): ...@@ -1384,9 +1408,66 @@ class failed_GMS_object(GMS_object):
self.ExceptionType = exc_type.__name__ self.ExceptionType = exc_type.__name__
self.ExceptionValue = repr(exc_val) self.ExceptionValue = repr(exc_val)
self.ExceptionTraceback = exc_tb self.ExceptionTraceback = exc_tb
self.proc_status = 'failed'
@property @property
def pandasRecord(self): def pandasRecord(self):
columns = ['scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem', 'image_type', 'proc_level', columns = ['scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem', 'image_type', 'proc_level',
'arr_shape', 'arr_pos', 'failedMapper', 'ExceptionType', 'ExceptionValue', 'ExceptionTraceback'] 'arr_shape', 'arr_pos', 'failedMapper', 'ExceptionType', 'ExceptionValue', 'ExceptionTraceback']
return DataFrame([getattr(self, k) for k in columns], columns=columns) return DataFrame([getattr(self, k) for k in columns], columns=columns)
def update_proc_status(GMS_mapper):
"""Decorator function for updating the processing status of each GMS_object (subclass) instance.
:param GMS_mapper: A GMS mapper function that takes a GMS object, does some processing and returns it back.
"""
@functools.wraps(GMS_mapper) # needed to avoid pickling errors
def wrapped_GMS_mapper(GMS_objs, **kwargs):
# type: (Union[List[GMS_object], GMS_object, OrderedDict, failed_GMS_object], dict) -> any
# noinspection PyBroadException
try:
# set processing status to 'running'
if isinstance(GMS_objs, OrderedDict):
GMS_objs['proc_status'] = 'running'
elif isinstance(GMS_objs, GMS_object):
GMS_objs.proc_status = 'running'
elif isinstance(GMS_objs, Iterable):
for GMS_obj in GMS_objs:
GMS_obj.proc_status = 'running'
elif isinstance(GMS_objs, failed_GMS_object):
assert GMS_objs.proc_status == 'failed'
return GMS_objs
else:
raise TypeError("Unexpected type of 'GMS_objs': %s" % type(GMS_objs))
# RUN the GMS_mapper
GMS_objs = GMS_mapper(GMS_objs, **kwargs)
# set processing status to 'finished'
if isinstance(GMS_objs, GMS_object):
GMS_objs.proc_status = 'finished'
elif isinstance(GMS_objs, Iterable):
for GMS_obj in GMS_objs:
GMS_obj.proc_status = 'finished'
else:
raise TypeError("Unexpected type of 'GMS_objs': %s" % type(GMS_objs))
except Exception:
# set processing status to 'running'
if isinstance(GMS_objs, OrderedDict):
GMS_objs['proc_status'] = 'failed'
elif isinstance(GMS_objs, GMS_object):
GMS_objs.proc_status = 'failed'
elif isinstance(GMS_objs, Iterable):
for GMS_obj in GMS_objs:
GMS_obj.proc_status = 'failed'
else:
raise TypeError("Unexpected type of 'GMS_objs': %s" % type(GMS_objs))
raise
return GMS_objs # type: Union[GMS_object, List[GMS_object]]
return wrapped_GMS_mapper
...@@ -605,7 +605,7 @@ class JobConfig(object): ...@@ -605,7 +605,7 @@ class JobConfig(object):
for row in cur.fetchall(): for row in cur.fetchall():
ds = OrderedDict() ds = OrderedDict()
ds["proc_level"] = row["proc_level"] ds["proc_level"] = row["proc_level"] if not self.is_test else None
ds["scene_ID"] = row["sceneid"] ds["scene_ID"] = row["sceneid"]
ds["dataset_ID"] = row["datasetid"] ds["dataset_ID"] = row["datasetid"]
ds["image_type"] = row["image_type"] ds["image_type"] = row["image_type"]
......
...@@ -11,13 +11,14 @@ from ..algorithms import L1C_P ...@@ -11,13 +11,14 @@ from ..algorithms import L1C_P
from ..algorithms import L2A_P from ..algorithms import L2A_P
from ..algorithms import L2B_P from ..algorithms import L2B_P
from ..algorithms import L2C_P from ..algorithms import L2C_P
from ..model.gms_object import failed_GMS_object from ..model.gms_object import failed_GMS_object, update_proc_status
from ..algorithms.geoprocessing import get_common_extent from ..algorithms.geoprocessing import get_common_extent
__author__ = 'Daniel Scheffler' __author__ = 'Daniel Scheffler'
@EXC_H.log_uncaught_exceptions @EXC_H.log_uncaught_exceptions
@update_proc_status
def L1A_map(dataset_dict): # map (scene-wise parallelization) def L1A_map(dataset_dict): # map (scene-wise parallelization)
# type: (dict) -> L1A_P.L1A_object # type: (dict) -> L1A_P.L1A_object
...@@ -43,6 +44,7 @@ def L1A_map(dataset_dict): # map (scene-wise parallelization) ...@@ -43,6 +44,7 @@ def L1A_map(dataset_dict): # map (scene-wise parallelization)
@EXC_H.log_uncaught_exceptions @EXC_H.log_uncaught_exceptions
@update_proc_status
def L1A_map_1(dataset_dict, block_size=None): # map (scene-wise parallelization) def L1A_map_1(dataset_dict, block_size=None): # map (scene-wise parallelization)
# type: (dict) -> List[L1A_P.L1A_object] # type: (dict) -> List[L1A_P.L1A_object]
...@@ -59,6 +61,7 @@ def L1A_map_1(dataset_dict, block_size=None): # map (scene-wise parallelization ...@@ -59,6 +61,7 @@ def L1A_map_1(dataset_dict, block_size=None): # map (scene-wise parallelization
@EXC_H.log_uncaught_exceptions @EXC_H.log_uncaught_exceptions
@update_proc_status
def L1A_map_2(L1A_tile): # map (block-wise parallelization) def L1A_map_2(L1A_tile): # map (block-wise parallelization)
# type: (L1A_P.L1A_object) -> L1A_P.L1A_object # type: (L1A_P.L1A_object) -> L1A_P.L1A_object
L1A_tile.calc_TOARadRefTemp() L1A_tile.calc_TOARadRefTemp()
...@@ -68,6 +71,7 @@ def L1A_map_2(L1A_tile): # map (block-wise parallelization) ...@@ -68,6 +71,7 @@ def L1A_map_2(L1A_tile): # map (block-wise parallelization)
@EXC_H.log_uncaught_exceptions @EXC_H.log_uncaught_exceptions
@update_proc_status
def L1A_map_3(L1A_obj): # map (scene-wise parallelization) def L1A_map_3(L1A_obj): # map (scene-wise parallelization)
# type: (L1A_P.L1A_object) -> L1A_P.L1A_object # type: (L1A_P.L1A_object) -> L1A_P.L1A_object
L1A_obj.calc_corner_positions() # requires mask_nodata L1A_obj.calc_corner_positions() # requires mask_nodata
...@@ -84,6 +88,7 @@ def L1A_map_3(L1A_obj): # map (scene-wise parallelization) ...@@ -84,6 +88,7 @@ def L1A_map_3(L1A_obj): # map (scene-wise parallelization)
@EXC_H.log_uncaught_exceptions @EXC_H.log_uncaught_exceptions
@update_proc_status
def L1B_map(L1A_obj): def L1B_map(L1A_obj):
# type: (L1A_P.L1A_object) -> L1B_P.L1B_object # type: (L1A_P.L1A_object) -> L1B_P.L1B_object
"""L1A_obj enthält in Python- (im Gegensatz zur inmem_serialization-) Implementierung KEINE ARRAY-DATEN!, """L1A_obj enthält in Python- (im Gegensatz zur inmem_serialization-) Implementierung KEINE ARRAY-DATEN!,
...@@ -99,6 +104,7 @@ def L1B_map(L1A_obj): ...@@ -99,6 +104,7 @@ def L1B_map(L1A_obj):
@EXC_H.log_uncaught_exceptions @EXC_H.log_uncaught_exceptions
@update_proc_status
def L1C_map(L1B_objs): def L1C_map(L1B_objs):
# type: (Iterable[L1B_P.L1B_object]) -> List[L1C_P.L1C_object] # type: (Iterable[L1B_P.L1B_object]) -> List[L1C_P.L1C_object]
"""Atmospheric correction. """Atmospheric correction.
...@@ -135,6 +141,7 @@ def L1C_map(L1B_objs): ...@@ -135,6 +141,7 @@ def L1C_map(L1B_objs):
@EXC_H.log_uncaught_exceptions @EXC_H.log_uncaught_exceptions
@update_proc_status
def L2A_map(L1C_objs, block_size=None, return_tiles=True): def L2A_map(L1C_objs, block_size=None, return_tiles=True):
# type: (Union[List[L1C_P.L1C_object], Tuple[L1C_P.L1C_object]]) -> Union[List[L2A_P.L2A_object], L2A_P.L2A_object] # type: (Union[List[L1C_P.L1C_object], Tuple[L1C_P.L1C_object]]) -> Union[List[L2A_P.L2A_object], L2A_P.L2A_object]
"""Geometric homogenization. """Geometric homogenization.
...@@ -179,6 +186,7 @@ def L2A_map(L1C_objs, block_size=None, return_tiles=True): ...@@ -179,6 +186,7 @@ def L2A_map(L1C_objs, block_size=None, return_tiles=True):
@EXC_H.log_uncaught_exceptions @EXC_H.log_uncaught_exceptions
@update_proc_status
def L2B_map(L2A_obj): def L2B_map(L2A_obj):
# type: (L2A_P.L2A_object) -> L2B_P.L2B_object # type: (L2A_P.L2A_object) -> L2B_P.L2B_object
L2B_obj = L2B_P.L2B_object(L2A_obj) L2B_obj = L2B_P.L2B_object(L2A_obj)
...@@ -191,6 +199,7 @@ def L2B_map(L2A_obj): ...@@ -191,6 +199,7 @@ def L2B_map(L2A_obj):
@EXC_H.log_uncaught_exceptions @EXC_H.log_uncaught_exceptions
@update_proc_status
def L2C_map(L2B_obj): def L2C_map(L2B_obj):
# type: (L2B_P.L2B_object) -> L2C_P.L2C_object # type: (L2B_P.L2B_object) -> L2C_P.L2C_object
L2C_obj = L2C_P.L2C_object(L2B_obj) L2C_obj = L2C_P.L2C_object(L2B_obj)
......
...@@ -10,7 +10,7 @@ import time ...@@ -10,7 +10,7 @@ import time
from itertools import chain from itertools import chain
import signal import signal
import re import re
from typing import TYPE_CHECKING, List from typing import TYPE_CHECKING
from ..io import output_writer as OUT_W from ..io import output_writer as OUT_W
from ..io import input_reader as INP_R from ..io import input_reader as INP_R
...@@ -20,10 +20,10 @@ from ..misc.path_generator import path_generator ...@@ -20,10 +20,10 @@ from ..misc.path_generator import path_generator
from ..misc.logging import GMS_logger, shutdown_loggers from ..misc.logging import GMS_logger, shutdown_loggers
from ..algorithms import L1A_P, L1B_P, L1C_P, L2A_P, L2B_P, L2C_P from ..algorithms import L1A_P, L1B_P, L1C_P, L2A_P, L2B_P, L2C_P
from ..model.metadata import get_LayerBandsAssignment from ..model.metadata import get_LayerBandsAssignment
from ..model.gms_object import failed_GMS_object from ..model.gms_object import failed_GMS_object, GMS_object
from .pipeline import (L1A_map, L1A_map_1, L1A_map_2, L1A_map_3, L1B_map, L1C_map, from .pipeline import (L1A_map, L1A_map_1, L1A_map_2, L1A_map_3, L1B_map, L1C_map,
L2A_map, L2B_map, L2C_map) L2A_map, L2B_map, L2C_map)
from ..options.config import set_config, GMS_config from ..options.config import set_config
from .multiproc import MAP from .multiproc import MAP
from ..misc.definition_dicts import proc_chain, db_jobs_statistics_def from ..misc.definition_dicts import proc_chain, db_jobs_statistics_def