Testing Apache Airflow DAGs locally with Testcontainers and LocalStack

Sebastian Daberdaku
Towards Data Engineering
6 min readJun 9, 2024

Testing Airflow code can be difficult, often resulting in data engineers having to go through whole development cycles to manually trigger the DAG run on a production-like environment which consumes time and resources. Ideally, one would want to run the DAG code locally, possibly in an end-to-end fashion, with the option of using IDE debugging tools. This however can be somewhat complicated, especially when our code interacts with cloud resources, such as S3 buckets, SQS queues, DynamoDB tables, etc.

In this article I will explore a convenient strategy for testing Airflow DAGs locally using Testcontainers LocalStack to simulate AWS cloud services such as Amazon S3. This strategy can be easily extended to other cloud services or databases.

The source code presented in this article is available at: https://github.com/sebastiandaberdaku/testing-airflow-with-localstack.git.

What is Testcontainers?

Testcontainers is a Python library that provides a friendly API to run Docker containers during automatic tests. Using this library users can spin-up Docker containers running the components required by their software (databases, other microservices, cloud services) during unit and integration tests. When the tests are over, the containers are automatically destroyed, resulting in a clean environment and easily repeatable tests.

What is LocalStack?

LocalStack is an AWS services emulator that runs in a single Docker container. With LocalStack, you can run your AWS applications or Lambdas entirely on your local machine without connecting to a remote cloud provider. A number of AWS services, like AWS Lambda, S3, DynamoDB, Kinesis, SQS, and SNS are currently supported, with basic features testable for free, while more advanced ones require a paid subscription.

Example DAG

Let us introduce a very simple Airflow DAG in order to better illustrate the proposed testing strategy. The DAG is comprised of two tasks: the first one will create an S3 bucket with a given name (if it doesn’t already exist), while the second one will download a CSV file from a public website and save it on said bucket. The full code is provided below:

# dags/example_dag.py
import logging
import requests
from airflow.decorators import dag, task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from pendulum import datetime, duration

logger = logging.getLogger(__name__)

@task
def create_bucket(s3_bucket: str, region: str) -> str:
"""
Creates an S3 bucket if it does not already exist.

:param s3_bucket: The name of the S3 bucket to be created.
:param region: The AWS region in which the S3 bucket should be created.
:return: The name of the S3 bucket.
"""
s3_hook = S3Hook()
logger.info(f"Checking if S3 bucket {s3_bucket} exists...")
if not s3_hook.check_for_bucket(bucket_name=s3_bucket):
logger.info(f"S3 bucket {s3_bucket} does not exist. Creating...")
s3_hook.create_bucket(bucket_name=s3_bucket, region_name=region)
logger.info(f"Created S3 bucket {s3_bucket} in region {region}.")
else:
logger.info(f"S3 bucket {s3_bucket} already exists.")
return s3_bucket

@task
def download_to_s3(url: str, s3_bucket: str) -> str:
"""
Downloads a file from the specified URL and uploads it to an S3 bucket.

:param url: The URL of the file to be downloaded.
:param s3_bucket: S3 bucket where the file will be uploaded.
:return: The S3 URI of the uploaded file.
"""
s3_hook = S3Hook()
filename = url.split("/")[-1]
s3_key = f"extract/{filename}"
with requests.get(url, stream=True) as response:
response.raise_for_status()
response.raw.decode_content = True
s3_hook.load_file_obj(file_obj=response.raw, key=s3_key, bucket_name=s3_bucket, replace=True)
s3_uri = f"s3://{s3_bucket}/{s3_key}"
logger.info(f"File saved at: {s3_uri}")
return s3_uri

@dag(
dag_id="example_dag",
start_date=datetime(2024, 6, 9),
doc_md="""
Example DAG

A very simple pipeline used to extract a CSV file (Rotten Tomato ratings of movies with Robert De Niro) from
a public server and save it on an S3 bucket.

Data source: [here](https://people.sc.fsu.edu/~jburkardt/data/data.html)
""",
schedule=None,
params={
"url": "https://people.sc.fsu.edu/~jburkardt/data/csv/deniro.csv",
"s3_bucket": "test-bucket",
"region": "eu-central-1",
},
default_args={
"retries": 3,
"retry_delay": duration(minutes=5),
}
)
def example_dag():
s3_bucket = create_bucket(s3_bucket="{{ params.s3_bucket }}", region="{{ params.region }}")
csv_s3a_uri = download_to_s3(url="{{ params.url }}", s3_bucket="{{ params.s3_bucket }}")
s3_bucket >> csv_s3a_uri


example_dag()

Airflow tasks interact with external services using Connections and Hooks. Each Connection is identified by a conn_id, and is used to store credentials, endpoint URLs and other information necessary for connecting to said services. Hooks on the other hand provide a high level interface to interacting with the external services. They integrate with Connections to gather connection information, and may have a default conn_id, which in the case of S3Hook (but also for other AWS service Hooks) corresponds to “aws_default”.

Local testing strategy

The main idea behind the proposed local testing strategy is to spin-up a LocalStack Docker container using the Testcontainers Python package, and override the default Airflow Connection to AWS so that it points to the current LocalStack instance instead of the actual AWS cloud.

This can be achieved with the following pytest fixtures:

# conftest.py
import json
from typing import Generator

import pytest
from _pytest.monkeypatch import MonkeyPatch
from airflow.models import Connection
from testcontainers.localstack import LocalStackContainer

@pytest.fixture(scope="session")
def localstack() -> Generator[LocalStackContainer, None, None]:
"""Set up a LocalStack container."""
with LocalStackContainer() as localstack:
yield localstack

@pytest.fixture(scope="session")
def monkeypatch_session()-> Generator[MonkeyPatch, None, None]:
"""Session-scoped monkeypatch fixture."""
mpatch = MonkeyPatch()
yield mpatch
mpatch.undo()

@pytest.fixture(scope="session", autouse=True)
def connection(monkeypatch_session, localstack, conn_id: str = "aws_default") -> None:
"""Session-scoped default AWS connection fixture pointing to LocalStack that is automatically used during tests."""
c = Connection(
conn_id=conn_id,
conn_type="aws",
extra=json.dumps({
"aws_access_key_id": "foo",
"aws_secret_access_key": "bar",
"endpoint_url": localstack.get_url()
})
)
monkeypatch_session.setenv(f"AIRFLOW_CONN_{conn_id.upper()}", c.get_uri())

The first fixture (localstack) will spin up a LocalStack Docker container which will be available during the whole test session duration. I am using session-scoped fixtures because starting-up Docker containers can be time consuming, and could really slow-down tests. This is more of a trade-off between test speed and having a clean AWS environment for each single test.

We can see the LocalStack container being created during the tests, and destroyed immediately after.

The second fixture (monkeypatch_session) is used to provide a session-scoped monkeypatch fixture (which by default is function-scoped only).

The third fixture (connection) is where the magic happens. Here we declare an Airflow Connection that overrides aws_default with the URL to the current LocalStack container. Also, please notice the autouse=true parameter in the @pytest.fixture decorator which will activate the fixture for all the tests that can see it. If you place this fixture in the root conftest.py file, it will be visible to all the test in the project.

Testing the DAG locally

Airflow 2.5.0 introduced the dag.test() method which allows us to run all tasks in a DAG within a single serialized Python process without running the Airflow scheduler. Running DAGs locally with the dag.test() method requires the developers to set up a database backend to manage its metadata for the local runs.

For testing purposes we can use SQLite, which can be initialized with the following commands, which will create the airflow.db file in your /tmp directory:

# 1. Configure the location of the SQLite database
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN="sqlite:////tmp/airflow.db"
# 2. Initialize the db
airflow db migrate

We can now write our test like so:

#tests/test_example_dag.py
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

from dags.example_dag import example_dag


def test_example_dag():
# Run example_dag
example_dag().test()
# Assert that the file was correctly saved on S3
s3_hook = S3Hook()
assert s3_hook.check_for_key("s3://test-bucket/extract/deniro.csv")

By running pytest in our shell from the project root directory we can start the local end-to-end test.

You can also interactively debug your Airflow tasks with your favorite IDE (remember to add the AIRFLOW__DATABASE__SQL_ALCHEMY_CONN environment variable to the run configuration).

Interactive debugging session with PyCharm.

--

--