Commit f01e4ff5 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Added L2A_P.SpecHomo_Classifier + test_spechomo_classifier.py.

parent 25aea6ba
Pipeline #1429 failed with stage
in 8 minutes and 2 seconds
......@@ -3,11 +3,12 @@
Level 2B Processor: Spectral homogenization
"""
import os
import numpy as np
import logging
from scipy.interpolate import interp1d
import scipy as sp
import matplotlib.pyplot as plt
from logging import Logger
from sklearn.cluster import KMeans, k_means_
from pandas import DataFrame
......@@ -68,7 +69,7 @@ class L2B_object(L2A_object):
class SpectralResampler(object):
"""Class for spectral resampling of a single spectral signature (1D-array) or an image (3D-array)."""
def __init__(self, wvl_src, srf_tgt, wvl_unit='nanometers', logger=Logger(__name__)):
def __init__(self, wvl_src, srf_tgt, wvl_unit='nanometers', logger=None):
# type: (np.ndarray, SRF, str) -> None
"""Get an instance of the SpectralResampler1D class.
......@@ -87,7 +88,7 @@ class SpectralResampler(object):
self.wvl_src_nm = wvl if wvl_unit == 'nanometers' else wvl * 1000
self.srf_tgt = srf_tgt
self.wvl_unit = wvl_unit
self.logger = logger
self.logger = logger or logging.getLogger(__name__)
def resample_signature(self, spectrum, scale_factor=10000, v=False):
# type: (np.ndarray, int, bool) -> np.ndarray
......@@ -286,6 +287,87 @@ class KMeansRSImage(object):
return random_samples
class SpecHomo_Classifier(object):
def __init__(self, filelist_refs, v=False, logger=None):
"""
:param filelist_refs: list of reference images
"""
self.ims_ref = filelist_refs
self.v = v
self.logger = logger or logging.getLogger(__name__)
def generate_reference_cube(self, tgt_satellite, tgt_sensor, n_clusters=10, tgt_n_samples=1000):
# type: (str, str, int, int) -> np.ndarray
"""Generate reference spectra from all hyperspectral input images.
The hyperspectral images are spectrally resampled to the target sensor specifications. The resulting target
sensor image is then clustered and the same number of spectra is randomly selected from each cluster. All
spectra are combined into a single 'reference cube' containing the same number of spectra for each cluster
whereas the spectra orginate from all the input images.
:param tgt_satellite: target satellite, e.g., 'Landsat-8'
:param tgt_sensor: target sensor, e.g.. 'OLI_TIRS'
:param n_clusters: number of clusters to be used for clustering the input images (KMeans)
:param tgt_n_samples: number o spectra to be collected from each input image
:return: np.array: [tgt_n_samples x images x spectral bands of the target sensor]
"""
self.logger.info('Generating reference spectra from all input images...')
# get SRFs
self.logger.info('Reading spectral response functions of target sensor...')
tgt_srf = SRF(dict(Satellite=tgt_satellite, Sensor=tgt_sensor, Subsystem=None, image_type='RSD',
proc_level='L1A', logger=self.logger))
if self.v:
tgt_srf.plot_srfs()
# generate random spectra samples equally for each KMeans cluster
random_samples_all_ims = dict()
for im_ref in self.ims_ref:
im_name = os.path.basename(im_ref)
# read input image
self.logger.info('Reading the input image %s...' % im_name)
im_gA = GeoArray(im_ref)
im_gA.cwl = np.array(im_gA.meta.loc['wavelength'], dtype=np.float).flatten()
# perform spectral resampling of input image to match spectral properties of target sensor
self.logger.info('Performing spectral resampling to match spectral properties of target sensor...')
SR = SpectralResampler(im_gA.cwl, tgt_srf)
im_tgt = GeoArray(SR.resample_image(im_gA[:]), geotransform=im_gA.gt, projection=im_gA.prj)
# compute KMeans clusters for the spectrally resampled image
self.logger.info('Computing %s KMeans clusters...' % n_clusters)
kmeans = KMeansRSImage(im_tgt, n_clusters=n_clusters)
if self.v:
kmeans.plot_cluster_centers()
kmeans.plot_cluster_histogram()
# randomly grab the given number of spectra from each cluster
self.logger.info('Getting %s random spectra from each cluster...' % (tgt_n_samples//n_clusters))
random_samples = kmeans.get_random_spectra_from_each_cluster(samplesize=tgt_n_samples//n_clusters)
# combine the spectra (2D arrays) of all clusters to a single 2D array
random_samples = np.vstack([random_samples[clusterlabel] for clusterlabel in random_samples])
# reshape it so that we have the spectral information in 3rd dimension (x rows, 1 column and z bands)
random_samples = random_samples.reshape((random_samples.shape[0], 1, random_samples.shape[1]))
# assign the result to the dict of all images
random_samples_all_ims[im_name] = random_samples
# Build the reference cube from the random samples of each image
# => rows: tgt_n_samples, columns: images, bands: spectral information
self.logger.info('Combining randomly sampled spectra to a single reference cube...')
im_names = [os.path.basename(p) for p in self.ims_ref]
ref_cube = np.hstack([random_samples_all_ims[imN] for imN in im_names])
return ref_cube
......
......@@ -29,7 +29,7 @@ testdata = os.path.join(os.path.dirname(__file__),
class Test_KMeansRSImage(unittest.TestCase):
"""Tests class for gms_preprocessing.algorithms.L2B_P.SpectralResampler1D"""
"""Tests class for gms_preprocessing.algorithms.L2B_P.SpectralResampler"""
@classmethod
def setUpClass(cls):
......@@ -49,8 +49,10 @@ class Test_KMeansRSImage(unittest.TestCase):
self.assertTrue(labels.size == self.geoArr.rows * self.geoArr.cols)
def test_get_random_spectra_from_each_cluster(self):
self.kmeans.get_random_spectra_from_each_cluster()
# TODO
random_samples = self.kmeans.get_random_spectra_from_each_cluster()
self.assertIsInstance(random_samples, dict)
for cluster_label in range(self.kmeans.n_clusters):
self.assertIn(cluster_label, random_samples)
def test_plot_cluster_centers(self):
self.kmeans.plot_cluster_centers()
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
test_spechomo_classifier
------------------------
Tests for gms_preprocessing.algorithms.L2B_P.SpecHomo_Classifier
"""
import unittest
import os
import numpy as np
from gms_preprocessing import __file__ # noqa E402 module level import not at top of file
from gms_preprocessing.config import set_config # noqa E402 module level import not at top of file
from gms_preprocessing.algorithms.L2B_P import SpecHomo_Classifier # noqa E402 module level import not at top of file
testdata = os.path.join(os.path.dirname(__file__),
'../tests/data/hy_spec_data/Bavaria_farmland_LMU_Hyspex_subset.bsq')
class Test_SpecHomo_Classifier(unittest.TestCase):
"""Tests class for gms_preprocessing.algorithms.L2B_P.SpecHomo_Classifier"""
@classmethod
def setUpClass(cls):
# Testjob Landsat-8
set_config(call_type='webapp', exec_mode='Python', job_ID=26186196, db_host='geoms', reset=True)
cls.SHC = SpecHomo_Classifier([testdata, testdata, ], v=False)
def test_generate_reference_cube_L8(self):
ref_cube = self.SHC.generate_reference_cube('Landsat-8', 'OLI_TIRS', n_clusters=10, tgt_n_samples=1000)
self.assertIsInstance(ref_cube, np.ndarray)
def test_generate_reference_cube_S2A(self):
ref_cube = self.SHC.generate_reference_cube('Sentinel-2A', 'MSI', n_clusters=10, tgt_n_samples=1000)
self.assertIsInstance(ref_cube, np.ndarray)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment