Commit da02b2c1 authored by Daniel Scheffler's avatar Daniel Scheffler Committed by Mathias Peters
Browse files

Revised GMS_obj.GMS_identifier.

parent 7a391284
......@@ -492,7 +492,7 @@ class L1B_object(L1A_object):
# exclude cirrus/oxygen band of Landsat-8/Sentinel-2
shift_bbl, ref_bbl = [False] * len(shift_cwl), [False] * len(ref_cwl) # bad band lists
for GMS_obj, s_r, bbl in zip([self, ref_obj], ['shift', 'ref'], [shift_bbl, ref_bbl]):
GMS_obj.GMS_identifier['logger'] = None # set a dummy value in order to avoid Exception
GMS_obj.GMS_identifier.logger = None # set a dummy value in order to avoid Exception
sensorcode = get_GMS_sensorcode(GMS_obj.GMS_identifier)
if sensorcode in ['LDCM', 'S2A', 'S2B'] and '9' in GMS_obj.LayerBandsAssignment:
bbl[GMS_obj.LayerBandsAssignment.index('9')] = True
......
......@@ -690,7 +690,7 @@ class AtmCorr(object):
tgt_res = self.inObjs[0].ac_options['cld_mask']['target_resolution']
# check if input GMS objects provide a cloud mask
avail_cloud_masks = {inObj.GMS_identifier['Subsystem']: inObj.mask_clouds for inObj in self.inObjs}
avail_cloud_masks = {inObj.GMS_identifier.subsystem: inObj.mask_clouds for inObj in self.inObjs}
no_avail_CMs = list(set(avail_cloud_masks.values())) == [None]
# compute cloud mask if not already provided
......
......@@ -36,6 +36,7 @@ from ..misc.definition_dicts import datasetid_to_sat_sen, sat_sen_to_datasetid
from ..misc.exceptions import ClassifierNotAvailableError
from ..model.metadata import get_LayerBandsAssignment
from .L2A_P import L2A_object
from ..model.gms_object import GMS_identifier
__author__ = 'Daniel Scheffler'
......@@ -65,8 +66,8 @@ class L2B_object(L2A_object):
tgt_sat, tgt_sen = datasetid_to_sat_sen(CFG.datasetid_spectral_ref)
# NOTE: get target LBA at L2A, because spectral characteristics of target sensor do not change after AC
tgt_LBA = get_LayerBandsAssignment(
dict(Satellite=tgt_sat, Sensor=tgt_sen, Subsystem=None,
image_type='RSD', proc_level='L2A', dataset_ID=src_dsID, logger=None))
GMS_identifier(satellite=tgt_sat, sensor=tgt_sen, subsystem=None,
image_type='RSD', proc_level='L2A', dataset_ID=src_dsID, logger=None))
####################################################
# special cases where homogenization is not needed #
......@@ -936,15 +937,15 @@ class ReferenceCube_Generator(object):
return self._refcubes
def _get_tgt_GMS_identifier(self, tgt_sat, tgt_sen):
# type: (str, str) -> dict
# type: (str, str) -> GMS_identifier
"""Get a GMS identifier for the specified target sensor such that all possible bands are included (L1A)
:param tgt_sat: target satellite
:param tgt_sen: target sensor
:return:
"""
return dict(Satellite=tgt_sat, Sensor=tgt_sen, Subsystem=None, image_type='RSD',
proc_level='L1A', logger=self.logger) # use L1A to have all bands available
return GMS_identifier(satellite=tgt_sat, sensor=tgt_sen, subsystem=None, image_type='RSD', dataset_ID=None,
proc_level='L1A', logger=self.logger) # use L1A to have all bands available
def _get_tgt_LayerBandsAssignment(self, tgt_sat, tgt_sen):
# type: (str, str) -> list
......@@ -1035,8 +1036,8 @@ class ReferenceCube_Generator(object):
# first, perform spectral resampling to Sentinel-2 to reduce dimensionality
if downsamp_sat and downsamp_sen:
tgt_srf = SRF(dict(Satellite=downsamp_sat, Sensor=downsamp_sen, Subsystem=None, image_type='RSD',
proc_level='L1A', logger=self.logger))
tgt_srf = SRF(GMS_identifier(satellite=downsamp_sat, sensor=downsamp_sen, subsystem=None, image_type='RSD',
dataset_ID=None, proc_level='L1A', logger=self.logger))
im2clust = self.resample_image_spectrally(im2clust, tgt_srf, progress=progress)
# compute KMeans clusters for the spectrally resampled image
......@@ -1299,9 +1300,11 @@ class Classifier_Generator(object):
['1', '2', '3', '4', '5', '6', '7'],
['1', '2', '3', '4', '5', '6', '7'], ...]
"""
L1A_GMSid = dict(Satellite=satellite, Sensor=sensor, Subsystem=None, image_type='RSD',
proc_level='L1A', logger=None)
L1C_GMSid = {**L1A_GMSid, **dict(proc_level='L1C')} # different numbers of bands after AC
L1A_GMSid = GMS_identifier(satellite=satellite, sensor=sensor, subsystem=None, image_type='RSD',
dataset_ID=None, proc_level='L1A', logger=None)
# different numbers of bands after AC
L1C_GMSid = GMS_identifier(satellite=satellite, sensor=sensor, subsystem=None, image_type='RSD',
dataset_ID=None, proc_level='L1C', logger=None)
return [get_LayerBandsAssignment(L1A_GMSid, no_pan=False, sort_by_cwl=True), # L1A_withPan_cwlSorted
get_LayerBandsAssignment(L1C_GMSid, no_pan=False, sort_by_cwl=True), # L1C_withPan_cwlSorted
......
......@@ -12,7 +12,7 @@ import zipfile
from tempfile import NamedTemporaryFile as tempFile
from logging import Logger
from matplotlib import pyplot as plt
from typing import Union, Dict, List, Tuple # noqa F401 # flake8 issue
from typing import Union, Dict, List, Tuple, TYPE_CHECKING # noqa F401 # flake8 issue
from datetime import datetime
import logging
......@@ -38,6 +38,9 @@ from ..misc.database_tools import get_overlapping_scenes_from_postgreSQLdb
from ..misc.path_generator import path_generator
from ..misc.spatial_index_mediator import SpatialIndexMediator
if TYPE_CHECKING:
from ..model.gms_object import GMS_identifier # noqa F401 # flake8 issue
def read_ENVIfile(path, arr_shape, arr_pos, logger=None, return_meta=False, q=0):
hdr_path = os.path.splitext(path)[0] + '.hdr' if not os.path.splitext(path)[1] == '.hdr' else path
......@@ -172,11 +175,11 @@ def get_list_GMSfiles(dataset_list, target):
return GMS_list
def SRF_reader(GMS_identifier, no_thermal=None, no_pan=None, v=False):
# type: (dict, bool) -> collections.OrderedDict
def SRF_reader(GMS_id, no_thermal=None, no_pan=None, v=False):
# type: (GMS_identifier, bool) -> collections.OrderedDict
"""Read SRF for any sensor and return a dictionary containing band names as keys and SRF numpy arrays as values.
:param GMS_identifier:
:param GMS_id:
:param no_thermal: whether to exclude thermal bands from the returned bands list
(default: CFG.skip_thermal)
:param no_pan: whether to exclude panchromatic bands from the returned bands list
......@@ -188,37 +191,36 @@ def SRF_reader(GMS_identifier, no_thermal=None, no_pan=None, v=False):
no_thermal = no_thermal if no_thermal is not None else CFG.skip_thermal
no_pan = no_pan if no_pan is not None else CFG.skip_pan
satellite, sensor = GMS_identifier['Satellite'], GMS_identifier['Sensor']
logger = GMS_identifier['logger'] or Logger(__name__)
LayerBandsAssignment = META.get_LayerBandsAssignment(GMS_identifier, no_thermal=no_thermal, no_pan=no_pan)
logger = GMS_id.logger or Logger(__name__)
LayerBandsAssignment = META.get_LayerBandsAssignment(GMS_id, no_thermal=no_thermal, no_pan=no_pan)
SRF_dict = collections.OrderedDict()
SRF_dir = PG.get_path_srf_file(GMS_identifier)
SRF_dir = PG.get_path_srf_file(GMS_id)
if os.path.isdir(SRF_dir):
for bandname in LayerBandsAssignment:
SRF_path = PG.get_path_srf_file(GMS_identifier, bandname=bandname)
SRF_path = PG.get_path_srf_file(GMS_id, bandname=bandname)
try:
SRF_dict[bandname] = np.loadtxt(SRF_path, skiprows=1)
if v:
logger.info('Reading SRF for %s %s, %s...' % (satellite, sensor, bandname))
logger.info('Reading SRF for %s %s, %s...' % (GMS_id.satellite, GMS_id.sensor, bandname))
except FileNotFoundError:
logger.warning('No spectral response function found for %s %s %s at %s! >None< is returned.'
% (satellite, sensor, bandname, SRF_path))
% (GMS_id.satellite, GMS_id.sensor, bandname, SRF_path))
else:
warnings.warn('SRF database directory not available at %s.' % SRF_dir)
SRF_dict = {}
logger.warning("No spectral response functions available for '%s %s'. Preconfigured values are used for solar "
"irradiance and central wavelength instead." % (satellite, sensor))
"irradiance and central wavelength instead." % (GMS_id.satellite, GMS_id.sensor))
return SRF_dict
class SRF(object):
def __init__(self, GMS_identifier=None, wvl_unit='nanometers', specres_nm=1, format_bandnames=False,
def __init__(self, GMS_id=None, wvl_unit='nanometers', specres_nm=1, format_bandnames=False,
no_thermal=None, no_pan=None, v=False):
# type: (dict, str, float, bool, bool) -> None
# type: (GMS_identifier, str, float, bool, bool) -> None
"""SRF instance provides SRF functions, wavelength positions, etc..
:param GMS_identifier: dictionary as provided by GMS_object
:param GMS_id: GMS_identifier as provided by GMS_object
:param wvl_unit: the wavelengths unit to be used within SRF instance ('nanometers' or 'micrometers)
:param specres_nm: output spectral resolution of SRFs in nanometers
:param format_bandnames: whether to format default strings from LayerBandsAssignment as 'B01', 'B02' etc..
......@@ -244,18 +246,18 @@ class SRF(object):
self.wvl_unit = wvl_unit
self.specres_nm = specres_nm
self.format_bandnames = format_bandnames
self.satellite = GMS_identifier['Satellite'] if GMS_identifier else ''
self.sensor = GMS_identifier['Sensor'] if GMS_identifier else ''
self.satellite = GMS_id.satellite if GMS_id else ''
self.sensor = GMS_id.sensor if GMS_id else ''
self.conv = {}
self.no_thermal = no_thermal
self.no_pan = no_pan
self.v = v
if GMS_identifier:
self.from_GMS_identifier(GMS_identifier)
if GMS_id:
self.from_GMS_identifier(GMS_id)
def from_GMS_identifier(self, GMS_identifier):
srf_dict = SRF_reader(GMS_identifier, no_thermal=self.no_thermal, no_pan=self.no_pan,
def from_GMS_identifier(self, GMS_id):
srf_dict = SRF_reader(GMS_id, no_thermal=self.no_thermal, no_pan=self.no_pan,
v=self.v) # type: collections.OrderedDict # (ordered according to LBA)
return self.from_dict(srf_dict)
......
......@@ -4,9 +4,13 @@ import collections
import re
import numpy as np
from typing import TYPE_CHECKING # noqa F401 # flake8 issue
from ..options.config import GMS_config as CFG
if TYPE_CHECKING:
from ..model.gms_object import GMS_identifier # noqa F401 # flake8 issue
__author__ = 'Daniel Scheffler'
dtype_lib_Python_IDL = {'bool_': 0, 'uint8': 1, 'int8': 1, 'int_': 1, 'int16': 2, 'uint16': 12, 'int32': 3,
......@@ -22,10 +26,10 @@ db_jobs_statistics_def = {'pending': 1, 'started': 2, None: 2, 'L1A': 3, 'L1B':
bandslist_all_errors = ['ac_errors', 'mask_clouds_confidence', 'spat_homo_errors', 'spec_homo_errors']
def get_GMS_sensorcode(GMS_identifier):
# type: (dict) -> str
def get_GMS_sensorcode(GMS_id):
# type: (GMS_identifier) -> str
Satellite, Sensor, Subsystem = (GMS_identifier['Satellite'], GMS_identifier['Sensor'], GMS_identifier['Subsystem'])
Satellite, Sensor, Subsystem = (GMS_id.satellite, GMS_id.sensor, GMS_id.subsystem)
Sensor = Sensor[:-1] if re.match('SPOT', Satellite, re.I) and Sensor[-1] not in ['1', '2'] else Sensor
meta_sensorcode = Satellite + '_' + Sensor + ('_' + Subsystem if Subsystem not in ["", None] else "")
sensorcode_dic = {
......@@ -142,16 +146,16 @@ def get_outFillZeroSaturated(dtype):
return dict_outFill[dtype], dict_outZero[dtype], dict_outSaturated[dtype]
def is_dataset_provided_as_fullScene(GMS_identifier):
# type: (dict) -> bool
def is_dataset_provided_as_fullScene(GMS_id):
# type: (GMS_identifier) -> bool
"""Returns True if the dataset belonging to the given GMS_identifier is provided as full scene and returns False if
it is provided as multiple tiles.
:param GMS_identifier:
:param GMS_id:
:return:
"""
sensorcode = get_GMS_sensorcode(GMS_identifier)
sensorcode = get_GMS_sensorcode(GMS_id)
dict_fullScene_or_tiles = {
'AVNIR-2': True,
'AST_full': True,
......
......@@ -6,10 +6,13 @@ import tempfile
import warnings
import uuid
from logging import Logger
from typing import Union, TYPE_CHECKING # noqa F401 # flake8 issue
from ..options.config import GMS_config as CFG
from .definition_dicts import get_GMS_sensorcode
if TYPE_CHECKING:
from ..model.gms_object import GMS_identifier # noqa F401 # flake8 issue
# get_scene_and_dataset_infos_from_postgreSQLdb # inline in order to avoid circular dependencies
......@@ -202,14 +205,14 @@ def get_tempfile(ext=None, prefix=None, tgt_dir=None):
return path
def get_path_cloud_class_obj(GMS_identifier, get_all=False):
def get_path_cloud_class_obj(GMS_id, get_all=False):
"""Returns the absolute path of the the training data used by cloud classifier.
:param GMS_identifier:
:param GMS_id:
:param get_all:
"""
GMS_sensorcode = get_GMS_sensorcode(GMS_identifier)
satellite, sensor, logger = GMS_identifier['Satellite'], GMS_identifier['Sensor'], GMS_identifier['logger']
GMS_sensorcode = get_GMS_sensorcode(GMS_id)
satellite, sensor, logger = GMS_id.satellite, GMS_id.sensor, GMS_id.logger
path_cloud_classifier_objects = CFG.path_cloud_classif
obj_name_dic = {
......@@ -274,41 +277,41 @@ def get_path_cloud_class_obj(GMS_identifier, get_all=False):
return classifier_path
def get_path_srf_file(GMS_identifier, bandname=''):
# type: (dict, str) -> str
def get_path_srf_file(GMS_id, bandname=''):
# type: (GMS_identifier, str) -> str
"""Returns the absolute path of the spectral response function belonging to the given bandname. The bandname must
correspond to output of METADATA.get_LayerbandsAssignment().
:param GMS_identifier:
:param GMS_id:
:param bandname: if not given, only the root directory is returned
"""
satellite, sensor = GMS_identifier['Satellite'], GMS_identifier['Sensor']
satellite, sensor = GMS_id.satellite, GMS_id.sensor
satellite = 'RapidEye' if re.match('RapidEye', satellite, re.I) else satellite
sensor = sensor[:-1] if re.match('SPOT', satellite, re.I) and sensor[-1] not in ['1', '2'] else sensor
filename = 'band_%s' % bandname if bandname else ''
return os.path.join(CFG.path_SRFs, satellite, sensor, filename)
def get_path_snr_model(GMS_identifier):
# type: (dict) -> str
def get_path_snr_model(GMS_id):
# type: (GMS_identifier) -> str
"""Returns the absolute path of the SNR model for the given sensor.
:param GMS_identifier:
:param GMS_id:
"""
satellite, sensor = (GMS_identifier['Satellite'], GMS_identifier['Sensor'])
satellite, sensor = (GMS_id.satellite, GMS_id.sensor)
satellite = 'RapidEye' if re.match('RapidEye', satellite, re.I) else satellite
sensor = sensor[:-1] if re.match('SPOT', satellite, re.I) and sensor[-1] not in ['1', '2'] else sensor
return os.path.join(CFG.path_SNR_models, satellite, sensor, 'SNR_model.csv')
def get_path_ac_options(GMS_identifier):
# type: (dict)->any
def get_path_ac_options(GMS_id):
# type: (GMS_identifier)->Union[str, None]
"""Returns the path of the options json file needed for atmospheric correction.
"""
GMSid_ac = GMS_identifier
GMSid_ac = GMS_id
GMSid_ac['Subsystem'] = ''
sensorcode = get_GMS_sensorcode(GMSid_ac)
......@@ -335,9 +338,9 @@ def get_path_ac_options(GMS_identifier):
}
try:
fName_optFile = ac_options_file_dic[get_GMS_sensorcode(GMS_identifier)]
fName_optFile = ac_options_file_dic[get_GMS_sensorcode(GMS_id)]
except KeyError:
GMS_identifier['logger'].warning(
GMS_id.logger.warning(
"Sensorcode '%s' is not included in ac_options dictionary. "
"Thus atmospheric correction is not available for the current scene." % sensorcode)
fName_optFile = None
......@@ -347,7 +350,7 @@ def get_path_ac_options(GMS_identifier):
path_ac = os.path.join(os.path.dirname(options.__file__), fName_optFile)
# validate
logger = GMS_identifier['logger'] or Logger(__name__)
logger = GMS_id.logger or Logger(__name__)
if not os.path.exists(path_ac):
logger.warning('Could not locate options file for atmospheric correction at %s.' % path_ac)
......
......@@ -266,10 +266,8 @@ class GMS_object(object):
@property
def GMS_identifier(self):
return collections.OrderedDict(zip(
['image_type', 'Satellite', 'Sensor', 'Subsystem', 'proc_level', 'dataset_ID', 'logger'],
[self.image_type, self.satellite, self.sensor, self.subsystem, self.proc_level, self.dataset_ID,
self.logger]))
return GMS_identifier(self.image_type, self.satellite, self.sensor, self.subsystem, self.proc_level,
self.dataset_ID, self.logger)
@property
def MetaObj(self):
......@@ -1756,6 +1754,12 @@ class GMS_object(object):
if hasattr(v, 'handlers') and v.handlers[:]:
warnings.warn('Not properly closed logger at GMS_obj.logger pointing to %s.' % v.path_logfile)
dict2write[k] = 'not set'
elif isinstance(v, GMS_identifier):
if v.logger is not None and hasattr(v.logger, 'handlers') and v.logger.handlers[:]:
warnings.warn("Not properly closed logger at %s['logger'] pointing to %s."
% (k, dict2write[k]['logger'].path_logfile))
v.logger.close()
v.logger = 'not set'
elif isinstance(v, collections.OrderedDict) or isinstance(v, dict):
dict2write[k] = dict2write[k].copy()
if 'logger' in v:
......@@ -2213,6 +2217,31 @@ class GMS_object(object):
del self.accuracy_layers
class GMS_identifier(OrderedDict):
def __init__(self, image_type, satellite, sensor, subsystem, proc_level, dataset_ID, logger=None):
# type: (str, str, str, str, str, int, logging.Logger) -> None
self.image_type = image_type
self.satellite = satellite
self.sensor = sensor
self.subsystem = subsystem
self.proc_level = proc_level
self.dataset_ID = dataset_ID
self.logger = logger
super().__init__(zip(
['image_type', 'Satellite', 'Sensor', 'Subsystem', 'proc_level', 'dataset_ID', 'logger'],
[image_type, satellite, sensor, subsystem, proc_level, dataset_ID, logger]))
def __getstate__(self):
"""Defines how the attributes of MetaObj instances are pickled."""
try:
self.logger.close()
except AttributeError:
pass
return self.__dict__
class failed_GMS_object(GMS_object):
def delete_tempFiles(self):
pass
......
......@@ -13,7 +13,7 @@ import sys
import warnings
import iso8601
import xml.etree.ElementTree as ET
from typing import List # noqa F401 # flake8 issue
from typing import List, TYPE_CHECKING # noqa F401 # flake8 issue
import numpy as np
import pyproj
......@@ -22,37 +22,37 @@ from pyorbital import astronomy
from py_tools_ds.geo.map_info import geotransform2mapinfo
from py_tools_ds.geo.projection import WKT2EPSG
from sicor.options import get_options as get_ac_options
from gms_preprocessing.options.config import GMS_config as CFG
from gms_preprocessing.io.input_reader import open_specific_file_within_archive, Solar_Irradiance_reader, SRF_reader
from gms_preprocessing.io.output_writer import enviHdr_keyOrder
from gms_preprocessing.algorithms import geoprocessing as GEOP
from gms_preprocessing.misc import helper_functions as HLP_F
from gms_preprocessing.misc import database_tools as DB_T
from gms_preprocessing.misc.path_generator import path_generator, get_path_ac_options
from gms_preprocessing.misc.definition_dicts import get_GMS_sensorcode, is_dataset_provided_as_fullScene, \
datasetid_to_sat_sen
from gms_preprocessing.misc.exceptions import ACNotSupportedError
from ..options.config import GMS_config as CFG
from ..io.input_reader import open_specific_file_within_archive, Solar_Irradiance_reader, SRF_reader
from ..io.output_writer import enviHdr_keyOrder
from ..algorithms import geoprocessing as GEOP
from ..misc import helper_functions as HLP_F
from ..misc import database_tools as DB_T
from ..misc.path_generator import path_generator, get_path_ac_options
from ..misc.definition_dicts import get_GMS_sensorcode, is_dataset_provided_as_fullScene, datasetid_to_sat_sen
from ..misc.exceptions import ACNotSupportedError
from sicor.options import get_options as get_ac_options
if TYPE_CHECKING:
from ..model.gms_object import GMS_identifier # noqa F401 # flake8 issue
__author__ = 'Daniel Scheffler', 'Robert Behling'
class METADATA(object):
def __init__(self, GMS_identifier):
def __init__(self, GMS_id):
# private attributes
self._AcqDateTime = None
# unpack GMS_identifier
self.GMS_identifier = GMS_identifier
self.image_type = GMS_identifier['image_type']
self.Satellite = GMS_identifier['Satellite']
self.Sensor = GMS_identifier['Sensor']
self.Subsystem = GMS_identifier['Subsystem']
self.proc_level = GMS_identifier['proc_level']
self.logger = GMS_identifier['logger']
self.GMS_identifier = GMS_id
self.image_type = GMS_id.image_type
self.Satellite = GMS_id.satellite
self.Sensor = GMS_id.sensor
self.Subsystem = GMS_id.subsystem
self.proc_level = GMS_id.proc_level
self.logger = GMS_id.logger
self.Dataname = ''
self.FolderOrArchive = ''
......@@ -1856,13 +1856,13 @@ map_odictKeys_objAttrnames = {
}
def get_LayerBandsAssignment(GMS_identifier, nBands=None, sort_by_cwl=None, no_thermal=None, no_pan=None,
def get_LayerBandsAssignment(GMS_id, nBands=None, sort_by_cwl=None, no_thermal=None, no_pan=None,
return_fullLBA=False):
# type: (dict, int, bool, bool) -> list
# type: (GMS_identifier, int, bool, bool) -> list
"""Returns LayerBandsAssignment corresponding to given satellite, sensor and subsystem and with respect to
CFG.sort_bands_by_cwl, CFG.skip_thermal and CFG.skip_pan.
:param GMS_identifier: <dict>, derived from self.get_GMS_identifier()
:param GMS_id: <dict>, derived from self.get_GMS_identifier()
NOTE: only if there is an additional key 'proc_level', the processing level will be
respected. This is needed to get the correct LBA after atm. correction
:param nBands: should be specified if number of bands differs from standard
......@@ -1882,8 +1882,8 @@ def get_LayerBandsAssignment(GMS_identifier, nBands=None, sort_by_cwl=None, no_t
no_thermal = no_thermal if no_thermal is not None else CFG.skip_thermal
no_pan = no_pan if no_pan is not None else CFG.skip_pan
if GMS_identifier['image_type'] == 'RSD':
GMS_sensorcode = get_GMS_sensorcode(GMS_identifier)
if GMS_id.image_type == 'RSD':
GMS_sensorcode = get_GMS_sensorcode(GMS_id)
assert GMS_sensorcode, 'Unable to get Layer Bands Assignment. No valid sensorcode privided (got >None<). '
if return_fullLBA:
......@@ -1937,7 +1937,7 @@ def get_LayerBandsAssignment(GMS_identifier, nBands=None, sort_by_cwl=None, no_t
LayerBandsAssignment = dict_cwlSorted_LayerBandsAssignment[GMS_sensorcode]
else: # special case SPOT MSI containing no PAN or SPOT PAN containing only PAN
assert re.match('SPOT', GMS_identifier['Satellite'], re.I) and \
assert re.match('SPOT', GMS_id.satellite, re.I) and \
nBands in [len(dict_LayerBandsAssignment[GMS_sensorcode]) - 1, 1], \
"Unable to get Layer Bands Assignment. Provided number of bands doesn´t match known layer band " \
"assignments."
......@@ -1945,28 +1945,28 @@ def get_LayerBandsAssignment(GMS_identifier, nBands=None, sort_by_cwl=None, no_t
else dict_LayerBandsAssignment[GMS_sensorcode][:-1]
if no_thermal:
LayerBandsAssignment = [i for i in LayerBandsAssignment if not isTHERMAL(GMS_identifier, i)]
LayerBandsAssignment = [i for i in LayerBandsAssignment if not isTHERMAL(GMS_id, i)]
if no_pan:
LayerBandsAssignment = [i for i in LayerBandsAssignment if not isPAN(GMS_identifier, i)]
LayerBandsAssignment = [i for i in LayerBandsAssignment if not isPAN(GMS_id, i)]
else:
LayerBandsAssignment = ['1']
# remove those bands that are excluded by atmospheric corrections if proc_level >= L1C
if GMS_identifier['proc_level'] not in [None, 'L1A', 'L1B']: # TODO replace with enum procL
if GMS_id.proc_level not in [None, 'L1A', 'L1B']: # TODO replace with enum procL
if CFG.target_radunit_optical == 'BOA_Ref':
# return LBA after AC
try:
bands_after_ac = get_bands_after_AC(GMS_identifier)
bands_after_ac = get_bands_after_AC(GMS_id)
LayerBandsAssignment = [i for i in LayerBandsAssignment if i in bands_after_ac]
except ACNotSupportedError:
# atmospheric correction is not yet supported -> LBA will be the same after L1C
pass
if GMS_identifier['proc_level'] in ['L2B', 'L2C']:
if GMS_id.proc_level in ['L2B', 'L2C']:
# handle different number of bands after spectral homogenization to target sensor
if GMS_identifier['dataset_ID'] == CFG.datasetid_spectral_ref:
if GMS_id['dataset_ID'] == CFG.datasetid_spectral_ref:
pass # return normal LBA from above
elif CFG.datasetid_spectral_ref is not None:
......@@ -1974,9 +1974,10 @@ def get_LayerBandsAssignment(GMS_identifier, nBands=None, sort_by_cwl=None, no_t
# => use the LBA of the target sensor after AC as the new LBA for the requested sensor
# find out how the spectral characteristics of this known target sensor look like after AC
from ..model.gms_object import GMS_identifier
tgt_sat, tgt_sen = datasetid_to_sat_sen(CFG.datasetid_spectral_ref)
tgt_GMSid = dict(image_type='RSD', Satellite=tgt_sat, Sensor=tgt_sen, Subsystem='', proc_level='L2A',
dataset_ID='', logger=None)
tgt_GMSid = GMS_identifier(image_type='RSD', satellite=tgt_sat, sensor=tgt_sen, subsystem='',
proc_level='L2A', dataset_ID='', logger=None)
try:
tgt_sen_LBA = get_bands_after_AC(tgt_GMSid)
......@@ -1995,18 +1996,18 @@ def get_LayerBandsAssignment(GMS_identifier, nBands=None, sort_by_cwl=None, no_t
return LayerBandsAssignment
def get_bands_after_AC(GMS_identifier):
# type: (dict) -> List[str]
def get_bands_after_AC(GMS_id):
# type: (GMS_identifier) -> List[str]
"""Returns a list of bands that are not removed by atmospheric correction.
:param GMS_identifier: <dict>, derived from self.get_GMS_identifier()
:param GMS_id: <dict>, derived from self.get_GMS_identifier()
:return: e.g. ['1', '2', '3', '4', '5', '6', '7', '9'] for Landsat-8
"""
path_ac_options = get_path_ac_options(GMS_identifier)
path_ac_options = get_path_ac_options(GMS_id)
if not path_ac_options or not os.path.exists(path_ac_options):
raise ACNotSupportedError('Atmospheric correction is not yet supported for %s %s.'
% (GMS_identifier['Satellite'], GMS_identifier['Sensor']))
% (GMS_id.satellite, GMS_id.sensor))
# FIXME this does not work for L7
# NOTE: don't validate because options contain pathes that do not exist on another server
......@@ -2016,29 +2017,29 @@ def get_bands_after_AC(GMS_identifier):
return ac_out_bands
def get_dict_LayerOptTherm(GMS_identifier, LayerBandsAssignment):
def get_dict_LayerOptTherm(GMS_id, LayerBandsAssignment):
dict_out = collections.OrderedDict()
[dict_out.update({lr: 'thermal' if isTHERMAL(GMS_identifier, lr) else 'optical'}) for lr in LayerBandsAssignment]
[dict_out.update({lr: 'thermal' if isTHERMAL(GMS_id, lr) else 'optical'}) for lr in LayerBandsAssignment]
return dict_out
def isPAN(GMS_identifier, LayerNr):
GMS_sensorcode = get_GMS_sensorcode(GMS_identifier)
def isPAN(GMS_id, LayerNr):
GMS_sensorcode = get_GMS_sensorcode(GMS_id)
dict_isPAN = {'TM7': ['8'], 'LDCM': ['8'],
'SPOT1a': ['4'], 'SPOT2a': ['4'], 'SPOT3a': ['4'], 'SPOT4a': ['5'], 'SPOT5a': ['5'],
'SPOT1b': ['4'], 'SPOT2b': ['4'], 'SPOT3b': ['4'], 'SPOT4b': ['5'], 'SPOT5b': ['5']}
return True if GMS_sensorcode in dict_isPAN and LayerNr in dict_isPAN[GMS_sensorcode] else False