Generating Airflow Unit Test in MOMO

Khôi Nguyên Phan
5 min readAug 6, 2020

--

TLDR

https://github.com/khoinguyen1312/airflow-unit-test-generator

Forewords

This blog is one of the series where we describe our Software Solutions on Data Engineering topic. Previous was:

In this blog, I will describe our Data Platform situation, how much data that we need to handle. How did we come to automatically generate unit tests and how it had helped us in delivery data without critical errors occur in production environment.

Situation

I am currently working at Momo — Mservice Vietnam, joining Data Platform Team. Before that, I had worked as a web developer at a fin-tech project back in Axon Active Vietnam, where we used to write tons of unit testing to verify our business logic. After a few weeks, I had come to realize that most of our Airflow’s code is lacking of unit test, this happen because:

  • We had quite a small size of team(4 people) and had to serve up to 40 people from the business team.
  • There are a lot of DAGs are running and being developed at the same time. Some DAG handle few tasks, some serve a heavy pipeline, containing more than 100 tasks with complex dependency.
  • Up until the time of this development, Airflow haven’t introduced an official way to write unit tests.
  • The topic wasn’t discussed widely enough on.
  • Honestly, data engineers were lazy in writing unit tests. They prefer pushing code to production, trigger the pipeline and wait for the end result.
The maze that we work on every day
The maze that we work on everyday

This situation lead to a lot of problems, which were:

  • Broken DAG / System downtime happens due to some small bug or python syntax.
  • We have a lot of DAGs controlled by python helpers, small change in those helper may affect a lot of pipeline.
  • Using Airflow became a struggle by monitoring important pipelines everyday, make sure new changes don’t break any DAG and we have to remote into the server for a hot-fix.
  • Struggling with finding where the bug comes from. Some time, it was a simple python syntax mistake, sometimes it was sending the wrong jar’s flag due to code conflict.
  • Struggling in code refactoring, it is a pain to make new code change without affecting the current pipeline running.

How did we solve the problem

To resolve the issue, we had analyzed our situation:

  • Our code structure was quite plain simple:
DAG call to python helper 
-> Python helper generates bash command
-> Call to a JAR which specific flag `OR` specific bash command.
  • Each code change might include small change in bash command, JAR’s flags, number of tasks, …
  • We do use a lot of bash command and BashOperator because it was convenience and since Airflow hadn’t support a lot of Operators like nowadays. We decided to keep using bash command.

First solution

The first version of this generator is a simple python script for generating all bash commands of DAG’s tasks, we keep using bash commands and turn it into our favor by:

Generate Every Tasks’s Bash Command into a text file for comparison.

https://github.com/khoinguyen1312/airflow-unit-test-generator/blob/master/tools/scripts/first_version/generate_dags_tasks.py

  • Have it generated in master
  • Have it generated in current code change
  • Compare
Comparison change from in local file migration to GCS file migration.

Final Solution

After the first idea come with problems, there are still some minor issues:

  • Human problem:
    — It’s easier for pushing code to production for testing than triggering the script.
    — Sometimes, we forgot to generate.
  • When DAG is in heavy change, which leads to harder for comparison.

We integrated it with our CI/CD pipeline.

The pytest contain Dag’s task bash command
The pytest contain Dag’s task bash command — https://github.com/khoinguyen1312/airflow-unit-test-generator/blob/master/pytest/airflow/dags/auto_generate_result/code/echo_dag_auto_test.py
  • Integrate with CI/CD pipeline, instead of generate into text file, we generate it into pytest and run it on every new code change.
  • If you forget to generate, pytest will fail, making it unable to merge new code changes into master. This enforce engineer to generate tests every time they make new changes, help engineers be aware of our their code’s affections.
  • We had also used Docker Airflow image which is running with local scripting for environment consistency.

If you forget, the test will fail

Github Solution airflow-unitest-generator

Our final work flow in the team were:

-> Code new code change
-> trigger script
-> compare different with master by git diff or IDE git comparison
-> Push new code change with generated test

Code
Code
Generate
Generate
See results
See results

Benefits

The engineers benefit this the most, since they didn’t have to write unit tests and still be able to acknowledge if their code have any issue. In code review, reviewer can also see the generated test right away. This help:

  • Reviewers can see code change’s affection by checking the auto generated test.
  • In the code refactoring, the auto generated test shouldn’t be affected at all (which we found the most valuable). This helps us to have a huge code refactoring without affecting the current pipeline running.

See code change immediately.

See code change affection right away in code review

Achievements

  • 100% able to see code-change affection to end result
  • 100% DAGs are cover in unit test
  • It’s so easy to use that other teams are able to adapt it quickly, adding their own unit tests. We had be able to scale this solution to Data Scientist team, and Machine Learning team too.
  • Engineers are able to navigate code errors, thus, reduce code error, system downtime.
  • Reliable for Airflow’s code refactoring.

Next Step

This solution just BashOperator, which is not enough in other cases. Seeing that we can generate test for other Operator too by generating using all operator’s attributes. We have public the code of unit test generator, you could help use in this by adding more Operator Tester.

https://github.com/khoinguyen1312/airflow-unit-test-generator

--

--