Commit ad01c475 authored by Jannes Münchmeyer's avatar Jannes Münchmeyer
Browse files

munchmej: group6 VAMP analysis

parent 055b5e0e
import torch
from torch.utils.data import Dataset
import h5py
from scipy import signal
import numpy as np
class LidDistData(Dataset):
def __init__(self, data_path, items_per_epoch=64e3, window_length=625, gap=1, part='train', factor=100):
# Gap in terms of how many windows. gap=1 means directly adjacent
self.items_per_epoch = int(items_per_epoch)
self.window_length = window_length
self.gap = gap
self.part = part
self.factor = factor
with h5py.File(data_path, 'r') as f:
self.lid_disp = f['lid_disp'][()]
self.lid_disp = signal.detrend(self.lid_disp)
if part == 'train':
self.min_idx = 0
self.max_idx = int(0.7 * self.lid_disp.shape[0] - window_length)
else:
self.min_idx = int(0.7 * self.lid_disp.shape[0] - window_length)
self.max_idx = int(self.lid_disp.shape[0] - (gap + 1) * window_length)
self.indices = None
self.on_epoch_end()
def __len__(self):
return self.items_per_epoch
def on_epoch_end(self):
self.indices = torch.randint(self.min_idx, self.max_idx, (self.items_per_epoch,))
def __getitem__(self, idx):
p = self.indices[idx]
p2 = int(p + self.gap * self.window_length)
sample1 = self.lid_disp[p:p + self.window_length].copy()
sample2 = self.lid_disp[p2:p2 + self.window_length].copy()
sample1 -= np.mean(sample1)
sample2 -= np.mean(sample2)
sample1 *= self.factor
sample2 *= self.factor
return {'sample1': sample1, 'sample2': sample2}
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Function
import numpy as np
class SequenceEmbedding(nn.Module):
def __init__(self, out_dim=5, bias=True, convbias=False):
super(SequenceEmbedding, self).__init__()
lin_shape = 1000
self.conv1 = nn.Conv1d(1, 8, 8, bias=convbias)
self.pool1 = nn.MaxPool1d(2)
self.conv2 = nn.Conv1d(8, 16, 8, bias=convbias)
self.pool2 = nn.MaxPool1d(2)
self.conv3 = nn.Conv1d(16, 16, 8, bias=convbias)
self.pool3 = nn.MaxPool1d(2)
self.conv4 = nn.Conv1d(16, 1, 8, bias=convbias)
self.lin1 = nn.Linear(65, 100, bias=bias)
self.lin2 = nn.Linear(100, 50, bias=bias)
self.lin3 = nn.Linear(50, out_dim, bias=bias)
def forward(self, x):
x = x.reshape((x.shape[0], 1, x.shape[1]))
x = F.relu(self.pool1(self.conv1(x)))
x = F.relu(self.pool2(self.conv2(x)))
x = F.relu(self.pool3(self.conv3(x)))
x = F.relu(self.conv4(x))
x = torch.squeeze(x, dim=1)
x = F.relu(self.lin1(x))
x = F.relu(self.lin2(x))
x = self.lin3(x)
x = F.softmax(x, dim=-1)
return x
def isqrt(a, eps=1e-5):
a = a + torch.eye(a.shape[0], device=a.device) * eps # Make matrix positive definite not only semi-definite
eig_val, eig_vec = torch.symeig(a, eigenvectors=True)
isqrt_eig_val = torch.eye(eig_val.shape[0], device=eig_val.device) * 1 / torch.sqrt(eig_val + eps)
return eig_vec @ isqrt_eig_val @ eig_vec.t()
class VAMP2Loss(Function):
# Note that both forward and backward are @staticmethods
@staticmethod
def forward(ctx, x, y):
# Demean
x = x - x.mean(0, keepdim=True)
y = y - y.mean(0, keepdim=True)
# Calculate covariance matrices
c00 = 1 / (x.shape[0] - 1) * x.t() @ x
c01 = 1 / (x.shape[0] - 1) * x.t() @ y
c11 = 1 / (x.shape[0] - 1) * y.t() @ y
ctx.save_for_backward(x, y, c00, c01, c11)
# Calculate inverse square roots
isqrt_c00 = isqrt(c00)
isqrt_c11 = isqrt(c11)
return 1 + torch.norm(isqrt_c00 @ c01 @ isqrt_c11)
@staticmethod
def backward(ctx, grad_output):
eps = 1e-5
# This is a pattern that is very convenient - at the top of backward
# unpack saved_tensors and initialize all gradients w.r.t. inputs to
# None. Thanks to the fact that additional trailing Nones are
# ignored, the return statement is simple even when the function has
# optional inputs.
x, y, c00, c01, c11 = ctx.saved_tensors
grad_x = grad_y = None
# Make inverse stable
c00_inv = (c00 + torch.eye(c00.shape[0], device=c00.device) * eps).inverse()
c11_inv = (c11 + torch.eye(c11.shape[0], device=c11.device) * eps).inverse()
grad_x = 1 / (x.shape[0] - 1) * c00_inv @ c01 @ c11_inv @ (y.t() - c01.t() @ c00_inv @ x.t())
grad_y = 1 / (x.shape[0] - 1) * c11_inv @ c01.t() @ c00_inv @ (x.t() - c01 @ c11_inv @ y.t())
grad_x = grad_x * grad_output
grad_y = grad_y * grad_output
grad_x = grad_x.t()
grad_y = grad_y.t()
return grad_x, grad_y
def estimate_koopman_matrix(samples):
x = samples[:-1].copy()
y = samples[1:].copy()
x -= np.mean(x, axis=0, keepdims=True)
y -= np.mean(y, axis=0, keepdims=True)
c00 = 1 / (x.shape[0] - 1) * x.transpose() @ x
c01 = 1 / (x.shape[0] - 1) * x.transpose() @ y
K = np.linalg.solve(c00, c01)
return K, c00, c01
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment