import math
from __future__ import division
def make_B(_x):
cos_theta = math.cos(_x[0][0])
return np.array([[cos_theta, 0]])
def compute_kalman_gain_matrix():
return np.matmul(E, np.transpose(B))/(np.matmul(np.matmul(B,E), np.transpose(B)) + noise_sigma)
E = np.array([[1, 0], [0, 1]])
I = np.array([[1, 0], [0, 1]])
system_noise = [1e-1, 1e-3, 1e-5, 1e-7]
X = np.transpose(np.array([[0],[0]]))
B = make_B(X)
K = compute_kalman_gain_matrix()
A = np.array([[2, -1], [1, 0]])
Nx = np.array([[system_noise[0], 0], [0, system_noise[0]]])
def estimate_new_observable(_x):
# return B.dot(_x)
return math.sin(_x[0][0])
def get_new_measurement(_t):
return y_noise[_t]
def estimate_new_state(_x, _t):
return _x + K*(get_new_measurement(_t)-estimate_new_observable(_x))
def update_error_matrix():
return np.matmul(I-np.matmul(K, B), E)
def predict_new_state(_x):
return np.matmul(A, _x)
def predict_new_error():
return np.matmul(np.matmul(A, E), np.transpose(A)) + Nx
predictions = []
for i in range(len(system_noise)):
predictions.append([])
Nx = np.array([[system_noise[i], 0], [0, system_noise[i]]])
for t in range(N):
B = make_B(X)
K = compute_kalman_gain_matrix()
estimate = estimate_new_state(X, t)
E = update_error_matrix()
X = predict_new_state(estimate)
predictions[i].append(X[0][0])
E = predict_new_error()
# plt.plot(x, np.array(predictions), c="r")
plt.plot(x, np.sin(np.array(predictions[i])), c="b")
plt.title('noise: ' + str(system_noise[i]))
plt.scatter(x, y_noise, s=0.5, c="k")
axes = plt.gca()
axes.set_xlim([0,N-1])
plt.show()