Commit de969c60 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Further developed L2B processor.

Former-commit-id: 09d7afde
Former-commit-id: cff5d7d9
parent f4a1fb8b
......@@ -12,6 +12,7 @@ import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from pandas import DataFrame
from typing import Union # noqa F401 # flake8 issue
from tqdm import tqdm
from sklearn.cluster import k_means_ # noqa F401 # flake8 issue
from geoarray import GeoArray # noqa F401 # flake8 issue
......@@ -83,6 +84,10 @@ class SpectralResampler(object):
if wvl_unit not in ['micrometers', 'nanometers']:
raise ValueError('Unknown wavelength unit %s.' % wvl_unit)
# privates
self._wvl_1nm = None
self._srf_1nm = {}
wvl = np.array(wvl_src, dtype=np.float).flatten()
if srf_tgt.wvl_unit != 'nanometers':
srf_tgt.convert_wvl_unit()
......@@ -92,6 +97,27 @@ class SpectralResampler(object):
self.wvl_unit = wvl_unit
self.logger = logger or logging.getLogger(__name__)
@property
def wvl_1nm(self):
# spectral resampling of input image to 1 nm resolution
if self._wvl_1nm is None:
self._wvl_1nm = np.arange(np.ceil(self.wvl_src_nm.min()), np.floor(self.wvl_src_nm.max()), 1)
return self._wvl_1nm
@property
def srf_1nm(self):
if not self._srf_1nm:
for band in self.srf_tgt.bands:
# resample srf to 1 nm
self._srf_1nm[band] = \
sp.interpolate.interp1d(self.srf_tgt.srfs_wvl, self.srf_tgt.srfs[band],
bounds_error=False, fill_value=0, kind='linear')(self.wvl_1nm)
# validate
assert len(self._srf_1nm[band]) == len(self.wvl_1nm)
return self._srf_1nm
def resample_signature(self, spectrum, scale_factor=10000, v=False):
# type: (np.ndarray, int, bool) -> np.ndarray
"""Resample the given spectrum according to the spectral response functions of the target instument.
......@@ -108,75 +134,82 @@ class SpectralResampler(object):
assert spectrum.size == self.wvl_src_nm.size
# resample input spectrum and wavelength to 1nm
wvl_1nm = np.arange(np.ceil(self.wvl_src_nm.min()), np.floor(self.wvl_src_nm.max()), 1)
spectrum_1nm = interp1d(self.wvl_src_nm, spectrum,
bounds_error=False, fill_value=0, kind='linear')(wvl_1nm)
bounds_error=False, fill_value=0, kind='linear')(self.wvl_1nm)
if v:
plt.figure()
plt.plot(wvl_1nm, spectrum_1nm/scale_factor, '.')
plt.plot(self.wvl_1nm, spectrum_1nm/scale_factor, '.')
spectrum_rsp = []
for band, wvl_center in zip(self.srf_tgt.bands, self.srf_tgt.wvl):
# resample srf to 1 nm
srf_1nm = interp1d(self.srf_tgt.srfs_wvl, self.srf_tgt.srfs[band],
bounds_error=False, fill_value=0, kind='linear')(wvl_1nm)
# compute the resampled spectral value (np.average computes the weighted mean value)
specval_rsp = np.average(spectrum_1nm, weights=srf_1nm)
specval_rsp = np.average(spectrum_1nm, weights=self.srf_1nm[band])
if v:
plt.plot(wvl_1nm, srf_1nm/max(srf_1nm))
plt.plot(self.wvl_1nm, self.srf_1nm/max(self.srf_1nm))
plt.plot(wvl_center, specval_rsp/scale_factor, 'x', color='r')
spectrum_rsp.append(specval_rsp)
return np.array(spectrum_rsp)
def resample_image(self, image_cube):
# type: (np.ndarray, int, bool) -> np.ndarray
"""Resample the given spectral image cube according to the spectral response functions of the target instument.
def resample_image(self, image_cube, tiledims=(20, 20)):
# type: (Union[GeoArray, np.ndarray], int, bool) -> np.ndarray
"""Resample the given spectral image cube according to the spectral response functions of the target instrument.
:param image_cube: image (3D array) containing the spectral information in the third dimension
:return: resampled spectral image cube
"""
# input validation
if not isinstance(image_cube, np.ndarray):
if not isinstance(image_cube, (GeoArray, np.ndarray)):
raise TypeError(image_cube)
if not image_cube.ndim == 3:
raise ValueError("The given image cube must be 3-dimensional. Received a %s-dimensional array."
% image_cube.ndim)
assert image_cube.shape[2] == self.wvl_src_nm.size
# resample input image and wavelength to 1nm
wvl_1nm = np.arange(np.ceil(self.wvl_src_nm.min()), np.floor(self.wvl_src_nm.max()), 1)
image_1nm = interp1d(self.wvl_src_nm, image_cube,
axis=2, bounds_error=False, fill_value=0, kind='linear')(wvl_1nm)
from ..misc.helper_functions import get_image_tileborders
tilebounds = get_image_tileborders('block', tiledims, shape_fullArr=image_cube.shape)
(R, C), B = image_cube.shape[:2], len(self.srf_tgt.bands)
image_rsp = np.zeros((R, C, B), dtype=image_cube.dtype)
for bounds in tilebounds:
(rs, re), (cs, ce) = bounds
tile = image_cube[rs: re+1, cs: ce+1, :] if image_cube.ndim == 3 else image_cube[rs: re+1, cs: ce+1]
for band_idx, (band, wvl_center) in enumerate(zip(self.srf_tgt.bands, self.srf_tgt.wvl)):
self.logger.info('Applying spectral resampling to band %s...' % band_idx)
tile_rsp = self._specresample(tile)
# resample srf to 1 nm
srf_1nm = sp.interpolate.interp1d(self.srf_tgt.srfs_wvl, self.srf_tgt.srfs[band],
bounds_error=False, fill_value=0, kind='linear')(wvl_1nm)
if image_cube.ndim == 3:
image_rsp[rs: re + 1, cs: ce + 1, :] = tile_rsp
else:
image_rsp[rs: re + 1, cs: ce + 1] = tile_rsp
return image_rsp
def _specresample(self, tile):
# spectral resampling of input image to 1 nm resolution
tile_1nm = interp1d(self.wvl_src_nm, tile,
axis=2, bounds_error=False, fill_value=0, kind='linear')(self.wvl_1nm)
tile_rsp = np.zeros((*tile_1nm.shape[:2], len(self.srf_tgt.bands)), dtype=tile.dtype)
for band_idx, band in enumerate(self.srf_tgt.bands):
# compute the resampled image cube (np.average computes the weighted mean value)
image_rsp[:, :, band_idx] = np.average(image_1nm, weights=srf_1nm, axis=2)
tile_rsp[:, :, band_idx] = np.average(tile_1nm, weights=self.srf_1nm[band], axis=2)
return image_rsp
return tile_rsp
class KMeansRSImage(object):
_clusters = None
_im_clust = None
_spectra = None
def __init__(self, im, n_clusters):
# type: (GeoArray, int) -> None
# privates
self._clusters = None
self._im_clust = None
self._spectra = None
self.im = im
self.n_clusters = n_clusters
......@@ -334,11 +367,17 @@ class SpecHomo_Classifier(object):
self.logger.info('Reading the input image %s...' % im_name)
im_gA = GeoArray(im_ref)
im_gA.cwl = np.array(im_gA.meta.loc['wavelength'], dtype=np.float).flatten()
# im_gA.to_mem()
wvl_unit = 'nanometers' if max(im_gA.cwl) > 15 else 'micrometers'
# perform spectral resampling of input image to match spectral properties of target sensor
self.logger.info('Performing spectral resampling to match spectral properties of target sensor...')
SR = SpectralResampler(im_gA.cwl, tgt_srf)
im_tgt = GeoArray(SR.resample_image(im_gA[:]), geotransform=im_gA.gt, projection=im_gA.prj)
SR = SpectralResampler(im_gA.cwl, tgt_srf, wvl_unit=wvl_unit)
im_tgt = np.empty((*im_gA.shape[:2], len(tgt_srf.bands)))
for ((rS, rE), (cS, cE)), tiledata in tqdm(im_gA.tiles((1000, 1000)), total=900):
im_tgt[rS: rE + 1, cS: cE + 1, :] = SR.resample_image(tiledata)
im_tgt = GeoArray(im_tgt, im_gA.gt, im_gA.prj)
# compute KMeans clusters for the spectrally resampled image
self.logger.info('Computing %s KMeans clusters...' % n_clusters)
......
......@@ -220,9 +220,9 @@ class SRF(object):
if wvl_unit not in ['micrometers', 'nanometers']:
raise ValueError('Unknown wavelength unit %s.' % wvl_unit)
self.srfs_wvl = []
self.srfs = {}
self.srfs_norm01 = {}
self.srfs_wvl = [] # wavelength positions with 1 nm precision
self.srfs = {} # srf values with 1 nm precision
self.srfs_norm01 = {} # srf values with 1 nm precision
self.bands = []
self.wvl = []
self.wvl_unit = wvl_unit
......
......@@ -137,12 +137,11 @@ def convert_absPathArchive_to_GDALvsiPath(path_archive):
def get_image_tileborders(target_tileShape, target_tileSize, path_GMS_file=None, shape_fullArr=None):
"""Calculates row/col bounds for image tiles according to the given parameters.
:param target_tileShape: <str> 'cube','row','col','band','block','pixel' or 'custom'
:param target_tileShape: <str> 'row','col','band','block','pixel' or 'custom'
:param target_tileSize: None or <tuple>. Only needed if target_tileShape=='block': (rows,cols)
:param path_GMS_file: <str> path to the *.gms file. Only needed if shape_fullArr is not given
:param shape_fullArr: <tuple> (rows,columns,bands)
"""
if shape_fullArr is None:
shape_fullArr = json.load(open(path_GMS_file))['shape_fullArr']
rows, cols = shape_fullArr[:2]
......@@ -172,6 +171,9 @@ def get_image_tileborders(target_tileShape, target_tileSize, path_GMS_file=None,
col_bounds.append(cols - 1)
return [[tuple([row_bounds[r], row_bounds[r + 1]]), tuple([col_bounds[c], col_bounds[c + 1]])]
for r in range(0, len(row_bounds), 2) for c in range(0, len(col_bounds), 2)]
else:
raise ValueError("'%s' is not a valid target_tileShape. "
"Choose between 'row','col','band','block','pixel' or 'custom'!" % target_tileShape)
def cut_GMS_obj_into_blocks(tuple__In_obj__blocksize_RowsCols):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment