Commit 59dfe723 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Added first version of SpectralResampler1D incl. test module...

Added first version of SpectralResampler1D incl. test module 'test_spectral_resampler'. Some PEP8 editing.
Former-commit-id: 35efca93
Former-commit-id: e781750d
parent 00490e67
This diff is collapsed.
......@@ -3,13 +3,16 @@
Level 2B Processor: Spectral homogenization
"""
__author__='Daniel Scheffler'
import numpy as np
from scipy.interpolate import interp1d
import scipy as sp
import matplotlib.pyplot as plt
from pandas import DataFrame
from ..config import GMS_config as CFG
from .L2A_P import L2A_object
from ..config import GMS_config as CFG
from .L2A_P import L2A_object
__author__ = 'Daniel Scheffler'
class L2B_object(L2A_object):
......@@ -25,18 +28,21 @@ class L2B_object(L2A_object):
def spectral_homogenization(self, subset=None, kind='linear'):
src_cwls = self.meta_odict['wavelength']
tgt_cwls = CFG.usecase.target_CWL # FIXME exclude or include thermal bands; respect sorted CWLs in context of LayerBandsAssignment
if src_cwls!=tgt_cwls:
assert kind in ['linear',], "%s is not a supported kind of homogenization." %kind
self.log_for_fullArr_or_firstTile('Performing spectral homogenization (%s) with target wavelength '
'positions at %s nm.' %(kind,', '.join(np.array(tgt_cwls[:-1]).astype(str))+' and %s' %tgt_cwls[-1]))
self.LayerBandsAssignment = [] # TODO better band names for homogenized product -> include in get_LayerBandsAssignment
self.arr = self.interpolate_cube_linear(self.arr,src_cwls,tgt_cwls) if kind=='linear' else self.arr
# FIXME exclude or include thermal bands; respect sorted CWLs in context of LayerBandsAssignment
tgt_cwls = CFG.usecase.target_CWL
if src_cwls != tgt_cwls:
assert kind in ['linear', ], "%s is not a supported kind of homogenization." % kind
self.log_for_fullArr_or_firstTile(
'Performing spectral homogenization (%s) with target wavelength positions at %s nm.'
% (kind, ', '.join(np.array(tgt_cwls[:-1]).astype(str))+' and %s' % tgt_cwls[-1]))
# TODO better band names for homogenized product -> include in get_LayerBandsAssignment
self.LayerBandsAssignment = []
self.arr = self.interpolate_cube_linear(self.arr,src_cwls,tgt_cwls) if kind == 'linear' else self.arr
self.meta_odict['wavelength'] = list(tgt_cwls)
self.meta_odict['bands'] = len(tgt_cwls)
if 'band names' in self.meta_odict: # FIXME bug workaround
if 'band names' in self.meta_odict: # FIXME bug workaround
del self.meta_odict['band names'] # TODO
else:
self.logger.info("Spectral homogenization has been skipped because the current spectral characteristics "
......@@ -46,7 +52,7 @@ class L2B_object(L2A_object):
def interpolate_cube_linear(arrcube, source_CWLs, target_CWLs):
# type: (np.ndarray,list,list) -> np.ndarray
assert arrcube is not None,\
'L2B_obj.interpolate_cube_linear expects a numpy array as input. Got %s.' %type(arrcube)
'L2B_obj.interpolate_cube_linear expects a numpy array as input. Got %s.' % type(arrcube)
orig_CWLs, target_CWLs = np.array(source_CWLs), np.array(target_CWLs)
outarr = interp1d(np.array(orig_CWLs), arrcube, axis=2, kind='linear', fill_value='extrapolate')(target_CWLs)
outarr = outarr.astype(np.int16)
......@@ -54,6 +60,63 @@ class L2B_object(L2A_object):
return outarr
from ..io.Input_reader import SRF
class SpectralResampler1D(object):
def __init__(self, wvl_src, srf_tgt, wvl_unit='nanometers'):
# type: (np.ndarray, SRF, str) -> None
"""
:param wvl_src:
:param srf_tgt:
:param wvl_unit: the wavelengths unit of the source wavelenth positions ('nanometers' or 'micrometers)
"""
# validate inputs
if wvl_unit not in ['micrometers', 'nanometers']:
raise ValueError('Unknown wavelength unit %s.' % wvl_unit)
assert srf_tgt.wvl_unit == wvl_unit, "The wavelength unit of the provided SRF object instande must equal the " \
"wavelength unit of the given source wavelength positions. Received " \
"srf_tgt.wvl_unit='%s' and wvl_unit='%s'." % (srf_tgt.wvl_unit, wvl_unit)
#wvl =
#self.wvl_src_nm = wvl if wvl_unit == 'nanometers' else wvl * 1000
self.wvl_src_nm = np.array(wvl_src, dtype=np.float).flatten()
self.srf_tgt = srf_tgt
def resample(self, spectrum):
# type: (np.ndarray) -> np.ndarray
"""
:param spectrum:
:return:
"""
spectrum = np.array(spectrum, dtype=np.float).flatten()
assert spectrum.size == self.wvl_src_nm.size
# resample input spectrum and wavelength to 1nm
wvl_1nm = np.arange(np.ceil(self.wvl_src_nm.min()), np.floor(self.wvl_src_nm.max()), 1)
spectrum_1nm = sp.interpolate.interp1d(self.wvl_src_nm, spectrum, kind='linear')(wvl_1nm)
plt.figure()
plt.plot(wvl_1nm, spectrum_1nm/10000, '.')
for band, wvl_center in zip(self.srf_tgt.bands, self.srf_tgt.wvl):
# resample srf to 1 nm
wvl_srf_1nm = np.arange(np.ceil(self.srf_tgt.srfs_wvl.min()), np.floor(self.srf_tgt.srfs_wvl.max()), 1)
srf_1nm = sp.interpolate.interp1d(
self.srf_tgt.srfs_wvl, self.srf_tgt.srfs[band], kind='linear')(wvl_srf_1nm)
# get the spectral range where the given spectrum and the current SRF function overlap
df_overlap = DataFrame(dict(wvl=wvl_1nm, spectrum=spectrum_1nm))\
.merge(DataFrame(dict(wvl=wvl_srf_1nm, srf=srf_1nm)), on='wvl', how="outer", copy=False).dropna()
# compute the resampled spectral value (np.average computes the weighted mean value)
specval_resamp = np.average(df_overlap['spectrum'], weights=df_overlap['srf'])
plt.plot(wvl_srf_1nm, srf_1nm)
plt.plot(wvl_center, specval_resamp/10000, 'x', color='r')
class SpectralResampler2D(object):
def __init__(self, wvl_src, srf_tgt=None, ):
self.wvl_src = wvl_src
......@@ -63,4 +126,3 @@ class SpectralResampler2D(object):
# type: (np.ndarray) -> np.ndarray
assert spectrum.shape == self.wvl_src.shape
This diff is collapsed.
......@@ -15,11 +15,11 @@ requirements = [
'pandas', 'numba', 'spectral', 'geopandas', 'iso8601', 'pyinstrument', 'geoalchemy2', 'sqlalchemy', 'psycopg2',
'py_tools_ds', 'geoarray', 'arosics'
# fmask # conda install -c conda-forge python-fmask
#'pyhdf', # conda install --yes -c conda-forge pyhdf
#'sicor', # pip install git+https://gitext.gfz-potsdam.de/hollstei/sicor.git
# 'pyhdf', # conda install --yes -c conda-forge pyhdf
# 'sicor', # pip install git+https://gitext.gfz-potsdam.de/hollstei/sicor.git
]
setup_requirements = [] # TODO(danschef): put setup requirements (distutils extensions, etc.) here
test_requirements = requirements+ ['coverage', 'nose', 'nose-htmloutput', 'rednose']
setup_requirements = [] # TODO(danschef): put setup requirements (distutils extensions, etc.) here
test_requirements = requirements + ['coverage', 'nose', 'nose-htmloutput', 'rednose']
setup(
name='gms_preprocessing',
......@@ -29,8 +29,8 @@ setup(
author="Daniel Scheffler",
author_email='daniel.scheffler@gfz-potsdam.de',
url='https://gitext.gfz-potsdam.de/geomultisens/gms_preprocessing',
packages=find_packages(), # searches for packages with an __init__.py and returns them as properly formatted list
package_dir={'gms_preprocessing':'gms_preprocessing'},
packages=find_packages(), # searches for packages with an __init__.py and returns them as properly formatted list
package_dir={'gms_preprocessing': 'gms_preprocessing'},
package_data={"database": ["gms_preprocessing/database/*"]},
include_package_data=True,
install_requires=requirements,
......@@ -62,4 +62,4 @@ try:
except ImportError:
warnings.warn('If you have not compiled GDAL with HDF4 support you need to install pyhdf manually '
'(see http://pysclint.sourceforge.net/pyhdf/install.html) for processing Terra ASTER data.'
'It is not automatically installed.') # TODO
'It is not automatically installed.') # TODO
......@@ -19,10 +19,11 @@ from py_tools_ds.geo.projection import EPSG2WKT, WKT2EPSG
gmsRepo_rootpath = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
class Test_DEM_Creator(unittest.TestCase):
class Test_DEM_Creator(unittest.TestCase):
def setUp(self):
set_config(call_type='webapp', exec_mode='Python', job_ID=26186196, db_host='geoms', reset=True, # Testjob Landsat-8
# Testjob Landsat-8
set_config(call_type='webapp', exec_mode='Python', job_ID=26186196, db_host='geoms', reset=True,
job_kwargs=dict(is_test=True,
path_archive=os.path.join(gmsRepo_rootpath, 'tests', 'data', 'archive_data')))
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
test_spectral_resampler
-----------------------
Tests for gms_preprocessing.algorithms.L2B_P.SpectralResampler1D
"""
import unittest
import numpy as np
from geoarray import GeoArray
from gms_preprocessing.io.Input_reader import SRF
from gms_preprocessing.config import set_config
from gms_preprocessing.algorithms.L2B_P import SpectralResampler1D as SR1D
class Test_SpectralResampler1D(unittest.TestCase):
"""Tests class for gms_preprocessing.algorithms.L2B_P.SpectralResampler1D"""
def setUp(self):
# Testjob Landsat-8
set_config(call_type='webapp', exec_mode='Python', job_ID=26186196, db_host='geoms', reset=True)
self.geoArr = GeoArray('/misc/gms2/segl/GMS/SPECHOM/ORIG_DATA/Bavaria_farmland_LMU_Hyspex.bsq')
# get SRF instance for Landsat-8
self.srf_l8 = SRF(dict(Satellite='Landsat-8', Sensor='OLI_TIRS', Subsystem=None,
logger=None, image_type='RSD', proc_level='L1A'))
# Get a hyperspectral spectrum.
self.spectrum = np.array(self.geoArr.meta.loc['wavelength'], dtype=np.float).flatten()
self.spectrum_wvl = self.geoArr[0, 0, :].flatten()
def test_signature_resampling(self):
sr1d = SR1D(self.spectrum_wvl, self.srf_l8)
sr1d.resample(self.spectrum)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment