Variational Auto-encoder—Peeking the code

Paper: Stochastic Gradient VB and the Variational Auto-Encoder

Code: https://github.com/y0ast/Variational-Autoencoder

We will dive directly into code:

# relu activation function
def relu(x):
return T.switch(x<0, 0, x)
class VAE:
"""This class implements the Variational Auto Encoder"""
def __init__(self, continuous, hu_encoder, hu_decoder, n_latent, x_train, b1=0.95, b2=0.999, batch_size=100, learning_rate=0.001, lam=0):
# let us keep the discussion to not continuous
self.continuous = continuous
# number of hidden units in encoder
self.hu_encoder = hu_encoder
# number of hidden units in decoder
self.hu_decoder = hu_decoder
# number of latent units
self.n_latent = n_latent
# number of training sample and number of features
[self.N, self.features] = x_train.shape
# a fixed random state so that you can reproduce the results
self.prng = np.random.RandomState(42)
# bias and learning rate
self.b1 = b1
self.b2 = b2
self.learning_rate = learning_rate
self.lam = lam
self.batch_size = batch_size
# standard deviation for weight initialization
sigma_init = 0.01
# these two are lambda function
# create_weight takes (dim_input, dim_output) as input argument and will return a theano matrix of size=(dim_input, dim_output) whose values are taken from a normal distribution with mean=0 and standard deviation=`sigma_init`
create_weight = lambda dim_input, dim_output: self.prng.normal(0, sigma_init, (dim_input, dim_output)).astype(theano.config.floatX)
# create_bias takes (dim_output) as input and will return a theano vector of size=dim_output with each value=0
create_bias = lambda dim_output: np.zeros(dim_output).astype(theano.config.floatX)
# ENCODER
# create weight matrix and biases for pass from input features to hidden weights
W_xh = theano.shared(create_weight(self.features, hu_encoder), name='W_xh')
b_xh = theano.shared(create_bias(hu_encoder), name='b_xh')
# create weight matrix and biases for inference network (approximate a normal curve)
W_hmu = theano.shared(create_weight(hu_encoder, n_latent), name='W_hmu')
b_hmu = theano.shared(create_bias(n_latent), name='b_hmu')
W_hsigma = theano.shared(create_weight(hu_encoder, n_latent), name='W_hsigma')
b_hsigma = theano.shared(create_bias(n_latent), name='b_hsigma')
# DECODER
# create weight matrix and biases for decoder network
W_zh = theano.shared(create_weight(n_latent, hu_decoder), name='W_zh')
b_zh = theano.shared(create_bias(hu_decoder), name='b_zh')
# A dict for parameters, will be used for back-propagation
self.params = OrderedDict([("W_xh", W_xh), ("b_xh", b_xh), ("W_hmu", W_hmu), ("b_hmu", b_hmu),
("W_hsigma", W_hsigma), ("b_hsigma", b_hsigma), ("W_zh", W_zh),
("b_zh", b_zh)])
if self.continuous:
# Lets leave this for now
W_hxmu = theano.shared(create_weight(hu_decoder, self.features), name='W_hxmu')
b_hxmu = theano.shared(create_bias(self.features), name='b_hxmu')
W_hxsig = theano.shared(create_weight(hu_decoder, self.features), name='W_hxsigma')
b_hxsig = theano.shared(create_bias(self.features), name='b_hxsigma')
self.params.update({'W_hxmu': W_hxmu, 'b_hxmu': b_hxmu, 'W_hxsigma': W_hxsig, 'b_hxsigma': b_hxsig})
else:
# create weight matrix and biases for reconstruction of features from decoder
W_hx = theano.shared(create_weight(hu_decoder, self.features), name='W_hx')
b_hx = theano.shared(create_bias(self.features), name='b_hx')
# Update the dict of parameters
self.params.update({'W_hx': W_hx, 'b_hx': b_hx})
# Adam parameters
self.m = OrderedDict()
self.v = OrderedDict()
# In this model adam optimizer have been used for optimizing each parameter, as per Adam parameter's config initialization first and second moment is dont to zero vector
for key, value in self.params.items():
self.m[key] = theano.shared(np.zeros_like(value.get_value()).astype(theano.config.floatX), name='m_' + key)
self.v[key] = theano.shared(np.zeros_like(value.get_value()).astype(theano.config.floatX), name='v_' + key)
x_train = theano.shared(x_train.astype(theano.config.floatX), name="x_train")
# Go to create_gradientfunctions firstly
self.update, self.likelihood, self.encode, self.decode = self.create_gradientfunctions(x_train)
def encoder(self, x):
# h_encoder = activation_function (Matrix multiplication of input feature vector ('x') with weight matrix for pass from input features to hidden weights)
h_encoder = relu(T.dot(x, self.params['W_xh']) + self.params['b_xh'].dimshuffle('x', 0))
# calculate the mean and standard deviation from inference network
mu = T.dot(h_encoder, self.params['W_hmu']) + self.params['b_hmu'].dimshuffle('x', 0)
log_sigma = T.dot(h_encoder, self.params['W_hsigma']) + self.params['b_hsigma'].dimshuffle('x', 0)
return mu, log_sigma
def sampler(self, mu, log_sigma):
seed = 42
if "gpu" in theano.config.device:
srng = theano.sandbox.cuda.rng_curand.CURAND_RandomStreams(seed=seed)
else:
srng = T.shared_randomstreams.RandomStreams(seed=seed)
eps = srng.normal(mu.shape)
# Reparametrize (See here: http://blog.shakirm.com/2015/10/machine-learning-trick-of-the-day-4-reparameterisation-tricks/)
# This is essential for backpropagation, we are approximating a normal distribution
z = mu + T.exp(0.5 * log_sigma) * eps
return z
def decoder(self, x, z):
# calculate decoder neuron values
h_decoder = relu(T.dot(z, self.params['W_zh']) + self.params['b_zh'].dimshuffle('x', 0))
if self.continuous:
reconstructed_x = T.dot(h_decoder, self.params['W_hxmu']) + self.params['b_hxmu'].dimshuffle('x', 0)
log_sigma_decoder = T.dot(h_decoder, self.params['W_hxsigma']) + self.params['b_hxsigma']
logpxz = (-(0.5 * np.log(2 * np.pi) + 0.5 * log_sigma_decoder) -
0.5 * ((x - reconstructed_x)**2 / T.exp(log_sigma_decoder))).sum(axis=1)
else:
# multiply decoder layer with output matrix (last one) to reconstruct input and find loss through binary crossentropy
# binary crossentropy because author has used MNIST
reconstructed_x = T.nnet.sigmoid(T.dot(h_decoder, self.params['W_hx']) + self.params['b_hx'].dimshuffle('x', 0))
logpxz = - T.nnet.binary_crossentropy(reconstructed_x, x).sum(axis=1)
return reconstructed_x, logpxz
def create_gradientfunctions(self, x_train):
x = T.matrix("x")
epoch = T.scalar("epoch")
batch_size = x.shape[0]
# Calculate the mean and std deviation for approximating the input representation to a normal distribution
mu, log_sigma = self.encoder(x)
# use the normal distribution to sample a latent variable
z = self.sampler(mu, log_sigma)
# use the latent variable to reconstruct input
reconstructed_x, logpxz = self.decoder(x,z)
# Expectation of (logpz - logqz_x) over logqz_x is equal to KLD (see appendix B):
KLD = 0.5 * T.sum(1 + log_sigma - mu**2 - T.exp(log_sigma), axis=1)
# Average over batch dimension
# KLD is used as a regularizer, so that our encoder doesn't cheat us
logpx = T.mean(logpxz + KLD)
# Compute all the gradients
gradients = T.grad(logpx, self.params.values())
# Adam implemented as updates
updates = self.get_adam_updates(gradients, epoch)
batch = T.iscalar('batch')
givens = {
x: x_train[batch*self.batch_size:(batch+1)*self.batch_size, :]
}
# Define a bunch of functions for convenience
update = theano.function([batch, epoch], logpx, updates=updates, givens=givens)
likelihood = theano.function([x], logpx)
encode = theano.function([x], z)
decode = theano.function([z], reconstructed_x)
return update, likelihood, encode, decode
# A simple adam optimizer implementation
def get_adam_updates(self, gradients, epoch):
updates = OrderedDict()
gamma = T.sqrt(1 - self.b2**epoch) / (1 - self.b1**epoch)
values_iterable = zip(self.params.keys(), self.params.values(), gradients, 
self.m.values(), self.v.values())
for name, parameter, gradient, m, v in values_iterable:
new_m = self.b1 * m + (1. - self.b1) * gradient
new_v = self.b2 * v + (1. - self.b2) * (gradient**2)
updates[parameter] = parameter + self.learning_rate * gamma * new_m / (T.sqrt(new_v) + epsilon)
if 'W' in name:
# MAP on weights (same as L2 regularization)
updates[parameter] -= self.learning_rate * self.lam * (parameter * np.float32(self.batch_size / self.N))
updates[m] = new_m
updates[v] = new_v
return updates

The same code is available at: gist

In a nutshell variational auto-encoder takes an input, create a hidden representation then use the hidden representation to approximate a normal curve, sample a random data from the approximate normal distribution, use it to reconstruct the input. The loss consist of two parts: reconstruction error and regularization (KL Divergence).