A machine learning pipeline with TensorFlow Estimators and Google Cloud Platform

Giulia Bianchi
Publicis Sapient France
14 min readDec 15, 2020
Photo by Negative Space from Pexels

When you are assigned the task of building and industrialising a machine learning pipeline, there are quite some challenges to tackle that strongly depend on the constraints offered by the specific use case and the technical environment. Whatever the context, one thing that matters the most to accomplish your mission is the team work and the collaboration between data scientists and data engineers. In this article I will show how TensorFlow Estimators can be used for effective integration of data science and data engineering tasks when developing a machine learning pipeline in Google Cloud Platform.

I will do so in the context of a renown use case: the NYC taxi trip duration Kaggle competition.

The problem statement is straightforward: given historical data about taxi trips in New York City, predict the duration of future trips. From the machine learning point of view, the solution consists of a classical inference task: fit a model on training data and make a prediction on test data. As the prediction is performed on the test data points all at once, we talk about batch inference. To better showcase the usage of TensorFlow Estimators, I decided to add an extra challenge: keep the same predictive task and also do real-time prediction. We talk about continuous inference as data points to predict on come in one by one. To picture the use case, imagine that you have an app to book a taxi ride. When you book it, the app estimates how much time the ride is going to last.

The estimation of the trip duration is computed given the desired destination, the actual location of the user and the hour and date of the request thanks to a machine learning model previously trained on historical data. My idea is not to build an actual application and have real users testing it, rather to simulate a real time data stream using publicly available data (year 2017, 2018, 2019).

This simple use case implies some technical challenges, for this article in particular I will go through a full implementation of the machine learning part in Google Cloud Platform (GCP), using TensorFlow Estimators. The whole project was done in collaboration with Loïc Divad, data engineer, and was presented with the the title of "Event Driven Machine Learning", as a conference at XebiCon19 (🇫🇷), and as a Confluent webinar (🇫🇷).

Ready to go for a ride?!

Data exploration

Raw data are stored in BigQuery (bq). BigQuery is a fully-managed data warehouse that allows storing and analysing data. This is very convenient as raw data count for approximately 35 GB. Also, through its web interface I could start exploring them. For further data analysis and visualisation I used AI Platform Notebooks (previously known as Cloud Datalab) as it provides JupyterLab instances. From a JupyterLab instance I could access directly data stored in BigQuery, clean the data, create train and test sets and visualise exploration results.

So the idea was to exploit BigQuery to have data as clean and as fast as possible before the modelling phase, for which I used TensorFlow Estimators. Exploring data in BigQuery is practical but it is better if the code is somewhere it can be versioned, and JupyterLab in AI Platform Notebooks allows you to do that. So, once I was satisfied with the preprocessing query, I could interact from a notebook with BigQuery using the bq command line tool and specifically the command to create a table from a query result:

%%bashbq query \
--destination_table <project_id>:<dataset_id>.<table_id> \
--replace \
--use_legacy_sql=false \
--allow_large_results \
'<QUERY>'

Note that the magic command is required to access BigQuery from Jupyter. I am not going to detail the whole query because it’s not that interesting, but one functionality about geographical data deserves to be mentioned as it’s less known. In the dataset about taxi trips, one important piece of information is the location of pick-ups and drop-offs. This information is given as zones, around 260 zones that cover all New York territory that you can see in the picture:

NYC Zones

These areas are more or less wide and don’t have precise position. Due to this lack of precision trip duration prediction will surely be inaccurate, but we can still try to compute an approximation of the distance between two zones. This is possible thanks to open data, again found at NYC OpenData, that provide these zones as geography objects, and to BigQuery GIS (Geographic Information Systems). Given two zones, we can compute the distance in meters between zone centroids using geography functions as follows:

SELECT
ST_DISTANCE(
ST_CENTROID(pickup_zone_geom),
ST_CENTROID(dropoff_zone_geom)
) AS distance
FROM <table>;

The final output of the preprocessing step is two tables, one for training (years 2017 and 2018) and one for test (year 2019) with the following information:

  • pick-up day of week
  • pick-up hour of the day
  • pick-up week of year
  • pick-up zone name
  • drop-off zone name
  • approximated distance
  • airport trip (a flag telling whether the ride involved an airport as starting or end point)
  • taxi size (a flag encoding the number of passengers — between 1 and 4, or 5 or more)
  • trip duration — the target variable.

Data viz

AI Platform Notebooks provides convenient connectors to work on data stored in BigQuery.

Other services are available in GCP for data visualisation and exploration purposes, they include Data Studio and DataPrep. Data Studio is more useful for creating interactive dashboards and reports of a finalised analysis and DataPrep is a much more complete tool that allows to perform advanced data transformations. Ultimately, it runs Dataflow jobs behind the scenes.

My choice is motivated by the ease of use of notebooks. Also, the pricing model is easy to understand: you pay for the BigQuery jobs underlying your queries and the VM instantiated by the notebook instance.

To query data in BigQuery from notebooks, you can launch a query by specifying %%bigquery in the notebook cell. Here is a link to available bq functions.

import matplotlib
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline #Load the magic commands from the client library
%load_ext google.cloud.bigquery

The results of a query is a pandas dataframe and you can use libraries such as matplotlib for visualisation.

Let’s see for example the number of trips and average trip duration per hour of the day:

%%bigquery count_per_hourofday_df
SELECT
hourofday,
COUNT(hourofday) AS record_count,
AVG(trip_duration) AS avg_duration
FROM `event-driven-ml.edml_nyc_yellow_taxi_us.viz_gis_feat_eng_2018`
GROUP BY hourofday
ORDER BY hourofday ASC
First 6 months of 2018 — trip count per hour and average duration (min)
%%bigquery count_per_day_dfSELECT
DATETIME_TRUNC(pickup_datetime, DAY) as pickup_date,
COUNT(DATETIME_TRUNC(pickup_datetime, DAY)) AS record_count,
AVG(trip_duration) AS avg_duration
FROM `event-driven-ml.edml_nyc_yellow_taxi_us.viz_gis_feat_eng_<year>`GROUP BY pickup_date
ORDER BY pickup_date ASC
First 6 months of 2017 (left) and 2018 (right) — trip count (above) and average trip duration in minutes (below) per day.
First 6 months of 2019 — trip count (above) and average trip duration in minutes (below) per day.

Time spent on cleaning up and preprocessing data is never wasted but the fun part is modelling.

The estimator

I used TensorFlow and especially Estimators (TF Estimators) to showcase how to industrialise a TensorFlow machine learning pipeline in GCP.

TensorFlow Estimator is meant as a high level API and was born to facilitate development and deployment of machine learning models including neural networks. With the release of TensorFlow 2, the preferred high level API is now Keras (the TensorFlow integrated Keras) but it doesn't cover exactly the same scope. TF Estimator was not only created to simplify model design, but also for specifying, training, evaluating, and deploying machine learning models. The key concept of TF Estimators is pre-made estimators, i.e. models, for classification and regression, but you can also convert a custom Keras model into an estimator so the two APIs coexist and can be used in a complementary manner. Other capabilities include the ability of running Estimator-based models:

  • on a local host or on a distributed multi-server environment without changing the model
  • on CPUs, GPUs, or TPUs without recoding the model.

The typical structure of a machine learning pipeline built with Estimators consists of four elements:

  1. an input function
  2. the feature columns
  3. an Estimator
  4. a call to a method for training, evaluation or inference

Step 1: data ingestion with an input function

The first step of the pipeline consists in reading the data. This is done by writing an input function (it can be only one that can handle reading data for both training and prediction or 2 separate functions for each phase). An Estimator expects data to be formatted as a pair of objects: a dictionary with feature names and feature values, and a tensor of labels. It is recommended to return data as a tf.data.Dataset. The input function is executed within a tf.Graph, which means that operations are optimised before execution.

In our case, data are stored in BigQuery and I could access them directly thanks to TensorFlow IO, a TensorFlow extension that allows to read data from external sources.

Input function definition

Step 2: feature pre-processing with feature columns

The step following data ingestion is represented by the identification and pre-preprocessing of features. Another TensorFlow module is used: tf.feature_column. This module allows you to perform a number of predictive variable pre-processing like mapping categorical or string features to numerical values. It's powerful but you soon reach its limits. The major limitation is that you can "map" transformations (line by line transformations like multiplying by a given constant) but if you need to pass over the full dataset to compute a scaling factor or get the full list of a variable categories, you can't do it automatically and you need to compute values beforehand. Available transformations are: bucketing, binarisation, one-hot encoding, embedding and feature crossing among others. The embedding is the only one transformation that is trained during training.

I computed features based on the initial four variables available in the dataset (pick-up location and timestamp, drop-off location, passenger number).

Pick-up timestamps were used to extract temporal features: the week of year, the hour of the day and the day of the week.

Pick-up and drop-off locations were used to compute geographical features: approximated distance and if the trip was to or from an airport.

The number of passengers was used to distinguish between a 5-seat taxi and a minivan.

I computed sparse and dense features (one hot encoded and embedded) and I also combined categories together, as for example hour of the day and day of week, so to capture the combined daily and weekly seasonality of traffic jam. These combinations are called "feature cross" and are useful when the relationship between two features is important. The drawback is that the majority of the combinations may not be relevant at all, and by one-hot encoding them the feature space gets really sparse. These problems are handled partially by the model as explained in the next session.

Examples of usage of tf.feature_column

Step 3: model definition with an Estimator

This is the central part of the pipeline and the fact that the estimator is separated from the rest is key for an iterative development: you can choose a very simple pre-made estimator and focus on implementing a fully functional pipeline from end to end. Only then you can focus again on the model and improve its performances. The two previous and following steps should't be much impacted.

The available pre-made estimators are:

The forth option is a hybrid model called wide and deep model and was presented in a recent paper. It is made of two parts : one connects inputs to outputs via a deep neural network, the other one connects inputs to outputs via a linear model. The linear part takes in sparse columns and the DNN takes in real-valued columns. This approach can be useful when dealing with sparse features as feature crosses, which is our case. The linear part is able to memorise previously seen interactions while the DNN part help generalise to new unseen interactions. These considerations motivated the choice of such a model and it was instantiated as follows:

Estimator definition

The heart of every Estimator — whether pre-made or custom — is its model function, which is a method that builds graphs for training, evaluation, and prediction. When you are using a pre-made Estimator, the model function has already been implemented. When relying on a custom Estimator, you must write the model function yourself. You can convert existing Keras models to Estimators with tf.keras.estimator.model_to_estimator. Doing so enables your Keras model to access Estimator's strengths, such as distributed training.

Step 4: train, evaluate and predict with the appropriate method

All Estimators provide train, evaluate, and predict methods. Each of them takes an input_fn as first parameter to know how to read data. The predict method returns an iterable.

Estimators export SavedModels through tf.Estimator.export_saved_model: it exports inference graph as a SavedModel into the given directory. This method builds a new graph by first calling the serving_input_receiver_fn to obtain feature tensors, and then calling this Estimator's model_fn to generate the model graph based on those features.

For automatic training and evaluation there’s a specific method: train_and_evaluate. This utility function trains, evaluates, and (optionally) exports the model by using the given estimator. Moreover, it provides consistent behaviour for both local (non-distributed) and distributed configurations. All training related specification is held in train_spec, including training input_fn and training max steps, etc. All evaluation and export related specifications are held in eval_spec, including evaluation input_fn, steps, exporter, etc. The exporter parameter is needed if you want to save the model. It is an instance of BestExporter (it performs a model export every time the new model is better than any existing model) or LatestExporter (in addition to exporting, this class also garbage collects stale exports) or FinalExporter (this class performs a single export at the end of training).

Whether you use train_and_evaluate or export_saved_model, when you save an Estimator you need to create a serving_input_receiver_fn. This function builds a part of a tf.Graph that parses the raw data received by the SavedModel. It takes no argument and returns a ServingInputReceiver instance. It basically specifies the features to be passed to the model.

Packaging up, training and prediction

AI Platform Training’s hosted machine learning frameworks are TensorFlow, scikit-learn, or XGBoost. It is also possible to use other frameworks, but you need to build a container. For the supported frameworks, before you can run your training application with AI Platform Training, you must upload your code and any dependencies into a Cloud Storage bucket that your Google Cloud project can access.

Package up code

You can structure your training application in any way you like. However, there is a recommended pattern that is used in many examples and that is suggested in the official documentation. Here is a resume of the desired structure:

  • create a main project directory, containing a setup.py file
  • create a subdirectory named trainer to store your main application module
  • name your main application module task.py.

The trainer directory usually contains the following source files:

  • task.py contains the application logic that manages the training job. It is the entry point, it parses parameters and, in the case of TensforFlow Estimators, it calls train_and_evaluate
  • model.py contains the logic of the model
  • util.py if present, it contains code to run the training application
$ tree edml-trainer/  # training application name
.
├── setup.py
└── trainer
├── __init__.py
├── task.py # main
├── model.py # model
└── util.py # utilities

Here is how the scripts structure look like:

task.py
model.py
util.py

Training with ai-platform

To launch the training job I used the command gcloud ai-platform jobs submit training:

#!/usr/bin/env bashBUCKET=edml
TRAINER_PACKAGE_PATH=gs://$BUCKET/data/taxi-trips/sources
MAIN_TRAINER_MODULE="trainer.task"
...
OUTDIR=gs://$BUCKET/ai-platform/models/$VERSION
gcloud ai-platform jobs submit training $JOB_NAME \
--job-dir $JOB_DIR \
--package-path $TRAINER_PACKAGE_PATH \
--module-name $MAIN_TRAINER_MODULE \
--region $REGION \
-- \
--batch-size=$BATCH_SIZE \
--output-dir=$OUTDIR \
--train-steps=2800000 \
--eval-steps=3
  • The empty -- flag marks the end of the gcloud specific flags and the start of the USER_ARGS that you want to pass to your application
  • --package-path points to a directory containing the Python source for the job. The gcloud tool looks for a setup.py file in the parent of trainer to build the package. As the application package can contain multiple modules, the one that contains the application entry point has to be specified. The training service runs that module by invoking Python. By following the recommended package structure the main module is task.py coming from trainer. Thus the --module-name flag is set to trainer.task.

Continuous asynchronous prediction

Even though ai-platform provides options for on-line and batch prediction, none of them was really adapted to this use case as the intention is to have a continuous (not in batch) asynchronous prediction. To accomplish this final step of the project, collaboration between data science and data engineering was key. So to avoid synchronous call to the model, we decided to exploit the TensorFlow graph created by TensorFlow Estimator during training which includes the reading and pre-processing steps, as well as the prediction one. The model is loaded in the JVM by using TensorFlow Java and can be used for prediction on incoming raw data, data point by data point. The incoming flow of data points was simulated by using NYC taxi trips open data of 2019 and Kafka.

In this way, the graph is used as an interface between the data scientist and data engineer and becomes the key of a complementary yet independent development. For example, during the experimental phase, features evolved and changed, but as they were based on the same raw data and as the graph contained the operations to transform raw data into final features, the overall pipeline could be built very soon and didn't change fundamentally. Once the overall mechanism of asynchronous prediction is in place, the data scientist can focus on improving the model, by changing the features or by using a different model, and the data engineer can focus on improving application.

Conclusion

With this article I wanted to show that using TensorFlow Estimator on GCP can be an effective way to develop and industrialise a whole machine learning pipeline allowing collaboration between data science and data engineering aspects thanks to the TensorFlow graph. Only by adding the "real time" modification to the NYC taxi Kaggle challenge, the technical problem becomes much more complex and its implementation quite cumbersome, the collaboration is key.

Here are some links to the sources or for further reading:

New York City Taxi Trip Duration
Open Data for All New Yorkers

TensorFlow Estimators: Managing Simplicity vs. Flexibility in High-Level Machine Learning Frameworks

TensorFlow Enterprise makes accessing data on Google Cloud faster and easier

Wide & Deep Learning for Recommender Systems
Packaging trainer
Scaling up Keras with Estimators

Scaling up running Keras models on ML Engine

Wide & Deep Learning for Recommender Systems
Packaging trainer
Scaling up Keras with Estimators

Scaling up running Keras models on ML Engine

--

--