Commit e46224d5 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

first revision of multilevel outlier dection algorithm

components.CoReg.GeoArray_CoReg:
- add_mask_bad_data() has been moved to GeoArray class within external library 'py_tools_ds'

components,CoReg.COREG:
- refactored attribute 'confidence_shifts' to 'shift_reliability'
- _get_opt_winpos_winsize(): bugfix for indexing GeoArray with float values
- refactored _calc_shift_confidence() to _calc_shift_reliability()

components,CoReg.CoReg_local:
- adjusted some code in consequence to new mask_baddata attribute within GeoArray
- adopted view_CoRegPoints() to new outlier detection algorithm

components,Geom_Quality_Grid.Geom_Quality_Grid:
- renamed column 'CONFIDENCE' within CoRegPoints_table to 'RELIABILITY'
- revised outlier detection algorithm:
    - _flag_outliers() has been moved to new class 'Tie_Point_Refiner'
    - _ransac_outlier_detection() has been moved to new class 'Tie_Point_Refiner'
    - to_GCPList() adopted to new outlier detection algorithm
- added class class 'Tie_Point_Refiner'

- updated __version__
parent cc553994
......@@ -9,7 +9,7 @@ from .components import utilities
from .components import geometry
__author__ = 'Daniel Scheffler'
__version__= '2016-11-18_01'
__version__= '2016-11-22_01'
__all__=['COREG',
'COREG_LOCAL',
......
......@@ -108,35 +108,8 @@ class GeoArray_CoReg(GeoArray):
# add bad data mask
given_mask = CoReg_params['mask_baddata_%s' % ('ref' if imID == 'ref' else 'tgt')]
self.mask_baddata = None
if given_mask:
self.add_mask_bad_data(given_mask)
def add_mask_bad_data(self, path_or_geoArr):
"""Adds a bad data mask. This method is separated from __init__() in order to allow explicit adding of the mask.
:param path_or_geoArr:
"""
geoArr_mask = GeoArray(path_or_geoArr)
assert geoArr_mask.bands == 1, \
'Expected one single band as bad data mask for %s. Got %s bands.' % (self.imName, geoArr_mask.bands)
assert geoArr_mask.shape[:2] == self.shape[:2]
pixelVals_in_mask = sorted(list(np.unique(geoArr_mask[:])))
assert len(pixelVals_in_mask) <= 2, 'Bad data mask must have only two pixel values (boolean) - 0 and 1 or ' \
'False and True! The given mask for %s contains the values %s.' % (
self.imName, pixelVals_in_mask)
assert pixelVals_in_mask in [[0, 1], [False, True]], 'Found unsupported pixel values in the given bad data ' \
'mask for %s: %s Only the values True, False, 0 and 1 ' \
'are supported. ' % (self.imName, pixelVals_in_mask)
assert geoArr_mask.gt in [None, self.gt], 'The geotransform of the given bad data mask for %s must match the ' \
'geotransform of the %s.' %(self.imName, self.imName)
assert geoArr_mask.prj is None or prj_equal(geoArr_mask.prj, self.prj) in [None, self.prj], 'The projection ' \
'of the given bad data mask for %s must match the projection of the %s.' %(self.imName, self.imName)
self.mask_baddata = geoArr_mask[:].astype(np.bool)
self.mask_baddata = given_mask
......@@ -288,7 +261,7 @@ class COREG(object):
self.ssim_orig = None # set by self._validate_ssim_improvement()
self.ssim_deshifted = None # set by self._validate_ssim_improvement()
self._ssim_improved = None # private attribute to be filled by self.ssim_improved
self.confidence_shifts = None # set by self.calculate_spatial_shifts()
self.shift_reliability = None # set by self.calculate_spatial_shifts()
self.tracked_errors = [] # expanded each time an error occurs
self.success = None # default
......@@ -568,7 +541,7 @@ class COREG(object):
if im.mask_baddata is not None:
imX, imY = mapXY2imXY(wp, im.mask_baddata.gt)
if not im.mask_baddata[imY, imX]:
if im.mask_baddata[int(imY), int(imX)] is True:
self.tracked_errors.append(
RuntimeError('According to the provided bad data mask for the %s the chosen window position '
'%s / %s is within a bad data area. Using this window position for coregistration '
......@@ -919,7 +892,7 @@ class COREG(object):
return x_intshift, y_intshift
def _calc_shift_confidence(self, scps):
def _calc_shift_reliability(self, scps):
"""Calculates a confidence percentage that can be used as an assessment for reliability of the calculated shifts.
:param scps: <np.ndarray> shifted cross power spectrum
......@@ -934,19 +907,18 @@ class COREG(object):
scps_masked = scps
scps_masked[peakR-1:peakR+2, peakC-1:peakC+2] = -9999
scps_masked = np.ma.masked_equal(scps_masked, -9999)
power_without_peak = np.mean(scps_masked) + 3* np.std(scps_masked)
power_without_peak = np.mean(scps_masked) + 2* np.std(scps_masked)
# calculate confidence
confid = 100-((power_without_peak/power_at_peak)*100)
confid = 100 if confid > 100 else 0 if confid < 0 else confid
if not self.q:
print('Confidence of the calculated shifts: %.1f' %confid, '%')
print('Estimated reliability of the calculated shifts: %.1f' %confid, '%')
return confid
def _validate_integer_shifts(self, im0, im1, x_intshift, y_intshift):
if (x_intshift, y_intshift)!=(0,0):
......@@ -1202,7 +1174,7 @@ class COREG(object):
# set self.ssim_before and ssim_after
self._validate_ssim_improvement()
self.confidence_shifts = self._calc_shift_confidence(scps)
self.shift_reliability = self._calc_shift_reliability(scps)
warnings.simplefilter('default')
......
......@@ -120,20 +120,10 @@ class COREG_LOCAL(object):
if match_gsd and out_gsd: warnings.warn("'-out_gsd' is ignored because '-match_gsd' is set.\n")
if out_gsd: assert isinstance(out_gsd, list) and len(out_gsd) == 2, 'out_gsd must be a list with two values.'
self.params = dict([x for x in locals().items() if x[0] != "self" and not x[0].startswith('__')])
self.params = dict([x for x in locals().items() if x[0] != "self" and not x[0].startswith('__')])
self.imref = GeoArray(im_ref, nodata=nodata[0], progress=progress, q=q)
self.im2shift = GeoArray(im_tgt, nodata=nodata[1], progress=progress, q=q)
# if isinstance(im_ref, GeoArray) and nodata is not None: im_ref.nodata = nodata[0]
# if isinstance(im_tgt, GeoArray) and nodata is not None: im_tgt.nodata = nodata[1]
# self.imref = im_ref if isinstance(im_ref, GeoArray) else GeoArray(im_ref, nodata=nodata[0])
# self.im2shift = im_tgt if isinstance(im_tgt, GeoArray) else GeoArray(im_tgt, nodata=nodata[1])
# self.imref.progress = progress
# self.im2shift.progress = progress
# self.imref.q = q
# self.im2shift.q = q
self.path_out = path_out # updated by self.set_outpathes
self.fmt_out = fmt_out
self.out_creaOpt = out_crea_options
......@@ -160,10 +150,10 @@ class COREG_LOCAL(object):
self.progress = progress if not q else False # overridden by v
self.ignErr = ignore_errors
assert self.tieP_filter_level in [0,1,2], 'Invalid tie point filter level.'
assert self.tieP_filter_level in range(4), 'Invalid tie point filter level.'
assert isinstance(self.imref, GeoArray) and isinstance(self.im2shift, GeoArray), \
'Something went wrong with the creation of GeoArray instances for reference or target image. The created ' \
'instances do not seem to belong to the GeoArray class. If you are working in Jupyter Notebook, reset the ' \
'instances do not seem to belong to the GeoArray class. If you are working in Jupyter Notebook, reset the '\
'kernel and try again.'
COREG.__dict__['_set_outpathes'](self, self.imref, self.im2shift)
......@@ -194,8 +184,8 @@ class COREG_LOCAL(object):
# add bad data mask
# (mask is not added during initialization of COREG object in order to avoid bad data area errors there)
if mask_baddata_ref: self.COREG_obj.ref.add_mask_bad_data(mask_baddata_ref)
if mask_baddata_tgt: self.COREG_obj.shift.add_mask_bad_data(mask_baddata_tgt)
if mask_baddata_ref is not None: self.COREG_obj.ref .mask_baddata = mask_baddata_ref
if mask_baddata_tgt is not None: self.COREG_obj.shift.mask_baddata = mask_baddata_tgt
self._quality_grid = None # set by self.quality_grid
self._CoRegPoints_table = None # set by self.CoRegPoints_table
......@@ -288,9 +278,11 @@ class COREG_LOCAL(object):
plt.title(attribute2plot)
# transform all points of quality grid to LonLat
outlierCols = [c for c in self.CoRegPoints_table.columns if 'OUTLIER' in c]
attr2include = ['geometry', attribute2plot] + outlierCols
GDF = self.CoRegPoints_table.loc\
[self.CoRegPoints_table.X_SHIFT_M != self.outFillVal, ['geometry', attribute2plot, 'SSIM_IMPROVED', 'OUTLIER']].copy() \
if exclude_fillVals else self.CoRegPoints_table.loc[:, ['geometry', attribute2plot, 'SSIM_IMPROVED', 'OUTLIER']]
[self.CoRegPoints_table.X_SHIFT_M != self.outFillVal, attr2include].copy() \
if exclude_fillVals else self.CoRegPoints_table.loc[:, attr2include]
# get LonLat coordinates for all points
get_LonLat = lambda X, Y: transform_any_prj(self.im2shift.projection, 4326, X, Y)
......@@ -310,24 +302,31 @@ class COREG_LOCAL(object):
GDF['plt_X'] = list(GDF['plt_XY'].map(lambda XY: XY[0]))
GDF['plt_Y'] = list(GDF['plt_XY'].map(lambda XY: XY[1]))
if not hide_filtered and self.tieP_filter_level>1:
# flag RANSAC outliers
GDF_filt = GDF[GDF.OUTLIER==True].copy()
plt.scatter(GDF_filt['plt_X'], GDF_filt['plt_Y'], c='b', marker='o' if len(GDF) < 10000 else '.', s=250, alpha=1.0)
if not hide_filtered and self.tieP_filter_level>0:
# flag SSIM filtered points
GDF_filt = GDF[GDF.SSIM_IMPROVED==False].copy()
plt.scatter(GDF_filt['plt_X'], GDF_filt['plt_Y'], c='r', marker='o' if len(GDF) < 10000 else '.', s=150, alpha=1.0)
if hide_filtered and self.tieP_filter_level>0:
GDF = GDF[GDF.SSIM_IMPROVED == True].copy()
if hide_filtered and self.tieP_filter_level>1:
GDF = GDF[GDF.OUTLIER == False].copy()
if hide_filtered:
if self.tieP_filter_level == 3: GDF = GDF[GDF.OUTLIER == False].copy() # FIXME dirty hack
else:
if self.tieP_filter_level > 0: GDF = GDF[GDF.L1_OUTLIER == False].copy()
if self.tieP_filter_level > 1: GDF = GDF[GDF.L2_OUTLIER == False].copy()
if self.tieP_filter_level > 2: GDF = GDF[GDF.L3_OUTLIER == False].copy()
else:
marker = 'o' if len(GDF) < 10000 else '.'
if self.tieP_filter_level > 0:
# flag level 1 outliers
GDF_filt = GDF[GDF.L1_OUTLIER == True].copy()
plt.scatter(GDF_filt['plt_X'], GDF_filt['plt_Y'], c='b', marker=marker, s=250, alpha=1.0)
if self.tieP_filter_level > 1:
# flag level 2 outliers
GDF_filt = GDF[GDF.L2_OUTLIER == True].copy()
plt.scatter(GDF_filt['plt_X'], GDF_filt['plt_Y'], c='r', marker=marker, s=150, alpha=1.0)
if self.tieP_filter_level > 2:
# flag level 3 outliers
GDF_filt = GDF[GDF.L2_OUTLIER == True].copy()
plt.scatter(GDF_filt['plt_X'], GDF_filt['plt_Y'], c='b', marker=marker, s=250, alpha=1.0)
#print(GDF)
# plot all points on top
vmin, vmax = np.percentile(GDF[attribute2plot], 0), np.percentile(GDF[attribute2plot], 95)
points = plt.scatter(GDF['plt_X'],GDF['plt_Y'], c=GDF[attribute2plot],
......
......@@ -13,7 +13,7 @@ try:
except ImportError:
from osgeo import gdal
import numpy as np
from geopandas import GeoDataFrame
from geopandas import GeoDataFrame, GeoSeries
from pykrige.ok import OrdinaryKriging
from shapely.geometry import Point
from skimage.measure import points_in_poly, ransac
......@@ -163,7 +163,7 @@ class Geom_Quality_Grid(object):
win_sz_y, win_sz_x = CR.matchBox.imDimsYX if CR.matchBox else (None, None)
CR_res = [win_sz_x, win_sz_y, CR.x_shift_px, CR.y_shift_px, CR.x_shift_map, CR.y_shift_map,
CR.vec_length_map, CR.vec_angle_deg, CR.ssim_orig, CR.ssim_deshifted, CR.ssim_improved,
CR.confidence_shifts, last_err]
CR.shift_reliability, last_err]
return [pointID]+CR_res
......@@ -212,7 +212,7 @@ class Geom_Quality_Grid(object):
assert not GDF.empty, 'No coregistration point could be placed within the overlap area. Check your input data!' # FIXME track that
# exclude all point where bad data mask is True (e.g. points on clouds etc.)
orig_len_GDF = len(GDF)
orig_len_GDF = len(GDF)
mapXY = np.array(GDF.loc[:,['X_UTM','Y_UTM']])
GDF['REF_BADDATA'] = self.COREG_obj.ref .mask_baddata.read_pointData(mapXY) \
if self.COREG_obj.ref .mask_baddata is not None else False
......@@ -221,6 +221,8 @@ class Geom_Quality_Grid(object):
GDF = GDF[(GDF['REF_BADDATA']==False) & (GDF['TGT_BADDATA']==False)]
print('According to the provided bad data mask(s) %s points of initially %s have been excluded.'
%(orig_len_GDF-len(GDF), orig_len_GDF))
self.ref.mask_baddata = None
self.shift.mask_baddata = None
#not_within_nodata = \
# lambda r: np.array(self.ref[r.Y_IM,r.X_IM,self.COREG_obj.ref.band4match]!=self.COREG_obj.ref.nodata and \
......@@ -276,7 +278,7 @@ class Geom_Quality_Grid(object):
if self.progress:
bar.print_progress(percent=numberDone/len(GDF)*100)
if results.ready():
results = results.get() # FIXME in some cases the code hangs here ==> x
results = results.get() # FIXME in some cases the code hangs here ==> x ==> seems to be fixed
break
else:
if not self.q:
......@@ -287,131 +289,29 @@ class Geom_Quality_Grid(object):
if self.progress:
bar.print_progress((i+1)/len(GDF)*100)
results[i,:] = self._get_spatial_shifts(coreg_kwargs)
# FIXME in some cases the code hangs here ==> x
# FIXME in some cases the code hangs here ==> x ==> seems to be fixed
# merge results with GDF
records = GeoDataFrame(np.array(results, np.object),
columns=['POINT_ID', 'X_WIN_SIZE', 'Y_WIN_SIZE', 'X_SHIFT_PX','Y_SHIFT_PX', 'X_SHIFT_M',
'Y_SHIFT_M', 'ABS_SHIFT', 'ANGLE', 'SSIM_BEFORE', 'SSIM_AFTER',
'SSIM_IMPROVED', 'CONFIDENCE', 'LAST_ERR'])
'SSIM_IMPROVED', 'RELIABILITY', 'LAST_ERR'])
GDF = GDF.merge(records, on='POINT_ID', how="inner")
GDF = GDF.fillna(int(self.outFillVal))
self.CoRegPoints_table = GDF
# filter tie points according to given filter level
if self.tieP_filter_level>0:
TPR = Tie_Point_Refiner(GDF[GDF.ABS_SHIFT != self.outFillVal])
GDF_filt, new_columns = TPR.run_filtering(level=self.tieP_filter_level)
GDF = GDF.merge(GDF_filt[ ['POINT_ID']+new_columns], on='POINT_ID', how="outer")
if self.tieP_filter_level>1:
self._flag_outliers(algorithm = 'RANSAC', inplace=True)
GDF = GDF.fillna(int(self.outFillVal))
self.CoRegPoints_table = GDF
return self.CoRegPoints_table
def _flag_outliers(self, algorithm='RANSAC', inplace=False):
"""Detects geometric outliers within CoRegPoints_table GeoDataFrame and adds a new boolean column 'OUTLIER'.
:param algorithm: <str> the algorithm to be used for outlier detection. available choices: 'RANSAC'
:param inplace: <bool> whether to overwrite CoRegPoints_table directly (True) or not (False). If False, the
resulting GeoDataFrame is returned.
"""
if not algorithm in ['RANSAC']: raise ValueError
warnings.warn("The currently implemented RANSAC outlier detection is still very experimental. You enabled it "
"by passing 'tieP_filter_level=2' to COREG_LOCAL. Use it on your own risk!")
GDF = self.CoRegPoints_table.copy()
# exclude all records where SSIM decreased or no match has been found
GDF = GDF[(GDF.ABS_SHIFT!=self.outFillVal) &(GDF.SSIM_IMPROVED==True)]
if not GDF.empty:
src = np.array(GDF[['X_UTM', 'Y_UTM']])
xyShift = np.array(GDF[['X_SHIFT_M', 'Y_SHIFT_M']])
dst = src + xyShift
if algorithm=='RANSAC':
model_roust, outliers = self._ransac_outlier_detection(src, dst)
#print(np.count_nonzero(outliers), 'marked as outliers')
records = GDF[['POINT_ID']].copy()
records['OUTLIER'] = outliers
GDF = self.CoRegPoints_table.merge(records, on='POINT_ID', how="outer")
GDF['OUTLIER'].fillna(int(self.outFillVal), inplace=True)
if inplace:
self.CoRegPoints_table = GDF
return GDF
else:
if not inplace:
return GeoDataFrame(columns=['OUTLIER'])
@staticmethod
def _ransac_outlier_detection(src_coords, est_coords, max_outlier_percentage=10, tolerance=2.5, max_iter=15, timeout=20):
"""Detect geometric outliers between point cloud of source and estimated coordinates using RANSAC algorithm.
:param src_coords: <np.ndarray> source coordinate point cloud as array with shape [Nx2]
:param est_coords: <np.ndarray> estimated coordinate point cloud as array with shape [Nx2]
:param max_outlier_percentage: <float, int> maximum percentage of outliers to be detected
:param tolerance: <float, int> percentage tolerance for max_outlier_percentage
:param max_iter: <int> maximum iterations for finding the best RANSAC threshold
:param timeout: <float, int> timeout for iteration loop in seconds
:return:
"""
for co, n in zip([src_coords, est_coords], ['src_coords', 'est_coords']):
assert co.ndim==2 and co.shape[1]==2, "'%s' must have shape [Nx2]. Got shape %s."%(n, co.shape)
if max_outlier_percentage >100: raise ValueError
min_inlier_percentage = 100-max_outlier_percentage
class PolyTF_1(PolynomialTransform):
def estimate(*data):
return PolynomialTransform.estimate(*data, order=1)
# robustly estimate affine transform model with RANSAC
# exliminates not more than the given maximum outlier percentage of the tie points
model_robust, inliers = None, None
count_inliers = None
th = 5 # start value
th_checked = {} # dict of thresholds that already have been tried + calculated inlier percentage
th_substract = 2
count_iter = 0
time_start = time.time()
while True:
if count_iter > max_iter or time.time()-time_start > timeout:
break # keep last values and break while loop
if th_checked:
if min_inlier_percentage-tolerance <= th_checked[th] <= min_inlier_percentage+tolerance:
th_substract /= 2
if abs(th_checked[th] - min_inlier_percentage) <= tolerance:
break # keep last values and break while loop
# check if more or less outliers have been detected than given percentage
if count_inliers >= min_inlier_percentage * src_coords.shape[0] / 100:
th -= th_substract # too less outliers found -> decrease threshold
else:
th += th_substract # too much outliers found -> increase threshold
# model_robust, inliers = ransac((src, dst), PolynomialTransform, min_samples=3,
model_robust, inliers = ransac((src_coords, est_coords), AffineTransform,
min_samples = 6,
residual_threshold = th,
max_trials = 3000,
stop_sample_num = int((min_inlier_percentage-tolerance)/100*src_coords.shape[0]),
stop_residuals_sum = int((max_outlier_percentage-tolerance)/100*src_coords.shape[0])
)
count_inliers = np.count_nonzero(inliers)
th_checked[th] = count_inliers / src_coords.shape[0] * 100
print(th_checked)
count_iter+=1
outliers = inliers == False
return model_robust, outliers
def dump_CoRegPoints_table(self, path_out=None):
path_out = path_out if path_out else get_generic_outpath(dir_out=self.dir_out,
fName_out="CoRegPoints_table_grid%s_ws(%s_%s)__T_%s__R_%s.pkl" % (self.grid_res, self.COREG_obj.win_size_XY[0],
......@@ -428,19 +328,8 @@ class Geom_Quality_Grid(object):
if getattr(GDF,'empty'): # GDF.empty returns AttributeError
return []
else:
orig_len_GDF = len(GDF)
if self.tieP_filter_level > 0:
# level 1 filtering
GDF = GDF[GDF.SSIM_IMPROVED==True].copy()
if not self.q:
print('SSIM filtering removed %s tie points.' %(orig_len_GDF-len(GDF)))
if self.tieP_filter_level > 1:
# level 2 filtering
GDF = GDF[GDF.OUTLIER == False].copy()
if not self.q:
print('RANSAC filtering removed %s tie points.' % (orig_len_GDF - len(GDF))) # FIXME hardcoded algorithm; count corresponds to original length of GDF
# exclude all points flagged as outliers
GDF = GDF[GDF.OUTLIER == False].copy()
# calculate GCPs
GDF['X_UTM_new'] = GDF.X_UTM + GDF.X_SHIFT_M
......@@ -628,3 +517,139 @@ class Geom_Quality_Grid(object):
kwargs = args_kwargs_dict.get('kwargs',[])
return self._Kriging_sp(*args, **kwargs)
class Tie_Point_Refiner(object):
def __init__(self, GDF, q=False):
self.GDF = GDF.copy()
self.q = q
self.new_cols = []
self.ransac_model_robust = None
def run_filtering(self, level=2):
# TODO catch empty GDF
if level>0:
marked_recs = GeoSeries(self._reliability_thresholding())
self.GDF['L1_OUTLIER'] = marked_recs
self.new_cols.append('L1_OUTLIER')
if not self.q:
print('%s tie points flagged by level 1 filtering (reliability).' % (len(marked_recs[marked_recs==True])))
if level>1:
marked_recs = GeoSeries(self._SSIM_filtering())
self.GDF['L2_OUTLIER'] = marked_recs
self.new_cols.append('L2_OUTLIER')
if not self.q:
print('%s tie points flagged by level 2 filtering (SSIM).' % (len(marked_recs[marked_recs==True])))
if level>2:
warnings.warn(
"The currently implemented RANSAC outlier detection is still very experimental. You enabled it "
"by passing 'tieP_filter_level=2' to COREG_LOCAL. Use it on your own risk!")
marked_recs = GeoSeries(self._RANSAC_outlier_detection())
self.GDF['L3_OUTLIER'] = marked_recs
self.new_cols.append('L3_OUTLIER')
if not self.q:
print('%s tie points flagged by level 3 filtering (RANSAC)' % (len(marked_recs[marked_recs==True])))
self.GDF['OUTLIER'] = self.GDF[self.new_cols].any(axis=1)
self.new_cols.append('OUTLIER')
return self.GDF, self.new_cols
def _reliability_thresholding(self, min_reliability=30):
"""Exclude all records where estimated reliability of the calculated shifts is below the given threshold.
:param min_reliability:
:return:
"""
return self.GDF.RELIABILITY < min_reliability
def _SSIM_filtering(self):
"""Exclude all records where SSIM decreased.
:return:
"""
return self.GDF.SSIM_IMPROVED == False
def _RANSAC_outlier_detection(self, max_outlier_percentage=10, tolerance=2.5, max_iter=15,
exclude_previous_outliers=True, timeout=20):
"""Detect geometric outliers between point cloud of source and estimated coordinates using RANSAC algorithm.
:param max_outlier_percentage: <float, int> maximum percentage of outliers to be detected
:param tolerance: <float, int> percentage tolerance for max_outlier_percentage
:param max_iter: <int> maximum iterations for finding the best RANSAC threshold
:param exclude_previous_outliers: <bool> whether to exclude points that have been flagged as outlier by
earlier filtering
:param timeout: <float, int> timeout for iteration loop in seconds
:return:
"""
GDF = self.GDF[self.GDF[self.new_cols].any(axis=1)==False] if exclude_previous_outliers else self.GDF
src_coords = np.array(GDF[['X_UTM', 'Y_UTM']])
xyShift = np.array(GDF[['X_SHIFT_M', 'Y_SHIFT_M']])
est_coords = src_coords + xyShift
for co, n in zip([src_coords, est_coords], ['src_coords', 'est_coords']):
assert co.ndim==2 and co.shape[1]==2, "'%s' must have shape [Nx2]. Got shape %s."%(n, co.shape)
if max_outlier_percentage >100: raise ValueError
min_inlier_percentage = 100-max_outlier_percentage
class PolyTF_1(PolynomialTransform):
def estimate(*data):
return PolynomialTransform.estimate(*data, order=1)
# robustly estimate affine transform model with RANSAC
# exliminates not more than the given maximum outlier percentage of the tie points
model_robust, inliers = None, None
count_inliers = None
th = 5 # start value
th_checked = {} # dict of thresholds that already have been tried + calculated inlier percentage
th_substract = 2
count_iter = 0
time_start = time.time()
while True:
if count_iter > max_iter or time.time()-time_start > timeout:
break # keep last values and break while loop
if th_checked:
if min_inlier_percentage-tolerance <= th_checked[th] <= min_inlier_percentage+tolerance:
th_substract /= 2
if abs(th_checked[th] - min_inlier_percentage) <= tolerance:
break # keep last values and break while loop
# check if more or less outliers have been detected than given percentage
if count_inliers >= min_inlier_percentage * src_coords.shape[0] / 100:
th -= th_substract # too less outliers found -> decrease threshold
else:
th += th_substract # too much outliers found -> increase threshold
# model_robust, inliers = ransac((src, dst), PolynomialTransform, min_samples=3,
model_robust, inliers = ransac((src_coords, est_coords), AffineTransform,
min_samples = 6,
residual_threshold = th,
max_trials = 3000,
stop_sample_num = int((min_inlier_percentage-tolerance)/100*src_coords.shape[0]),
stop_residuals_sum = int((max_outlier_percentage-tolerance)/100*src_coords.shape[0])
)
count_inliers = np.count_nonzero(inliers)
th_checked[th] = count_inliers / src_coords.shape[0] * 100
#print(th_checked)
count_iter+=1
outliers = inliers == False
self.ransac_model_robust = model_robust
return GeoSeries(outliers)
......@@ -36,6 +36,7 @@ def angle_to_north(XY):
def get_true_corner_mapXY(fPath_or_geoarray, band=0, noDataVal=None, mp=1, v=0, q=0):
# FIXME this function is not used anymore
"""
:param fPath_or_geoarray:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment