Variational Auto-encoder—Peeking the code

Paper: Stochastic Gradient VB and the Variational Auto-Encoder

Code: https://github.com/y0ast/Variational-Autoencoder

We will dive directly into code:

# relu activation function
def relu(x):
    return T.switch(x<0, 0, x)class VAE:
    """This class implements the Variational Auto Encoder"""
    def __init__(self, continuous, hu_encoder, hu_decoder, n_latent, x_train, b1=0.95, b2=0.999, batch_size=100, learning_rate=0.001, lam=0):
        # let us keep the discussion to not continuous
        self.continuous = continuous
        # number of hidden units in encoder
        self.hu_encoder = hu_encoder
        # number of hidden units in decoder
        self.hu_decoder = hu_decoder
        # number of latent units
        self.n_latent = n_latent
        # number of training sample and number of features
        [self.N, self.features] = x_train.shape
        # a fixed random state so that you can reproduce the results
        self.prng = np.random.RandomState(42)# bias and learning rate
        self.b1 = b1
        self.b2 = b2
        self.learning_rate = learning_rate
        self.lam = lamself.batch_size = batch_size
        # standard deviation for weight initialization
        sigma_init = 0.01
         # these two are lambda function
         # create_weight takes (dim_input, dim_output) as input argument and will return a theano matrix of size=(dim_input, dim_output) whose values are taken from a normal distribution with mean=0 and standard deviation=`sigma_init`
        create_weight = lambda dim_input, dim_output: self.prng.normal(0, sigma_init, (dim_input, dim_output)).astype(theano.config.floatX)
        # create_bias takes (dim_output) as input and will return a theano vector of size=dim_output with each value=0
        create_bias = lambda dim_output: np.zeros(dim_output).astype(theano.config.floatX)# ENCODER
        # create weight matrix and biases for pass from input features to hidden weights
        W_xh = theano.shared(create_weight(self.features, hu_encoder), name='W_xh')
        b_xh = theano.shared(create_bias(hu_encoder), name='b_xh')
        # create weight matrix and biases for inference network (approximate a normal curve)
        W_hmu = theano.shared(create_weight(hu_encoder, n_latent), name='W_hmu')
        b_hmu = theano.shared(create_bias(n_latent), name='b_hmu')
        W_hsigma = theano.shared(create_weight(hu_encoder, n_latent), name='W_hsigma')
        b_hsigma = theano.shared(create_bias(n_latent), name='b_hsigma')# DECODER
        # create weight matrix and biases for decoder network
        W_zh = theano.shared(create_weight(n_latent, hu_decoder), name='W_zh')
        b_zh = theano.shared(create_bias(hu_decoder), name='b_zh')
        # A dict for parameters, will be used for back-propagation
        self.params = OrderedDict([("W_xh", W_xh), ("b_xh", b_xh), ("W_hmu", W_hmu), ("b_hmu", b_hmu),
                                   ("W_hsigma", W_hsigma), ("b_hsigma", b_hsigma), ("W_zh", W_zh),
                                   ("b_zh", b_zh)])if self.continuous:
        # Lets leave this for now
            W_hxmu = theano.shared(create_weight(hu_decoder, self.features), name='W_hxmu')
            b_hxmu = theano.shared(create_bias(self.features), name='b_hxmu')W_hxsig = theano.shared(create_weight(hu_decoder, self.features), name='W_hxsigma')
            b_hxsig = theano.shared(create_bias(self.features), name='b_hxsigma')self.params.update({'W_hxmu': W_hxmu, 'b_hxmu': b_hxmu, 'W_hxsigma': W_hxsig, 'b_hxsigma': b_hxsig})
        else:
        # create weight matrix and biases for reconstruction of features from decoder
            W_hx = theano.shared(create_weight(hu_decoder, self.features), name='W_hx')
            b_hx = theano.shared(create_bias(self.features), name='b_hx')
        # Update the dict of parameters
            self.params.update({'W_hx': W_hx, 'b_hx': b_hx})# Adam parameters
        self.m = OrderedDict()
        self.v = OrderedDict()
        # In this model adam optimizer have been used for optimizing each parameter, as per Adam parameter's config initialization first and second moment is dont to zero vector
        for key, value in self.params.items():
                self.m[key] = theano.shared(np.zeros_like(value.get_value()).astype(theano.config.floatX), name='m_' + key)
                self.v[key] = theano.shared(np.zeros_like(value.get_value()).astype(theano.config.floatX), name='v_' + key)x_train = theano.shared(x_train.astype(theano.config.floatX), name="x_train")# Go to create_gradientfunctions firstly
        self.update, self.likelihood, self.encode, self.decode = self.create_gradientfunctions(x_train)def encoder(self, x):
        # h_encoder = activation_function (Matrix multiplication of input feature vector ('x') with weight matrix for pass from input features to hidden weights)
        h_encoder = relu(T.dot(x, self.params['W_xh']) + self.params['b_xh'].dimshuffle('x', 0))# calculate the mean and standard deviation from inference network
        mu = T.dot(h_encoder, self.params['W_hmu']) + self.params['b_hmu'].dimshuffle('x', 0)
        log_sigma = T.dot(h_encoder, self.params['W_hsigma']) + self.params['b_hsigma'].dimshuffle('x', 0)return mu, log_sigmadef sampler(self, mu, log_sigma):
        seed = 42if "gpu" in theano.config.device:
            srng = theano.sandbox.cuda.rng_curand.CURAND_RandomStreams(seed=seed)
        else:
            srng = T.shared_randomstreams.RandomStreams(seed=seed)eps = srng.normal(mu.shape)# Reparametrize (See here: http://blog.shakirm.com/2015/10/machine-learning-trick-of-the-day-4-reparameterisation-tricks/)
        # This is essential for backpropagation, we are approximating a normal distribution
        z = mu + T.exp(0.5 * log_sigma) * epsreturn zdef decoder(self, x, z):
        # calculate decoder neuron values
        h_decoder = relu(T.dot(z, self.params['W_zh']) + self.params['b_zh'].dimshuffle('x', 0))if self.continuous:
            reconstructed_x = T.dot(h_decoder, self.params['W_hxmu']) + self.params['b_hxmu'].dimshuffle('x', 0)
            log_sigma_decoder = T.dot(h_decoder, self.params['W_hxsigma']) + self.params['b_hxsigma']logpxz = (-(0.5 * np.log(2 * np.pi) + 0.5 * log_sigma_decoder) -
                      0.5 * ((x - reconstructed_x)**2 / T.exp(log_sigma_decoder))).sum(axis=1)
        else:
            # multiply decoder layer with output matrix (last one) to reconstruct input and find loss through binary crossentropy
            # binary crossentropy because author has used MNIST
            reconstructed_x = T.nnet.sigmoid(T.dot(h_decoder, self.params['W_hx']) + self.params['b_hx'].dimshuffle('x', 0))
            logpxz = - T.nnet.binary_crossentropy(reconstructed_x, x).sum(axis=1)return reconstructed_x, logpxzdef create_gradientfunctions(self, x_train):
        x = T.matrix("x")epoch = T.scalar("epoch")batch_size = x.shape[0]# Calculate the mean and std deviation for approximating the input representation to a normal distribution
        mu, log_sigma = self.encoder(x)
        # use the normal distribution to sample a latent variable
        z = self.sampler(mu, log_sigma)
        # use the latent variable to reconstruct input
        reconstructed_x, logpxz = self.decoder(x,z)# Expectation of (logpz - logqz_x) over logqz_x is equal to KLD (see appendix B):
        KLD = 0.5 * T.sum(1 + log_sigma - mu**2 - T.exp(log_sigma), axis=1)# Average over batch dimension
        # KLD is used as a regularizer, so that our encoder doesn't cheat us
        logpx = T.mean(logpxz + KLD)# Compute all the gradients
        gradients = T.grad(logpx, self.params.values())# Adam implemented as updates
        updates = self.get_adam_updates(gradients, epoch)batch = T.iscalar('batch')givens = {
            x: x_train[batch*self.batch_size:(batch+1)*self.batch_size, :]
        }# Define a bunch of functions for convenience
        update = theano.function([batch, epoch], logpx, updates=updates, givens=givens)
        likelihood = theano.function([x], logpx)
        encode = theano.function([x], z)
        decode = theano.function([z], reconstructed_x)return update, likelihood, encode, decode# A simple adam optimizer implementation
    def get_adam_updates(self, gradients, epoch):
        updates = OrderedDict()
        gamma = T.sqrt(1 - self.b2**epoch) / (1 - self.b1**epoch)values_iterable = zip(self.params.keys(), self.params.values(), gradients, 
                              self.m.values(), self.v.values())for name, parameter, gradient, m, v in values_iterable:
            new_m = self.b1 * m + (1. - self.b1) * gradient
            new_v = self.b2 * v + (1. - self.b2) * (gradient**2)updates[parameter] = parameter + self.learning_rate * gamma * new_m / (T.sqrt(new_v) + epsilon)if 'W' in name:
                # MAP on weights (same as L2 regularization)
                updates[parameter] -= self.learning_rate * self.lam * (parameter * np.float32(self.batch_size / self.N))updates[m] = new_m
            updates[v] = new_vreturn updates

The same code is available at: gist

In a nutshell variational auto-encoder takes an input, create a hidden representation then use the hidden representation to approximate a normal curve, sample a random data from the approximate normal distribution, use it to reconstruct the input. The loss consist of two parts: reconstruction error and regularization (KL Divergence).

30

30 claps
Nishant Nikhil

Written by

Undergrad @ IIT Kharagpur | Learning Intelligence | GSoC ’16 Student | GSoC’ 17 Mentor