Week 10 - Filtering and State Estimation

In [2]:
import numpy as np
import matplotlib.pyplot as plt

Problem 19.1:

What is the approximate Kalman gain matrix in the limit of small measurement noise? How is the error matrix updated in this case?

In the limit of small measurement noise we get:

$$ \begin{split} N_t^y &\rightarrow 0 \\ B_t E_{t|t-1} B_t^T + N_t^y &\approx B_t E_{t|t-1} B_t^T \end{split} $$$$ K_t = E_{t|t-1} B_t^T (B_t E_{t|t-1} B_t^T + N_t^y)^{-1} \approx E_{t|t-1} B_t^T (B_t E_{t|t-1} B_t^T)^{-1} = B_t^{-1} $$

And for the error matrix we get:

$$ E_{t|t} = (I-K_t B_t) E_{t|t-1} \approx (I-B_t^{-1} B_t) E_{t|t-1} = 0 $$

Problem 19.2:

Take as a test signal a periodically modulated sinusoid with noise added, $$y_n = sin[0.1t_n + 4 sin(0.01t_n)] + η ≡ sin(θ_n) + η , $$ where $η$ is a Gaussian noise process with $σ = 0.1$. Design an extended Kalman filter to estimate the noise-free signal. Use a two-component state vector $\vec{x_n} = (θ_n, θ_{n−1})$, and assume for the internal model a linear extrapolation $θ_{n+1} = θ_n + (θ_n−θ_{n−1})$. Take the system noise matrix $N^x$ to be diagonal, and plot the predicted value of $y$ versus the measured value of $y$ if the standard deviation of the system noise is chosen to be $10^{−1}$, $10^{−3}$, and $10^{−5}$. Use the identity matrix for the initial error estimate.

In [59]:
delta_t = .1
T = np.arange(0, 1000, delta_t)
theta = .1*T + 4*np.sin(.01*T)
eta = np.random.normal(loc=0, scale=.1, size=T.size)
Y = np.sin(theta) + eta
In [60]:
def ExtendedKalmanStep(A_func, B_func, f, g, x_est, y, E_est, N_x, N_y):
    # Estimate the new observable
    y_est = g(x_est)

    # Compute the Kalman gain matrix
    K = np.matmul(E_est, 
                  np.matmul(np.transpose(B_func(x_est)), 
                            np.linalg.inv(np.matmul(B_func(x_est), 
                                                    np.matmul(E_est, np.transpose(B_func(x_est)))) + N_y)))

    # Estimate the new state
    x = x_est + np.matmul(K, y - y_est)

    # Update the error matrix
    E = np.matmul(np.eye(N=K.shape[0], M=B_func(x_est).shape[1]) - np.matmul(K, B_func(x_est)), E_est)

    # Predict the new state
    x_est = f(x)

    # Predict the new error
    E_est = np.matmul(A_func(x), np.matmul(E, np.transpose(A_func(x)))) + N_x
    
    return x_est, y_est, E_est

def KalmanFilter(A_func, B_func, f, g, init_state, init_obs, Y, init_E, N_x, N_y):
    x_est = init_state
    E_est = init_E
    Y_est = [init_obs]
    X_est = [x_est]
    
    for y in Y[1:]:
        x_est, y_est, E_est = ExtendedKalmanStep(A_func, B_func, f, g, x_est, y, E_est, N_x, N_y)
        Y_est.append(y_est)
        X_est.append(x_est)
        
    return X_est, Y_est
In [61]:
def main(Y, std):
    A_func = lambda x: np.array([[2, -1], [1, 0]])
    B_func = lambda x: np.array([[np.cos(x[0]), 0], [0, np.cos(x[1])]])
    
    f = lambda x: (2*x[0]-x[1], x[0])
    g = lambda x: (np.sin(x[0]), np.sin(x[1]))
    
    init_state = np.array([0, 0])
    init_obs = (Y[0], Y[0])
    init_E = np.eye(2, 2)
    
    N_x = np.diag([std**2, std**2])
    N_y = np.diag([.01, .01])
    
    return KalmanFilter(A_func, B_func, f, g, init_state, init_obs, Y, init_E, N_x, N_y)
In [62]:
X_est, Y_est = main(Y, 1e-1)
Y_est = [y[0] for y in Y_est]

plt.plot(T, Y, 'b', label='True observations')
plt.plot(T, Y_est, 'r', label='Predicted observations', alpha=.8)
plt.xlabel('T')
plt.ylabel('Value')
plt.title('Standard deviation 1e-1')
plt.legend()
plt.show()
In [63]:
X_est, Y_est = main(Y, 1e-3)
Y_est = [y[0] for y in Y_est]

plt.plot(T, Y, 'b', label='True observations')
plt.plot(T, Y_est, 'r', label='Predicted observations')
plt.xlabel('T')
plt.ylabel('Value')
plt.title('Standard deviation 1e-3')
plt.legend()
plt.show()
In [68]:
X_est, Y_est = main(Y, 1e-7)
Y_est = [y[0] for y in Y_est]

plt.plot(T, Y, 'b', label='True observations')
plt.plot(T, Y_est, 'r', label='Predicted observations')
plt.xlabel('T')
plt.ylabel('Value')
plt.title('Standard deviation 1e-7')
plt.legend()
plt.show()

Problem 19.3:

(a) Given an HMM trellis (Figure 19.5), work out the probability to see an internal state given the observations $p(x_t|y_1, \dots , y_T)$ in terms of forward and backward terms.

$$ \begin{split} p(x_t|y_1,\dots ,y_T) &= \frac{p(x_t,y_1,\dots ,y_T)}{p(y_1,\dots ,y_T)} \\ \end{split} $$$$ \begin{split} p(x_t,y_1,\dots ,y_T) =& \sum_{x_T=1}^N p(y_T|x_T)p(x_T,x_t,y_1,\dots ,y_{T-1}) \\ =&\sum_{x_T=1}^N p(y_T|x_T)\sum_{x_{T-1}=1}^N p(y_{T-1}|x_{T-1})p(x_T|x_{T-1})p(x_{T-1},x_t,y_1,\dots ,y_{T-2}) \\ =&\sum_{x_T=1}^N p(y_T|x_T)\sum_{x_{T-1}=1}^N p(y_{T-1}|x_{T-1})p(x_T|x_{T-1}) \dots \sum_{x_{t+1}=1}^N p(y_{t+1}|x_{t+1})p(x_{t+2}|x_{t+1})p(x_{t+1},x_t,y_1,\dots ,y_{t}) \\ =&\sum_{x_T=1}^N p(y_T|x_T)\sum_{x_{T-1}=1}^N p(y_{T-1}|x_{T-1})p(x_T|x_{T-1}) \dots \\ &\sum_{x_{t+1}=1}^N p(y_{t+1}|x_{t+1}) p(x_{t+2}|x_{t+1}) p(y_t|x_t) p(x_{t+1}|x_t) p(x_t,y_1,\dots ,y_{t-1}) \\ =&\sum_{x_T=1}^N p(y_T|x_T)\sum_{x_{T-1}=1}^N p(y_{T-1}|x_{T-1})p(x_T|x_{T-1}) \dots \\ &\sum_{x_{t+1}=1}^N p(y_{t+1}|x_{t+1}) p(x_{t+2}|x_{t+1}) p(y_t|x_t) p(x_{t+1}|x_t) \sum_{x_{t-1}=1}^N p(y_{t-1}|x_{t-1}) p(x_t|x_{t-1}) \dots \\ & \sum_{x_{1}=1}^N p(y_{1}|x_{1}) p(x_2|x_1) p(x_1) \\ \\ p(y_1,\dots ,y_T) =& \sum_{x_T=1}^N p(y_T|x_T)\sum_{x_{T-1}=1}^N p(y_{T-1}|x_{T-1})p(x_T|x_{T-1}) \dots \\ &\sum_{x_{2}=1}^N p(y_{2}|x_{2}) p(x_3|x_{2}) \sum_{x_{1}=1}^N p(y_{1}|x_{1}) p(x_2|x_1) p(x_1) \end{split} $$

(b) Generate observations from the model in Figure 19.4, with a fair and a biased coin as the internal states, and probabilities to switch between them.

In [21]:
probs = np.array([[.5, .5], [.25, .75]])
trans_p = np.array([[.99, .01], [.01, .99]])

T = 900
coins = [np.random.choice([0, 1])]
obs = [np.random.choice([0, 1], p=probs[coins[-1]])]

for _ in range(T-1):
    coins.append(np.random.choice([0, 1], p=trans_p[coins[-1]]))
    obs.append(np.random.choice([0, 1], p=probs[coins[-1]]))

(c) Use knowledge of this model and those observations to estimate the probabilities for which coin was used when, and compare with the correct values.

In [22]:
def forward_rec(obs, trans_probs, obs_probs, start_probs):
    """Computes p(x_t, y_1, ... , y_{t-1})."""
    """Returns vector"""

    if len(obs) == 0:
        return start_probs
    
    state_obs_probs = forward_rec(obs[:-1], trans_probs, obs_probs, start_probs)*obs_probs[:,obs[-1]]
    return np.matmul(state_obs_probs, trans_probs)
    
def forward(obs, trans_probs, obs_probs, start_probs):
    """Computes p(y_1, ... , y_T)"""
    """Returns scalar"""
    
    state_obs_probs = forward_rec(obs[:-1], trans_probs, obs_probs, start_probs)*obs_probs[:,obs[-1]]
    return np.sum(state_obs_probs)
    
def backward(obs, trans_probs, obs_probs, flag=True):
    """Computes p(y_t, ... , y_T | x_t)"""
    """Returns vector"""

    if flag:
        state_obs_probs = obs_probs[:,obs[0]]*backward(obs[1:], trans_probs, obs_probs, flag=False)
        return state_obs_probs
    
    if len(obs) == 1:
        return np.matmul(trans_probs, obs_probs[:,obs[0]])
    else:
        try:
            state_obs_probs = obs_probs[:,obs[0]]*backward(obs[1:], trans_probs, obs_probs, flag=False)
            return np.matmul(trans_probs, state_obs_probs)
        except:
            print(obs)
In [23]:
obs_likelihood = forward(obs, trans_p, probs, [.5, .5])

backward_probs = np.array([backward(obs[t:], trans_p, probs) for t in range(T-1)])
forward_probs = np.array([forward_rec(obs[:t], trans_p, probs, [.5, .5]) for t in range(T-1)])
In [24]:
coin_probs = backward_probs*forward_probs/obs_likelihood
coin_probs = np.concatenate(([[.5, .5]], coin_probs))
In [25]:
accuracy = np.mean(np.equal(np.argmax(coin_probs, axis=-1), coins))
print('Accuracy = %f' % accuracy)
Accuracy = 0.933333
In [ ]: