DeepAR, a highly effective algorithm for time series forecasting

Next generation probabilistic autoregressive time series forecasting, implemented on PyTorch Forecasting.

Insaf Ajili
hipay-tech
11 min readJan 13, 2023

--

Time series modeling at HiPay

As a payment company, we process huge amounts of transactions for thousands of customers. Those transactions are described by many indicators that describe our activity over time (time series !).
Over the years, we spent a lot of time and energy researching that specific kind of data, from a machine learning point of view. This gave birth to a number of large-scale production systems designed mainly for anomaly detection, monitoring, and forecasting. We share here hands-on information about using DeepAR, an efficient and powerful algorithm.

Source: unsplash.

Deep AutoRegressive Model (DeepAR) is a recent supervised machine learning algorithm for modeling time series using Recurrent Neural Networks (RNNs). The method builds upon previous research on deep learning for time series data, and tailors a LSTM-based recurrent neural network architecture to the probabilistic forecasting problem. Long Short-Term Memory (LSTM) is an advanced RNN, a sequential network, that allows information to persist. It is capable of handling the vanishing gradient problem faced by RNNs due to long-term dependencies. It manages to keep, forget or ignore data points based on a probabilistic model. Open source implementations are available in the gluonts and pytorch-forecasting Python libraries.

I. Model architecture

DeepAR is a method that provides a global forecasting model based on an autoregressive and recurrent neural network. The model is autoregressive because the observation of the last time step is used as input to the network at the next time step and recurrent because the output of the network at a given time is fed back into the network at the next time.

Building on RNN architecture, DeepAR uses LSTM cells to fit our predictor variables to our variable of interest. It implements an encoder-decoder setup common in sequence-to-sequence models. In the DeepAR model, the encoder and decoder networks have the same architecture. The encoder takes a variable-length sequence as input and transforms it into a hidden state with a fixed shape. The decoder maps the encoded state of a fixed shape to a variable-length sequence.

The encoder-decoder architecture.
Seq2Seq-Encoder-Decoder Model.

DeepAR’s core architecture builds on the same concept as the Encoder-Decoder structure. Both encoder and the decoder are typically LSTM models.

The left side of the figure presents the encoding phase(training), at each time step t, the network receive the target values at the previous time step z(t-1) and covariates at current time step x(t) as well as the previous network hidden state h(t−1) and outputs the hidden state h(t). The state h(t) is used to tune the parameters Θ of the model for the likelihood of z(t). The state h(t) at time t is then used to compute the one-step ahead forecast. The current target value z(t) and hidden state h(t) are passed to the next time step and the training process continues.

The right part of the figure presents the decoding phase (prediction). The initial states h(0) of the decoder are set to the final states h(t) of the encoder. Using these initial states, decoder starts generating the output sequence. During prediction, the input to the decoder at each time step is the output from the previous time step.

Some advantages of DeepAR approach are:

  1. Multiple time-series support: The model is trained on multiple time-series, learning global characteristics that further enhance forecasting accuracy.
  2. Probabilistic forecasting: DeepAR does not estimate the time series’ future values but their future probability distribution. These probabilities can be used to develop quantile forecasts for all sub-ranges in the prediction horizon.
  3. Extra covariates: DeepAR allows extra features, it can use covariates with little training history.
  4. Short time series: DeepAR can provide forecasts for time-series that have little history.

II. Dataset presentation

Our dataset contains a set of merchants with different characteristics such as the id of the merchant (business_id), the created amount, the risk level, etc. We want to predict the transaction amount for each merchant for the next three months to get an idea of the evolution of our merchants’ flows and thus detect anomalies that arise in the data by comparing the real values with the predicted values. The transaction amount is presented in the form of a time series, we have transaction amount data for each day.

We use BigQuery to store our data, a fast, powerful, and flexible tabular data storage service that’s tightly integrated with other Google Cloud Platform services. It is cloud-based and offers the ability to perform fast SQL queries and interactive analysis on very large datasets.

Reading from BigQuery

First, we need to transform our time series data into a pandas dataframe where each row can be identified with a timestamp and a time series.

In the example below we show an example on how to read the “trx_created_amount” metric for a merchant with a business_id=1 from BigQuery and convert it to a dataframe. This example corresponds to a one-time series (one series per business_id).

from google.cloud import bigquery
import pandas as pd
import numpy as np
import datetime
import pandas_gbq

def get_data(sql):

all_data = client.query(sql).to_dataframe()
all_data = all_data.sort_values(by=["date_operation"]).reset_index(drop=True)
all_data['date_operation'] = pd.to_datetime(all_data['date_operation'], errors='coerce')

return all_data

client = bigquery.Client()

sql_data="""

SELECT
date_operation,
business_id,
trx_created_amount
FROM
`project.table` WHERE
business_id=1 and
date_operation >= "2007-01-01" """

all_data = get_data(sql_data)
all_data.head()
all_data[['date_operation', "trx_created_amount"]].describe()

Now let’s visualize our data over time.

import plotly.graph_objects as go

fig = go.Figure()
fig.add_trace(go.Scatter(x=all_data.date_operation, y=all_data.trx_created_amount, line={"color": "blue"},
name="true values"))

fig.update_layout(
title="trx_created_amount")
Trx_created_amount metric.

III. Features extraction

The dataset is already in the correct format, the amount for each merchant (business_id) on each date, but misses some important features.

In addition to historic amounts, we extract more features to modelize our time series such as:

  • The average amount,
  • The moving average with three window sizes (7, 14 and 30),
  • The gap between the max and the min.

We could have used some fancy Python library like tsfresh, but we need simple features so we juste use SQL.

sql_df="""
WITH
features AS (
SELECT
date_operation,
business_id,
trx_created_amount
FROM
`project.table` WHERE
date_operation >= "2007-01-01" ),
MV_features AS (
SELECT
date_operation,
business_id,
AVG(trx_created_amount) OVER(PARTITION BY business_id ORDER BY business_id, date_operation ROWS BETWEEN 7 PRECEDING AND CURRENT ROW) AS MVavg7_metric,
AVG(trx_created_amount) OVER(PARTITION BY business_id ORDER BY business_id, date_operation ROWS BETWEEN 14 PRECEDING AND CURRENT ROW) AS MVavg14_metric,
AVG(trx_created_amount) OVER(PARTITION BY business_id ORDER BY business_id, date_operation ROWS BETWEEN 30 PRECEDING AND CURRENT ROW) AS MVavg30_metric
FROM
`project.table` WHERE
date_operation >= "2007-01-01" ),
AVG_features AS (
SELECT
business_id,
ROUND(AVG(trx_created_amount),2) AS avg_metric,
Max(trx_created_amount)-Min(trx_created_amount) AS ecart_metric,

FROM
`project.table` WHERE
date_operation >= "2007-01-01"
GROUP BY
business_id )
SELECT
*
FROM
features
INNER JOIN
MV_features USING (date_operation, business_id)
INNER JOIN
AVG_features
USING
( business_id)
"""
df_total = get_data(sql_df)
df_total.head()
Features extracted for the merchant with business_id “1”.

Most importantly, we need to add a time index (time_idx) that is incremented by one for each time step. This is used to determine the sequence of samples. If there no missings observations, the time index should increase by +1 for each subsequent sample.

It is beneficial to add date features, in fact, date column have lots of hidden information like:

  • Day of the Month
  • Week of the year
  • Month of the year
  • The quarter of the date.
  • Which year
  • Is it a holiday or not.

We define a list of French holidays, to be considered in the learning process. We retrieved the information from the holidays library. Using the snippet below, we print all France holidays for the years 2021–2022.

for holiday in holidays.FR(years=[2021, 2022]).items():
print(holiday)

It returns the following list:

Another useful feature of the library is to check whether a given date is a holiday or not. This way, we can easily create a binary feature that is valued to 1 if there is a holiday on that date, and 0 otherwise.

def get_features(df, date_min):
fr_holidays = holidays.FR()
df_total = df.reset_index(drop=True)
df_total = df_total.assign(business_id=df_total["business_id"].astype(str))

df_total = df_total.assign(day=(df_total["date_operation"].dt.day).astype(str))
df_total = df_total.assign(week=(df_total["date_operation"].dt.isocalendar().week).astype(str))
df_total = df_total.assign(month=(df_total["date_operation"].dt.month).astype(str))
df_total = df_total.assign(quarter=(df_total["date_operation"].dt.quarter).astype(str))
df_total = df_total.assign(year=(df_total["date_operation"].dt.year).astype(str))
df_total["is_holiday"] = df_total['date_operation'].apply(lambda x: 1 if x in fr_holidays else 0).astype(str)

df_total = df_total.assign(time_idx=(df_total["date_operation"] - np.datetime64(date_min)).dt.days)
df_total["idx_str"] = df_total["time_idx"].astype(str)
df_total = df_total.sort_values(["date_operation"])
df_total = df_total[["business_id", "date_operation", "trx_created_amount","avg_metric","MVavg7_metric","MVavg14_metric","MVavg30_metric", "ecart_metric","time_idx", "idx_str", "day", "quarter", "week", "month","year", "is_holiday"]]

return df_total

df_features=get_features(df_total,date_min= min(df_total["date_operation"]))

IV. Splitting data

The dataset is split into two groups: a training dataset and a testing dataset. We define four parameters:

  • Training_date_min,
  • Training_date_max,
  • Testing_date_min,
  • Testing date_max.

The training process is done on data between (training_date_min and training_date_max). We want to predict the transaction amount of the next three months.

def split_data(df, training_date_min, training_date_max, testing_date_min, testing_date_max):
df_train = df[(df['date_operation'] >= training_date_min) & (df['date_operation'] <= training_date_max)]
df_test = df[(df['date_operation'] >= testing_date_min) & (df['date_operation'] <= testing_date_max)]
return df_train, df_test

training_date_min="2010-01-01"
training_date_max="2022-01-01"
prediction_length=90

testing_date_min=datetime.datetime.strptime(training_date_max, '%Y-%m-%d').date() +datetime.timedelta(days=1)
testing_date_max=testing_date_min+datetime.timedelta(days=prediction_length)
testing_date_min=testing_date_min.strftime('%Y-%m-%d')
testing_date_max=testing_date_max.strftime('%Y-%m-%d')
df_train, df_test = split_data(df_features, training_date_min, training_date_max, testing_date_min, testing_date_max)
fig.add_vline(x=testing_date_min, line_width=3, line_dash="dash", line_color="green", name="split data")
Splitting data into train and test sets.

V. Creating a PyTorch dataset for training and validation

In order to feed data to our model, we convert the dataframe into a PyTorch Forecasting TimeSeriesDataset. This class is used to prepare the dataset for training with PyTorch. This class takes care of the variable transformation, random sampling, missing value filling, etc.

But first we should transform data into a specific format to be used by the TimeSeriesDataset.

It should be in a pandas DataFrame and have a categorical column to identify each series and an integer column to specify the time of the record.

The training dataset is all the data except the last max_prediction_length data points of each Time series (each time series correspond to data points with the same group_ids). Those last data points are filtered by the training cutoff.

  • time_idx: integer column denoting the time index. This columns is used to determine the sequence of samples.
  • target: column denoting the target.
  • group_ids: list of column names identifying a time series. This means that the group_ids identify a sample together with the time_idx.
  • min/max encoder length: minimum and maximum history lengths used by the time series dataset. The benefit of this flexibility in length is if we have data with different length we will not find a problem in prediction process fro very short time series samples.
  • min/max prediction length: minimum/maximum prediction/decoder length, and other parameters found here.
from pytorch_forecasting import Baseline, TimeSeriesDataSet, DeepAR
from pytorch_forecasting.data import NaNLabelEncoder,GroupNormalizer


list_features_static_real = ["avg_metric", "ecart_metric"]
list_features=["trx_created_amount"]+["MVavg7_metric","MVavg14_metric","MVavg30_metric"]
max_prediction_length= 90 # forecat of three months
max_encoder_length= 14 # using history of 14
batch_size= 30

def create_dataset(df_train):
training_cutoff = df_train["time_idx"].max() - max_prediction_length
training = TimeSeriesDataSet(
df_train[lambda x: x.time_idx <= training_cutoff],
time_idx="time_idx",
target="trx_created_amount",
categorical_encoders={"business_id": NaNLabelEncoder(add_nan=True),
"idx_str": NaNLabelEncoder(add_nan=True).fit(df_train.idx_str),
"day": NaNLabelEncoder(add_nan=True).fit(df_train.day),
"quarter": NaNLabelEncoder(add_nan=True).fit(df_train.quarter),
"week": NaNLabelEncoder(add_nan=True).fit(df_train.week),
"month": NaNLabelEncoder(add_nan=True).fit(df_train.month),
"year": NaNLabelEncoder(add_nan=True).fit(df_train.year),
"is_holiday": NaNLabelEncoder(add_nan=True).fit(df_train.is_holiday)},
group_ids=["business_id"],
target_normalizer=GroupNormalizer(groups=["business_id"]),
static_categoricals=["business_id"],
static_reals=list_features_static_real,
time_varying_unknown_reals=list_features,
time_varying_known_categoricals=["idx_str", "day", "quarter", "week", "month", "year", "is_holiday"],
max_encoder_length=max_encoder_length,
max_prediction_length=max_prediction_length,
allow_missing_timesteps=True,
add_target_scales=True,
add_encoder_length=True
)
return training_cutoff, training

training_cutoff, training_dataset = create_dataset(df_features)

We can display all parameters of our dataset with the following function:

training_dataset.get_parameters()

We create a validation dataset using the same format as the training dataset. We choose to use the last three months as a validation set. We set the predict parameter to True to predict the last max_prediction_length points in each time series.

validation_dataset = TimeSeriesDataSet.from_dataset(training_dataset, df_train, predict=True, stop_randomization=True, min_prediction_idx=training_cutoff + 1)

VI. Training and validation dataloaders

Instead of using the entire dataset in one training pass, we instead prefer to use mini-batch training. Now how samples are sampled into batches for training, is determined by the DataLoader. DataLoader object fetches data from a Dataset and serves the data up in batches.

Principle of Batches creating using Pytorch Dataloader.
train_dataloader = training_dataset.to_dataloader(train=True, batch_size=batch_size, num_workers=0)
val_dataloader = validation_dataset.to_dataloader(train=False, batch_size=batch_size, num_workers=0) ​

To get a batch of data from the Dataloader we iterate through the Dataloader using the iter() and next().

it=iter(train_dataloader)
next(it)

VII. Building the model

The steps to building and using a model are:

  • Define: What type of model will it be? Some other parameters of the model type are specified too.
  • Fit: Fit our model on a data.
  • Predict: Use saved model to predict data.
  • Evaluate: Determine how accurate the model’s predictions are.

Defining a DeepAR model

DeepAR basic hyperparameters:

  • cell_type: recurrent cell type.
  • hidden_size: hidden recurrent size.
  • rnn_layers: number of hidden layers in the RNN.
  • dropout: dropout rate in RNN layers.

list_features_decoder = list(list_features)
list_features_decoder.remove("trx_created_amount")

def define_model(training):
list_features_decoder = list(list_features)
list_features_decoder.remove("trx_created_amount")
model_deepAR=DeepAR.from_dataset(
training,
learning_rate=0.03,
hidden_size=25,
rnn_layers=2,
dropout=0.2,
time_varying_reals_encoder=list_features,
time_varying_reals_decoder=list_features_decoder,
)
model=model_deepAR
print(f"Number of parameters in network: {model.size() / 1e3:.1f}k")
return model

model = define_model(training=training_dataset)
Number of parameters in network: 97.2k

Preparing for training

We used Pytorch Lightning, a framework designed on top of PyTorch to simplify the training task of neural networks. It helps developers eliminate loops to go through train data in batches to train networks using the Trainer object. Training process stops once the number of epochs is reached.

import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint

def train_step( model, train_dataloader, val_dataloader):
checkpoint_callback = ModelCheckpoint(dirpath='my/path/', monitor="val_loss", mode="min")
trainer = pl.Trainer(
max_epochs=30,
enable_model_summary=True,
gradient_clip_val=0.1,
callbacks=[checkpoint_callback],
enable_checkpointing=True)

trainer.fit(model, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader)
best_model_path = trainer.checkpoint_callback.best_model_path
best_model = DeepAR.load_from_checkpoint(best_model_path)
return best_model

best_model =train_step(model, train_dataloader, val_dataloader)

Prediction data

We predict the validation dataset to evaluate model performance.

raw_predictions, x = best_model.predict(val_dataloader, mode="raw", return_x=True)

We can now also plot predictions against actual data with the plot_prediction() method.

best_model.plot_prediction(x, raw_predictions, idx=0, add_loss_to_title=True);
Plot predictions data.

Evaluating the model

To validate the results, we simply compare the predicted data to the actual data in the validation dataset.

import torch
actuals = torch.cat([y[0] for x, y in iter(val_dataloader)])
predictions = best_model.predict(val_dataloader)
(actuals - predictions).abs().mean()

Thank for reading, give me a little 👏 if you found this to be useful, and see you in the next story ! You can also leave a comment if you have a question about some specific aspect of the story 😉

--

--