Commit 45b133cf authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Added kNN_MinimumDistance_Classifier + tests.


Signed-off-by: Daniel Scheffler's avatarDaniel Scheffler <danschef@gfz-potsdam.de>
parent 2f0a328d
Pipeline #4112 failed with stage
in 2 minutes and 15 seconds
......@@ -188,7 +188,7 @@ class MinimumDistance_Classifier(_ImageClassifier):
self.clf_name = 'minimum distance (nearest centroid)'
self.clf = NearestCentroid(**kwargs)
self.clf = NearestCentroid(**kwargs) # this is the fastest implementation
self.clf.fit(train_spectra, train_labels)
self.class_centroids = self.clf.centroids_
......@@ -239,6 +239,63 @@ class MinimumDistance_Classifier(_ImageClassifier):
self._show_distance_metrics(**kwargs)
class kNN_MinimumDistance_Classifier(MinimumDistance_Classifier):
def __init__(self, train_spectra, train_labels, n_neighbors=3, CPUs=1, **kwargs):
# type: (np.ndarray, Union[np.ndarray, List[int]], int, Union[int, None], dict) -> None
super(kNN_MinimumDistance_Classifier, self).__init__(train_spectra, train_labels, CPUs=CPUs, **kwargs)
self.clf_name = 'k-nearest neighbour minimum distance (nearest centroid) (kNN_MinDist; k=%d)' % n_neighbors
self.n_neighbors = n_neighbors
@staticmethod
def compute_euclidian_distance_3D(image, endmembers):
n_samples, n_features = endmembers.shape
if not image.shape[2] == endmembers.shape[1]:
raise RuntimeError('Matrix dimensions are not aligned. Input image has %d bands but input spectra '
'have %d.' % (image.shape[2], endmembers.shape[1]))
dists = np.zeros((image.shape[0], image.shape[1], n_samples), np.float32)
# if np.std(tileimdata) == 0: # skip tiles that only contain the same value
# loop over all training spectra and compute spectral angle for each pixel
for n_sample in range(n_samples):
train_spectrum = endmembers[n_sample, :].reshape(1, 1, n_features).astype(np.float)
diff = image - train_spectrum
dists[:, :, n_sample] = np.sqrt((diff ** 2).sum(axis=2))
return dists
def _predict(self, tilepos):
assert global_shared_endmembers is not None and global_shared_im2classify is not None
(rS, rE), (cS, cE) = tilepos
tileimdata = global_shared_im2classify[rS: rE + 1, cS: cE + 1, :]
dists = self.compute_euclidian_distance_3D(tileimdata, global_shared_endmembers)
k = self.n_neighbors if self.n_neighbors <= dists.shape[2] else dists.shape[2]
if self.n_neighbors < dists.shape[2]:
cmap = np.argpartition(dists, k, axis=2)[:, :, :k].astype(np.int16)
dists_min_k = np.partition(dists, k, axis=2)[:, :, :k].astype(np.float32)
# sort cmap by ascending spectral distances
idx_2D = np.argsort(dists_min_k, axis=2).reshape(-1, cmap.shape[2])
cmap = \
cmap.reshape(-1, cmap.shape[2])[np.arange(cmap.shape[0] * cmap.shape[1])[:, np.newaxis], idx_2D]\
.reshape(*cmap.shape)
dists_min_k = np.sort(dists_min_k, axis=2)
else:
cmap = np.tile(np.arange(dists.shape[2]).reshape(1, 1, -1), (*dists.shape[:2], 1))
dists_min_k = dists
if global_shared_im2classify.nodata is not None and self._cmap_nodataVal is not None:
cmap = self.overwrite_cmap_at_nodata_positions(cmap, tileimdata,
self._cmap_nodataVal, global_shared_im2classify.nodata)
return tilepos, cmap.astype(np.int16), dists_min_k
class kNN_Classifier(_ImageClassifier):
def __init__(self, train_spectra, train_labels, CPUs=1, **kwargs):
# type: (np.ndarray, Union[np.ndarray, List[int]], Union[int, None], dict) -> None
......@@ -516,13 +573,14 @@ def classify_image(image, train_spectra, train_labels, classif_alg, in_nodataVal
:param train_labels:
:param classif_alg: algorithm to be used for image classification
(to define which cluster each pixel belongs to)
'MinDist': Minimum Distance (Nearest Centroid)
'kNN': k-nearest-neighbour
'SAM': spectral angle mapping
'kNN_SAM': k-nearest neighbour spectral angle mapping
'FEDSA': fused euclidian distance / spectral angle
'SID': spectral information divergence
'RF': random forest
'MinDist': Minimum Distance (Nearest Centroid)
'kNN_MinDist': Minimum Distance (Nearest Centroid)
'kNN': k-nearest-neighbour
'SAM': spectral angle mapping
'kNN_SAM': k-nearest neighbour spectral angle mapping
'FEDSA': fused euclidian distance / spectral angle
'SID': spectral information divergence
'RF': random forest
:param in_nodataVal:
:param cmap_nodataVal:
:param tiledims:
......@@ -551,6 +609,13 @@ def classify_image(image, train_spectra, train_labels, classif_alg, in_nodataVal
CPUs=CPUs,
**kwargs)
elif classif_alg == 'kNN_MinDist':
clf = kNN_MinimumDistance_Classifier(
train_spectra,
train_labels,
CPUs=CPUs,
**kwargs) # 'n_neighbors' should be in there
elif classif_alg == 'SAM':
clf = SAM_Classifier(
train_spectra,
......@@ -580,15 +645,15 @@ def classify_image(image, train_spectra, train_labels, classif_alg, in_nodataVal
CPUs=CPUs, **kwargs)
else:
raise NotImplementedError("Currently only the methods 'kNN', 'MinDist', 'SAM', 'kNN_SAM', "
raise NotImplementedError("Currently only the methods 'kNN', 'MinDist', 'kNN_MinDist', 'SAM', 'kNN_SAM', "
"'FEDSA', 'SID' and 'RF' are implemented.")
cmap = clf.classify(image, in_nodataVal=in_nodataVal, cmap_nodataVal=cmap_nodataVal, tiledims=tiledims)
# label unclassified pixels
if unclassified_threshold is not None:
if classif_alg not in ['MinDist', 'SAM', 'kNN_SAM', 'FEDSA', 'SID']:
raise RuntimeError("Only the methods 'MinDist', 'SAM', 'kNN_SAM', 'FEDSA' and 'SID' "
if classif_alg not in ['MinDist', 'kNN_MinDist', 'SAM', 'kNN_SAM', 'FEDSA', 'SID']:
raise RuntimeError("Only the methods 'MinDist', 'kNN_MinDist', 'SAM', 'kNN_SAM', 'FEDSA' and 'SID' "
"can label unclassifed pixels.")
clf.label_unclassified_pixels(label_unclassified=unclassified_pixVal, threshold=unclassified_threshold)
......@@ -597,7 +662,7 @@ def classify_image(image, train_spectra, train_labels, classif_alg, in_nodataVal
if not return_distance:
return cmap
else:
if classif_alg == 'MinDist':
if classif_alg == ['MinDist', 'kNN_MinDist']:
dist = clf.euclidian_distance
elif classif_alg in ['SAM', 'kNN_SAM']:
dist = clf.angles_deg
......
......@@ -15,18 +15,19 @@ import zipfile
import tempfile
import numpy as np
from geoarray import GeoArray
from time import time
from gms_preprocessing import set_config
from gms_preprocessing.algorithms.classification import \
MinimumDistance_Classifier, kNN_Classifier, SAM_Classifier, kNN_SAM_Classifier, FEDSA_Classifier, \
SID_Classifier, RF_Classifier
MinimumDistance_Classifier, kNN_MinimumDistance_Classifier, kNN_Classifier, SAM_Classifier, kNN_SAM_Classifier, \
FEDSA_Classifier, SID_Classifier, RF_Classifier
from . import db_host
cfg = set_config(job_ID=26186196, db_host=db_host, reset_status=True, is_test=True)
path_classifier_zip = os.path.join(cfg.path_spechomo_classif, 'LR_classifiers.zip')
fName_cls = 'LR_clust50__Landsat-7__ETM+.dill'
test_gA = GeoArray(np.random.randint(0, 10000, (1010, 1010, 6), np.int16)) # 6 Landsat-5 bands
test_gA = GeoArray(np.random.RandomState(0).randint(0, 10000, (1010, 1010, 6), np.int16)) # 6 Landsat-5 bands
test_gA[:5, 0, :] = -9999
test_gA[:5, 1, 3] = -9999
......@@ -46,10 +47,12 @@ test_gA_pure_endmembers[:, :, :] = cluster_centers
class Test_MinimumDistance_Classifier(unittest.TestCase):
def test_classify(self):
t0 = time()
MDC = MinimumDistance_Classifier(cluster_centers, cluster_labels, CPUs=1)
cmap_sp = MDC.classify(test_gA, in_nodataVal=-9999, cmap_nodataVal=-9999)
self.assertIsInstance(cmap_sp, GeoArray)
self.assertEqual(cmap_sp.shape, (1010, 1010))
print(time()-t0)
MDC = MinimumDistance_Classifier(cluster_centers, cluster_labels, CPUs=None)
cmap_mp = MDC.classify(test_gA, in_nodataVal=-9999, cmap_nodataVal=-9999)
......@@ -73,6 +76,39 @@ class Test_MinimumDistance_Classifier(unittest.TestCase):
MDC.label_unclassified_pixels(label_unclassified=-1, threshold='10%')
class Test_kNN_MinimumDistance_Classifier(unittest.TestCase):
def setUp(self) -> None:
self.n_neighbors = 5
def test_classify(self):
MDC = kNN_MinimumDistance_Classifier(cluster_centers, cluster_labels, n_neighbors=self.n_neighbors, CPUs=1)
cmap_sp = MDC.classify(test_gA, in_nodataVal=-9999, cmap_nodataVal=-9999, tiledims=(400, 200))
self.assertIsInstance(cmap_sp, GeoArray)
self.assertEqual(cmap_sp.shape, (1010, 1010, self.n_neighbors))
MDC = kNN_MinimumDistance_Classifier(cluster_centers, cluster_labels, n_neighbors=self.n_neighbors, CPUs=None)
cmap_mp = MDC.classify(test_gA, in_nodataVal=-9999, cmap_nodataVal=-9999, tiledims=(400, 200))
self.assertIsInstance(cmap_mp, GeoArray)
self.assertEqual(cmap_mp.shape, (1010, 1010, self.n_neighbors))
self.assertTrue(np.array_equal(cmap_sp, cmap_mp))
MDC = kNN_MinimumDistance_Classifier(cluster_centers, cluster_labels, n_neighbors=self.n_neighbors, CPUs=None)
cmap_mp = MDC.classify(test_gA_pure_endmembers, in_nodataVal=-9999, cmap_nodataVal=-9999)
self.assertTrue(np.array_equal(cmap_mp[:, :, 0].flatten(), cluster_labels))
def test_label_unclassified_pixels_absolute_th(self):
MDC = kNN_MinimumDistance_Classifier(cluster_centers, cluster_labels, n_neighbors=self.n_neighbors, CPUs=None)
MDC.classify(test_gA, in_nodataVal=-9999, cmap_nodataVal=-9999, tiledims=(400, 200))
MDC.label_unclassified_pixels(label_unclassified=-1, threshold=10)
def test_label_unclassified_pixels_relative_th(self):
MDC = kNN_MinimumDistance_Classifier(cluster_centers, cluster_labels, self.n_neighbors, CPUs=None)
MDC.classify(test_gA, in_nodataVal=-9999, cmap_nodataVal=-9999, tiledims=(400, 200))
MDC.label_unclassified_pixels(label_unclassified=-1, threshold='10%')
class Test_kNN_Classifier(unittest.TestCase):
def test_classify(self):
kNNC = kNN_Classifier(cluster_centers, cluster_labels, CPUs=1)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment