Commit 2056441b authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Updated version info.

Merge branch 'feature/spectral_homogenization'
Conflicts:
	requirements.txt
	setup.py
Former-commit-id: 4754de07
Former-commit-id: 1d953976
parents 4485439e df55f7f6
......@@ -3,10 +3,17 @@
Level 2B Processor: Spectral homogenization
"""
import os
import numpy as np
import logging
from scipy.interpolate import interp1d
import scipy as sp
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from pandas import DataFrame
from sklearn.cluster import k_means_ # noqa F401 # flake8 issue
from geoarray import GeoArray # noqa F401 # flake8 issue
from ..config import GMS_config as CFG
from ..io.input_reader import SRF # noqa F401 # flake8 issue
......@@ -56,14 +63,14 @@ class L2B_object(L2A_object):
orig_CWLs, target_CWLs = np.array(source_CWLs), np.array(target_CWLs)
outarr = interp1d(np.array(orig_CWLs), arrcube, axis=2, kind='linear', fill_value='extrapolate')(target_CWLs)
outarr = outarr.astype(np.int16)
assert outarr.shape == (arrcube.shape[:2] + (len(target_CWLs), ))
assert outarr.shape == tuple([*arrcube.shape[:2], len(target_CWLs)])
return outarr
class SpectralResampler(object):
"""Class for spectral resampling of a single spectral signature (1D-array) or an image (3D-array)."""
def __init__(self, wvl_src, srf_tgt, wvl_unit='nanometers'):
def __init__(self, wvl_src, srf_tgt, wvl_unit='nanometers', logger=None):
# type: (np.ndarray, SRF, str) -> None
"""Get an instance of the SpectralResampler1D class.
......@@ -82,6 +89,7 @@ class SpectralResampler(object):
self.wvl_src_nm = wvl if wvl_unit == 'nanometers' else wvl * 1000
self.srf_tgt = srf_tgt
self.wvl_unit = wvl_unit
self.logger = logger or logging.getLogger(__name__)
def resample_signature(self, spectrum, scale_factor=10000, v=False):
# type: (np.ndarray, int, bool) -> np.ndarray
......@@ -149,6 +157,8 @@ class SpectralResampler(object):
image_rsp = np.zeros((R, C, B), dtype=image_cube.dtype)
for band_idx, (band, wvl_center) in enumerate(zip(self.srf_tgt.bands, self.srf_tgt.wvl)):
self.logger.info('Applying spectral resampling to band %s...' % band_idx)
# resample srf to 1 nm
srf_1nm = sp.interpolate.interp1d(self.srf_tgt.srfs_wvl, self.srf_tgt.srfs[band],
bounds_error=False, fill_value=0, kind='linear')(wvl_1nm)
......@@ -157,3 +167,203 @@ class SpectralResampler(object):
image_rsp[:, :, band_idx] = np.average(image_1nm, weights=srf_1nm, axis=2)
return image_rsp
class KMeansRSImage(object):
_clusters = None
_im_clust = None
_spectra = None
def __init__(self, im, n_clusters):
# type: (GeoArray, int) -> None
self.im = im
self.n_clusters = n_clusters
@property
def clusters(self):
# type: () -> k_means_.KMeans
if not self._clusters:
self._clusters = self.compute_clusters()
return self._clusters
@clusters.setter
def clusters(self, clusters):
self._clusters = clusters
@property
def im_clust(self):
if self._im_clust is None:
self._im_clust = self.clusters.labels_.reshape((self.im.rows, self.im.cols))
return self._im_clust
def compute_clusters(self):
kmeans = KMeans(n_clusters=self.n_clusters, random_state=0)
self.clusters = kmeans.fit(self._im2spectra(self.im))
return self.clusters
def apply_clusters(self, image):
labels = self.clusters.predict(self._im2spectra(GeoArray(image)))
return labels
@staticmethod
def _im2spectra(geoArr):
return geoArr.reshape((geoArr.rows * geoArr.cols, geoArr.bands))
def plot_cluster_centers(self, figsize=(15, 5)):
# type: (tuple) -> None
"""Show a plot of the cluster center signatures.
:param figsize: figure size (inches)
"""
plt.figure(figsize=figsize)
for i, center_signature in enumerate(self.clusters.cluster_centers_):
plt.plot(range(1, self.im.bands + 1), center_signature, label='Cluster #%s' % (i + 1))
plt.title('KMeans cluster centers for %s clusters' % self.n_clusters)
plt.xlabel('Spectral band')
plt.ylabel('Pixel values')
plt.legend(loc='upper right')
plt.show()
def plot_cluster_histogram(self, figsize=(15, 5)):
# type: (tuple) -> None
"""Show a histogram indicating the proportion of each cluster label in percent.
:param figsize: figure size (inches)
"""
# grab the number of different clusters and create a histogram
# based on the number of pixels assigned to each cluster
numLabels = np.arange(0, len(np.unique(self.clusters.labels_)) + 1)
hist, bins = np.histogram(self.clusters.labels_, bins=numLabels)
# normalize the histogram, such that it sums to 100
hist = hist.astype("float")
hist /= hist.sum() / 100
# plot the histogram as bar plot
plt.figure(figsize=figsize)
plt.bar(bins[:-1], hist, width=1)
plt.title('Proportion of cluster labels (%s clusters)' % self.n_clusters)
plt.xlabel('# Cluster')
plt.ylabel('Proportion [%]')
plt.show()
def plot_clustered_image(self, figsize=(15, 15)):
# type: (tuple) -> None
"""Show a the clustered image.
:param figsize: figure size (inches)
"""
plt.figure(figsize=figsize)
rows, cols = self.im_clust.shape[:2]
plt.imshow(self.im_clust, plt.cm.prism, interpolation='none', extent=(0, cols, rows, 0))
plt.show()
def get_random_spectra_from_each_cluster(self, samplesize=50):
# type: (int) -> dict
"""Returns a given number of spectra randomly selected within each cluster.
E.g., 50 spectra of belonging to cluster 1, 50 spectra of belonging to cluster 2 and so on.
:param samplesize: number of spectra to be randomly selected from each cluster
:return:
"""
# get DataFrame with columns [cluster_label, B1, B2, B3, ...]
df = DataFrame(self._im2spectra(self.im), columns=['B%s' % band for band in range(1, self.im.bands + 1)], )
df.insert(0, 'cluster_label', self.clusters.labels_)
# get random sample from each cluster and generate a dict like {cluster_label: random_sample}
random_samples = dict()
for label in range(self.n_clusters):
cluster_subset = df[df.cluster_label == label].loc[:, 'B1':]
# get random sample while filling it with duplicates of the same sample when cluster has not enough spectra
random_samples[label] = np.array(cluster_subset.sample(samplesize, replace=True))
return random_samples
class SpecHomo_Classifier(object):
def __init__(self, filelist_refs, v=False, logger=None):
"""
:param filelist_refs: list of reference images
"""
self.ims_ref = filelist_refs
self.v = v
self.logger = logger or logging.getLogger(__name__)
def generate_reference_cube(self, tgt_satellite, tgt_sensor, n_clusters=10, tgt_n_samples=1000):
# type: (str, str, int, int) -> np.ndarray
"""Generate reference spectra from all hyperspectral input images.
The hyperspectral images are spectrally resampled to the target sensor specifications. The resulting target
sensor image is then clustered and the same number of spectra is randomly selected from each cluster. All
spectra are combined into a single 'reference cube' containing the same number of spectra for each cluster
whereas the spectra orginate from all the input images.
:param tgt_satellite: target satellite, e.g., 'Landsat-8'
:param tgt_sensor: target sensor, e.g.. 'OLI_TIRS'
:param n_clusters: number of clusters to be used for clustering the input images (KMeans)
:param tgt_n_samples: number o spectra to be collected from each input image
:return: np.array: [tgt_n_samples x images x spectral bands of the target sensor]
"""
self.logger.info('Generating reference spectra from all input images...')
# get SRFs
self.logger.info('Reading spectral response functions of target sensor...')
tgt_srf = SRF(dict(Satellite=tgt_satellite, Sensor=tgt_sensor, Subsystem=None, image_type='RSD',
proc_level='L1A', logger=self.logger))
if self.v:
tgt_srf.plot_srfs()
# generate random spectra samples equally for each KMeans cluster
random_samples_all_ims = dict()
for im_ref in self.ims_ref:
im_name = os.path.basename(im_ref)
# read input image
self.logger.info('Reading the input image %s...' % im_name)
im_gA = GeoArray(im_ref)
im_gA.cwl = np.array(im_gA.meta.loc['wavelength'], dtype=np.float).flatten()
# perform spectral resampling of input image to match spectral properties of target sensor
self.logger.info('Performing spectral resampling to match spectral properties of target sensor...')
SR = SpectralResampler(im_gA.cwl, tgt_srf)
im_tgt = GeoArray(SR.resample_image(im_gA[:]), geotransform=im_gA.gt, projection=im_gA.prj)
# compute KMeans clusters for the spectrally resampled image
self.logger.info('Computing %s KMeans clusters...' % n_clusters)
kmeans = KMeansRSImage(im_tgt, n_clusters=n_clusters)
if self.v:
kmeans.plot_cluster_centers()
kmeans.plot_cluster_histogram()
# randomly grab the given number of spectra from each cluster
self.logger.info('Getting %s random spectra from each cluster...' % (tgt_n_samples//n_clusters))
random_samples = kmeans.get_random_spectra_from_each_cluster(samplesize=tgt_n_samples//n_clusters)
# combine the spectra (2D arrays) of all clusters to a single 2D array
random_samples = np.vstack([random_samples[clusterlabel] for clusterlabel in random_samples])
# reshape it so that we have the spectral information in 3rd dimension (x rows, 1 column and z bands)
random_samples = random_samples.reshape((random_samples.shape[0], 1, random_samples.shape[1]))
# assign the result to the dict of all images
random_samples_all_ims[im_name] = random_samples
# Build the reference cube from the random samples of each image
# => rows: tgt_n_samples, columns: images, bands: spectral information
self.logger.info('Combining randomly sampled spectra to a single reference cube...')
im_names = [os.path.basename(p) for p in self.ims_ref]
ref_cube = np.hstack([random_samples_all_ims[imN] for imN in im_names])
return ref_cube
......@@ -10,7 +10,9 @@ import tarfile
import warnings
import zipfile
from tempfile import NamedTemporaryFile as tempFile
from typing import TYPE_CHECKING
from logging import Logger
from matplotlib import pyplot as plt
from typing import Dict # noqa F401 # flake8 issue
import dill
import numpy as np
......@@ -32,9 +34,6 @@ from geoarray import GeoArray
from py_tools_ds.geo.coord_calc import corner_coord_to_minmax
from py_tools_ds.geo.coord_trafo import transform_any_prj
if TYPE_CHECKING:
from logging import Logger # noqa F401 # flake8 issue
def read_ENVIfile(path, arr_shape, arr_pos, logger=None, return_meta=False, q=0):
hdr_path = os.path.splitext(path)[0] + '.hdr' if not os.path.splitext(path)[1] == '.hdr' else path
......@@ -184,7 +183,8 @@ def SRF_reader(GMS_identifier, v=False):
:param v: verbose mode
"""
satellite, sensor, logger = (GMS_identifier['Satellite'], GMS_identifier['Sensor'], GMS_identifier['logger'])
satellite, sensor = GMS_identifier['Satellite'], GMS_identifier['Sensor']
logger = GMS_identifier['logger'] or Logger(__name__)
LayerBandsAssignment = META.get_LayerBandsAssignment(GMS_identifier)
SRF_dict = collections.OrderedDict()
SRF_dir = PG.get_path_srf_file(GMS_identifier)
......@@ -221,38 +221,20 @@ class SRF(object):
raise ValueError('Unknown wavelength unit %s.' % wvl_unit)
self.srfs_wvl = []
self.srfs = []
self.srfs = {}
self.srfs_norm01 = {}
self.bands = []
self.wvl = []
self.wvl_unit = wvl_unit
self.format_bandnames = format_bandnames
self.satellite = GMS_identifier['Satellite'] if GMS_identifier else ''
self.sensor = GMS_identifier['Sensor'] if GMS_identifier else ''
self.conv = {}
self.v = v
if GMS_identifier:
self.from_GMS_identifier(GMS_identifier)
def _set_attrs(self, wvl, srfs):
for bN in srfs:
srf = np.array(srfs[bN], dtype=np.float)
srfs[bN] = srf / np.trapz(x=wvl, y=srf) # TODO seems like we NEED nanometers here; BUT WHY??
self.srfs_wvl = np.array(wvl)
self.srfs = srfs
self.bands = sorted(list(self.srfs.keys()))
# FIXME this is not the GMS algorithm to calculate center wavelengths
# calculate center wavelengths
# TODO seems like we NEED nanometers here; BUT WHY??:
self.wvl = np.array([np.trapz(x=self.srfs_wvl, y=self.srfs_wvl * self.srfs[band]) for band in self.bands])
# self.wvl = self.wvl if self.wvl_unit == 'micrometers' else np.array([int(i) for i in self.wvl])
self.srfs_wvl = self.srfs_wvl if self.wvl_unit == 'nanometers' else self.srfs_wvl / 1000
self.wvl = self.wvl if self.wvl_unit == 'nanometers' else self.wvl / 1000
self.conv = {}
self.conv.update({key: value for key, value in zip(self.bands, self.wvl)})
self.conv.update({value: key for key, value in zip(self.bands, self.wvl)})
def from_GMS_identifier(self, GMS_identifier):
srf_dict = SRF_reader(GMS_identifier, v=self.v)
return self.from_dict(srf_dict)
......@@ -278,7 +260,30 @@ class SRF(object):
df = df.join(Series(srfs, index=wvls, name=bandname))
df = df.fillna(0)
wvl = np.array(df.index.astype(np.float))
self._set_attrs(wvl=wvl, srfs=df.to_dict(orient='series'))
srfs_norm01 = df.to_dict(orient='series') # type: Dict[Series]
################
# set attributes
################
for bN in srfs_norm01:
self.srfs_norm01[bN] = srf = np.array(srfs_norm01[bN], dtype=np.float)
self.srfs[bN] = srf / np.trapz(x=wvl, y=srf) # TODO seems like we NEED nanometers here; BUT WHY??
self.srfs_wvl = np.array(wvl)
self.bands = sorted(list(self.srfs.keys()))
# FIXME this is not the GMS algorithm to calculate center wavelengths
# calculate center wavelengths
# TODO seems like we NEED nanometers here; BUT WHY??:
self.wvl = np.array([np.trapz(x=self.srfs_wvl, y=self.srfs_wvl * self.srfs[band]) for band in self.bands])
# self.wvl = self.wvl if self.wvl_unit == 'micrometers' else np.array([int(i) for i in self.wvl])
self.srfs_wvl = self.srfs_wvl if self.wvl_unit == 'nanometers' else self.srfs_wvl / 1000
self.wvl = self.wvl if self.wvl_unit == 'nanometers' else self.wvl / 1000
self.conv.update({key: value for key, value in zip(self.bands, self.wvl)})
self.conv.update({value: key for key, value in zip(self.bands, self.wvl)})
return self
......@@ -303,6 +308,22 @@ class SRF(object):
def __getitem__(self, band):
return self.srfs[band]
def plot_srfs(self, figsize=(15, 5)):
# type: (tuple) -> None
"""Show a plot of all spectral response functions.
:param figsize: figue size of the plot
"""
from ..misc.helper_functions import sorted_nicely
plt.figure(figsize=figsize)
for band in sorted_nicely(self.bands):
plt.plot(self.srfs_wvl, list(self.srfs_norm01[band]), '-', label='Band %s' % band)
plt.title(' '.join(['Spectral response functions', self.satellite, self.sensor]))
plt.xlabel('wavelength [%s]' % self.wvl_unit)
plt.ylabel('spectral response')
plt.legend(loc='upper right')
def pickle_SRF_DB(L1A_Instances):
list_GMS_identifiers = [i.GMS_identifier for i in L1A_Instances]
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
test_kmeans
-----------
Tests for gms_preprocessing.algorithms.L2B_P.KMeansRSImage
"""
import unittest
import os
import numpy as np
from sklearn.cluster import k_means_
from geoarray import GeoArray # noqa E402 module level import not at top of file
from gms_preprocessing import __file__ # noqa E402 module level import not at top of file
from gms_preprocessing.config import set_config # noqa E402 module level import not at top of file
from gms_preprocessing.algorithms.L2B_P import KMeansRSImage # noqa E402 module level import not at top of file
testdata = os.path.join(os.path.dirname(__file__),
'../tests/data/hy_spec_data/Bavaria_farmland_LMU_Hyspex_subset.bsq')
class Test_KMeansRSImage(unittest.TestCase):
"""Tests class for gms_preprocessing.algorithms.L2B_P.SpectralResampler"""
@classmethod
def setUpClass(cls):
# Testjob Landsat-8
set_config(call_type='webapp', exec_mode='Python', job_ID=26186196, db_host='geoms', reset=True)
cls.geoArr = GeoArray(testdata)
cls.geoArr.to_mem()
cls.kmeans = KMeansRSImage(cls.geoArr, n_clusters=10)
os.environ['MPLBACKEND'] = 'Template' # disables matplotlib figure popups # NOTE: import geoarray sets 'Agg'
def test_compute_clusters(self):
self.kmeans.compute_clusters()
self.assertIsInstance(self.kmeans.clusters, k_means_.KMeans)
def test_apply_clusters(self):
labels = self.kmeans.apply_clusters(self.geoArr)
self.assertIsInstance(labels, np.ndarray)
self.assertTrue(labels.size == self.geoArr.rows * self.geoArr.cols)
def test_get_random_spectra_from_each_cluster(self):
random_samples = self.kmeans.get_random_spectra_from_each_cluster()
self.assertIsInstance(random_samples, dict)
for cluster_label in range(self.kmeans.n_clusters):
self.assertIn(cluster_label, random_samples)
def test_plot_cluster_centers(self):
self.kmeans.plot_cluster_centers()
def test_plot_cluster_histogram(self):
self.kmeans.plot_cluster_histogram()
def test_plot_clustered_image(self):
self.kmeans.plot_clustered_image()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
test_spechomo_classifier
------------------------
Tests for gms_preprocessing.algorithms.L2B_P.SpecHomo_Classifier
"""
import unittest
import os
import numpy as np
from gms_preprocessing import __file__ # noqa E402 module level import not at top of file
from gms_preprocessing.config import set_config # noqa E402 module level import not at top of file
from gms_preprocessing.algorithms.L2B_P import SpecHomo_Classifier # noqa E402 module level import not at top of file
testdata = os.path.join(os.path.dirname(__file__),
'../tests/data/hy_spec_data/Bavaria_farmland_LMU_Hyspex_subset.bsq')
class Test_SpecHomo_Classifier(unittest.TestCase):
"""Tests class for gms_preprocessing.algorithms.L2B_P.SpecHomo_Classifier"""
@classmethod
def setUpClass(cls):
# Testjob Landsat-8
set_config(call_type='webapp', exec_mode='Python', job_ID=26186196, db_host='geoms', reset=True)
cls.SHC = SpecHomo_Classifier([testdata, testdata, ], v=False)
def test_generate_reference_cube_L8(self):
ref_cube = self.SHC.generate_reference_cube('Landsat-8', 'OLI_TIRS', n_clusters=10, tgt_n_samples=1000)
self.assertIsInstance(ref_cube, np.ndarray)
def test_generate_reference_cube_S2A(self):
ref_cube = self.SHC.generate_reference_cube('Sentinel-2A', 'MSI', n_clusters=10, tgt_n_samples=1000)
self.assertIsInstance(ref_cube, np.ndarray)
......@@ -5,7 +5,7 @@
test_spectral_resampler
-----------------------
Tests for gms_preprocessing.algorithms.L2B_P.SpectralResampler1D
Tests for gms_preprocessing.algorithms.L2B_P.SpectralResampler
"""
import unittest
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment