Idempotent Data Pipeline with dbt and Airflow

Abubakar Alaro
Geek Culture
Published in
3 min readFeb 15, 2023
Apache Airflow & dbt

Building a robust data pipeline that is predictable in terms of the data it produces is the goal of every data engineer. Tools like dbt and Apache Airflow allows an engineer to do this effectively and efficiently. In this article, I will describe what an idempotent model looks like in dbt and how it can be achieved with Airflow.

I discussed Idempotent data pipeline and why it important in this article

Outline

  1. Introduction to dbt and Airflow
  2. Writing Idempotent dbt models
  3. Conclusion

Introduction to dbt and Airflow

data build tools commonly called dbt, a data transformation tool that enables data analysts and engineers to transform, test and document data in the cloud data warehouse.

Apache Airflow is an orchestration tool used for creating batch-oriented data pipelines. It uses Directed Acyclic Graph (DAG) to create data processing networks and It gives you the ability to determine when and how your pipelines are executed.

There are a lot of free resources that explain in detail what these tools are and how to use them, I will provide some links in the conclusion part of the article.

Writing Idempotent dbt models

Let me paint a scenario where this will be important. Let's say we have a dbt task that needs to load data for the current day at which it running and we schedule this task on airflow. The dbt model looks like this:

SELECT
sale_date,
country,
sum(amount) as total_sales_per_country
FROM orders
WHERE sale_date = CURRENT_DATE() - 1
{{ dbt_utils.group_by(2) }}

Every time the model runs it gets the sales data that happened the day before. This is fine and works well in an ideal world where pipelines don’t fail for any reason but we don’t have that. I mean it will be very nice and cool if all data pipelines run without any errors and get the correct data. In situations where jobs did not run because of one error or another and we have to re-run the job the following day, the model will not report the correct numbers as we expected because we have somehow hardcoded the filter value for the data.

In other to make this model idempotent and give the right data every time it runs, we need to use the vars argument in dbt and airflow.

dbt provides a mechanism, and variables, to provide data to models for compilation. In airflow, variables are Airflow’s runtime configuration concept — a general key/value store that is global and can be queried from your tasks, and easily set in the UI or in the dag file. To achieve idempotency for this model, a variable needs to be defined in the dbt_project.yml file. In this example, I will define a variable called: run_date_country_report and pass a default value to it. It looks like this in my dbt_project.yml file

name: idempotent_dbt_model
version: 1.0.0

config-version: 2

vars:
# The `run_date` variable will be accessible in all resources
run_date_country_report: '2023-01-01'

and the model will be updated as well

SELECT
sale_date,
country,
sum(amount) as total_sales_per_country
FROM orders
WHERE sale_date = '{{ var("run_date_country_report") }}'::DATE
{{ dbt_utils.group_by(2) }}

All I need to do is to pass this variable to the task on Airflow. The dag will look like this

from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
"owner": "Abubakar",
"depends_on_past": False,
"start_date": "2022-01-01",
"email": ["owner@me.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"catchup": False,
"retry_delay": timedelta(minutes=30),
}

dag = DAG(
"sales_country_report",
default_args=default_args,
schedule_interval="0 1 * * *",
catchup=False,
max_active_runs=1,
)

model_name = "sales_country_report"

print_hello = BashOperator(task_id='print_hello', bash_command="""echo 'hello world!' """)
dbt_run = BashOperator(dag=dag, task_id=f"{model_name}_run", bash_command=f"""cd $DBT_PROFILES_DIR && dbt run --models {model_name} --vars '{{"run_date_country_report": "{ds}"}}'""")

print_world >> dbt_run

This will ensure that every time the model runs, we get the correct data irrespective of any delay.

Conclusion

dbt and Ariflow are 2 important tools in any data tech stack today. Knowing how to integrate both is a good skill every data engineer should know. In this article, I have shown a simple example of how both tools can be used together to build idempotent data pipelines. Learn more about dbt and airflow

References

--

--