How to do time series prediction using RNNs, TensorFlow and Cloud ML Engine

The Estimators API in tf.contrib.learn (See tutorial here) is a very convenient way to get started using TensorFlow. The really cool thing from my perspective about the Estimators API is that using it is a very easy way to create distributed TensorFlow models. Many of the TensorFlow samples that you see floating around on the internets are not distributed — they assume that you will be running the code on a single machine. People start with such code and then are immeasurably saddened to learn that the low-level TensorFlow code doesn’t actually work on their complete dataset. They then have to do lots of work to add distributed training code around the original sample, and who wants to edit somebody else’s code?

So, please, please, please, if you see a TensorFlow sample that doesn’t use the Estimators API, ignore it. It will be a lot of work to make it work on your production (read: large) datasets — there will be monitors, coordinators, parameter servers, and all kinds of systems programming craziness that you don’t want to have to dive into. Start with the Estimator API and use the Experiment class. (Disclaimer: my views, not that of my employer).

Time series prediction needs a custom estimator

The Estimators API comes with a Deep Neural Network classifier and regressor. If you have typical structured data, follow the tutorial linked above or take this training course from Google Cloud (soon to be available on Coursera) and you’ll be on your way to creating machine learning models that work on real-world, large datasets in your relational data warehouse. But what if you don’t have a typical structured data problem? In that case, you will often need to create a custom estimator. In this blog post, I will show you how.

A common type of data that you will want to do machine learning on is time-series data. Essentially, your inputs are a set of numbers and you want to predict the next number in that sequence. In this article, I will make it a bit more general and assume that you want to predict the last two numbers of the sequence. As the Computer Science proverb goes, if you can do two, you can do N.

The traditional neural network architecture that is used for sequence-to-sequence prediction is called a Recurrent Neural Network (RNN). See this article and this one for a very accessible introduction to RNNs. But you don’t need to know how to implement a RNN to use one, so once those articles go deeper than you want, quit.

To follow along with this article, have my Jupyter notebook open in another browser window. I am only showing key snippets of code here. The notebook (and the GitHub folder it’s in) contains all of the code.

Simulate some time-series data

It’s usually easier to learn with a small toy dataset that you can generate as much as you want of. Real data will come with its own quirks! So, let’s generate a bunch of time-series data. Each sequence will consist of 10 numbers. We will use the first eight as inputs and the last two as the labels (i.e., what is to be predicted):

The code to generate these time-series sequences using numpy (np):

SEQ_LEN = 10
def create_time_series():
freq = (np.random.random()*0.5) + 0.1 # 0.1 to 0.6
ampl = np.random.random() + 0.5 # 0.5 to 1.5
x = np.sin(np.arange(0,SEQ_LEN) * freq) * ampl
return x

Write a bunch of these time-series sequences to CSV files (train.csv and valid.csv) and we are in business. We’ve got data.

Input Function

The way the Estimators API in TensorFlow works is that you need to provide an input_fn to read your data. You don’t provide x and y values. Instead, you provide a function that returns inputs and labels. The inputs is a dictionary of all your inputs (name-of-input to tensor) and the labels is a tensor.

In our case, our CSV file simply consists of 10 floating point numbers. The DEFAULTS serves to specify the data type for the tensors. We want to read the data 20 lines at a time; that’s the BATCH_SIZE. A batch is the number of samples over which gradient descent is performed. You will need to experiment with this number — if it is too large, your training will be slow and if it is too small, your training will bounce around won’t converge. Since we have only input, the name you give that input doesn’t really matter. We’ll call it rawdata.

DEFAULTS = [[0.0] for x in xrange(0, SEQ_LEN)]
BATCH_SIZE = 20
TIMESERIES_COL = 'rawdata'
N_OUTPUTS = 2 # in each sequence, 1-8 are features, and 9-10 is label
N_INPUTS = SEQ_LEN - N_OUTPUTS

The input_fn that the Estimators API wants should take no parameters. However, we do want to be able to provide the filename(s) to read on the command line. So, Let’s write a read_dataset() function that returns an input_fn.

# read data and convert to needed format
def read_dataset(filename, mode=tf.contrib.learn.ModeKeys.TRAIN):
def _input_fn():
num_epochs = 100 if mode == tf.contrib.learn.ModeKeys.TRAIN else 1

The first thing that we do is to decide the number of epochs. This is how many times we need to go through the dataset. We’ll go through the dataset 100 times if we are training, but only once if we are evaluating.

Next, we’ll do wild-card expansion. Lots of times, Big Data programs produce sharded files such as train.csv-0001-of-0036 and so, we’d like to simply provide train.csv* as the input. We use this to populate a filename queue and then use a TextLineReader to read the data:

# could be a path to one file or a file pattern.
input_file_names = tf.train.match_filenames_once(filename)
filename_queue = tf.train.string_input_producer(
input_file_names, num_epochs=num_epochs, shuffle=True)
reader = tf.TextLineReader()
_, value = reader.read_up_to(filename_queue, num_records=BATCH_SIZE)
value_column = tf.expand_dims(value, -1)

After this, we decode the data, treating the first 8 numbers as inputs and the last two as the label. The inputs, when we read it, is a list of 8 tensors each of which is batchsize x 1. Using tf.concat makes it a single 8xbatchsize tensor. This is important because the Estimators API wants tensors not lists.

# all_data is a list of tensors
all_data = tf.decode_csv(value_column, record_defaults=DEFAULTS)
inputs = all_data[:len(all_data)-N_OUTPUTS] # first few values
label = all_data[len(all_data)-N_OUTPUTS : ] # last few values

# from list of tensors to tensor with one more dimension
inputs = tf.concat(inputs, axis=1)
label = tf.concat(label, axis=1)
print 'inputs={}'.format(inputs)

return {TIMESERIES_COL: inputs}, label # dict of features, label

Define a RNN

If we we were using a LinearRegressor, DNNRegressor, DNNLinearCombinedRegressor, etc., we could have simply used the existing class. But because we are doing sequence-to-sequence prediction, we have to write our own model function. At least right now, the Estimators API doesn’t come with an out-of-the-box RNNRegressor. So, let’s roll out our own RNN model using low-level TensorFlow functions.

LSTM_SIZE = 3  # number of hidden layers in each of the LSTM cells

# create the inference model
def simple_rnn(features, targets, mode):
# 0. Reformat input shape to become a sequence
x = tf.split(features[TIMESERIES_COL], N_INPUTS, 1)
#print 'x={}'.format(x)

# 1. configure the RNN
lstm_cell = rnn.BasicLSTMCell(LSTM_SIZE, forget_bias=1.0)
outputs, _ = rnn.static_rnn(lstm_cell, x, dtype=tf.float32)

# slice to keep only the last cell of the RNN
outputs = outputs[-1]
#print 'last outputs={}'.format(outputs)

# output is result of linear activation of last layer of RNN
weight = tf.Variable(tf.random_normal([LSTM_SIZE, N_OUTPUTS]))
bias = tf.Variable(tf.random_normal([N_OUTPUTS]))
predictions = tf.matmul(outputs, weight) + bias

# 2. Define the loss function for training/evaluation
#print 'targets={}'.format(targets)
#print 'preds={}'.format(predictions)
loss = tf.losses.mean_squared_error(targets, predictions)
eval_metric_ops = {
"rmse": tf.metrics.root_mean_squared_error(targets, predictions)
}

# 3. Define the training operation/optimizer
train_op = tf.contrib.layers.optimize_loss(
loss=loss,
global_step=tf.contrib.framework.get_global_step(),
learning_rate=0.01,
optimizer="SGD")

# 4. Create predictions
predictions_dict = {"predicted": predictions}

# 5. return ModelFnOps
return tflearn.ModelFnOps(
mode=mode,
predictions=predictions_dict,
loss=loss,
train_op=train_op,
eval_metric_ops=eval_metric_ops)

Recall that we had to package up the inputs into a single tensor to pass it as the features out of the input_fn. Step 0 simply reverses that process and gets back the list of tensors.

A Recurrent Neural Network consists of a BasicLSTMLCell to which you pass in the input. You get back outputs and states. Slice it to keep only the last cell of the RNN — we are not using any of the previous states. Other architectures are possible. For example, I could have trained the network to have only one output always and used rolling windows. I’ll talk about how to modify my example to do that at the end of this article.

The comments in the code above are pretty self-explanatory regarding the other steps. We are not doing anything surprising there. This is a regression problem, so I’m using RMSE.

Create an Experiment

The Experiment class is the smart one in the Estimators API. It knows how to take the model function, input functions for training and validation and do reasonable things regarding distribution, early stopping, etc. So, let’s hand off our pieces to it:

def get_train():
return read_dataset('train.csv', mode=tf.contrib.learn.ModeKeys.TRAIN)

def get_valid():
return read_dataset('valid.csv', mode=tf.contrib.learn.ModeKeys.EVAL)

def experiment_fn(output_dir):
# run experiment
return tflearn.Experiment(
tflearn.Estimator(model_fn=simple_rnn, model_dir=output_dir),
train_input_fn=get_train(),
eval_input_fn=get_valid(),
eval_metrics={
'rmse': tflearn.MetricSpec(
metric_fn=metrics.streaming_root_mean_squared_error
)
}
)

shutil.rmtree('outputdir', ignore_errors=True) # start fresh each time
learn_runner.run(experiment_fn, 'outputdir')

Training on the Cloud

The code above works on a single machine, and if you package it up into a Python module, you can also submit it to Cloud ML Engine to have it trained in a serverless way:

OUTDIR=gs://${BUCKET}/simplernn/model_trained
JOBNAME=simplernn_$(date -u +%y%m%d_%H%M%S)
REGION=us-central1
gsutil -m rm -rf $OUTDIR
gcloud ml-engine jobs submit training $JOBNAME \
--region=$REGION \
--module-name=trainer.task \
--package-path=${REPO}/simplernn/trainer \
--job-dir=$OUTDIR \
--staging-bucket=gs://$BUCKET \
--scale-tier=BASIC \
--runtime-version=1.0 \
-- \
--train_data_paths="gs://${BUCKET}/train.csv*" \
--eval_data_paths="gs://${BUCKET}/valid.csv*" \
--output_dir=$OUTDIR \
--num_epochs=100

A common variant: very long time-series

In this article, I assumed that you have thousands of short (10-element) sequences. What if you have a very long sequence? For example, you might have the price of a stock or the temperature reading from a sensor. In such cases, what you could do is to break up your long sequence into rolling sequences of fixed length. This length is obviously arbitrary, but think of it as the “look-back” interval of the RNN. Here is TensorFlow code that will take a long sequence and break into smaller, overlapping sequences of a fixed length:

import tensorflow as tf
import numpy as np
def breakup(sess, x, lookback_len):
N = sess.run(tf.size(x))
windows = [tf.slice(x, [b], [lookback_len]) for b in xrange(0, N-lookback_len)]
windows = tf.stack(windows)
return windows

For example:

x = tf.constant(np.arange(1,11, dtype=np.float32))
with tf.Session() as sess:
print 'input=', x.eval()
seqx = breakup(sess, x, 5)
print 'output=', seqx.eval()

will result in:

input= [  1.   2.   3.   4.   5.   6.   7.   8.   9.  10.]
output= [[ 1. 2. 3. 4. 5.]
[ 2. 3. 4. 5. 6.]
[ 3. 4. 5. 6. 7.]
[ 4. 5. 6. 7. 8.]
[ 5. 6. 7. 8. 9.]]

Once you have these fixed length sequences, everything is the same as before.

Happy coding!