import numpy as np
import matplotlib.pyplot as plt
In the limit of small measurement noise we get:
$$ \begin{split} N_t^y &\rightarrow 0 \\ B_t E_{t|t-1} B_t^T + N_t^y &\approx B_t E_{t|t-1} B_t^T \end{split} $$$$ K_t = E_{t|t-1} B_t^T (B_t E_{t|t-1} B_t^T + N_t^y)^{-1} \approx E_{t|t-1} B_t^T (B_t E_{t|t-1} B_t^T)^{-1} = B_t^{-1} $$And for the error matrix we get:
$$ E_{t|t} = (I-K_t B_t) E_{t|t-1} \approx (I-B_t^{-1} B_t) E_{t|t-1} = 0 $$delta_t = .1
T = np.arange(0, 1000, delta_t)
theta = .1*T + 4*np.sin(.01*T)
eta = np.random.normal(loc=0, scale=.1, size=T.size)
Y = np.sin(theta) + eta
def ExtendedKalmanStep(A_func, B_func, f, g, x_est, y, E_est, N_x, N_y):
# Estimate the new observable
y_est = g(x_est)
# Compute the Kalman gain matrix
K = np.matmul(E_est,
np.matmul(np.transpose(B_func(x_est)),
np.linalg.inv(np.matmul(B_func(x_est),
np.matmul(E_est, np.transpose(B_func(x_est)))) + N_y)))
# Estimate the new state
x = x_est + np.matmul(K, y - y_est)
# Update the error matrix
E = np.matmul(np.eye(N=K.shape[0], M=B_func(x_est).shape[1]) - np.matmul(K, B_func(x_est)), E_est)
# Predict the new state
x_est = f(x)
# Predict the new error
E_est = np.matmul(A_func(x), np.matmul(E, np.transpose(A_func(x)))) + N_x
return x_est, y_est, E_est
def KalmanFilter(A_func, B_func, f, g, init_state, init_obs, Y, init_E, N_x, N_y):
x_est = init_state
E_est = init_E
Y_est = [init_obs]
X_est = [x_est]
for y in Y[1:]:
x_est, y_est, E_est = ExtendedKalmanStep(A_func, B_func, f, g, x_est, y, E_est, N_x, N_y)
Y_est.append(y_est)
X_est.append(x_est)
return X_est, Y_est
def main(Y, std):
A_func = lambda x: np.array([[2, -1], [1, 0]])
B_func = lambda x: np.array([[np.cos(x[0]), 0], [0, np.cos(x[1])]])
f = lambda x: (2*x[0]-x[1], x[0])
g = lambda x: (np.sin(x[0]), np.sin(x[1]))
init_state = np.array([0, 0])
init_obs = (Y[0], Y[0])
init_E = np.eye(2, 2)
N_x = np.diag([std**2, std**2])
N_y = np.diag([.01, .01])
return KalmanFilter(A_func, B_func, f, g, init_state, init_obs, Y, init_E, N_x, N_y)
X_est, Y_est = main(Y, 1e-1)
Y_est = [y[0] for y in Y_est]
plt.plot(T, Y, 'b', label='True observations')
plt.plot(T, Y_est, 'r', label='Predicted observations', alpha=.8)
plt.xlabel('T')
plt.ylabel('Value')
plt.title('Standard deviation 1e-1')
plt.legend()
plt.show()
X_est, Y_est = main(Y, 1e-3)
Y_est = [y[0] for y in Y_est]
plt.plot(T, Y, 'b', label='True observations')
plt.plot(T, Y_est, 'r', label='Predicted observations')
plt.xlabel('T')
plt.ylabel('Value')
plt.title('Standard deviation 1e-3')
plt.legend()
plt.show()
X_est, Y_est = main(Y, 1e-7)
Y_est = [y[0] for y in Y_est]
plt.plot(T, Y, 'b', label='True observations')
plt.plot(T, Y_est, 'r', label='Predicted observations')
plt.xlabel('T')
plt.ylabel('Value')
plt.title('Standard deviation 1e-7')
plt.legend()
plt.show()
probs = np.array([[.5, .5], [.25, .75]])
trans_p = np.array([[.99, .01], [.01, .99]])
T = 900
coins = [np.random.choice([0, 1])]
obs = [np.random.choice([0, 1], p=probs[coins[-1]])]
for _ in range(T-1):
coins.append(np.random.choice([0, 1], p=trans_p[coins[-1]]))
obs.append(np.random.choice([0, 1], p=probs[coins[-1]]))
def forward_rec(obs, trans_probs, obs_probs, start_probs):
"""Computes p(x_t, y_1, ... , y_{t-1})."""
"""Returns vector"""
if len(obs) == 0:
return start_probs
state_obs_probs = forward_rec(obs[:-1], trans_probs, obs_probs, start_probs)*obs_probs[:,obs[-1]]
return np.matmul(state_obs_probs, trans_probs)
def forward(obs, trans_probs, obs_probs, start_probs):
"""Computes p(y_1, ... , y_T)"""
"""Returns scalar"""
state_obs_probs = forward_rec(obs[:-1], trans_probs, obs_probs, start_probs)*obs_probs[:,obs[-1]]
return np.sum(state_obs_probs)
def backward(obs, trans_probs, obs_probs, flag=True):
"""Computes p(y_t, ... , y_T | x_t)"""
"""Returns vector"""
if flag:
state_obs_probs = obs_probs[:,obs[0]]*backward(obs[1:], trans_probs, obs_probs, flag=False)
return state_obs_probs
if len(obs) == 1:
return np.matmul(trans_probs, obs_probs[:,obs[0]])
else:
try:
state_obs_probs = obs_probs[:,obs[0]]*backward(obs[1:], trans_probs, obs_probs, flag=False)
return np.matmul(trans_probs, state_obs_probs)
except:
print(obs)
obs_likelihood = forward(obs, trans_p, probs, [.5, .5])
backward_probs = np.array([backward(obs[t:], trans_p, probs) for t in range(T-1)])
forward_probs = np.array([forward_rec(obs[:t], trans_p, probs, [.5, .5]) for t in range(T-1)])
coin_probs = backward_probs*forward_probs/obs_likelihood
coin_probs = np.concatenate(([[.5, .5]], coin_probs))
accuracy = np.mean(np.equal(np.argmax(coin_probs, axis=-1), coins))
print('Accuracy = %f' % accuracy)