Building a Modern Data Pipeline Part 4: DAG Walk-thru

Andy Sawyer
4 min readFeb 25, 2024

--

AI Generated

This is the fourth part of a six-part series titled ‘Building a Modern Data Pipeline: A Journey from API to Insight’ related to this GitHub repo. It steps through setting up a Data Pipeline and running the pipeline end-to-end on your local machine.

Part 1 was a high level overview, while part 2 stepped through how to download and run the pipeline, and part 3 looks at the configuration of Docker. This post looks at the Python file used by Airflow to orchestrate our data pipeline.

The File

You’ll find the actual file in the dags folder, named data-pipeline-demo.py. The file contents is provided below:

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator

from datetime import datetime
# bronze data
from pipelines.rates_pipeline.get_rates_to_bronze import main as get_rates_to_bronze
from pipelines.currency_pipeline.get_currencies_to_bronze import main as get_currencies_to_bronze
# silver data
from pipelines.rates_pipeline.transform_rates_to_silver import main as transform_rates_to_silver
from pipelines.currency_pipeline.transform_currencies_to_silver import main as transform_currencies_to_silver
# gold data
from pipelines.rates_pipeline.present_rates_in_gold import main as present_rates_in_gold
from pipelines.currency_pipeline.present_currencies_in_gold import main as present_currencies_in_gold
from pipelines.generic_pipeline.present_dates_in_gold import main as present_dates_in_gold

with DAG(
dag_id='data-pipeline-demo',
start_date=datetime(2024, 2, 18),
schedule_interval=None
) as dag:

start_task = EmptyOperator(
task_id='start'
)
get_rates_to_bronze = PythonOperator(
task_id='get_rates_to_bronze',
python_callable=get_rates_to_bronze
)
...

present_dates_in_gold = PythonOperator(
task_id='present_dates_in_gold',
python_callable=present_dates_in_gold
)

end_task = EmptyOperator(
task_id='end'
)

start_task >> [get_rates_to_bronze >> transform_rates_to_silver >> present_rates_in_gold,
get_currencies_to_bronze >> transform_currencies_to_silver >> present_currencies_in_gold] >> present_dates_in_gold >> end_task

Let’s walk through the above step-by-step.

Import the Packages

First off, you can’t do too much in Python without first importing internal and external libraries. This is what we do at the very top of the file.

We bring in the packages needed to run Airflow, followed by the datetime library, followed by each of our process steps that will be triggered in the pipeline. Each of the pipeline functions has been imported relative to the root of the repository. The datetime library is needed as Airflow allows us to schedule our DAG to run on a schedule. I’ve not set this up in the demonstration, as I’m assuming this isn’t something that you’re going to be leaving running in the background.

Define the DAG

Starting at line 20, we define our DAG. Here, we first give it an id, and can optionally specify a schedule. We then define each step that needs to be run within the DAG.

An example of a step that will be run can be seen in the code block below:

get_rates_to_bronze = PythonOperator(
task_id='get_rates_to_bronze',
python_callable=get_rates_to_bronze
)

What’s going on here?

We’re defining a variable called get_rates_to_bronze, and then setting that as a PythonOperator. That is an operator that is native to Airflow, and lets Airflow know we want to run Python code. There are many types of operators, including those for Python, Bash, dbt etc.

Next, we define the task id. The task id is what you will see in the front end of the Airflow application. Finally, we identify the Python package that we want to run. In this case, it’s the get_rates_to_bronze package that we imported on line 8 of the file.

You can have as many of these tasks as you like within your DAG. Next we need to tell Airflow how to run them.

Setting the Order

Airflow DAGs can be easy or complex. This is a relatively simple DAG. But not as simple as just running one task then the next then the next.

Let’s look at the code for running the DAG:

start_task >> [get_rates_to_bronze >> transform_rates_to_silver >> present_rates_in_gold,
get_currencies_to_bronze >> transform_currencies_to_silver >> present_currencies_in_gold] >> present_dates_in_gold >> end_task

We start with the start_task which is just an empty placeholder task. Then we use the >> to tell Airflow to go to the next step.

We’ve now used square brackets. Within the square brackets, there are two processes, separated by a comma. Each has three steps: bronze, silver and gold steps. The two processes can run in parallel, but the tasks within each step will run in series.

Finally, once everything inside the square brackets has run successfully, the last two steps will run in series. The first is to update the date dimension, followed by and empty task that identifies the end of the DAG.

From within the UI we can see a DAG graph that shows how it will end up triggering:

DAG Graph

Next Steps

That’s all for this post. The next post will be coming shortly, and is going to get into the guts of the Python code that is the actual pipeline. Stay tuned, and please feel free to share your thoughts. Your feedback and questions are highly welcome. Follow me for updates on this series and more insights into the world of data engineering.

--

--

Andy Sawyer

Bringing software engineering best practices and a product driven mindset to the world of data. Find me at https://www.linkedin.com/in/andrewdsawyer/