Creating Dynamic Workflows in Airflow

Agung Santoso
2 min readApr 21, 2017

I have a problem with how to create a workflow where it is impossible to know the number of task B’s that will be needed to calculate Task C until Task A has been completed. Each Task B.* will take several hours to compute and cannot be combined.

              |---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
| .... |
|---> Task B.N --|

There is a solution that I don’t like because I have to create a blocking ExternalTaskSensor and all the Task B.* will take between 2–24 hours to complete. So I do not consider this a viable solution.

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C
Dag 2 (Dynamically created DAG though python_callable in TriggerDagRunOperator) |-- Task B.1 --|
|-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
| .... |
|-- Task B.N --|

Surely there is an easier way, using the SubDag operator.

I would use a Xcoms x from Task A -> Task B that creates x copies of the task to run in B in a loop.

Something like:

# inside a PythonOperator called 'pushing_task'
def push_function():
return value
# inside another PythonOperator where provide_context=True
def pull_function(**context):
value = context['task_instance'].xcom_pull(task_ids='pushing_task')
push_task = PythonOperator(
task_id='pushing_task',
provide_context=True,
python_callable=pushing_task,
trigger_rule="all_done",
dag=dag)

Then for the Subdag B, something more like:

# Dag is returned by a factory method
def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval, value):
dag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
schedule_interval=schedule_interval,
start_date=start_date,
)
for x in xrange(value):
bash_cmd = "echo B{}".format(x)
task_operator = BashOperator(
bash_command=bash_cmd,
task_id='B*{}'.format(x),
dag=dag,
)
return dag

Then call this method from the main dag

value = context['task_instance'].xcom_pull(task_ids='pushing_task')
#Get the xcoms value from task A then call sub_dag with the additional argument 'value'
sub_dag = SubDagOperator(
subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, main_dag.start_date,
main_dag.schedule_interval, value),
task_id=CHILD_DAG_NAME,
dag=main_dag,
)

Note that the sub_dag method returns a DAG and not a task. Therefore, main tasks can be created in a loop, and set_upstream is not required as the entire DAG is only created after Task A has run. However, if tasks in B are dependent on each other, they can be set with B1.set_upstream(B2) legacy or the new B1 << B2.

--

--