How to Automate tasks with Airflow, Docker, and Python on your Local Machine
Context
You have some tasks that you need to execute every day, e.g. collect data from a website and store it on a database. You developed a script to automatize it but still need to run it manually every time. In this article, I will show a way to schedule the scripts to run at the time you want on a Local Machine, and the only thing required is that the machine is on at the scheduled time to run the scripts.
To do that, we will use docker, airflow, and python. You already have some familiarity with these tools.
The code snippets featured in this article are extracted from a project where I automated data collection from an e-commerce website.
Let’s dive into the details and empower your workflows with efficient scheduling.
Docker
I assume you already have Docker installed on your computer. However, if you haven’t installed it yet, here is a link for you. For those who are new to Docker, their website is very informative. They have an outstanding get-started section.
Additionally, you will also need docker-compose installed for this. Follow the steps in this link.
Dockerfile
While doing this project, I faced a few problems with the standard Airflow Image, prompting me to build a custom container. You have to create a file named ‘Dockerfile’ with the following content:
#Dockerfile
FROM apache/airflow:2.6.0-python3.9
# Install additional dependencies
USER root
COPY requirements.txt ./requirements.txt
USER airflow
RUN pip install --user --upgrade pip
# Set up additional Python dependencies
RUN pip install --no-cache-dir --user -r ./requirements.txt
To build this container, run the following command in your terminal:
docker build . --tag extending_airflow:latest
Ensure you’re running this command in the same folder of the Dockerfile.
I’ve opted for Airflow version 2.6 and Python 3.9 due to compatibility issues with the SQLAlchemy library with newer Airflow versions. The Dockerfile sets the user to root, allowing us to copy the requirements.txt file from our local machine. It then switches to the airflow user, upgrades pip, and installs the libraries we need using the requirements.txt file.
Compose file
Airflow Environment Variables
version: '3.7'
# ====================================== AIRFLOW ENVIRONMENT VARIABLES =======================================
x-environment: &airflow_environment
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
- AIRFLOW__CORE__LOAD_EXAMPLES=False
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql://airflow:airflow@postgres:5432/airflow
- AIRFLOW__CORE__STORE_DAG_CODE=True
- AIRFLOW__CORE__STORE_SERIALIZED_DAGS=True
- AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True
- AIRFLOW__WEBSERVER__RBAC=False
x-airflow-image: &airflow_image extending_airflow:latest
# ====================================== /AIRFLOW ENVIRONMENT VARIABLES ======================================
We will use a Postgres service as the airflow metadata database, and we need to pass the connection string as an environment variable (airflow utilizes SQLAlchemy to connect to it):
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql://airflow:airflow@postgres:5432/airflow
We need to ensure we’re using the custom image we’ve created with the Dockerfile in the previous step. The following line does that:
x-airflow-image: &airflow_image extending_airflow:latest (replace ‘extending_airflow:latest’ with the tag used to build the container).
Services
Airflow Database
First, we set the Postgres service to be the airflow metadata database running on port 5432.
services:
postgres:
image: postgres:12-alpine
restart: always
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
ports:
- "5432"
Now, we need to initialize the database and create a user to log into the airflow UI.
init:
image: *airflow_image
depends_on:
- postgres
environment: *airflow_environment
entrypoint: /bin/bash
command: -c 'airflow db upgrade && sleep 5 && airflow db init && airflow users create --username your_user --password your_password --firstname name --lastname lastname --role Admin --email your_email'
This service will only start after the Postgres service, which is configured using the `depends_on` option.
The command initializes the database and creates the user. Make sure to put your user information in the code.
Webserver
To access the Airflow UI for managing workflows (DAGs), we need to run the Airflow webserver. It will be running in port 8080. You can access it through your browser at: http://127.0.0.1:8080/ or http://localhost:8080/. Log in using the username and password created in the previous step.
webserver:
image: *airflow_image
restart: always
depends_on:
- postgres
ports:
- "8080:8080"
volumes:
- logs:/opt/airflow/logs
environment: *airflow_environment
command: webserver
We’ve configured volumes to persist the log data generated and used by this service.
Scheduler
The scheduler is necessary to monitor and trigger the airflow dags according to the configured schedule once you activate them. You will need to activate the dag in the airflow UI. We will delve into details of the airflow setup in the next section.
scheduler:
image: *airflow_image
restart: always
user: root
depends_on:
- postgres
- webserver
volumes:
- ./dags:/opt/airflow/dags
- ./configurations.json:/tmp/conf_file/configurations.json
- logs:/opt/airflow/logs
- ./dags/data:/opt/airflow/dags/data
environment: *airflow_environment
command: scheduler
volumes:
logs:
For this service, we define volumes for the dags, the configuration file, the logs, and the data.
Finally, we set the ‘logs’ volume globally, allowing multiple services to reuse it.
Airflow
Dags
The DAG (Directed Acyclic Graph) is responsible for defining the tasks, dictating the order they should run, and outlining task dependencies. The DAG doesn’t care about what is happening inside the tasks.
Here is how our DAG file is configured:
Imports
We import the DAG class from the airflow to define our dags. We will use BashOperator to run the tasks since in this project, the tasks are defined in a Python script file.
# dag.py
import os
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import timedelta, datetime
path = os.environ['AIRFLOW_HOME']
Dag Configuration
Define the dag default arguments using your information. Define the dag, set its id, the start date of the dag, and the interval you want it to run.
#dag.py
default_args = {
'owner': 'your_user',
'depends_on_past': False,
'email': ['your@email.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=1)
}
# Define the DAG, its ID and when should it run.
dag = DAG(
dag_id='business_products_dag',
start_date=datetime(year=2024, month=01, day=27, hour=9),
schedule_interval="30 */12 * * *",
default_args=default_args,
catchup=False
)
The dag will run every day at 9:30 am and 9:30 pm. If you are not familiar with cron schedule expressions, you can use this site to help you set the schedule interval.
Tasks
Define your tasks on the dag file, set the necessary command to run your tasks, and set the dependency between your tasks if necessary.
# Define the task 1 (collect the data) id. Run the bash command because the task is in a .py file.
task1 = BashOperator(
task_id='get_data',
bash_command=f'python {path}/dags/src/get_data.py --num_pages=1',
dag=dag
)
# Define Task 2 (insert the data into the database)
task2 = BashOperator(
task_id='insert_data',
bash_command=f'python {path}/dags/src/insert_data.py'
)
# task2 will only run after task1 is finished.
task1 >> task2
I won’t show the script code because this article is too long already, but you have all you need to run your own scripts with airflow.
Folder Structure
In the Airflow folder, you will need a ‘dags’ directory to house your DAG file or files. Include a ‘src’ folder with the Python task files and a ‘data’ folder where we save the data before sending it to the database.
Final Steps
After you set everything right, the folders, your scripts, the dag, the docker-compose.yaml, and Dockerfile. You’ll only need two lines of code to run airflow:
- First, build the custom image, as mentioned earlier.
docker build . --tag extending_airflow:latest
2. Now, execute the following code for the docker-compose file:
docker compose up -d
or
docker-compose up -d
This starts all the containers defined in the compose file, ensuring they are properly linked and networked. The ‘-d’ flag stands for ‘detached’ mode, which means the containers run in the background, eliminating the need to rerun it after restarting your machine.
For both commands, ensure you are in the folder where the files are located.
Now, open your browser and access the airflow UI to manage your DAGs.
That is how it’s going to look after a few days running.
Full Code
Dockerfile
#Dockerfile
FROM apache/airflow:2.6.0-python3.9
# Install additional dependencies
USER root
COPY requirements.txt ./requirements.txt
USER airflow
RUN pip install --user --upgrade pip
# Set up additional Python dependencies
RUN pip install --no-cache-dir --user -r ./requirements.txt
Compose file
#docker-compose.yaml
version: '3.7'
# ====================================== AIRFLOW ENVIRONMENT VARIABLES =======================================
x-environment: &airflow_environment
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
- AIRFLOW__CORE__LOAD_EXAMPLES=False
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql://airflow:airflow@postgres:5432/airflow
- AIRFLOW__CORE__STORE_DAG_CODE=True
- AIRFLOW__CORE__STORE_SERIALIZED_DAGS=True
- AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True
- AIRFLOW__WEBSERVER__RBAC=False
x-airflow-image: &airflow_image extending_airflow:latest
# ====================================== /AIRFLOW ENVIRONMENT VARIABLES ======================================
services:
postgres:
image: postgres:12-alpine
restart: always
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
ports:
- "5432"
init:
image: *airflow_image
depends_on:
- postgres
environment: *airflow_environment
entrypoint: /bin/bash
command: -c 'airflow db upgrade && sleep 5 && airflow db init && airflow users create --username breno --password fate123 --firstname breno --lastname teixeira --role Admin --email brentchav@gmail.com'
webserver:
image: *airflow_image
restart: always
depends_on:
- postgres
ports:
- "8080:8080"
volumes:
- logs:/opt/airflow/logs
environment: *airflow_environment
command: webserver
scheduler:
image: *airflow_image
restart: always
user: root
depends_on:
- postgres
- webserver
volumes:
- ./dags:/opt/airflow/dags
- ./configurations.json:/tmp/conf_file/configurations.json
- logs:/opt/airflow/logs
- ./dags/data:/opt/airflow/dags/data
environment: *airflow_environment
command: scheduler
volumes:
logs:
Dags
# dag.py
from airflow import DAG
from airflow.operators.bash import BashOperator
import os
path = os.environ['AIRFLOW_HOME']
from datetime import timedelta, datetime
default_args = {
'owner': 'your_user',
'depends_on_past': False,
'email': ['your@email.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=1)
}
# Define the DAG, its ID and when should it run.
dag = DAG(
dag_id='products_dag',
start_date=datetime(year=2023, month=12, day=11, hour=10),
schedule_interval="30 */12 * * *",
default_args=default_args,
catchup=False
)
# Define the task 1 (collect the data) id. Run the bash command because the task is in a .py file.
task1 = BashOperator(
task_id='get_data',
bash_command=f'python {path}/dags/src/get_data.py --num_pages=1',
dag=dag
)
# Define Task 2 (insert the data into the database)
task2 = BashOperator(
task_id='insert_data',
bash_command=f'python {path}/dags/src/insert_data.py'
)
task1 >> task2
Conclusion
In this article, we explored a comprehensive example of leveraging the power of Docker, Apache Airflow, and Python to automate tasks. I hope this was helpful to you. If you have any additions, corrections, or suggestions, please feel free to leave a comment.