Commit c6dd9c3b authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Added kNN_SAM_Classifier + tests. Revised SAM_Classifier.


Signed-off-by: Daniel Scheffler's avatarDaniel Scheffler <danschef@gfz-potsdam.de>
parent 090aef8a
Pipeline #4090 failed with stage
in 2 minutes and 24 seconds
......@@ -67,11 +67,6 @@ class _ImageClassifier(object):
bounds_alltiles = get_array_tilebounds(image_cube_gA.shape, tiledims)
# use a local variable to avoid pickling in multiprocessing
cmap = GeoArray(np.empty((image_cube_gA.rows, image_cube_gA.cols), dtype=dtype_cmap), nodata=cmap_nodataVal)
cmap.unclassified_val = None
dist = np.empty((image_cube_gA.rows, image_cube_gA.cols), dtype=np.float32)
print('Performing %s image classification...' % self.clf_name)
if self.CPUs is None or self.CPUs > 1:
with Pool(self.CPUs, initializer=initializer, initargs=(self.train_spectra, image_cube_gA)) as pool:
......@@ -81,6 +76,13 @@ class _ImageClassifier(object):
initializer(self.train_spectra, image_cube_gA)
tiles_results = [self._predict(bounds) for bounds in tqdm(bounds_alltiles)]
# use a local variable to avoid pickling in multiprocessing
cmap_dist_shape = (image_cube_gA.rows, image_cube_gA.cols) if tiles_results[0][1].ndim == 2 else \
(image_cube_gA.rows, image_cube_gA.cols, tiles_results[0][1].ndim)
cmap = GeoArray(np.empty(cmap_dist_shape, dtype=dtype_cmap), nodata=cmap_nodataVal)
cmap.unclassified_val = None
dist = np.empty(cmap_dist_shape, dtype=np.float32)
for tile_res in tiles_results:
((rS, rE), (cS, cE)), tile_cm = tile_res[:2]
cmap[rS: rE + 1, cS: cE + 1] = tile_cm
......@@ -273,21 +275,15 @@ class SAM_Classifier(_ImageClassifier):
def angles_deg(self):
return np.rad2deg(self._distance_metrics) if self._distance_metrics is not None else None
def _predict(self, tilepos):
assert global_shared_endmembers is not None and global_shared_im2classify is not None
(rS, rE), (cS, cE) = tilepos
tileimdata = global_shared_im2classify[rS: rE + 1, cS: cE + 1, :]
endmembers = global_shared_endmembers
if not tileimdata.shape[2] == self.train_spectra.shape[1]:
def _calc_sam(self, image, endmembers):
if not image.shape[2] == self.train_spectra.shape[1]:
raise RuntimeError('Matrix dimensions are not aligned. Input image has %d bands but input spectra '
'have %d.' % (tileimdata.shape[2], self.train_spectra.shape[1]))
'have %d.' % (image.shape[2], self.train_spectra.shape[1]))
# normalize input data because SAM asserts only data between -1 and 1
train_spectra_norm, tileimdata_norm = normalize_endmembers_image(endmembers, tileimdata)
train_spectra_norm, tileimdata_norm = normalize_endmembers_image(endmembers, image)
angles = np.zeros((tileimdata.shape[0], tileimdata.shape[1], self.n_samples), np.float)
angles = np.zeros((image.shape[0], image.shape[1], self.n_samples), np.float)
# if np.std(tileimdata) == 0: # skip tiles that only contain the same value
# loop over all training spectra and compute spectral angle for each pixel
......@@ -295,6 +291,15 @@ class SAM_Classifier(_ImageClassifier):
train_spectrum = train_spectra_norm[n_sample, :].reshape(1, 1, self.n_features)
angles[:, :, n_sample] = calc_sam(tileimdata_norm, train_spectrum, axis=2)
return angles
def _predict(self, tilepos):
assert global_shared_endmembers is not None and global_shared_im2classify is not None
(rS, rE), (cS, cE) = tilepos
tileimdata = global_shared_im2classify[rS: rE + 1, cS: cE + 1, :]
angles = self._calc_sam(tileimdata, global_shared_endmembers)
angles_min = np.min(angles, axis=2).astype(np.float32)
cmap = np.argmin(angles, axis=2).astype(np.int16)
......@@ -317,6 +322,31 @@ class SAM_Classifier(_ImageClassifier):
self._show_distance_metrics(**kwargs)
class kNN_SAM_Classifier(SAM_Classifier):
def __init__(self, train_spectra, k=3, CPUs=1):
# type: (np.ndarray, int, Union[int, None]) -> None
super(kNN_SAM_Classifier, self).__init__(train_spectra, CPUs=CPUs)
self.clf_name = 'k-nearest neighbour spectral angle mapper (SAM)'
self.k = k
def _predict(self, tilepos):
assert global_shared_endmembers is not None and global_shared_im2classify is not None
(rS, rE), (cS, cE) = tilepos
tileimdata = global_shared_im2classify[rS: rE + 1, cS: cE + 1, :]
angles = self._calc_sam(tileimdata, global_shared_endmembers)
cmap = np.argpartition(angles, self.k, axis=2)[:, :, :self.k].astype(np.int16)
angles_min_k = np.partition(angles, self.k, axis=2)[:, :, :self.k].astype(np.float32)
if global_shared_im2classify.nodata is not None and self._cmap_nodataVal is not None:
cmap = self.overwrite_cmap_at_nodata_positions(cmap, tileimdata,
self._cmap_nodataVal, global_shared_im2classify.nodata)
return tilepos, cmap.astype(np.int16), angles_min_k
class FEDSA_Classifier(_ImageClassifier):
def __init__(self, train_spectra, CPUs=1):
# type: (np.ndarray, Union[int, None]) -> None
......
......@@ -18,7 +18,8 @@ from geoarray import GeoArray
from gms_preprocessing import set_config
from gms_preprocessing.algorithms.classification import \
MinimumDistance_Classifier, kNN_Classifier, SAM_Classifier, FEDSA_Classifier, SID_Classifier, RF_Classifier
MinimumDistance_Classifier, kNN_Classifier, SAM_Classifier, kNN_SAM_Classifier, FEDSA_Classifier, \
SID_Classifier, RF_Classifier
from . import db_host
......@@ -120,6 +121,41 @@ class Test_SAM_Classifier(unittest.TestCase):
SC.label_unclassified_pixels(label_unclassified=-1, threshold='10%')
class Test_KNN_SAM_Classifier(unittest.TestCase):
def setUp(self) -> None:
self.k = 3
def test_classify(self):
SC = kNN_SAM_Classifier(cluster_centers, k=self.k, CPUs=1)
cmap_sp = SC.classify(test_gA, in_nodataVal=-9999, cmap_nodataVal=-9999, tiledims=(400, 200))
self.assertIsInstance(cmap_sp, GeoArray)
self.assertEqual(cmap_sp.shape, (1010, 1010, self.k))
SC = kNN_SAM_Classifier(cluster_centers, k=self.k, CPUs=None)
cmap_mp = SC.classify(test_gA, in_nodataVal=-9999, cmap_nodataVal=-9999, tiledims=(400, 200))
self.assertIsInstance(cmap_mp, GeoArray)
self.assertEqual(cmap_mp.shape, (1010, 1010, self.k))
self.assertTrue(np.array_equal(cmap_sp, cmap_mp))
SC = kNN_SAM_Classifier(cluster_centers, k=self.k, CPUs=None)
cmap_mp = SC.classify(test_gA_pure_endmembers, in_nodataVal=-9999, cmap_nodataVal=-9999)
for i, cl in enumerate(cluster_labels):
self.assertTrue(cl in cmap_mp[0, i, :])
# self.assertTrue(np.array_equal(cmap_mp.flatten(), cluster_labels)) # TODO sort cmap by SC.angles_deg
def test_label_unclassified_pixels_absolute_th(self):
SC = kNN_SAM_Classifier(cluster_centers, k=self.k, CPUs=None)
SC.classify(test_gA, in_nodataVal=-9999, cmap_nodataVal=-9999, tiledims=(400, 200))
SC.label_unclassified_pixels(label_unclassified=-1, threshold=10)
def test_label_unclassified_pixels_relative_th(self):
SC = kNN_SAM_Classifier(cluster_centers, self.k, CPUs=None)
SC.classify(test_gA, in_nodataVal=-9999, cmap_nodataVal=-9999, tiledims=(400, 200))
SC.label_unclassified_pixels(label_unclassified=-1, threshold='10%')
class Test_FEDSA_Classifier(unittest.TestCase):
def test_classify(self):
FC = FEDSA_Classifier(cluster_centers, CPUs=1)
......@@ -136,6 +172,7 @@ class Test_FEDSA_Classifier(unittest.TestCase):
FC = FEDSA_Classifier(cluster_centers, CPUs=None)
cmap_mp = FC.classify(test_gA_pure_endmembers, in_nodataVal=-9999, tiledims=(400, 200))
FC.label_unclassified_pixels(-1, 0.2)
self.assertTrue(np.array_equal(cmap_mp.flatten(), cluster_labels))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment