Apply CI/CD To Airflow in MOMO

Hai Nguyen
5 min readJul 27, 2020

--

At MOMO, we are leveraging Airflow as the main workflow management, handling more than 400 data aggregations belonging to 30 DAGs (akka workflow) per hour. There are various types of workflows run by Airflow server with different schedules, such as:

  • Data pipelines for reporting purposes developed by and for Data Analysts (DA), Data Engineers (DE).
  • Data migration from sources to sources created by Data Engineers.
  • Training models by Machine Learning Engineers.
  • Monitoring task (our cloud budgets, data quality)
  • Abnormally detection

If you are wondering why MOMO is using Airflow, you guys can read our Airflow adoption in the blog.

TLDR: Airflow enables users to schedule, execute, and monitor their workflows which were originally developed by the Airbnb team and currently an Apache project.

At the beginning, we were just using airflow docker from puckel (the most popular airflow image), we downloaded it, followed the README.md and placed our modifications in the airflow working environment such as: working variables, database connection, alert system connection,… then chose the suitable executors. We picked CeleryExecutor to tackle our jobs and deployed it in Computing Engine in Google Cloud Platform by the following command

docker-compose -f docker-compose-CeleryExecutor.yml up -d

We also added some container customizations to the compose configuration:

  • Nginx, certbot: proxy, domain certificate management.
  • Airflow code: airflow code management (It is built by our CI/CD process, which I will describe later).

That is all, you now have an airflow platform running in a machine. My team (the Data Engineer team) were able to define various workflows in python files and deploy them to the persistent volume mounted to airflow containers (scheduler, web server, worker).

In the first days, the deployment processes were simple with our relatively small team (4 members in Data Engineer team):

We soon noticed the problems after providing it to the Data Analyst (DA), Machine Learning (ML) team.

  • Broken DAGs: DAGs (akka workflow) are defined as python code and you guys can put 1 to many DAGs to a python file. If any of the files containing the corresponding DAGs were to have syntax error, the Airflow platform would not understand the files and the DAGs in the error syntax files would not be scheduled.
  • Security: to deploy/update the DAG to Airflow, developers must have specific permissions to sync these python files with DAG definition to the Airflow server.
  • Inconsistency working environment: Python/Java code versions ,Airflow’s working variables, Airflow’s connect configurations.
  • Code version control: In order to simplify the development workflow of DA, ML team, we provide many operator (akka task implementation) templates, such as:
  • DpEtlOperator: running the ETL process in BigQuery (the main Data warehouse in MOMO)
  • DpJavaOperator: running the specific java application in Airflow server
  • EtlGcloudDataprocHelper: helper class to manage spark cluster in Google Cloud Platform.
  • TelegramWebHook, GoogleChatWebHook: callback alert message to any chat room in our office.

So there are any new updates for those templates, we cannot ensure all of the data pipelines work well without exceptions.

The above problems slowed down our data platform development too much.

How to fix it

The short-term solution:

First of all, we removed the permission access to the Airflow server and proposed a person to be in charge of the deployment process all code changes to the Airflow server at this moment. It is necessary to protect our production, give us time to find out the long-term solution solving the above issues and not to slow down the development of the DE, DA, ML team.

The long-term solution:

Meeting our CI/CD workflow for the Airflow server

In software engineering, CI/CD or CICD generally refers to the combined practices of continuous integration and either continuous delivery or continuous deployment.[1][2][3]

CI/CD bridges the gaps between development and operation activities and teams by enforcing automation in building, testing and deployment of applications

Wiki

Step 1: Developers (DA, DE, ML) would define/update their workflows and push to our codebase to review (we are using gerrit).

Step 2: Codes are reviewed by reviewers and verified by Jenkins with unit test, integration test then merged into the master branch.

Step 3: We schedule the deployment process to the Airflow server every 1:30 pm excluding Saturday, Sunday(non-working days) in Jenkins. In this process, Jenkin will

  • Pull master code from Gerrit and build defined execution jar files if any.
  • Packed all of DAGs file, execution file, airflow variables environment, …related to Airflow to a docker image called airflow_code and tagging it label ‘latest’
  • Push the images to Google Container Registry (GCR).
  • Send a message to topic deployment in Google Pubsub to trigger the deployment process in the Airflow server.

Step 4: We deploy the Deployment-Agent service in the Airflow server subscribing to topic deployment in Google Pubsub. After consuming a message with the tag image information, the agent service pulls the corresponding image airflow code from GCR.

Step 5: The agent would create a container airflow code from the image to update codes, resources, configurations (variable, connection) in the persistent volume. It will apply the configurations to the Airflow web server, workers, scheduler.

Last but not least, we sometimes would want to roll back the latest version to the specific version of the airflow code in the server or hotfix the problems in production. We define a custom workflow in Jenkins to simply trigger the workflow above with a specific commit ID.

Achievements:

  • DA, DE, ML team have a safety way to push new code changes to the airflow server. without error. Code commit to the airflow server improves from 5/week to 50/week.
  • 100% code in Airflow is verified and automated by Jenkin.
  • Have the ability to revert to the specific version in Airflow.

--

--