Airflow: Tips, Tricks, and Pitfalls

Airflow, the workflow scheduler we use, recently hit version 1.6.1, and introduced a revamp of its scheduling engine. We like it because the code is easy to read, easy to fix, and the maintainer, Maxime Beauchemin, is very responsive. We also like that it’s all code, rather than using config files like xml to describe the dags. Nevertheless, there are a few things that are less than obvious that we wish we had known.

Make tasks idempotent

In a distributed environment, things are going to fail. Airflow accommodates this by automatically retrying tasks. So far, so good.

The problem arises when you have a series of tasks and you want to reset to a state where it makes sense to retry them.

One strategy is to have a subdag. The subdag task will have its own retry, so if you set the tasks in the subdag to never retry, and rely on the subdag operator you can have the whole dag succeed or fail together. This works well if the reset is the first task in the dag; it’s attractive if there’s a relatively complex dependency structure within the subdag.

Note that SubdagOperator tasks won’t mark failure properly unless you’re using the latest version from GitHub.

Another strategy is to use the retry handler:

Make tasks idempotent with retry handler

This works great to make a particular task re-tryable.

Generate your tasks programmatically

One of the most powerful features of a system where workflows are described in code is that you can programmatically generate your dag. This is very, very useful where you want to automatically pick up new data sources without manual intervention.

We have our log ingestion dag examine HDFS for the log directories which exist, and generate one task per directory to ingest the data within it. Example code below.

Daily tasks run when the day is over

Not at the start of the day.

You can’t put subdags in the dags folder

Or to be more accurate: you can’t put a subdag in its own module in the dags folder unless you protect it with some kind of factory. Or to be even more accurate: you can, but then the subdag will be run on its own schedule, as well as by the subdag operator in the main dag.

Here’s an example of two dags (assume they’re both in the dags folder, called bad_dags) where the subdag will be scheduled by the scheduler independently of the main dag:

How not to do it unless you really want the subdag to be scheduled by the scheduler

We solve this by using a factory function. This has the advantage that the main dag can pass in the necessary parameters to the subdag, so their schedules, and any other parameters align automatically. No need to hunt down parameters when the schdedule of your main dag changes. In the below example, assume the dags are in a package called good_dags:

Use a factory to make subdags more maintainable and keep the scheduler from running them

Another pattern is to stuff everything that needs to be shared between main dag and subdag into the default_args, and pass that down into the factory function. (Thanks to Maxime for that suggestion).

Subdags must have a schedule and be enabled

Even though subdags are triggered as part of a larger dag, if their schedule is set to None or ‘@once’, the subdag operator will succeed without doing anything.

Even worse, if your subdag is disabled, then the subdag operator will run, but never complete, as your subdag will never run. This will quickly lead to your main dag having the maximum number of active dag runs active (by default 16 as at time of writing), and in consequence no more runs will be scheduled.

In both cases, the cause is that the subdag operator is implemented as a BackfillJob. See here calling this.

DagRuns: the missing piece

The single biggest change in Airflow 1.6 is the introduction of the DagRun. Now, scheduling of task instances is triggered by the creation of a DagRun object.

Accordingly, if you want to trigger a run of all of a dag, instead of running a backfill, you are likely better off creating a DagRun. You can do that either with the `airflow trigger_dag` command, or through the webserver’s DagRun page.

The big advantage of this is that it makes the scheduler’s behaviour easier to understand, as it scans for DagRuns to create, then schedules TaskInstances based on which DagRuns are active. The webserver can now show us the state of individual DagRuns, and the state of the task instances associated with it.

This creates one new pitfall — namely that Dags have a maximum number of active DagRuns, and if your DagRuns don’t reach completion for whatever reason, the scheduler will stop running your Dag.

How DagRuns are scheduled

The new model also creates a new opportunity to control the scheduler. The next DagRun to be scheduled is calculated based on the last scheduled DagRun in existence in the database. Apart from the case where the maximum is reached it does not matter whether the most recent instance is in a running or final state.

This means that if you want to rerun several DagRuns, you can delete all the DagRuns between the present, and the last run you want to re-run. The scheduler will then rerun them all, triggering them in order. Whether it waits for one to complete before the next depends on your settings for depends_on_past. You will also need to delete the corresponding taskinstances for those dagruns.

It also means that if you want to rerun a DagRun which is not part of the contiguous set between the present and a past time, you can simply delete the taskinstances for that DagRun, and set that DagRun to a running state.

The scheduler should be restarted frequently

In our experience, a long running scheduler process, at least with the CeleryExecutor, ends up not scheduling some tasks. We still don’t know the exact cause, unfortunately.

Fortunately, airflow has a built-in workaround in the form of the — num_runs flag. It specifies a number of iterations for the scheduler to run of its loop before it quits. We’re running it with 10 iterations, Airbnb runs it with 5. Note that this will cause problems when using the LocalExecutor.

We currently use chef to restart the executor for us; we’re planning on moving to supervisor to keep it up constantly.

Operators (dis)appear based on dependencies

The airflow.operators package has a little of magic to only expose operators which imported correctly. This means that if you don’t have the necessary dependencies installed, it will look like your operators are missing.

That’s all folks! (For now)

Airflow is under active development, not least by us. There’ll be more improvements, and I’ll post about more tricks as we learn them.

Interested in solving problems like this, while working with cool people in an office equally close to an amazing doughnut shop and a very rad gym? Check out our careers page to see if there’s an opening for you!