Practical Guide šŸ“š: How to automatize your ML process with Airflow and Cloud Composer

Raphael Madillo
Snipfeed
Published in
8 min readApr 9, 2019
ā˜€ļøStart your day with yet the job done (Photo by Andrian Valeanu on Pixabay)

Are you dreaming about building a fast, scalable, maintainable Machine Learning pipeline (like a recommender system) that extracts data, trains and predicts on it and saves its results in your sleep so you only have to admire all the work done while sipping on your morning coffee ?

Search no longer, Google Cloud Composer and Airflow are made for you ! Thereā€™s always a huge gap in between implementing/training a ML model on your jupyter notebook, and putting it into production. Building a clean infrastructure that allows you to monitor every step of the pipeline (data collection, train ML models, put models into production, get reported if there is any problem) is necessary in order to scale. At Snipfeed (the beta version of our app is coming soon, but you can try the messenger version for a sneak peek), we use Airflow to monitor and automatize our ML process for the recommender system.

The dream Pipeline

In this practical guide, we will implement the following architecture:

Dreamed pipeline

Here are the daily tasks of your pipeline:

  • Extracting and filtering data from MongoDB and storing it on Cloud Storage.
  • Applying a ML model to your extracted data on a Compute Engine and saving the result on Cloud Storage.
  • Importing your results onto Redis Cache.

Failures get directly reported on Slack

Airflow and Cloud Composer

Letā€™s introduce our tools for developing this pipeline: Airflow and Cloud Composer.

  1. Airflow

Airflow is a platform that allows you to create, manage and monitor workflows. In our scenario, Airflow schedules the aggregation of data with MongoDB, and runs the ML model once it is completed.

In Airflow, you can build DAGs (Directed Acyclic Graphs), which are pipelines (or workflows) written in Python that execute Operators (such as Python functions or Bash commands).

A DAG has a start date (the date it becomes active) and a schedule interval that defines the time lapse between two runs.

In our example

  • DAG : The dream pipeline
  • Operators: 2 Python Operators to access MongoDB and Redis, 1 Bash Operator to execute ML code on Compute engine.
  • Start date: datetime(2019, 3, 27) our DAG will become active on March 27th 2019.
  • Schedule interval: timedelta(days=1) starting from March 27, 2019, our DAG will run each day.

2. Cloud Composer

Cloud Composer is a service provided by Google Cloud Platform that uses Airflow to set up a pipeline.

The platform provided by Cloud Composer is relatively easy to handle . The airflow runs on an environment in the back end.
An environment is made up of nodes that execute Airflowā€™s operators in parallel if possible.
The environment also comes with a Google Cloud Storage bucket that stores DAGs code, temporary data, and logs.

Start up with Cloud Composer

To begin we have to create an environment.

  1. On Google Cloud Platform go to Cloud Composer API and check the activate API checkbox.
  2. Open Cloud Shell (top right corner) and type in the following command:
$ cloud beta composer environments create composer --location us-central-1 --zone us-central1-b --disk-size 40GB --machine-type n1-standard-1 --node-count 3 --python-version 3 --image-version composer-1.5.2-airflow-1.10.1
  • composer: your environment name
  • location & zone: physical location of your environment.
  • disk-size: memory size of your environment, minimum is 30 GB but itā€™s not enough to later install the Python package.
  • machine-type: type of machine your nodes are working on (n1-standard-1 is default). See description here for other types.
  • node-count: how many nodes work on your DAGs, minimum is 3.
  • python-version: python version the code in your DAGs is executed in (2 and 3 available).
  • image-version: version of airflow and composer running on the environment, check here for newer versions.

3. After about 10 minutes, your environment is set up

Environment set up šŸŽ‰

Create your first Airflow DAG

Now that your Cloud Composer environment is set up, we have to create our first DAG to run on it.

A DAG is written in Python and made up of 4 different parts:

  1. DAG arguments:
  • owner: the name of this DAGā€™s owner
  • depends_on_past : whether the DAG depend on its past run,
  • start_date : the date on which you want this DAG to activate
  • retries : how many retries to run in case of failure
  • retry_delay : the delay between these retries

2. DAG definition:

Hereā€™s the part where you create your DAG.

  • dag_id: the DAG name
  • args: the arguments that we declared above
  • schedule_interval: the time interval between each run of your DAG

3. DAG operator:

And now you should define the operators in your DAG.
For example here we instantiate a Bash operator and a Python operator.
These operators take in the following parameters:

  • task_id: the id of the operator
  • provide_context: Boolean to specify whether to pass context to operator (this could be useful to get task_id, execution_date etc.)
  • op_kwargs: arguments passed to your operator
  • python_callable: python function to run (field only available for objects of the class PythonOperator)
  • bash_command: bash command to run (field only available for objects of the class BashOperator)
  • on_failure_callback: function to run in case the operator fails (not used in the code below)

This is an example of a function a PythonOperator can run.
Note that what our function returns gets printed in the log (and thatā€™s all that happens to it)

4. Dependencies definition:

Here we define the order in which to execute the operators, so we have the following pipeline:

When you put everything together, you obtain:

Our first DAG šŸ‘

Link your DAG to Cloud Composer

To be able to monitor your DAG on Cloud Composer:

  1. Go to DAGs

2. Upload your dag1.py file

3. Go to Airflow

4. After a few minutes, refresh and your DAG should appear, identified by its ID (itā€™s ON by default, and will run a certain number of times depending on what you set as start_date and scheduled_interval)

Our first DAG on Airflow āœ…

You can now monitor it and discover the numerous monitoring features provided by Airflow by clicking on your DAG! (some help about monitoring is provided by this article)

Back to our Dream Pipeline

So now we can go into more details about our dream pipeline.
We have to:

  1. Connect to MongoDB, aggregate data and save it to a bucket.
  2. Execute a bash command that copies this data to a Compute Engine, applies a ML algorithm and saves the result into a bucket.
  3. Connect to Redis and upload our results.
  4. On failure, send a message to Slack.

There are still some Airflow functionalities that we need to uncover to be able to set up our dream pipeline, for that go in Airflow web UI.

  • Connexions
Create Airflow connection

As specified, we will need to set 2 connexions to Slack and Redis (not MongoDB because by the time this article is published, you wonā€™t be able to set MongoDB as a connexion)

Follow this medium to set up the Slack connexion

For the Redis connexion, go to Redis and set up a new cache.
In Airflow web UI, in connexions tab, click on Create put an id in Conn Id (ā€œredisā€ for example). Also specify Conn Type as Redis and then add information given to you by redis in Host, Password, Port and Extra.

  • Variables

Airflow also provides us with a simple way to pass arguments to our DAG

Create Airflow variable

In our case our variable will contain:

  • redis: the id of the redis connexion that we defined earlier
  • slack: the id of the slack connexion that we defined earlier
  • mongo_uri: uri used by pymongo to access your database in the format mongodb://username:password@host:port/db?options
  • mongo_collection: the collection you want to aggregate
  • instance_name: the name of the instance on which you want to run ML code
  • instance_zone: the zone of the instance on which you want to run ML code

So create a variable and fill it in with your details as above

This variable is accessed in python dag file with the following code

Execute ML code on Compute Engine

Your ML code will be copied and transferred to an instance by the pipeline, so it can use that instance to make predictions.

Requirement: python3 and pip3 have to be installed on your instance.

You then need two files in a zipped directory to make your ML code executable:
requirements.txt that will install external libraries following this format and, main.py that will implement your ML model and save its predictions.
This ml.zip should be placed in the data folder of your environment bucket.

Final DAG Code

Conclusion

Here you go, you have a fresh new workflow that will make a lot of ML engineers happy in your company or do all the work for you ! šŸŽ‰

Comments

Weā€™re close to the end of the article, so I will comment on some choices of implementation I made while building this pipeline.

  • Why exporting and executing ML code on a Compute Engine if you have a python environment directly on Cloud Composer ?

If you need a lot of CPU or RAM to execute your ML code, you have to upgrade to 3 nodes minimum on Cloud Composer, and that can become expensive; whereas you can choose to only add one node on Compute Engine, which you can also customize with GPU.

  • Why not directly save outputs to Redis at the end of the ML model instead of creating a new operator that does the work afterwards?

The reason behind that choice is that it allows us to store Redis credentials in Airflow Connexions, so the connexion is encrypted and all credentials are kept in the same place.

  • Why donā€™t we use MongoDB Connection Hook on Airflow instead of pymongo inside DAG code ?

Again, at the time this article was written MongoDB connexion wasnā€™t available on the airflow version used by Cloud Composer.

  • Are there some particularities dealing with MongoDB or Redis ?

Yes there are some particularities that you will have to handle.
With MongoDB you have to take care of the fact that Mongo aggregation cannot return results larger than 16 MB, so maybe you will have to part your aggregation using $skip and $limit operators.
With Redis you cannot mset dictionaries that are bigger than 512 MB so you will maybe have to part your mset into several dicts.

--

--