Utilizing DockerOperator in Airflow to Run Containerized Applications in Data Engineer Projects

Rio Dwi Putra Perkasa
Apache Airflow
Published in
12 min readApr 2, 2024

Background

In the current era of microservices, container usage in application development is widespread. There are many advantages to running applications within containers, one being application portability. By utilizing containers, developers can run applications easily on different operating systems and hardware. This applies to both local and production environments. The work of data engineers also benefits from containerization: when a data engineer develops an ETL locally, they can use containers to ensure smooth development and deployment of the ETL between local and production environments.

Airflow can be used to support containerized development. Airflow is a tool used for scheduling and managing tasks, so it plays a crucial role in the development of ETL from raw data to dashboards. Although, within Airflow, there are several operators that can be used to execute tasks, not all of them are capable of running containers. One that is capable is the DockerOperator.

So, in this article, I want to do PoC on how we can use DockerOperator to run a containerized application in airflow. For this PoC, I will create the pipeline to demonstrate the process. Also, this can be an alternative to running containerized applications in airflow besides KubernetesPodOperator.

Project Explanation

The data engineering project used as an example in this article is an ETL project that transforms raw data into usable data for dashboards. This project will be built using open-source applications, allowing readers to try it out on their local machines.

As depicted in the diagram above, each component in this project is an application container. These applications are then executed based on a Directed Acyclic Graph (DAG) created within Airflow using DockerOperator. The data used in this article is scraped data from a real estate sales website in Indonesia.

  1. The scraped data from the real estate sales website will be stored in MongoDB.
  2. Python scripts will be executed through Airflow to extract data from MongoDB.
  3. The extracted data will be stored in ClickHouse.
  4. Airflow will then run DBT to perform data transformations.
  5. The transformed data will be stored back into ClickHouse.

MongoDB

Serves as the storage repository for the scraped data. All scraped data will be stored here in document form. The data in MongoDB still contains many duplicates.

Python Script

A script designed to extract data from MongoDB and load it into ClickHouse.

ClickHouse

ClickHouse is an open-source database focused on analytics activities. Here, it will function as a data warehouse. Below is a simple representation of the data warehouse that will be created:

DBT

A tool used for data transformation within ClickHouse. It transforms raw data from MongoDB into usable data in ClickHouse.

Airflow

Airflow is the main component for running containers and scheduling jobs in the specified order.

Project Setup

Below are the steps needed to run the project on your local machine:

Install Docker

Docker is an application that needs to be downloaded to run containers on your local computer. Installation instructions can be found at the following link: https://docs.docker.com/engine/install/

Set Up Databases

As explained above, two databases are required for this project: MongoDB and ClickHouse. Both databases will be created as containers using pre-existing images available on DockerHub.

MongoDB: docker pull mongo

ClickHouse: docker pull clickhouse/clickhouse-server

After successfully downloading the images, we will create a Docker Compose file to facilitate setting up the containers using these images.

In this Docker Compose file, we will create a network to facilitate communication between containers. There are various drivers that can be selected for the network.

networks:
data-eng-network:
name: data-eng-network
driver: bridge

Then define the service for MongoDB as follows.

services:
mongodb:
image: mongo:7
ports:
- "27017:27017"
networks:
- data-eng-network
volumes:
- mongodb-data:/data/db
environment:
- MONGO_INITDB_ROOT_USERNAME=<username>
- MONGO_INITDB_ROOT_PASSWORD=<password>

Make sure the MongoDB container uses the previously created network. The volume on MongoDB is used to store data persistently, so the data will not be lost when the container is deleted. The environment contains the root user credentials for accessing MongoDB. Add volume at the volume section.

volumes:
# Mongodb volume
mongodb-data:
driver: local

After adding the volume, we can try running the MongoDB service with the command:

`docker-compose up -d`

Wait for a moment, and MongoDB will be ready for use. You can access MongoDB in two ways: through the Mongo shell (mongosh) and using applications like MongoDB Compass to check data in the GUI.

Checking the application in mongosh:

  1. Use `docker container ls` to check the name of the MongoDB container.
  2. Run `docker exec -it <container_name> mongosh` to enter the mongosh.

Checking the application in MongoDB Compass:

  1. Enter the following URL to connect to the container: mongodb://user:password@localhost:27017

Next, define the service for ClickHouse.

clickhousedb:
image: clickhouse/clickhouse-server:24.1
networks:
- data-eng-network
ports:
- "8123:8123"
- "9000:9000"
volumes:
- clickhouse-data:/var/lib/clickhouse
- clickhouse-logs:/var/log/clickhouse-server
- clickhouse-config:/etc/clickhouse-server/config.d/*.xml
- clickhouse-user:/etc/clickhouse-server/users.d/*.xml
environment:
- CLICKHOUSE_DB=default_lake
- CLICKHOUSE_USER=<username>
- CLICKHOUSE_PASSWORD=<password>
- CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1

According to the installation guide for ClickHouse on Docker Hub, the ClickHouse service needs to have volumes and environment variables added. The required volume for ClickHouse should be registered in the volume section.

# Clickhouse volumes
clickhouse-data:
driver: local
clickhouse-logs:
driver: local
clickhouse-config:
driver: local
clickhouse-user:
driver: local

Perform a restart for the services already running in Docker. Run:

`docker-compose down`

Then:

`docker-compose up -d`

Now the ClickHouse server should be running in the container. To execute ClickHouse, you can use the ClickHouse client:

`docker exec -it <container_name> clickhouse-client`

Write a Python Script

Create a Python script to extract data from MongoDB. Before doing so, the extracted data should have the following schema:

id String,
price Decimal(10,2),
bedroom Int8,
bathroom Int8,
area Int8,
district String,
city String,
releaseDate Date,

From the schema, we will create a simple script to extract the data and save it to ClickHouse.

The libraries needed for this script are:

  • pymongo = Establish connection to MongoDB.
  • clickhouse_driver = Establish connection to ClickHouse.
  • datetime = Date formatting.
  • os = for environment management.

Add all the required libraries to the requirements.txt file, then run:

`pip install -r requirements.txt`

to install all the necessary libraries.

Establish connections to MongoDB and ClickHouse.

# Connect to MongoDB
mongo_client = MongoClient('mongodb://' + os.environ['MONGO_USER'] + ':' + os.environ['MONGO_PASSWORD'] + '@' + os.environ['MONGO_HOST'] + ':27017/')
mongo_db = mongo_client[<database_name>]
mongo_collection = mongo_db[<collection_name>]
# Connect to ClickHouse
ch_client = Client(host=os.environ['CLICKHOUSE_HOST'], user=os.environ['CLICKHOUSE_USER'], password=os.environ['CLICKHOUSE_PASSWORD'], database='de_project')

It’s important to note that I’m using os.environ to fetch credentials. To run the script locally, make sure to add the required environment variables first.

Then, to insert data into ClickHouse, we need to create the table beforehand.

# Create a table in ClickHouse
ch_client.execute('''
CREATE TABLE IF NOT EXISTS houses_raw (
id String,
price Decimal(10,2),
bedroom Int8,
bathroom Int8,
area Int32,
district String,
city String,
release_date Date
) ENGINE = MergeTree()
ORDER BY id
''')

The table in ClickHouse will use an overwrite system, so each time the job runs, it will overwrite the old table. To achieve this, we can add a command to drop the table if it already exists before the INSERT command.

# Delete the table in ClickHouse
ch_client.execute('''
DROP TABLE IF EXISTS houses_raw
''')

After that, we can perform extraction from MongoDB and then load the data into ClickHouse.

# Extract data from MongoDB
mongo_data = mongo_collection.find()
# Load data into ClickHouse
for document in mongo_data:
release_date = datetime.strptime(document['releaseDate'], '%Y/%m/%d')
ch_client.execute('INSERT INTO houses_raw VALUES', [(
str(document['id']),
document['price'],
document['bedroom'],
document['bathroom'],
document['area'],
document['district'],
document['city'],
release_date,
)])

To run the script, add a .env file in the same folder level as the script, containing the required variables. These variables will need to be manually added when creating tasks in Airflow.

# MongoDB environment variables
MONGO_USER=<username>
MONGO_PASSWORD=<password>
MONGO_HOST=localhost
# ClickHouse environment variables
CLICKHOUSE_USER=<username>
CLICKHOUSE_PASSWORD=<password>
CLICKHOUSE_HOST=localhost

Now, let’s create the Dockerfile:

# Use an official Python runtime as a parent image
FROM python:3.10-slim-bullseye
# Set the working directory in the container to /app
WORKDIR /app
# Add current directory files to /app in container
ADD . /app
# Install any needed packages specified in requirements.txt
RUN pip install - no-cache-dir -r requirements.txt
# Run the script when the container launches
CMD ["python", "extract_mongo_clickhouse.py"]

Now, build the image to be run in Airflow using the command:

`docker build -t your_image_name:tag .`

Replace your_image_name with the desired name for your Docker image and tag with the version or tag you want to assign to the image. Make sure to run this command in the directory where your Dockerfile and other necessary files are located.

Install and Initialize DBT

DBT (Data Build Tool) is a tool used for data transformation according to the defined models. The first step is to install DBT and the driver or database to be used. Since ClickHouse is being used here, dbt-clickhouse needs to be installed first.

Install dbt-clickhouse using the following command:

`pip install dbt-clickhouse`

Once the installation is complete, initialize the DBT project using the command:

`dbt init`

There will be several prompts that need to be filled out. If the ClickHouse driver is already installed, it will appear as an option for the database selection.

After initialization, there will be several folders:

- models/
- data/
- analysis/
- macros/
- dbt_project.yml
- profiles.yml

A brief explanation of each folder:

- `models/`: This folder contains the data model definitions for transformation in SQL or YAML format.

- `data/`: This folder is used to store raw data or data sources to be used in transformation.

- `analysis/`: This folder contains SQL code used for analyzing transformed data.

- `macros/`: This folder contains SQL code that can be used as macros in models.

- `dbt_project.yml`: The DBT project configuration file containing project settings, such as target database settings and working directory.

- `profiles.yml`: The DBT profile configuration file containing database connection information, such as host, username, password, and port.

We will focus on the models folder, as this folder contains models for performing transformations. But the first thing to do after setting up DBT is to configure the profile. A profile contains information on how we can connect to the database. Create a file named profiles.yml and enter information about ClickHouse.

<your_project_name>:
target: dev
outputs:
dev:
type: clickhouse
schema: <schema_name>
host: "{{ env_var('HOST_CLICKHOUSE') }}"
port: 8123
user: <username>
password: <password>
secure: False

In the host section, we will fill in a variable that can be filled with variables from the Docker container.

In DBT, we will perform two simple levels of transformation. This can be done by adding a model to the models folder. We will try to use the term “data warehouse” to facilitate the illustration of the project. Raw data will be transformed into a fact table. The fact table will be transformed into a mart table.

Transformation into the fact table is quite simple because the data from MongoDB is quite structured and clean. So, in this transformation we will clean the data from duplicate entries.

{{ config(order_by='(updated_at, id)', engine='MergeTree()', materialized='table') }}
SELECT
DISTINCT *,
release_date AS updated_at
FROM {{ source('lake', 'houses_raw') }}

After that, transformation from the fact table to the mart table will be performed.

The purpose of the first mart table is to determine the number of houses listed in each area.

{{ config(engine='MergeTree()', materialized='table') }}
SELECT
district,
city,
COUNT(id) AS sum_house_sell
FROM {{ ref('fact_house') }}
GROUP BY district, city

The second mart is used to determine the average price of houses with the same specifications.

{{ config(engine='MergeTree()', materialized='table') }}
SELECT
district,
city,
CASE
WHEN area < 50 THEN '< 50'
WHEN area BETWEEN 50 AND 100 THEN '50–100'
WHEN area BETWEEN 100 AND 200 THEN '100–200'
WHEN area BETWEEN 200 AND 300 THEN '200–300'
ELSE '> 300'
END AS area_group,
AVG(price) AS avg_house_price,
MIN(price) AS min_house_price,
MAX(price) AS max_house_price
FROM {{ ref('fact_house') }}
GROUP BY 1,2,3
ORDER BY 2,1,3

Next, we will create a Dockerfile to enable the DBT application to run in a container.

FROM python:3.10-slim-bullseye
RUN apt-get update && apt-get install -y curl
RUN pip install - upgrade pip
COPY . .
RUN pip install -r requirements.txt
ENTRYPOINT ["dbt"]

Since dbt-clickhouse does not have an official image from DBT yet, we need to create a custom image. As seen above, the image will be built from the Python image, and then dbt-clickhouse will be installed via requirements.txt.

Create a file named requirements.txt.

dbt-clickhouse==1.7.3

Once the requirements.txt file is created, build the image for DBT:

`docker build -t <your_image_name>:<tag> .`

Install and Set up Airflow

In this project, I’m using Astronomer’s Astro CLI to install Airflow. This is to simplify the installation and setup process. Install the Astro CLI using the following documentation: Astro CLI installation guide.

Initialize the project using the command:

`astro dev init`

This command will create several folders. We will focus on the `dags` folder.

To be able to run the Docker operator, we need to add the Docker provider in the `requirements.txt` file:

apache-airflow-providers-docker==3.9.2

This provider package will be included at runtime.

We will create a DAG with the following diagram:

Because the Docker operator requires access to the Docker socket to function, we need to add a volume containing the Docker socket from the host to the scheduler container. To override the configuration from Astronomer, we need to create a file named docker-compose.override.yml.

version: '3.1'
services:
scheduler:
user: root
volumes:
- /var/run/docker.sock:/var/run/docker.sock:rw

Further information about the parameters available in the DockerOperator can be found in the registry.

Below is the definition of the DAG.

@dag(start_date=datetime(2022, 8, 1), schedule=None, catchup=False)
def airflow_docker_operator():
extract_mongo_task = DockerOperator(
task_id='extract_mongo',
image=<python_script_image_name>,
command=["python", "extract_mongo_clickhouse.py"],
container_name='extract-mongo',
docker_url='unix://var/run/docker.sock',
network_mode='data-eng-network',
mount_tmp_dir=False,
auto_remove='success',
environment={
'CLICKHOUSE_HOST': 'clickhousedb',
'CLICKHOUSE_USER': <username>,
'CLICKHOUSE_PASSWORD': <password>,
'MONGO_HOST': 'mongodb',
'MONGO_USER': <username>,
'MONGO_PASSWORD': <password>
}
)

fact_house_task = DockerOperator(
task_id='fact_house',
image=<dbt_image_name>,
command=["run", " - models", "fact_house"],
container_name='fact-house',
docker_url='unix://var/run/docker.sock',
network_mode='data-eng-network',
mount_tmp_dir=False,
auto_remove='success',
environment={
'HOST_CLICKHOUSE': 'clickhousedb'
}
)

mart_region_house_sell_task = DockerOperator(
task_id='mart_region_house_sell',
image=<dbt_image_name>,
command=["run", " - models", "mart_region_house_sell"],
container_name='mart-region-house-sell',
docker_url='unix://var/run/docker.sock',
network_mode='data-eng-network',
mount_tmp_dir=False,
auto_remove='success',
environment={
'HOST_CLICKHOUSE': 'clickhousedb'
}
)

mart_spec_house_price_task = DockerOperator(
task_id='mart_spec_house_price',
image=<dbt_image_name>,
command=["run", " - models", "mart_spec_house_price"],
container_name='mart-region-house-sell',
docker_url='unix://var/run/docker.sock',
network_mode='data-eng-network',
mount_tmp_dir=False,
auto_remove='success',
environment={
'HOST_CLICKHOUSE': 'clickhousedb'
}
)

extract_mongo_task >> fact_house_task >> mart_region_house_sell_task
fact_house_task >> mart_spec_house_price_task

airflow_docker_operator()

The host in the environment is assigned according to the name of the DB service set earlier. Ensure that the task_id and container_name are unique for each task. In each task, auto_remove is added to remove the container after it finishes running, which can help save storage memory. mount_tmp_dir is set to false so that the temporary directory is not mounted into the container. Also, assign an image parameter with the image name you already created before.

After execution, the DAG will look like this:

If we check in ClickHouse, the created tables will also appear.

The biggest advantage of using the DockerOperator is that no matter how complex the application is, as long as it can run in a container, it can be executed in Airflow. Additionally, we no longer need to worry about dependencies for running each application, as they are already isolated in containers.

Areas for Improvement

This project serves as a proof of concept for using the DockerOperator in Airflow to run containerized applications. However, there are several areas for improvement:

  1. Better management of environment variables:
    Environment variables in the Docker operator need to be hidden because they are used in task definitions. This can be addressed by modifying the application to retrieve credentials or secrets from a secret manager. This way, no sensitive information will be leaked within tasks.
  2. Running containers in the cloud:
    This project is currently 100% localhost-friendly. It could be beneficial to adapt the project for cloud deployment to better reflect real-world scenarios. We can use VM instances to run Docker and artifact registries to store images. Another option is to use the KubernetesPodOperator.
  3. Project expansion:
    We can expand this project from data extraction to dashboard creation. Currently, only the data extraction part has been implemented, where data is collected in MongoDB. The data stored in ClickHouse has not been visualized yet.
  4. Adding quality checks:
    DBT offers many plugins for data quality checks. Additionally, we can develop our own applications for data quality checks, such as Great Expectations or Soda, to be containerized applications. These applications can then be executed once the data is available.

Try it at Home

If you want to try this project, you can clone the following repository: https://github.com/riodpp/de-project-airflow.

There is also a dump data for mongodb to try the project. Thank you!

Conclusion

The DockerOperator is a highly useful feature within the Airflow framework, especially in Data Engineer projects. It allows users to run applications packaged within Docker containers as part of scheduled workflows triggered by Airflow. This provides the flexibility and isolation necessary to handle various tasks, including data processing, analysis, and other tasks within a controlled environment.

By leveraging the DockerOperator, a data engineer can easily set up and execute applications packaged within Docker containers as part of Airflow workflows. This enables the composition of complex workflows with separate tasks in isolated containers, facilitating application management and scalability.

However, currently, there is still limited documentation on the Docker Operator, and there are few resources available to address some of the errors encountered, so further learning about Docker itself is needed.

Reference

--

--