Practical Guide š: How to automatize your ML process with Airflow and Cloud Composer
Are you dreaming about building a fast, scalable, maintainable Machine Learning pipeline (like a recommender system) that extracts data, trains and predicts on it and saves its results in your sleep so you only have to admire all the work done while sipping on your morning coffee ?
Search no longer, Google Cloud Composer and Airflow are made for you ! Thereās always a huge gap in between implementing/training a ML model on your jupyter notebook, and putting it into production. Building a clean infrastructure that allows you to monitor every step of the pipeline (data collection, train ML models, put models into production, get reported if there is any problem) is necessary in order to scale. At Snipfeed (the beta version of our app is coming soon, but you can try the messenger version for a sneak peek), we use Airflow to monitor and automatize our ML process for the recommender system.
The dream Pipeline
In this practical guide, we will implement the following architecture:
Here are the daily tasks of your pipeline:
- Extracting and filtering data from MongoDB and storing it on Cloud Storage.
- Applying a ML model to your extracted data on a Compute Engine and saving the result on Cloud Storage.
- Importing your results onto Redis Cache.
Failures get directly reported on Slack
Airflow and Cloud Composer
Letās introduce our tools for developing this pipeline: Airflow and Cloud Composer.
- Airflow
Airflow is a platform that allows you to create, manage and monitor workflows. In our scenario, Airflow schedules the aggregation of data with MongoDB, and runs the ML model once it is completed.
In Airflow, you can build DAGs (Directed Acyclic Graphs), which are pipelines (or workflows) written in Python that execute Operators (such as Python functions or Bash commands).
A DAG has a start date (the date it becomes active) and a schedule interval that defines the time lapse between two runs.
In our example
- DAG : The dream pipeline
- Operators: 2 Python Operators to access MongoDB and Redis, 1 Bash Operator to execute ML code on Compute engine.
- Start date: datetime(2019, 3, 27) our DAG will become active on March 27th 2019.
- Schedule interval: timedelta(days=1) starting from March 27, 2019, our DAG will run each day.
2. Cloud Composer
Cloud Composer is a service provided by Google Cloud Platform that uses Airflow to set up a pipeline.
The platform provided by Cloud Composer is relatively easy to handle . The airflow runs on an environment in the back end.
An environment is made up of nodes that execute Airflowās operators in parallel if possible.
The environment also comes with a Google Cloud Storage bucket that stores DAGs code, temporary data, and logs.
Start up with Cloud Composer
To begin we have to create an environment.
- On Google Cloud Platform go to Cloud Composer API and check the activate API checkbox.
- Open Cloud Shell (top right corner) and type in the following command:
$ cloud beta composer environments create composer --location us-central-1 --zone us-central1-b --disk-size 40GB --machine-type n1-standard-1 --node-count 3 --python-version 3 --image-version composer-1.5.2-airflow-1.10.1
- composer: your environment name
- location & zone: physical location of your environment.
- disk-size: memory size of your environment, minimum is 30 GB but itās not enough to later install the Python package.
- machine-type: type of machine your nodes are working on (n1-standard-1 is default). See description here for other types.
- node-count: how many nodes work on your DAGs, minimum is 3.
- python-version: python version the code in your DAGs is executed in (2 and 3 available).
- image-version: version of airflow and composer running on the environment, check here for newer versions.
3. After about 10 minutes, your environment is set up
Create your first Airflow DAG
Now that your Cloud Composer environment is set up, we have to create our first DAG to run on it.
A DAG is written in Python and made up of 4 different parts:
- DAG arguments:
- owner: the name of this DAGās owner
- depends_on_past : whether the DAG depend on its past run,
- start_date : the date on which you want this DAG to activate
- retries : how many retries to run in case of failure
- retry_delay : the delay between these retries
2. DAG definition:
Hereās the part where you create your DAG.
- dag_id: the DAG name
- args: the arguments that we declared above
- schedule_interval: the time interval between each run of your DAG
3. DAG operator:
And now you should define the operators in your DAG.
For example here we instantiate a Bash operator and a Python operator.
These operators take in the following parameters:
- task_id: the id of the operator
- provide_context: Boolean to specify whether to pass context to operator (this could be useful to get task_id, execution_date etc.)
- op_kwargs: arguments passed to your operator
- python_callable: python function to run (field only available for objects of the class PythonOperator)
- bash_command: bash command to run (field only available for objects of the class BashOperator)
- on_failure_callback: function to run in case the operator fails (not used in the code below)
This is an example of a function a PythonOperator can run.
Note that what our function returns gets printed in the log (and thatās all that happens to it)
4. Dependencies definition:
Here we define the order in which to execute the operators, so we have the following pipeline:
When you put everything together, you obtain:
Link your DAG to Cloud Composer
To be able to monitor your DAG on Cloud Composer:
- Go to DAGs
2. Upload your dag1.py file
3. Go to Airflow
4. After a few minutes, refresh and your DAG should appear, identified by its ID (itās ON by default, and will run a certain number of times depending on what you set as start_date and scheduled_interval)
You can now monitor it and discover the numerous monitoring features provided by Airflow by clicking on your DAG! (some help about monitoring is provided by this article)
Back to our Dream Pipeline
So now we can go into more details about our dream pipeline.
We have to:
- Connect to MongoDB, aggregate data and save it to a bucket.
- Execute a bash command that copies this data to a Compute Engine, applies a ML algorithm and saves the result into a bucket.
- Connect to Redis and upload our results.
- On failure, send a message to Slack.
There are still some Airflow functionalities that we need to uncover to be able to set up our dream pipeline, for that go in Airflow web UI.
- Connexions
As specified, we will need to set 2 connexions to Slack and Redis (not MongoDB because by the time this article is published, you wonāt be able to set MongoDB as a connexion)
Follow this medium to set up the Slack connexion
For the Redis connexion, go to Redis and set up a new cache.
In Airflow web UI, in connexions tab, click on Create put an id in Conn Id (āredisā for example). Also specify Conn Type as Redis and then add information given to you by redis in Host, Password, Port and Extra.
- Variables
Airflow also provides us with a simple way to pass arguments to our DAG
In our case our variable will contain:
- redis: the id of the redis connexion that we defined earlier
- slack: the id of the slack connexion that we defined earlier
- mongo_uri: uri used by pymongo to access your database in the format mongodb://username:password@host:port/db?options
- mongo_collection: the collection you want to aggregate
- instance_name: the name of the instance on which you want to run ML code
- instance_zone: the zone of the instance on which you want to run ML code
So create a variable and fill it in with your details as above
This variable is accessed in python dag file with the following code
Execute ML code on Compute Engine
Your ML code will be copied and transferred to an instance by the pipeline, so it can use that instance to make predictions.
Requirement: python3 and pip3 have to be installed on your instance.
You then need two files in a zipped directory to make your ML code executable:
requirements.txt that will install external libraries following this format and, main.py that will implement your ML model and save its predictions.
This ml.zip should be placed in the data folder of your environment bucket.
Final DAG Code
Conclusion
Here you go, you have a fresh new workflow that will make a lot of ML engineers happy in your company or do all the work for you ! š
Comments
Weāre close to the end of the article, so I will comment on some choices of implementation I made while building this pipeline.
- Why exporting and executing ML code on a Compute Engine if you have a python environment directly on Cloud Composer ?
If you need a lot of CPU or RAM to execute your ML code, you have to upgrade to 3 nodes minimum on Cloud Composer, and that can become expensive; whereas you can choose to only add one node on Compute Engine, which you can also customize with GPU.
- Why not directly save outputs to Redis at the end of the ML model instead of creating a new operator that does the work afterwards?
The reason behind that choice is that it allows us to store Redis credentials in Airflow Connexions, so the connexion is encrypted and all credentials are kept in the same place.
- Why donāt we use MongoDB Connection Hook on Airflow instead of pymongo inside DAG code ?
Again, at the time this article was written MongoDB connexion wasnāt available on the airflow version used by Cloud Composer.
- Are there some particularities dealing with MongoDB or Redis ?
Yes there are some particularities that you will have to handle.
With MongoDB you have to take care of the fact that Mongo aggregation cannot return results larger than 16 MB, so maybe you will have to part your aggregation using $skip and $limit operators.
With Redis you cannot mset dictionaries that are bigger than 512 MB so you will maybe have to part your mset into several dicts.