Automated Airflow Testing

Alexander Knorr
Slalom Technology
Published in
3 min readApr 17, 2019

Introduction to Airflow

Airflow is an Apache project for scheduling and monitoring work flows. The project has excellent documentation here, but while working in Airflow I have come across some topics that were harder to find resources on. Unit testing Airflow code was a topic where many struggle to find valuable information online. I’ll show some code basics for local automated testing!

Simple Airflow Setup

Before we can test Airflow DAGs we need a place for code to run. Most organizations will have Dev & Production environments, but it is still important to run code locally to check for errors before deploying to either of those environments. This avoids cluttering the Airflow WebUI with error messages from test code. Containers are a great way to quickly stand up a local copy of Airflow for testing purposes. This is a fantastic Github page with instructions on how to get a basic Airflow server running.

Pulling and using this image requires Docker, so start there if you are unfamiliar. Now with a local container available we can write some automated tests to make sure our DAGs in development meet some basic standards.

The Airflow Web UI

Local Automated Airflow Testing

The below code builds a container based on the puckel image mentioned earlier, copies all our local development dags and tests into the container, and runs the container on local port 8080. This allows us to access the web UI from the address localhost:8080.

def airflow_build(dag_path, test_path):
"""
This function runs the build for a container with airflow processes locally. Turns on webserver and scheduler.
:param dag_path: (string) the filepath where dags live locally.
:param test_path: (string) the filepath where the pytest script lives locally.
:return: returns the docker container object.
"""

client = docker.from_env()
client.images.pull("name_of_airflow_image")
running_container = client.containers.run(
"name_of_airflow_image",
detach=True,
ports={"8080/tcp": 8080}, # expose local port 8080 to container
volumes={
dag_path: {"bind": "/usr/local/airflow/dags/", "mode": "rw"},
test_path: {"bind": "/usr/local/airflow/test/", "mode": "rw"},
},
)
running_container.exec_run(
"airflow initdb", detach=True
) # docker execute command to initialize the airflow db
running_container.exec_run(
"airflow scheduler", detach=True
) # docker execute command to start airflow scheduler

return running_container

Here are some simple tests to run against DAGs:

Were there any import errors?

The webUI will not load any DAGS that contain syntax type errors and will flag them at the top of the console.

Does our DAG have a valid email provided in the `default_args`?

This could be changed if there is a specific alerting address all DAGs should send to. These tests will be included on the container as our pytest script (I named this dag_pytest.py).

Additional tests can be added based on project needs!

Shown below is the dag_pytest.py script:

def test_import_dags():
"""
Pytest to ensure there will be no import errors in dagbag. These are generally syntax problems.
"""
dags = DagBag()

assert len(dags.import_errors) == 0


def test_alert_email_present():
"""
Pytest to ensure all dags have a valid email address
"""

dags = DagBag()
email_regex = re.compile(
r"^[A-Za-z0-9\.\+_-]+@[A-Za-z0-9\._-]+\.[a-zA-Z]*$"
) # regex to check for valid email

for dag_id, dag in dags.dags.items():
emails = dag.default_args.get("email", [])
for email in emails:
assert email_regex.match(email) is not None

The final step is to have a python script that starts a container, runs the pytests against our DAG files, and returns a status of those tests.

def test_run_dag_test_script():
"""
This function runs a docker exec command on the container and will run our airflow DAG testing script. Once the test is complete return the exit status and output messsage from the docker execute command. Then the function stops the container.
:return: the exit status and output message from the docker exec command of test script.
"""

dags = "/file/path/to/DAGS"
test_script = "/file/path/to/test/script"

running_container = airflow_build(
dag_path=dags, test_path=test_script
)

dag_test_output = running_container.exec_run(
"pytest /usr/local/airflow/test/test_dag_pytest.py"
)
ex_code = dag_test_output[0]

running_container.stop()

assert ex_code == 0

Now, the above can be run just like any other pytest script!

Conclusion

Airflow is a powerful, open source scheduling system. Basic setup is fairly straight-forward and building jobs is as easy as writing python. Local container generation for testing adds a layer of code development best practice to DAGs. There are a whole host of tests that can be instituted in this framework and should be based on project needs. Once these tests are developed, your project members can implement this system locally and standardize DAG quality across your project!

--

--