Commit ec4dddb2 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Revised image classification algorithms to speed them up in multiprocessing....

Revised image classification algorithms to speed them up in multiprocessing. Added multiprocessing tests for classification algorithms.
parent cacba3a8
Pipeline #3246 failed with stage
in 17 minutes and 17 seconds
......@@ -12,6 +12,22 @@ from tqdm import tqdm
from sklearn.neighbors import KNeighborsClassifier, NearestCentroid
from sklearn.preprocessing import MaxAbsScaler
from geoarray import GeoArray
from py_tools_ds.numeric.array import get_array_tilebounds
global_shared_endmembers = None
global_shared_im2classify = None
def initializer(endmembers, im2classify):
"""Declare global variables needed for image classifiers.
:param endmembers:
:param im2classify:
"""
global global_shared_endmembers, global_shared_im2classify
global_shared_endmembers = endmembers
global_shared_im2classify = im2classify
class _ImageClassifier(object):
......@@ -26,30 +42,39 @@ class _ImageClassifier(object):
self.clf = None # to be implemented by the subclass
self.cmap = None
def _predict(self, tilepos, tileimdata):
def _predict(self, tilepos):
raise NotImplementedError('This method has to be implemented by the subclass.')
def classify(self, image_cube, nodataVal=None, tiledims=(1000, 1000)):
def classify(self, image_cube, nodataVal=None, tiledims=(250, 250)):
image_cube_gA = GeoArray(image_cube, nodata=nodataVal)
image_cube_gA.to_mem()
self.cmap = GeoArray(np.empty((image_cube_gA.rows, image_cube_gA.cols),
dtype=np.array(self.train_labels).dtype), nodata=nodataVal)
bounds_alltiles = get_array_tilebounds(image_cube_gA.shape, tiledims)
# use a local variable to avoid pickling in multiprocessing
cmap = GeoArray(np.empty((image_cube_gA.rows, image_cube_gA.cols),
dtype=np.array(self.train_labels).dtype), nodata=nodataVal)
print('Performing image classification...')
if self.CPUs is None or self.CPUs > 1:
with Pool(self.CPUs) as pool:
tiles_cm = pool.starmap(self._predict, image_cube_gA.tiles(tiledims))
for ((rS, rE), (cS, cE)), tile_cm in tiles_cm:
self.cmap[rS: rE + 1, cS: cE + 1] = tile_cm
with Pool(self.CPUs, initializer=initializer, initargs=(self.train_spectra, image_cube_gA)) as pool:
tiles_cm = pool.map(self._predict, bounds_alltiles)
for ((rS, rE), (cS, cE)), tile_cm in tiles_cm:
cmap[rS: rE + 1, cS: cE + 1] = tile_cm
else:
for ((rS, rE), (cS, cE)), tile in tqdm(image_cube_gA.tiles(tiledims)):
print('Performing classification for tile ((%s, %s), (%s, %s))...' % (rS, rE, cS, cE))
self.cmap[rS: rE + 1, cS: cE + 1] = self._predict(((rS, rE), (cS, cE)), tile)[1]
initializer(self.train_spectra, image_cube_gA)
for (rS, rE), (cS, cE) in tqdm(bounds_alltiles):
# print('Performing classification for tile ((%s, %s), (%s, %s))...' % (rS, rE, cS, cE))
cmap[rS: rE + 1, cS: cE + 1] = self._predict(((rS, rE), (cS, cE)))[1]
if nodataVal is not None:
self.cmap[image_cube_gA.mask_nodata.astype(np.int8) == 0] = nodataVal
cmap[image_cube_gA.mask_nodata.astype(np.int8) == 0] = nodataVal
self.cmap = cmap.astype(image_cube.dtype)
return self.cmap.astype(image_cube.dtype)
return self.cmap
def show_cmap(self):
if self.cmap:
......@@ -63,12 +88,20 @@ class MinimumDistance_Classifier(_ImageClassifier):
"""
def __init__(self, train_spectra, train_labels, CPUs=1):
# type: (np.ndarray, Union[np.ndarray, List[int]], Union[int, None]) -> None
if CPUs is None or CPUs > 1:
CPUs = 1 # The NearestCentroid seens to parallelize automatically. So using multiprocessing is slower.
super(MinimumDistance_Classifier, self).__init__(train_spectra, train_labels, CPUs=CPUs)
self.clf = NearestCentroid()
self.clf.fit(train_spectra, train_labels)
def _predict(self, tilepos, tileimdata):
def _predict(self, tilepos):
assert global_shared_im2classify is not None
(rS, rE), (cS, cE) = tilepos
tileimdata = global_shared_im2classify[rS: rE + 1, cS: cE + 1, :]
spectra = tileimdata.reshape((tileimdata.shape[0] * tileimdata.shape[1], tileimdata.shape[2]))
return tilepos, self.clf.predict(spectra).reshape(*tileimdata.shape[:2])
......@@ -78,10 +111,14 @@ class kNN_Classifier(_ImageClassifier):
# type: (np.ndarray, Union[np.ndarray, List[int]], Union[int, None], int) -> None
super(kNN_Classifier, self).__init__(train_spectra, train_labels, CPUs=CPUs)
self.clf = KNeighborsClassifier(n_neighbors=n_neighbors, n_jobs=CPUs)
self.clf = KNeighborsClassifier(n_neighbors=n_neighbors, n_jobs=1)
self.clf.fit(train_spectra, train_labels)
def _predict(self, tilepos, tileimdata):
def _predict(self, tilepos):
assert global_shared_im2classify is not None
(rS, rE), (cS, cE) = tilepos
tileimdata = global_shared_im2classify[rS: rE + 1, cS: cE + 1, :]
spectra = tileimdata.reshape((tileimdata.shape[0] * tileimdata.shape[1], tileimdata.shape[2]))
return tilepos, self.clf.predict(spectra).reshape(*tileimdata.shape[:2])
......@@ -89,28 +126,32 @@ class kNN_Classifier(_ImageClassifier):
class SAM_Classifier(_ImageClassifier):
def __init__(self, train_spectra, CPUs=1):
# type: (np.ndarray, Union[int, None]) -> None
if np.percentile(train_spectra, 80) > 1.:
warnings.warn('SAM assumes the input spectra to be scaled between 0 and 1. Received data with an '
'80% percentile above 1. This might lead to invalid output values.')
super(SAM_Classifier, self).__init__(train_spectra, np.array(range(train_spectra.shape[0])), CPUs=CPUs)
def _predict(self, tilepos, tileimdata):
def _predict(self, tilepos):
assert global_shared_endmembers is not None and global_shared_im2classify is not None
(rS, rE), (cS, cE) = tilepos
tileimdata = global_shared_im2classify[rS: rE + 1, cS: cE + 1, :]
endmembers = global_shared_endmembers # type: np.ndarray
if not tileimdata.shape[2] == self.train_spectra.shape[1]:
raise RuntimeError('Matrix dimensions are not aligned. Input image has %d bands but input spectra '
'have %d.' % (tileimdata.shape[2], self.train_spectra.shape[1]))
angles = np.zeros((tileimdata.shape[0], tileimdata.shape[1], self.n_samples), np.float)
# normalize input data because SAM asserts only data between -1 and 1
train_spectra_norm, tileimdata_norm = normalize_endmembers_image(endmembers, tileimdata)
angles = np.zeros((tileimdata.shape[0], tileimdata.shape[1], self.n_samples), np.float)
# if np.std(tileimdata) == 0: # skip tiles that only contain the same value
for n_sample in range(self.n_samples):
train_spectrum = self.train_spectra[n_sample, :].reshape(1, 1, self.n_features)
angles[:, :, n_sample] = self._calc_sam(tileimdata.astype(np.float),
train_spectrum.astype(np.float),
train_spectrum = train_spectra_norm[n_sample, :].reshape(1, 1, self.n_features)
angles[:, :, n_sample] = self._calc_sam(tileimdata_norm,
train_spectrum,
axis=2)
cmap = np.argmin(angles, axis=2)
cmap = np.argmin(angles, axis=2).astype(np.int16)
return tilepos, cmap
......@@ -164,9 +205,6 @@ def classify_image(image, train_spectra, train_labels, classif_alg,
CPUs=CPUs)
elif classif_alg == 'SAM':
# normalize input data because SAM asserts only data between -1 and 1
train_spectra, image = normalize_endmembers_image(train_spectra, image)
clf = SAM_Classifier(
train_spectra,
CPUs=CPUs)
......@@ -188,9 +226,13 @@ def normalize_endmembers_image(endmembers, image):
allVals = np.hstack([em.flat, im.flat]).reshape(-1, 1)
max_abs_scaler = MaxAbsScaler()
max_abs_scaler.fit_transform(allVals)
endmembers_norm = max_abs_scaler.transform(em)
image_norm = spectra2im(max_abs_scaler.transform(im2spectra(im)), tgt_rows=im.shape[0], tgt_cols=im.shape[1])
if allVals.min() < -1 or allVals.max() > 1:
max_abs_scaler = MaxAbsScaler()
max_abs_scaler.fit_transform(allVals)
endmembers_norm = max_abs_scaler.transform(em)
image_norm = spectra2im(max_abs_scaler.transform(im2spectra(im)), tgt_rows=im.shape[0], tgt_cols=im.shape[1])
return endmembers_norm, image_norm
return endmembers_norm, image_norm
else:
return em, im
......@@ -41,26 +41,44 @@ with zipfile.ZipFile(path_classifier_zip, "r") as zf, tempfile.TemporaryDirector
class Test_MinimumDistance_Classifier(unittest.TestCase):
def test_classify(self):
MDC = MinimumDistance_Classifier(cluster_centers, cluster_labels)
cmap = MDC.classify(test_gA, nodataVal=-9999)
MDC = MinimumDistance_Classifier(cluster_centers, cluster_labels, CPUs=1)
cmap_sp = MDC.classify(test_gA, nodataVal=-9999)
self.assertIsInstance(cmap_sp, np.ndarray)
self.assertEqual(cmap_sp.shape, (1010, 1010))
self.assertIsInstance(cmap, np.ndarray)
self.assertEqual(cmap.shape, (1010, 1010))
MDC = MinimumDistance_Classifier(cluster_centers, cluster_labels, CPUs=None)
cmap_mp = MDC.classify(test_gA, nodataVal=-9999)
self.assertIsInstance(cmap_mp, np.ndarray)
self.assertEqual(cmap_mp.shape, (1010, 1010))
self.assertTrue(np.array_equal(cmap_sp, cmap_mp))
class Test_kNN_Classifier(unittest.TestCase):
def test_classify(self):
kNNC = kNN_Classifier(cluster_centers, cluster_labels)
cmap = kNNC.classify(test_gA, nodataVal=-9999)
kNNC = kNN_Classifier(cluster_centers, cluster_labels, CPUs=1)
cmap_sp = kNNC.classify(test_gA, nodataVal=-9999)
self.assertIsInstance(cmap_sp, np.ndarray)
self.assertEqual(cmap_sp.shape, (1010, 1010))
kNNC = kNN_Classifier(cluster_centers, cluster_labels, CPUs=None)
cmap_mp = kNNC.classify(test_gA, nodataVal=-9999)
self.assertIsInstance(cmap_mp, np.ndarray)
self.assertEqual(cmap_mp.shape, (1010, 1010))
self.assertIsInstance(cmap, np.ndarray)
self.assertEqual(cmap.shape, (1010, 1010))
self.assertTrue(np.array_equal(cmap_sp, cmap_mp))
class Test_SAM_Classifier(unittest.TestCase):
def test_classify(self):
SC = SAM_Classifier(cluster_centers)
cmap = SC.classify(test_gA, nodataVal=-9999, tiledims=(400, 200))
SC = SAM_Classifier(cluster_centers, CPUs=1)
cmap_sp = SC.classify(test_gA, nodataVal=-9999, tiledims=(400, 200))
self.assertIsInstance(cmap_sp, np.ndarray)
self.assertEqual(cmap_sp.shape, (1010, 1010))
SC = SAM_Classifier(cluster_centers, CPUs=None)
cmap_mp = SC.classify(test_gA, nodataVal=-9999, tiledims=(400, 200))
self.assertIsInstance(cmap_mp, np.ndarray)
self.assertEqual(cmap_mp.shape, (1010, 1010))
self.assertIsInstance(cmap, np.ndarray)
self.assertEqual(cmap.shape, (1010, 1010))
self.assertTrue(np.array_equal(cmap_sp, cmap_mp))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment