Implementation and working notes of Final Project

A] Initial final project direction: Iterative parameter fitting and model selection.

Initially I wanted to explore function fitting using neural networks, in order to get a sense of a data driven parameter estimation. The ideas was to train a neural network fit the parameters of a given function family into a set of datapoints, with aim to use the same network architecture for model selection.

To explain the last sentence, let's assume that we have a function of the form $f(\vec{x})=\sum_i m_iu_i(\vec{x})$, where $u_i$ is a diverse set of functions. For example, $$f(x) = m_1[\alpha sin(\beta x)] + m_2 \log(x) + m_3 x^2 \text{ . }$$

The neural network would have to learn the parameters $m_i$, which is essential a model selection for the underlying function. Of course, this approach relies heavily on knowledge about the domain of the dataset, which will be used to form the set of functions $u_i$ that the network would have to select for.

Investigating function approximation with neural nets

In [1]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style('white')
sns.set_context('notebook')
import keras
import keras.backend as K
from keras.models import Sequential
from keras.layers import Dense, Activation
from keras.callbacks import EarlyStopping
Using TensorFlow backend.
In [2]:
x = np.arange(-10, 10, 0.01)
y1 = 2*x**2 + 3
f1 = lambda x: 2*x**2 + 3
x_train = np.random.choice(x, size=1000)
y_train = f1(x_train)
In [3]:
def train_simple_neural_network(x_train, y_train, x_test, y_test, n_neurons=2, epochs=1000, batch_size=50):
    model = Sequential()
    model.add(Dense(n_neurons, input_shape=(1,)))
    model.add(Activation('sigmoid'))
    model.add(Dense(1,use_bias=False))
    model.add(Activation('linear'))

    model.compile(loss='mse',
                  optimizer='sgd')
    
    history = model.fit(x_train, y_train,
                    batch_size=batch_size,
                    epochs=epochs,
                    verbose=0,
                    validation_split=0,
                    callbacks=[EarlyStopping(patience=20, min_delta=.001, monitor='loss')])
    
    y_pred = model.predict(x_test, batch_size=50, verbose=1)
    error = model.evaluate(x_test, y_test, batch_size=50, verbose=1)
        
    return y_pred, model.count_params(), error
In [4]:
plt.scatter(x, y1, label='Function')
for n in [1, 2, 5, 10]:
    y_pred, params, error = train_simple_neural_network(x_train, y_train, x, y1, n_neurons=n)
    plt.scatter(x, y_pred, label='%d Neurons, %d params, %.3f Error' % (n, params, error))
plt.legend()
plt.show()
2000/2000 [==============================] - 0s     
In [5]:
plt.scatter(x, y1, label='Function')
for n in [10, 20, 50, 100]:
    y_pred, params, error = train_simple_neural_network(x_train, y_train, x, y1, n_neurons=n)
    plt.scatter(x, y_pred, label='%d Neurons, %d params, %.3f Error' % (n, params, error))
plt.legend()
plt.show()
  50/2000 [..............................] - ETA: 2s
In [6]:
def train_univ_neural_network(x_train, y_train, x_test, y_test, n_neurons=None):
    if n_neurons==None:
        n_neurons=[]
    model = Sequential()
    model.add(Dense(n_neurons[0], input_shape=(1,)))
    model.add(Activation('sigmoid'))
    model.add(Dense(n_neurons[1], input_shape=(1,)))
    model.add(Activation('sigmoid'))
    model.add(Dense(1,use_bias=False))
    model.add(Activation('linear'))

    model.compile(loss='mse',
                  optimizer='sgd')
    
    history = model.fit(x_train, y_train,
                    batch_size=100,
                    epochs=1000,
                    verbose=0,
                    validation_split=0,
                    callbacks=[EarlyStopping(patience=20, min_delta=.001, monitor='loss')])
    
    y_pred = model.predict(x_test, batch_size=50, verbose=1)
    error = model.evaluate(x_test, y_test, batch_size=50, verbose=1)
        
    return y_pred, model.count_params(), error
In [7]:
plt.scatter(x, y1, label='Function')
for n in [1, 2, 5, 10]:
    y_pred, params, error = train_univ_neural_network(x_train, y_train, x, y1, n_neurons=[n,n])
    plt.scatter(x, y_pred, label='%d Neurons, %d params, %.3f Error' % (n, params, error))
plt.legend()
plt.show()
  50/2000 [..............................] - ETA: 2s
In [8]:
plt.scatter(x, y1, label='Function')
for n in [10, 20, 50, 100]:
    y_pred, params, error = train_univ_neural_network(x_train, y_train, x, y1, n_neurons=[n, n//2])
    plt.scatter(x, y_pred, label='%d Neurons, %d params, %.3f Error' % (n, params, error))
plt.legend()
plt.show()
2000/2000 [==============================] - 0s     
In [9]:
x = np.arange(-2.5, 2.5, 0.001)
y3 = np.sin(2*x**2 + 3)
f3 = lambda x: np.sin(2*x**2 + 3)
x_train3 = np.random.choice(x, size=5000)
y_train3 = f3(x_train3)
In [10]:
plt.scatter(x, y3, label='Function')
plt.xlim([-2.5, 2.5])
for n in [1, 2, 5, 10]:
    y_pred, params, error = train_simple_neural_network(x_train3, y_train3, x, y3, n_neurons=n, batch_size=100)
    plt.scatter(x, y_pred, label='%d Neurons, %d params, %.3f Error' % (n, params, error))
plt.legend()
plt.show()
4650/5000 [==========================>...] - ETA: 0s
In [11]:
def train_deep_neural_network(x_train, y_train, x_test, y_test, n_neurons=None, activation='sigmoid'):
    if n_neurons==None:
        n_neurons=[]
    model = Sequential()
    for i in n_neurons:
        model.add(Dense(i, input_shape=(1,)))
        model.add(Activation(activation))
    model.add(Dense(1,use_bias=False))
    model.add(Activation('linear'))

    model.compile(loss='mse',
                  optimizer='sgd')
    
    history = model.fit(x_train, y_train,
                    batch_size=100,
                    epochs=1000,
                    verbose=0,
                    validation_split=0,
                    callbacks=[EarlyStopping(patience=20, min_delta=.001, monitor='loss')])
    
    y_pred = model.predict(x_test, batch_size=50, verbose=1)
    error = model.evaluate(x_test, y_test, batch_size=50, verbose=1)
        
    return y_pred, model.count_params(), error
In [12]:
plt.scatter(x, y3, label='Function')
plt.xlim([-2.5, 2.5])
for act in ['relu', 'sigmoid', 'tanh', 'linear']:
    n = [10,10,10]
    y_pred, params, error = train_deep_neural_network(x_train3, y_train3, x, y3, n_neurons=n, activation=act)
    plt.scatter(x, y_pred, label=act)
plt.legend()
plt.show()
3900/5000 [======================>.......] - ETA: 0s

As you can see, there is a very subtle relationship between the neural network perfomance, and hyperparameters such as number of layers, number of neurons and activation function.

Difference between NN with and without prior for the model

In [13]:
x_vis = np.arange(-5., 5., .01)
y_vis = f3(x_vis)
data  = []
for lay in [1, 2, 3]:
    n = [10 for _ in range(lay)]
    y_pred, params, error = train_deep_neural_network(x_train3, y_train3, x_vis, y_vis, n_neurons=n, activation='tanh')
    data.append({'y_pred':y_pred, 'label': '%d Layers' % len(n)})
  50/1000 [>.............................] - ETA: 3s
In [14]:
plt.plot(x_vis, y_vis, label='Function', color='k')
plt.xlim([-5., 5.])
plt.ylim([-1., 1.])
for d in data:
    plt.scatter(x_vis, d['y_pred'], label=d['label'])
plt.legend()
plt.show()
In [24]:
import keras.backend as K
from keras.models import Model
from keras.layers import Lambda, Input
def train_deep_neural_network_prior(x_train, y_train, x_test, y_test, n_neurons=None, activation='sigmoid'):
    if n_neurons==None:
        n_neurons=[]
    
    inp = Input(batch_shape=(None, 1))
    x = inp
    for i in n_neurons:
        x = Dense(i, input_shape=(1,))(x)
        x = Activation(activation)(x)
    a = Dense(1, use_bias=False, activation='linear')(x)
    b = Dense(1, use_bias=False, activation='linear')(x)
    c = Dense(1, use_bias=False, activation='linear')(x)
    
    def model_function(args):
        a, b, c, inp = args
        return a*K.sin(b*K.square(inp) + c)
    
    out = Lambda(model_function)([a, b, c, inp])
 
    model = Model(inp, out)
    
    params = Model(inp, [a, b, c])
    
    model.compile(loss='mse',
                  optimizer='sgd')
    
    history = model.fit(x_train, y_train,
                    batch_size=100,
                    epochs=1000,
                    verbose=0,
                    validation_split=0.2,
                    callbacks=[EarlyStopping(patience=20, min_delta=.001, monitor='loss')])
    
    y_pred = model.predict(x_test, batch_size=50, verbose=1)
    error = model.evaluate(x_test, y_test, batch_size=50, verbose=1)
        
    return y_pred, model.count_params(), error, history, params
In [25]:
x_vis = np.arange(-5.,5.,.01)
y_vis = f3(x_vis)
n = [10]
y_pred, params, error, history, param_model = train_deep_neural_network_prior(x_train3, y_train3, 
                                                                 x_vis, y_vis, n_neurons=n, activation='relu')
  50/1000 [>.............................] - ETA: 0s
In [26]:
a_v = []
b_v = []
c_v = []
for i in x_vis:
    a,b,c = param_model.predict(np.asarray([i]))
    a_v.append(a.squeeze())
    b_v.append(b.squeeze())
    c_v.append(c.squeeze())
In [27]:
plt.figure()
plt.title('a*sin(b*x^2 + c)')
plt.plot(x_vis, a_v, label='a')
plt.plot(x_vis, b_v, label='b')
plt.plot(x_vis, c_v, label='c')
plt.legend()
plt.show()
In [28]:
plt.figure()
plt.plot(x_vis, y_vis, label='Function', color='k')
plt.xlim([-5., 5.])
plt.ylim([-1., 1.])
plt.scatter(x_vis, y_pred, label='NN with Prior')
plt.legend()
plt.show()

B] Final Project: Predicting (pseudo-)randomness

Introduction

It is impossible to predict the output of a true random process, but what about a pseudo random number generator? A good implementation of a pseudo-random number generator (PRG) produces bits that are actually indistinguishable from a true random source, and its statistically properties are rigorously tested. Yet, in all cases, the output is produced by a deterministic process, programmed to generate an extremely long (you guessed well, longer than the age of the universe) sequence of bits before looping back to the start.

The purpose of this work, which is inspired by the very last problem set question of this course, is to thoroughly investigate whether (deep) neural networks can learn the underlying generating process, when given a sufficiently long run of bits from a PRG as a training set. As a test-case, I will use the Linear Feedback Shift Register PRGs that we talked about at the second week, since they provide an seemingly easy task (addition of two or four bits) for a neural network to reverse engineer.

The project is structured as follows; Initially, I will expand on previous excersice, and use several deep feed-forward neural networks. I will do a manual hyper-parameter optimization run, to find the dependencies between network architectures and the LFSR. I will continue with a selection of recurrent neural networks to show their limitations in capturing simple operations between datapoints in sequence data. Finally, to overcome the limitations of RNNs, I will try to implement a Neural Programmer, which is a novel architecture capable to learn quickly simple mathematical (or logical) operations needed to produce an output from an set or sequence of inputs.

Setting up the LFSRs

In [1]:
# Imports
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style('white')
sns.set_context('notebook')

from sklearn.model_selection import train_test_split
import keras
import keras.backend as K
from keras.models import Model
from keras.layers import Dense, Activation, Input
from keras.callbacks import EarlyStopping
Using TensorFlow backend.
In [2]:
def dectobin(dec, bits):
    binary = [int(x) for x in bin(dec)[2:]]
    return np.hstack((np.zeros(bits-len(binary), dtype=int), binary ))

# Build PRG functions
class LFSR():
    def __init__(self, xnor_bits: list, seed: int = None):
        self.xnor_bits = xnor_bits
        self.tap = max(xnor_bits)
        
        if seed:
            self.state = dectobin(seed, self.tap).tolist()
        else:
            self.state = np.random.randint(2, size=self.tap).tolist()
            
    def step(self):
        next_bit = sum([self.state[i-1] for i in self.xnor_bits]) % 2
        self.state = self.state[1:] + [next_bit]
        return next_bit
    
    def run_seq(self, length: int):
        return [self.step() for _ in range(length)]
    
    def run_gen(self):
        while 1:
            yield self.step()

Having an implementation for a generic LFSR, I consulted the table of Maximal Length Taps in order to implement four examples of LFSR PRGs that I will use for this project. To verify the implementation, but also to visually understand the properties of the LFSRs, I visualized them as 2D images.

In [3]:
# 4-tap
lfsr_4 = LFSR([1, 4], 5)
lfsr_4_seq = lfsr_4.run_seq(length=10000)
lfsr_4_img = np.asarray(lfsr_4_seq).reshape((100,100))

# 8-tap
lfsr_8 = LFSR([2, 3, 4, 8], 5)
lfsr_8_seq = lfsr_8.run_seq(length=10000)
lfsr_8_img = np.asarray(lfsr_8_seq).reshape((100,100))

# 23-tap
lfsr_23 = LFSR([5, 23], 5)
lfsr_23_seq = lfsr_23.run_seq(length=10000)
lfsr_23_img = np.asarray(lfsr_23_seq).reshape((100,100))

# 85-tap
lfsr_85 = LFSR([85,84,57,53], 5)
lfsr_85_seq = lfsr_85.run_seq(length=10000)
lfsr_85_img = np.asarray(lfsr_85_seq).reshape((100,100))

f, axarr = plt.subplots(2, 2)
axarr[0, 0].imshow(lfsr_4_img)
axarr[0, 0].set_title('LFSR 4-tap')
axarr[0, 1].imshow(lfsr_8_img)
axarr[0, 1].set_title('LFSR 8-tap')
axarr[1, 0].imshow(lfsr_23_img)
axarr[1, 0].set_title('LFSR 23-tap')
axarr[1, 1].imshow(lfsr_85_img)
axarr[1, 1].set_title('LFSR 85-tap')
f.subplots_adjust(hspace=0.3)
f.set_size_inches(12,12)
plt.show()
In [4]:
# Prepare datasets
def chunks(l, n):
    for i in range(0, len(l)-n):
        yield l[i:i + n]
def generate_dataset(sequence, window, test_size=.2, net='feed_forward'):
    data = list(chunks(sequence, window+1))
    x_data = np.array([d[:window] for d in data])
    y_data = np.array([d[window] for d in data])
    
    if net=='feed_forward':
        # Dataset for the feed forward
        x_train, x_valid, y_train, y_valid = train_test_split(x_data, y_data, test_size=.2, stratify=y_data)
        return x_train, x_valid, y_train, y_valid
    elif net=='recurrent':
        # Dataset for the recurrent
        x_train, x_valid, y_train, y_valid = train_test_split(x_data, y_data, test_size=.2, stratify=y_data)
        return x_train.reshape((-1,window,1)), x_valid.reshape((-1,window,1)), y_train, y_valid
    else:
        # Dataset for the autoencoder
        x_train, x_valid = train_test_split(x_data, test_size=.2)
        return x_train, x_valid

Setting up Feed-Forward Network

In [5]:
def train_feed_forward(x_train, y_train, x_test, y_test, n_neurons: list=None, activation='sigmoid', return_model=False):
    
    inp = Input(batch_shape=(None, x_train.shape[1]))
    x = inp
    for i in n_neurons:
        x = Dense(i)(x)
        x = Activation(activation)(x)
        
    out = Dense(1,use_bias=False, activation='sigmoid')(x)
    
    model = Model(inp, out)
    
    model.compile(loss='binary_crossentropy',
                  optimizer='sgd',
                  metrics=['binary_accuracy'])
    
    history = model.fit(x_train, y_train,
                    batch_size=100,
                    epochs=100,
                    verbose=0,
                    validation_split=.2,
                    callbacks=[EarlyStopping(patience=20, min_delta=.001, monitor='val_binary_accuracy')])
    
    y_pred = model.predict(x_test, batch_size=50, verbose=0)
    loss, acc = model.evaluate(x_test, y_test, batch_size=50, verbose=0)
    
    if return_model:
        return model
        
    return y_pred, model.count_params(), loss, acc

Survey on the number of layers and neurons that work well in the LFSR problem.

For now, let's keep the context window to the known size of the LFSR tap.

a) LFSR-4

In [12]:
x_train, x_valid, y_train, y_valid = generate_dataset(lfsr_4_seq, window=4, net='feed_forward')
layers = [1, 2, 3, 5]
neurons = [2, 4, 8, 10]
loss = np.zeros((len(layers), len(neurons)))
acc = np.zeros_like(loss)
params = np.zeros_like(loss, dtype=int)
for i,l in enumerate(layers):
    for j,n in enumerate(neurons):
        y_pred, params[i,j], loss[i,j], acc[i,j] = train_feed_forward(x_train, y_train, 
                                                             x_valid, y_valid, 
                                                             n_neurons=[n for _ in range(l)], activation='relu')
f, axarr = plt.subplots(1, 2)
for i,a in enumerate(acc.T):
    axarr[0].plot(layers, a, marker='o', label='%d Neurons' % neurons[i])
axarr[0].set_xticks(layers)
axarr[0].set_xlabel('#Layers')
axarr[0].set_ylabel('Accuracy')
axarr[0].legend()
for i, a in enumerate(acc):
    axarr[1].plot(neurons, a, marker='o', label='%d Layers' % layers[i])
axarr[1].set_xticks(neurons)
axarr[1].set_xlabel('#Neurons')
axarr[1].set_ylabel('Accuracy')
axarr[1].legend()
f.set_size_inches(12,4)
plt.show()

b) LFSR-8

In [74]:
x_train, x_valid, y_train, y_valid = generate_dataset(lfsr_8_seq, window=8, net='feed_forward')
layers = [1, 2, 3, 5]
neurons = [2, 4, 8, 10]
loss = np.zeros((len(layers), len(neurons)))
acc = np.zeros_like(loss)
params = np.zeros_like(loss, dtype=int)
for i,l in enumerate(layers):
    for j,n in enumerate(neurons):
        y_pred, params[i,j], loss[i,j], acc[i,j] = train_feed_forward(x_train, y_train, 
                                                             x_valid, y_valid, 
                                                             n_neurons=[n for _ in range(l)], activation='relu')
f, axarr = plt.subplots(1, 2)
for i,a in enumerate(acc):
    axarr[0].plot(layers, a, marker='o', label='%d Neurons' % neurons[i])
axarr[0].set_xticks(layers)
axarr[0].set_xlabel('#Layers')
axarr[0].set_ylabel('Accuracy')
axarr[0].legend()
for i, a in enumerate(acc.T):
    axarr[1].plot(neurons, a, marker='o', label='%d Layers' % layers[i])
axarr[1].set_xticks(neurons)
axarr[1].set_xlabel('#Neurons')
axarr[1].set_ylabel('Accuracy')
axarr[1].legend()
f.set_size_inches(12,4)
plt.show()
1999/1999 [==============================] - 0s     
1400/1999 [====================>.........] - ETA: 0s

c] LFSR-23

In [92]:
x_train, x_valid, y_train, y_valid = generate_dataset(lfsr_23_seq, window=23, net='feed_forward')
layers = [1, 2, 3, 5]
neurons = [2, 4, 8, 10]
loss = np.zeros((len(layers), len(neurons)))
acc = np.zeros_like(loss)
params = np.zeros_like(loss, dtype=int)
for i,l in enumerate(layers):
    for j,n in enumerate(neurons):
        y_pred, params[i,j], loss[i,j], acc[i,j] = train_feed_forward(x_train, y_train, 
                                                             x_valid, y_valid, 
                                                             n_neurons=[n for _ in range(l)], activation='relu')
f, axarr = plt.subplots(1, 2)
for i,a in enumerate(acc):
    axarr[0].plot(layers, a, marker='o', label='%d Neurons' % neurons[i])
axarr[0].set_xticks(layers)
axarr[0].set_xlabel('#Layers')
axarr[0].set_ylabel('Accuracy')
axarr[0].legend()
for i, a in enumerate(acc.T):
    axarr[1].plot(neurons, a, marker='o', label='%d Layers' % layers[i])
axarr[1].set_xticks(neurons)
axarr[1].set_xlabel('#Neurons')
axarr[1].set_ylabel('Accuracy')
axarr[1].legend()
f.set_size_inches(12,4)
plt.show()
1996/1996 [==============================] - 1s      
1996/1996 [==============================] - 0s     
1200/1996 [=================>............] - ETA: 0s