How to make a Dynamic DAG and how to use a Parameters file

Oscar García
4 min readMar 8, 2022

--

Apache Airflow Logo

I know that after you read Getting started with Apache Airflow and Understanding better the DAGs (And Operators) concepts you thought that it would be awesome if you could simplify your code to only construct “one” operator that can receive different parameters in every run to do different things. Well… good news! You can do it.

In this article we will learn the logic behind constructing a dynamic DAG and the possible use cases of them.

Use case

Imagine that you to need to send different commands to the operative system with a Bash Operator; in a normal case, we would create one Bash Operator with every command we need. We would end up with something like this:

start = DummyOperator(
task_id=’start_wf’
)

end= DummyOperator(
task_id=’end_wf’
)

task1 = BashOperator(
task_id=’task1’,
bash_command=’<command 1>’

task2 = BashOperator(
task_id=’task2’,
bash_command=’<command 2>’

task3 = BashOperator(
task_id=’task3’,
bash_command=’<command 3>’

start>>task1>>task2>>task3>>end

As we can see, we are “repeating” the construction of the Bash Operators and the DAG could become huge if we have a lot of commands to execute. Also, if we need to change one or more commands or if we need to add others, we would have to go directly to our code, modify it, add tasks, change the flow segment code and upload the DAG again.

Solution

Look at the following DAG code, which we will analyze later:

First, we have our classic modules/libraries import section. We have an additional library called pandas that will allow us to handle the file content (this is an optional library; if you are more comfortable with another library or function, you can use it).

Then, in line 7, we read our file called “dynamic_dag_tutorial.conf” that has a column header: command and 4 records of respective commands.

command
echo 1
echo 2
echo 3
echo 4

Inside of the DAG construction, we have two dummy operators that will act as start and end of the workflow, respectively.

And below them is where magic happens. As we can see, we are now using a for loop to create as many operators (and not creating one for every command) as commands we have in our configuration (parameters) file. The command will be the value in the command column, for every iteration (index) in our file.

Our DAG will look like this:

Finally, we also need to construct our DAG flow dynamically. The logic in last section is:

  1. If we only have one task (command) to execute, we can only link the start with the command_0 task and then with the end.
  2. If we have two tasks or more (commands) to execute, for the first iteration we need to link the start with the first task and the first task with the second task.
  3. Then, for the following indexes (except for the last one) we need to link the actual task to the next task in line. If we only have two tasks, this condition is never fulfilled.
  4. Finally, for the last index, we need to link the last task to the end (else condition).

If we execute our DAG, we will see that we got the expected output for every task.

command_0 task output.
command_3 task output

With all this, if we need to change the command of one of the tasks or add more commands, we only need to change the configuration file content and the DAG will be built dynamically with all the required tasks.

Change for dynamic_dag_tutorial.conf file
DAG graphical view with changes in configuration file

Conclusion

As we could see throughout the article, it’s possible to construct dynamically tasks for our DAGs.

With this, we can simplify our code by reducing the written lines, and also we make our code being capable of handling possible future changes. As we could see, we only need to construct our DAG once and then, we only need to make changes in our configuration file.

We used a very simple example with a Bash Operator but you can come up new solutions, using other operators: Python Operator, Snowflake Operator, Big Query Operators, MySQL Operators, S3 Operators, etc.

This is the solution I came to with my experience, but feel free to find improvements in your own solutions. I will be happy to read your comments and come up with ever simpler solutions.

Until next code!

Os😉

--

--

Oscar García

Senior Data Engineer with experience in Business Intelligence/Data Vault/Data Warehousing/GCP/AWS/Airflow/Python