Industrialization of a Machine Learning model using Apache Airflow and Apache BEAM

Massy Bourennani
The Startup
Published in
6 min readJun 18, 2020

Introduction

The workflow of a machine learning model consists of the following steps pictured by the schema below.

Figure-1: ML workflow[1]

This Article is going to discuss the indsutrialization of the inference phase (white boxes above) using Airflow for scheduling several tasks and Apache BEAM to apply the model, which is already trained, on all data points.

Some context

I’m a data engineer and one of my missions is to get ML models in production using the services provided by Google Cloud Platform (GCP). In order to achieve that I work closely with data scientists. This Article describes a particular use case we encountered: how to execute a model developped in Python on millions of rows, every day ?

The data science team has two Github repositories: one for trained models saved in pickle format and the other for code that produces the model (SQL queries, python packages …etc.). As a data engineer we automate the inference while respecting the following contraints:

  1. scalability: the model needs to be applied on millions of rows
  2. flexibility: data scientists can use every python library to develop the model
  3. versioning: ML models will improve over time, so we need to specify which version we need to execute
  4. frequency: the model can be executed each day, once a week or twice a month. The industrialized workflow must allow such scheduling flexibility

The Data science part

A data scientist is working on some model to predict if a user is going to churn. Such information is extremely valuable for the business because, for instance, it allows them to prevent the loss of users by giving them discounts.

The model is trained on limited data using some python libraries (sci-kit learn, pandas and numpy for example) and a custom python package, which is used to do feature engineering. The model is then saved in pickle which is a serialization format for python objects. It is then pushed to the Github repository. A tag is added to identify a particular version of it.

Figure-2: Models are saved on Github in pickle format

One important contract between data scientists and data engineers is that the model must have a predict method that accepts a collection of dicts (input data) and returns a collection of dicts (output scored data)

In addition to the model, additional code is pushed to github. We distinguish two important parts:

  • the SQL query that is used to retrieve the inference dataset (the dataset on which the model will be applied)
  • the custom python packages used for prediction (in our churn example the custom package used for feature engineering)

The data scientist is done, enter the data engineer to develop the prediction workflow.

The Data Engineering part

In order to schedule the execution of the ML model and to manage the dependencies between several steps Apache Airflow will be used. The workflow is encapsulated in a single DAG allowing other data engineers to industrialize the execution of numerous ML models.

To apply the ML model on millions of rows Apache BEAM is used. It is run on Dataflow.

All the data is saved in BigQuery hence the SQL query to get the dataset.

Google Cloud Storage (GCS) is used as a working directory where the model is copied from Github to GCS in addition to custom python packages. Furthermore, it is used as a staging area between BigQuery and Apache BEAM.

An overview of the workflow and its building blocks is shown in Figure-3.

Figure-3: Overview of the prediciton workflow

More details below :)

The Apache Airflow DAG

Figure-4: ML prediction DAG

Figure-4 is a representation of the DAG that schedules the computation of predictions. The frequency is defined by the DAG’s schedule_interval which takes the value 0 1 * * * to specify a daily execution at 01:00 AM.

Each step represents an operator. start and end are dummy operators, the most interesting ones are described in the following sections.

github_bq_query

This operator is responsible of executing a SQL query, located on Github. It is identified by a path in the repository and a tag which allows us to execute a particular version of that query file. The SQL can be templated in Airflow.

Internally the operator is a specialization of BigQueryOperator where the self.sql parameter is retrieved from GitHub (instead of a raw string or local file). The render_template_fields of BaseOperator is used as a hook to know when to exactly get the query and save it back to self.sql .

The result of the query is then saved in a table by specifying the destination_dataset_table let it be my_project.my_datset.churn for our example.

bq_to_gcs

The content of my_project.my_dataset.churnis then saved on GCS in new line delimited JSON (NDJSON) format.

start_beam_job

In this operator an Apache BEAM pipeline is started. It reads NDJSON files one line at a time, transforms each one to a dict, applies the model and saves back the result to GCS after converting it to string. We show the pipeline below

Figure-5: Apache BEAM pipeline

Below is the code of the main.py reflecting the pipeline.

The most interesting part resides in the ExecuteModelDoFn where the model.pkl is taken from GCS, deserialized and later used to compute the score of each data point.

model.pklis copied from Github to GCS. It is identified by two things the path to the model and the tag, like what is described in Figure-2 which allows us to execute a specific version of the model. This operation is done in a previous step of the Airflow DAG.

Figure-5 shows that the model is applied to one item at a time which is not very efficient. In order to decrease latency of the predictions we can batch in the DoFnand compute the predictions one batch at a time.

The code below details the implementation of ExecuteModelDoFn

The setup() method is executed only once by each worker on Dataflow. In that method model.pkl is downloaded from GCS, deserialized and saved in the the instance attribute self.model .

The from google.cloud import storage inside the method may seem peculiar at first sight. We must not forget that beam_pipeline.py is run in Airflow and the pipeline object is serialized and sent to Dataflow along with the DoFn . Only the __init__ method is executed in the context (think pyhon packages) of Airflow. Putting the import in the beginning of the file constrains us to have google-cloud-storage installed in Airflow workers (and scheduler).

Thanks to Apache BEAM and Dataflow we have the scalability we need to execute the model on millions of rows. In addition to that, the computation of the score of one observation is independent from the others. Consequently, the process() method is executed by many Dataflow workers on different elements which allows us to reduce prediction’s latency of the whole dataset.

What about flexibility ? We must allow data scientist to use the same libraries used to train the model and some custom packages too. Knowing that the execution context of process() is Dataflow worker, we need to provide them with the right dependencies. This is done using the setup.py

gcs_to_bq

The predictions are then sent from GCS to BigQuery using GoogleCloudStorageToBigQueryOperator.

Conclusion

We presented the automation of the prediction phase of ML models based on python. In order to orchestrate the workflow we used Apache Airflow. BEAM and Dataflow were used to apply the model on the whole dataset allowing to scale to millions of data points. Flexibility is also achieved by providing Dataflow workers with the right dependencies using the pipeline’s setup.py. A particular version of the model is retrieved from Github using tags. BigQuery and GCS are used as storage components which allows us to separate data and compute.

Monitoring is achieved using Airflow’s web UI and the alerting (sending a slack message, email …etc) is done by specifying on_failure_callback in the DAG’s default_args .

References

[1] https://cloud.google.com/ai-platform/docs/ml-solutions-overview

--

--