Commit b787ec3f authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Revised spectral resampling class -> now fully working.

Former-commit-id: a7fdc5f0
Former-commit-id: 124988ae
parent 59dfe723
......@@ -693,7 +693,7 @@ class AtmCorr(object):
# get legend
cm_legend = get_mask_classdefinition('mask_clouds', inObj2use.satellite)
# {'Clear': 10, 'Thick Clouds': 20, 'Thin Clouds': 30, 'Snow': 40} # FIXME hardcoded
# {'Clear': 10, 'Thick Clouds': 20, 'Thin Clouds': 30, 'Snow': 40} # FIXME hardcoded
# validate that xGSD equals yGSD
if cm_geoarray.xgsd != cm_geoarray.ygsd:
......@@ -714,9 +714,9 @@ class AtmCorr(object):
inObj.build_combined_masks_array()
break # appending it to one inObj is enough
return S2Mask(mask_array = cm_array,
mask_legend = cm_legend,
geo_coding = cm_geocoding)
return S2Mask(mask_array=cm_array,
mask_legend=cm_legend,
geo_coding=cm_geocoding)
def run_atmospheric_correction(self, dump_ac_input=False):
# type: (bool) -> list
......@@ -747,10 +747,10 @@ class AtmCorr(object):
rs_image = RSImage(**rs_data)
self.ac_input = dict(
rs_image = rs_image,
options = self.options,
logger = repr(self.logger), # only a string
script = script
rs_image=rs_image,
options=self.options,
logger=repr(self.logger), # only a string
script=script
)
# run AC
......
......@@ -7,9 +7,9 @@ import numpy as np
from scipy.interpolate import interp1d
import scipy as sp
import matplotlib.pyplot as plt
from pandas import DataFrame
from ..config import GMS_config as CFG
from ..io.Input_reader import SRF
from .L2A_P import L2A_object
__author__ = 'Daniel Scheffler'
......@@ -60,69 +60,101 @@ class L2B_object(L2A_object):
return outarr
from ..io.Input_reader import SRF
class SpectralResampler(object):
"""Class for spectral resampling of a single spectral signature (1D-array) or an image (3D-array)."""
class SpectralResampler1D(object):
def __init__(self, wvl_src, srf_tgt, wvl_unit='nanometers'):
# type: (np.ndarray, SRF, str) -> None
"""
"""Get an instance of the SpectralResampler1D class.
:param wvl_src:
:param srf_tgt:
:param wvl_src: center wavelength positions of the source spectrum
:param srf_tgt: spectral response of the target instrument as an instance of io.Input_reader.SRF.
:param wvl_unit: the wavelengths unit of the source wavelenth positions ('nanometers' or 'micrometers)
"""
# validate inputs
if wvl_unit not in ['micrometers', 'nanometers']:
raise ValueError('Unknown wavelength unit %s.' % wvl_unit)
assert srf_tgt.wvl_unit == wvl_unit, "The wavelength unit of the provided SRF object instande must equal the " \
"wavelength unit of the given source wavelength positions. Received " \
"srf_tgt.wvl_unit='%s' and wvl_unit='%s'." % (srf_tgt.wvl_unit, wvl_unit)
#wvl =
#self.wvl_src_nm = wvl if wvl_unit == 'nanometers' else wvl * 1000
self.wvl_src_nm = np.array(wvl_src, dtype=np.float).flatten()
wvl = np.array(wvl_src, dtype=np.float).flatten()
if srf_tgt.wvl_unit != 'nanometers':
srf_tgt.convert_wvl_unit()
self.wvl_src_nm = wvl if wvl_unit == 'nanometers' else wvl * 1000
self.srf_tgt = srf_tgt
self.wvl_unit = wvl_unit
def resample(self, spectrum):
# type: (np.ndarray) -> np.ndarray
"""
def resample_signature(self, spectrum, scale_factor=10000, v=False):
# type: (np.ndarray, int, bool) -> np.ndarray
"""Resample the given spectrum according to the spectral response functions of the target instument.
:param spectrum:
:return:
:param spectrum: spectral signature data
:param scale_factor: the scale factor to apply to the given spectrum when it is plotted (default: 10000)
:param v: enable verbose mode (shows a plot of the resampled spectrum) (default: False)
:return: resampled spectral signature
"""
if not spectrum.ndim == 1:
raise ValueError("The array of the given spectral signature must be 1-dimensional. "
"Received a %s-dimensional array." % spectrum.ndim)
spectrum = np.array(spectrum, dtype=np.float).flatten()
assert spectrum.size == self.wvl_src_nm.size
# resample input spectrum and wavelength to 1nm
wvl_1nm = np.arange(np.ceil(self.wvl_src_nm.min()), np.floor(self.wvl_src_nm.max()), 1)
spectrum_1nm = sp.interpolate.interp1d(self.wvl_src_nm, spectrum, kind='linear')(wvl_1nm)
plt.figure()
plt.plot(wvl_1nm, spectrum_1nm/10000, '.')
spectrum_1nm = interp1d(self.wvl_src_nm, spectrum,
bounds_error=False, fill_value=0, kind='linear')(wvl_1nm)
if v:
plt.figure()
plt.plot(wvl_1nm, spectrum_1nm/scale_factor, '.')
spectrum_rsp = []
for band, wvl_center in zip(self.srf_tgt.bands, self.srf_tgt.wvl):
# resample srf to 1 nm
wvl_srf_1nm = np.arange(np.ceil(self.srf_tgt.srfs_wvl.min()), np.floor(self.srf_tgt.srfs_wvl.max()), 1)
srf_1nm = sp.interpolate.interp1d(
self.srf_tgt.srfs_wvl, self.srf_tgt.srfs[band], kind='linear')(wvl_srf_1nm)
# get the spectral range where the given spectrum and the current SRF function overlap
df_overlap = DataFrame(dict(wvl=wvl_1nm, spectrum=spectrum_1nm))\
.merge(DataFrame(dict(wvl=wvl_srf_1nm, srf=srf_1nm)), on='wvl', how="outer", copy=False).dropna()
srf_1nm = interp1d(self.srf_tgt.srfs_wvl, self.srf_tgt.srfs[band],
bounds_error=False, fill_value=0, kind='linear')(wvl_1nm)
# compute the resampled spectral value (np.average computes the weighted mean value)
specval_resamp = np.average(df_overlap['spectrum'], weights=df_overlap['srf'])
specval_rsp = np.average(spectrum_1nm, weights=srf_1nm)
plt.plot(wvl_srf_1nm, srf_1nm)
plt.plot(wvl_center, specval_resamp/10000, 'x', color='r')
if v:
plt.plot(wvl_1nm, srf_1nm/max(srf_1nm))
plt.plot(wvl_center, specval_rsp/scale_factor, 'x', color='r')
spectrum_rsp.append(specval_rsp)
class SpectralResampler2D(object):
def __init__(self, wvl_src, srf_tgt=None, ):
self.wvl_src = wvl_src
self.srf_tgt = srf_tgt
return np.array(spectrum_rsp)
def resample_image(self, image_cube):
# type: (np.ndarray, int, bool) -> np.ndarray
"""Resample the given spectral image cube according to the spectral response functions of the target instument.
:param image_cube: image (3D array) containing the spectral information in the third dimension
:return: resampled spectral image cube
"""
# input validation
if not isinstance(image_cube, np.ndarray):
raise TypeError(image_cube)
if not image_cube.ndim == 3:
raise ValueError("The given image cube must be 3-dimensional. Received a %s-dimensional array."
% image_cube.ndim)
assert image_cube.shape[2] == self.wvl_src_nm.size
# resample input image and wavelength to 1nm
wvl_1nm = np.arange(np.ceil(self.wvl_src_nm.min()), np.floor(self.wvl_src_nm.max()), 1)
image_1nm = interp1d(self.wvl_src_nm, image_cube,
axis=2, bounds_error=False, fill_value=0, kind='linear')(wvl_1nm)
(R, C), B = image_cube.shape[:2], len(self.srf_tgt.bands)
image_rsp = np.zeros((R, C, B), dtype=image_cube.dtype)
for band_idx, (band, wvl_center) in enumerate(zip(self.srf_tgt.bands, self.srf_tgt.wvl)):
# resample srf to 1 nm
srf_1nm = sp.interpolate.interp1d(self.srf_tgt.srfs_wvl, self.srf_tgt.srfs[band],
bounds_error=False, fill_value=0, kind='linear')(wvl_1nm)
# compute the resampled image cube (np.average computes the weighted mean value)
image_rsp[:, :, band_idx] = np.average(image_1nm, weights=srf_1nm, axis=2)
def resample(self, spectrum):
# type: (np.ndarray) -> np.ndarray
assert spectrum.shape == self.wvl_src.shape
return image_rsp
......@@ -228,16 +228,19 @@ class SRF(object):
def _set_attrs(self, wvl, srfs):
for bN in srfs:
srf = np.array(srfs[bN], dtype=np.float)
srfs[bN] = srf / np.trapz(x=wvl, y=srf)
srfs[bN] = srf / np.trapz(x=wvl, y=srf) # TODO seems like we NEED nanometers here; BUT WHY??
self.srfs_wvl = wvl
self.srfs_wvl = np.array(wvl)
self.srfs = srfs
self.bands = sorted(list(self.srfs.keys()))
# FIXME this is not the GMS algorithm to calculalate center wavelengths
# FIXME this is not the GMS algorithm to calculate center wavelengths
# calculate center wavelengths
self.wvl = [np.trapz(x=self.srfs_wvl, y=self.srfs_wvl * self.srfs[band]) for band in self.bands]
self.wvl = self.wvl if self.wvl_unit == 'micrometers' else [int(i) for i in self.wvl]
self.wvl = np.array([np.trapz(x=self.srfs_wvl, y=self.srfs_wvl * self.srfs[band]) for band in self.bands]) # TODO seems like we NEED nanometers here; BUT WHY??
# self.wvl = self.wvl if self.wvl_unit == 'micrometers' else np.array([int(i) for i in self.wvl])
self.srfs_wvl = self.srfs_wvl if self.wvl_unit == 'nanometers' else self.srfs_wvl / 1000
self.wvl = self.wvl if self.wvl_unit == 'nanometers' else self.wvl / 1000
self.conv = {}
self.conv.update({key: value for key, value in zip(self.bands, self.wvl)})
......@@ -263,11 +266,11 @@ class SRF(object):
df = DataFrame(index=wvl)
for band in srf_dict:
bandname = band if not self.format_bandnames else ('B%s' % band if len(band) == 2 else 'B0%s' % band)
df = df.join(Series(srf_dict[band][:, 1],
index=np.array(srf_dict[band][:, 0] * scale_factor)
.astype(np.int), name=bandname))
srfs = srf_dict[band][:, 1]
wvls = np.array(srf_dict[band][:, 0] * scale_factor).astype(np.int)
df = df.join(Series(srfs, index=wvls, name=bandname))
df = df.fillna(0)
wvl = np.array(df.index.astype(np.float) if self.wvl_unit == 'nanometers' else df.index / 1000.)
wvl = np.array(df.index.astype(np.float))
self._set_attrs(wvl=wvl, srfs=df.to_dict(orient='series'))
return self
......@@ -280,6 +283,13 @@ class SRF(object):
'sol_irr': None
}
def convert_wvl_unit(self):
"""Convert the wavelength unit to nanometers if they are in micrometers or vice versa."""
factor = 1/1000 if self.wvl_unit == 'nanometers' else 1000
self.srfs_wvl = self.srfs_wvl * factor
self.wvl = self.wvl * factor
self.wvl_unit = 'nanometers' if self.wvl_unit == 'micrometers' else 'micrometers'
def __call__(self, band):
return self.srfs[band]
......
......@@ -15,25 +15,35 @@ from geoarray import GeoArray
from gms_preprocessing.io.Input_reader import SRF
from gms_preprocessing.config import set_config
from gms_preprocessing.algorithms.L2B_P import SpectralResampler1D as SR1D
from gms_preprocessing.algorithms.L2B_P import SpectralResampler as SR1D
class Test_SpectralResampler1D(unittest.TestCase):
class Test_SpectralResampler(unittest.TestCase):
"""Tests class for gms_preprocessing.algorithms.L2B_P.SpectralResampler1D"""
def setUp(self):
@classmethod
def setUpClass(cls):
# Testjob Landsat-8
set_config(call_type='webapp', exec_mode='Python', job_ID=26186196, db_host='geoms', reset=True)
self.geoArr = GeoArray('/misc/gms2/segl/GMS/SPECHOM/ORIG_DATA/Bavaria_farmland_LMU_Hyspex.bsq')
cls.geoArr = GeoArray('/misc/gms2/segl/GMS/SPECHOM/ORIG_DATA/Bavaria_farmland_LMU_Hyspex.bsq') # FIXME
cls.geoArr.to_mem()
# get SRF instance for Landsat-8
self.srf_l8 = SRF(dict(Satellite='Landsat-8', Sensor='OLI_TIRS', Subsystem=None,
logger=None, image_type='RSD', proc_level='L1A'))
cls.srf_l8 = SRF(dict(Satellite='Landsat-8', Sensor='OLI_TIRS', Subsystem=None,
logger=None, image_type='RSD', proc_level='L1A'))
def test_resample_signature(self):
# Get a hyperspectral spectrum.
self.spectrum = np.array(self.geoArr.meta.loc['wavelength'], dtype=np.float).flatten()
self.spectrum_wvl = self.geoArr[0, 0, :].flatten()
spectrum_wvl = np.array(self.geoArr.meta.loc['wavelength'], dtype=np.float).flatten()
spectrum = self.geoArr[0, 0, :].flatten()
def test_signature_resampling(self):
sr1d = SR1D(self.spectrum_wvl, self.srf_l8)
sr1d.resample(self.spectrum)
sr1d = SR1D(spectrum_wvl, self.srf_l8)
sr1d.resample_signature(spectrum)
def test_resample_image(self):
# Get a hyperspectral spectrum.
image_wvl = np.array(self.geoArr.meta.loc['wavelength'], dtype=np.float).flatten()
image = self.geoArr[:]
sr1d = SR1D(image_wvl, self.srf_l8)
sr1d.resample_image(image)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment