Commit 3dbc551d authored by Maximilian Schanner's avatar Maximilian Schanner
Browse files

Script for inversion with linearization around an axial dipole, for testing purposes!

parent d50aeb5f
"""
This module is part of the CORBASS algorithm. It defines an alternative
inversion, where the linearization is performed around an axial dipole.
It serves the purpose of testing our method against the standard approach.
Copyright (C) 2019 Helmholtz Centre Potsdam GFZ,
German Research Centre for Geosciences, Potsdam, Germany
Cite as:
Schanner, Maximilian Arthus and Mauerberger, Stefan (2019)
CORBASS: CORrelation Based Archeomagnetic SnapShot model. V. 1.0.
GFZ Data Services. http://doi.org/10.5880/GFZ.2.3.2019.008
This file is part of CORBASS.
CORBASS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
CORBASS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with CORBASS. If not, see <https://www.gnu.org/licenses/>.
"""
if __name__ == '__main__':
# make relative imports work
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).absolute().parents[1]))
import numpy as np
from pandas import DataFrame
import pyfield
from corbass.utils import scaling, read_data, dif2nez, nez2dif
from corbass.inversion import Inversion, grad_d, grad_i, grad_f
class Dip_Lin_Inversion(Inversion):
def _get_obs_locs(self, fname, drop_duplicates=False):
""" Convenience function to get observations and locations from a file,
setup indices for complete and incomplete records and calculate the
approximate error matrix for the complete records.
Parameters
----------
fname : str or pandas.DataFrame
Either a file path (str) leading to or a pandas.DataFrame
containing archeomagnetic data
drop_duplicates : bool (optional, default is False)
Wether to drop multiple records for the same location and keep only
the first one
"""
if isinstance(fname, DataFrame):
# read D,I,F-data
data = fname.copy()
else:
data = read_data(fname)
# Drop duplicates
if drop_duplicates:
data.drop_duplicates(subset=['lat', 'lon'], inplace=True,
keep='first')
# Reset index
data.reset_index(inplace=True)
# XXX Map declination to [-180:180] degrees
# I doubt that approach being smart
data['D'].where(data['D'] <= 180, data['D']-360, inplace=True)
self.n_obs = len(data)
self.x_obs = np.asfortranarray(data[['co-lat', 'lon', 'rad', 't']]).T
# DataFrame indices for D, I and F records
self.idx_D = data.query('D==D').index
self.idx_I = data.query('I==I').index
self.idx_F = data.query('F==F').index
# Number of records
self.n_recs = self.idx_D.size + self.idx_I.size + self.idx_F.size
# records
self.o_DIF = np.concatenate((data['D'].loc[self.idx_D],
data['I'].loc[self.idx_I],
data['F'].loc[self.idx_F]))
# Errors
std_DIF = np.concatenate((data['dD'].loc[self.idx_D],
data['dI'].loc[self.idx_I],
data['dF'].loc[self.idx_F]))
self.errs = np.diag(std_DIF**2)
# Index-foo to have index arrays w.r.t the pyfield order
self.idx_NEZ_D = np.vstack(((3*self.idx_D,
3*self.idx_D+1,
3*self.idx_D+2))).T.ravel()
self.idx_NEZ_I = np.vstack(((3*self.idx_I,
3*self.idx_I+1,
3*self.idx_I+2))).T.ravel()
self.idx_NEZ_F = np.vstack(((3*self.idx_F,
3*self.idx_F+1,
3*self.idx_F+2))).T.ravel()
# More index foo
self.idx_NEZ = np.concatenate((self.idx_NEZ_D,
self.idx_NEZ_I,
self.idx_NEZ_F))
def _prep_calc(self, r_Ref, lamb, epsilon, rho, dip):
""" This function serves as a setup for inversion and likelihood.
During a run several quantities that are used in both functions are
calculated and set as class attributes.
Parameters
----------
r_Ref : float
The reference radius.
lamb : float
The scaling factor for the non-dipole kernel contribution.
epsilon : float
Scaling factor for the observational error.
rho : float
A residual error.
"""
# Kernels from the pyfield library
self.sp_Dip = pyfield.StatDipIntSpline(pyfield.REARTH)
self.sp_WoDip = pyfield.StatWODipIntSpline(r_Ref)
# Allocate and retrieve basis functions (-grad Phi)
self.dspharm_obs = np.empty((3, 3*self.n_obs), order='F')
self.sp_Dip.gaussField(1, self.x_obs, self.dspharm_obs)
# construct linearization point from given dipole
self.lin_point_NEZ = dip * self.dspharm_obs[0]
# Prior covariance at locations of observation)
# Non-dipole part (without dipole)
cov_NEZ_obs_WoDip = np.zeros((3*self.n_obs, 3*self.n_obs),
order='F')
self.sp_WoDip.field(self.x_obs, cov_NEZ_obs_WoDip)
# Construct entire data covariance matrix
self.cov_NEZ_obs = lamb**2*cov_NEZ_obs_WoDip
# Calculate linearised mean for D, I and F records
lin_point_N_obs = self.lin_point_NEZ[0::3] # North
lin_point_E_obs = self.lin_point_NEZ[1::3] # East
lin_point_Z_obs = self.lin_point_NEZ[2::3] # Down
# 0th order term
lin_point_D_icmp = nez2dif(n=lin_point_N_obs[self.idx_D],
e=lin_point_E_obs[self.idx_D],
d=lin_point_Z_obs[self.idx_D])[0]
lin_point_I_icmp = nez2dif(n=lin_point_N_obs[self.idx_I],
e=lin_point_E_obs[self.idx_I],
d=lin_point_Z_obs[self.idx_I])[1]
lin_point_F_icmp = nez2dif(n=lin_point_N_obs[self.idx_F],
e=lin_point_E_obs[self.idx_F],
d=lin_point_Z_obs[self.idx_F])[2]
self.lin_point_DIF_gbar = np.concatenate((lin_point_D_icmp,
lin_point_I_icmp,
lin_point_F_icmp))
# get gradients at the linearization point
grad_D = grad_d(self.lin_point_NEZ[self.idx_NEZ_D]).reshape((-1, 3))
grad_I = grad_i(self.lin_point_NEZ[self.idx_NEZ_I]).reshape((-1, 3))
grad_F = grad_f(self.lin_point_NEZ[self.idx_NEZ_F]).reshape((-1, 3))
self.grad_DIF = np.concatenate((grad_D, grad_I, grad_F))
# Full covariance w.r.t. north, east and down, multiple entries
cov_NEZ_full = self.cov_NEZ_obs[self.idx_NEZ[:, None],
self.idx_NEZ[None, :]]
cov_NEZ_full = cov_NEZ_full.reshape(self.n_recs, 3,
self.n_recs, 3)
# so far only BB-covariance, product with grad gives oo-covariance
self.cov_DIF = np.einsum('ij,ijlm,lm->il',
self.grad_DIF,
cov_NEZ_full,
self.grad_DIF)
epsilon_term = np.diag(np.dot(self.grad_DIF, self.grad_DIF.T))
epsilon_term = np.diag(epsilon_term)
self.cov_DIF += epsilon**2 * self.errs \
+ rho**2 * epsilon_term
# Invert covariance using Cholesky
self.L = np.linalg.cholesky(self.cov_DIF)
self.L_inv = np.linalg.inv(self.L)
self.prc_DIF = np.dot(self.L_inv.T, self.L_inv)
# Non informative stuff...
dspharm_full = self.dspharm_obs[:, self.idx_NEZ]
dspharm_full = dspharm_full.T.reshape(-1, 3, 3)
self.basis = np.einsum('ij, ijk->ik',
self.grad_DIF,
dspharm_full)
self.h = np.dot(self.L_inv, self.basis)
self.c = np.dot(self.L_inv, self.o_DIF)
self.var_g_bar = np.linalg.inv(np.dot(self.h.T, self.h))
self.g_bar = np.linalg.multi_dot((self.var_g_bar, self.h.T, self.c))
self.mu_NEZ_gbar = np.dot(self.g_bar,
self.dspharm_obs)
diff = self.mu_NEZ_gbar - self.lin_point_NEZ
diff = diff[self.idx_NEZ]
diff_full = diff.reshape(self.n_recs, 3)
corr = np.einsum('ij, ij->i',
self.grad_DIF,
diff_full)
# Calculate linearised mean for D, I and F records
mu_N_obs = self.mu_NEZ_gbar[0::3] # North
mu_E_obs = self.mu_NEZ_gbar[1::3] # East
mu_Z_obs = self.mu_NEZ_gbar[2::3] # Down
# 0th order term
mu_D_icmp = nez2dif(n=mu_N_obs[self.idx_D],
e=mu_E_obs[self.idx_D],
d=mu_Z_obs[self.idx_D])[0]
mu_I_icmp = nez2dif(n=mu_N_obs[self.idx_I],
e=mu_E_obs[self.idx_I],
d=mu_Z_obs[self.idx_I])[1]
mu_F_icmp = nez2dif(n=mu_N_obs[self.idx_F],
e=mu_E_obs[self.idx_F],
d=mu_Z_obs[self.idx_F])[2]
self.mu_DIF_gbar = np.concatenate((mu_D_icmp,
mu_I_icmp,
mu_F_icmp))
# self.mu_DIF_gbar = self.lin_point_DIF_gbar + corr
def field_inversion(self, r_Ref, lamb, epsilon, rho, dip, n_desi):
""" Invert for the magnetic field at equidistributed design points.
Parameters
----------
r_Ref : float
The reference radius.
lamb : float
The scaling factor for the non-dipole kernel contribution.
epsilon : float
Scaling factor for the observational error.
rho : float
A residual error.
n_desi : int
The number of design points.
Returns
-------
mu_NEZ_desi : array
The posterior mean of the field at design points.
cov_NEZ_desi : array
The posterior covariance of the field at design points.
"""
self._prep_calc(r_Ref, lamb, epsilon, rho, dip)
return self._field_inversion(lamb, n_desi=n_desi)
def _field_inversion(self, lamb, n_desi=10):
""" Private version of field_inversion. Should be called only after
_prep_calc has been called with the parameters of interest, since it
makes use of the stuff calculated there!
Parameters
----------
lamb : float
The scaling factor for the non-dipole kernel contribution.
n_desi : int (optional, default is 10)
The number of design points.
Returns
-------
mu_NEZ_desi : array
The posterior mean of the field at design points.
cov_NEZ_desi : array
The posterior covariance of the field at design points.
"""
# Setup design points
if isinstance(n_desi, np.ndarray):
self.x_desi = n_desi
self.n_desi = self.x_desi.shape[1]
elif isinstance(n_desi, DataFrame):
self.n_desi = len(n_desi)
self.x_desi = np.zeros(4, self.n_desi)
self.x_desi[0] = n_desi[['co-lat']]
self.x_desi[1] = n_desi[['lon']]
self.x_desi[2] = n_desi[['rad']]
self.x_desi[3] = n_desi[['t']]
else:
self.x_desi, self.n_desi = self.desi_points(n_desi)
# Allocate and retrieve basis functions at design points
dspharm_desi = np.empty((3, 3*self.n_desi), order='F')
self.sp_Dip.gaussField(1, self.x_desi, dspharm_desi)
# Prior covariance at design points
cov_NEZ_desi_WoDip = np.zeros((3*self.n_desi,
3*self.n_desi),
order='F')
self.sp_WoDip.field(self.x_desi, cov_NEZ_desi_WoDip)
# Construct design points covariance matrix
cov_NEZ_desi = lamb**2*cov_NEZ_desi_WoDip
# Prior correlations amongst design points and observations
cor_NEZ_desi_NEZ_obs_WoDip = np.zeros((3*self.n_desi,
3*self.n_obs),
order='F')
self.sp_WoDip.field(self.x_desi,
self.x_obs,
cor_NEZ_desi_NEZ_obs_WoDip)
# Construct design-observation covariance matrix
cor_NEZ_desi_NEZ_obs = lamb**2*cor_NEZ_desi_NEZ_obs_WoDip
cor_NEZ_desi_NEZ_obs = cor_NEZ_desi_NEZ_obs[:,
self.idx_NEZ[None, :]]
cor_NEZ_desi_NEZ_obs = cor_NEZ_desi_NEZ_obs.reshape(3*self.n_desi,
self.n_recs,
3)
cor_NEZ_desi_DIF_obs = np.einsum('ijk, jk->ij',
cor_NEZ_desi_NEZ_obs,
self.grad_DIF)
mu_NEZ_desi = np.dot(self.g_bar, dspharm_desi) \
+ np.linalg.multi_dot([cor_NEZ_desi_DIF_obs,
self.prc_DIF,
self.o_DIF - self.mu_DIF_gbar])
cov_NEZ_desi -= np.linalg.multi_dot([cor_NEZ_desi_DIF_obs,
self.prc_DIF,
cor_NEZ_desi_DIF_obs.T])
R_desi = dspharm_desi \
- np.linalg.multi_dot((self.basis.T,
self.prc_DIF,
cor_NEZ_desi_DIF_obs.T))
cov_NEZ_desi += np.linalg.multi_dot((R_desi.T, self.var_g_bar, R_desi))
return mu_NEZ_desi, cov_NEZ_desi
def full_run(*args, **kwargs):
raise NotImplementedError(f"The method is not implemented, as it is "
f"not required for testing.")
def _zf_CMB_inversion(*args, **kwargs):
raise NotImplementedError(f"The method is not implemented, as it is "
f"not required for testing.")
def _coeff_inversion(*args, **kwargs):
raise NotImplementedError(f"The method is not implemented, as it is "
f"not required for testing.")
def _dif_inversion(*args, **kwargs):
raise NotImplementedError(f"The method is not implemented, as it is "
f"not required for testing.")
def _log_likeli(*args, **kwargs):
raise NotImplementedError(f"The method is not implemented, as it is "
f"not required for testing.")
def _log_pst(*args, **kwargs):
raise NotImplementedError(f"The method is not implemented, as it is "
f"not required for testing.")
if __name__ == '__main__':
from pandas import read_csv
data = read_csv('../dat/synth_data.csv')
ours = Dip_Lin_Inversion(data)
ours._prep_calc(2800, 36e3, 1.2, 2000, 30e3)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment