Dynamic scheduling in Airflow

Ihor Liubymov
2 min readAug 3, 2018

--

Airflow is a great tool with a lot of functionality, I would say it’s overloaded in some cases. But in the same time simplest things for bare celery, in Airflow can be really tricky.

For example, a common case for ETL: we have daily based import task, download a new text file each day. Depends on a day it can contain a different number of rows, sometimes 10 and you can handle all rows at once. Another day, the file contains 1 000 000 rows and if you try to handle all rows one by one, it can take a long time. And it obvious that you want to schedule many tasks with small parts of this rows.

With celery you can do it in few rows of code:

def schedule_tasks(rows, chunk_size=3000):
for i in range(0, len(rows), chunk_size):
chunk_rows = rows[i:i + chunk_size]
handle_rows.delay(chunk_rows)

If you have 1M rows that function will generate 1M/3000 = 334 celery jobs. And it’s easy to scale — add more celery worker

What about airflow? Airflow has TriggerDagRunOperator and it runs only one instance, but we need multiple. Luckily airflow has a clean code base and it pretty easy to read it. TriggerDagRunOperator use DagRunOrder for create new DagRun and we can use same logic.

I created TriggerMultiDagRunOperator, it takes python_callable like TriggerDagRunOperator. But in my case python_callable should return collection of DagRunOrder.

class TriggerMultiDagRunOperator(TriggerDagRunOperator):

@apply_defaults
def __init__(self, op_args=None, op_kwargs=None, *args, **kwargs):
super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
self.op_args = op_args or []
self.op_kwargs = op_kwargs or {}

def execute(self, context):
session = settings.Session()
created = False
for
dro in self.python_callable(context, *self.op_args, **self.op_kwargs):
if not dro or not isinstance(dro, DagRunOrder):
break

if
dro.run_id is None:
dro.run_id = 'trig__' + datetime.utcnow().isoformat()

dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
state=State.RUNNING,
conf=dro.payload,
external_trigger=True
)
created = True
self.log.info("Creating DagRun %s", dr)

if created is True:
session.commit()
else:
self.log.info("No DagRun created")
session.close()

How to use:

def generate_dag_run(context):
rows = read_rows()
results = []
for i in range(0, len(rows), chunk_size):
chunk_rows = rows[i:i + chunk_size]
results.append(DagRunOrder(payload={'rows': chunk_rows}))
return results
gen_target_dag_run = TriggerMultiDagRunOperator(
task_id='gen_target_dag_run',
dag=dag,
trigger_dag_id='example_target_dag',
python_callable=generate_dag_run,
)

This operator schedule dag example_target_dag many times and pass chunk of rows as payload.

And now it’s easy to scale too — add more airflow workers.

You can find all code and more examples here:

https://github.com/mastak/airflow_multi_dagrun

--

--