Testing in Airflow Part 2 — Integration Tests and End-To-End Pipeline Tests

Chandu Kavar
5 min readNov 28, 2019

--

Written by Sarang Shinde and Chandu Kavar — Nov 28, 2019

In part 1 of this blog, I explained DAG validation tests, DAG definition tests, and Unit tests. These tests help to verify the syntax and cyclicity in the DAG, upstream and downstream of each task, the logic of the custom operator and sensor, etc. I would recommend reading the first part of this post.

In this part, we will cover the integration and end-to-end pipeline tests.

Before we start, we want you to understand what is XComs in Airflow.

XComs

XComs let tasks exchange messages, allowing more nuanced forms of control and shared state. The name is an abbreviation of “cross-communication”. XComs are not only defined by key, value, and timestamp, but also track attributes like the task/DAG that created the XCom and when it should become visible. Any object that can be pickled can be used as an XCom value, so users should make sure to use objects of appropriate size.

XComs can be “pushed” (sent) or “pulled” (received). When a task pushes an XCom, it makes it generally available to other tasks. Tasks can push XComs at any time by calling the xcom_push() method. In addition, if a task returns a value (either from its Operator’s execute() method, or from a PythonOperator’s python_callable function), then an XCom containing that value is automatically pushed.

Tasks call xcom_pull() to retrieve XComs, optionally applying filters based on criteria like a key, source task_ids, and source dag_id. By default, xcom_pull() filters for the keys that are automatically given to XComs when they are pushed by being returned from executing functions.

(https://airflow.apache.org/concepts.html)

Let’s assume that we have this “hello world” DAG in which we use XComs to push/pull dummy key and value. In this case, we want to test the integration between tasks.

Integration Tests

We use integration tests to test XComs in which we will verify whether the XCom value received by a task is correct or not. For the above DAG, we should write at least two tests:

  1. Push/pull from a python function
  2. XComs used in the template field

Push/Pull From Python Function

Source for push/pull
Test for push/pull

In the above test, we will do three things —

  1. Push the dummy value by executing the task instance (of push_to_xcoms)
  2. Pull the dummy value from the task instance (of pull_from_xcoms)
  3. Assert the value

XComs Pull As Templated Field

In Airflow, the operator might have some template fields. Templated fields allow us to pass data dynamically at run time to airflow operators. Airflow uses jinja templating to do that. We can also access XComs data in template fields and use it to set a dynamic configuration of airflow operators. You can check the support of the template fields of each operator in airflow’s documentation or operator’s code. For more information refer this https://airflow.apache.org/tutorial.html#templating-with-jinja

Source for push/pull template
Tests for push/pull template

In the above test, we test the templated value of Xcoms used in BashOperator. (You can see in the example DAG that we are testing). To test the templated value, we will follow these steps —

  1. Push the dummy value by executing the task instance (of push_to_xcoms)
  2. Render template using rendered_template() function: This function required three arguments: template field, the value of the template field and template context. We can get the value of the template field by applying getattr() method on the task and template field. The context we can get using get_template_context() method on task instance.
  3. Assert the rendered value with the expectation.

Using XComs in the templated field is a very powerful tool to achieve many important things at run time. But each time we write such code we need to safeguard it by writing tests.

End-to-End Pipeline Tests

In End-to-End (E2E) pipeline tests, we run entire DAG without mocking anything. It required an environment to run the entire DAG. We use Kubernetes (inside Minikube) to create a test environment for Airflow to run the DAG on sample data. After the successful run, we assert the output of the DAG.

Airflow Set up on Minikube

Prerequisites

  • Basic understanding of Docker and Kubernetes
  • We need to install: Docker, Minikube, and Airflow on Minikube

Detailed steps to install Minikube and airflow here: https://github.com/chandulal/airflow-testing#end-to-end-tests

After the successful installation of Airflow, we will have these services running on Minikube -

  1. Postgres (To store the metadata of airflow)
  2. Redis (Broker for celery executors)
  3. Airflow Scheduler
  4. Celery Workers
  5. Airflow Web Server
  6. Flower

Airflow REST APIs

To run integration tests on Minikube, we will have to create APIs —

  • To pause/unpause the DAGs
  • To trigger the DAG and check the status of the DAG,
  • To clear the DAG
  • To add connections

Code for these API you can find it here:
https://github.com/chandulal/airflow-testing/blob/master/src/integrationtest/python/airflow_api.py

Let’s assume we have this DAG and we want to run it end to end on the environment we created above.

This DAG is very basic and does not have any other dependencies apart from the Airflow environment. We use the APIs that we created to run integration tests. Here is the integration test for this DAG —

Now, let’s say you have one DAG which transfers the data from Presto to MySQL. We need Preso and MySQL with Airflow Environment to run and test this DAG. Here is the DAG to transfer the data —

To test the DAGs which depend on external environments such as DBs, REST APIs, etc.

  • Install all external components. In our case, we will install MySQL and Preso in Minikube.

The deployment YAMLs to install MySQL and Presto in Minikube is here —

https://github.com/chandulal/airflow-testing/blob/master/k8s/mysql/mysql.kube.yaml

https://github.com/chandulal/airflow-testing/blob/master/k8s/presto/presto.kube.yaml

  • Register all variables/connections used in DAGs. We will create a MySQL and Presto connections in Airflow.
Add connections using airflow APIs

After the successful deployment of MySQL and Presto on Minikube, we can run the above DAG end to end. Here is the code for the test -

In setUp() function, we first create connections (Presto and MySQL), register with airflow, create required tables (in MySQL and Presto), insert records in the Presto table. In the test, we run the DAG and assert the no of records in MySQL table.

Github repo is here for all the code available in this blog.

Thanks for reading. If you found this blog helpful, please recommend it and share it. Follow me for more articles on Big Data.

You can find other more interesting blogs here. Thanks!

Thanks to Balvinder Khurana and Siddharth Kulkarni for reviewing this post.

--

--