Use apache airflow to run task exactly once

Apache airflow is a great tool to manage recurring tasks, which should run e.g. every day. Usually, the advice is to make the tasks idempotent. This allows to rerun jobs again or after they failed and will still produce the same outcome.

Example DAG were tasks which have been run successfully before are skipped

Run tasks successfully exactly once

But sometimes, it is required to run a script exactly once successfully. An example is, where e.g. a file or database is manipulated. One could now implement the task in an idempotent way, which would require some re-engineering and is usually the best solution. Or, for “one time” scripts, you could ether

  • run the scripts once, e.g. from bash, and rerun failed parts
  • change the script, so it can check itself if it already run successfully (e.g. create a special file or check the database itself, which might not be possible anymore at a later point in time)
  • use the stored result in airflow of the last run to check if it was already run once successfully

We chose the last option with airflow because it already stores the TaskInstance result in the database and we do not have to build our own solution from scratch.
But Airflow does not provide the capability to run a task exactly once, only a complete DAG. 
Why is this not enough? Because if one task of the DAG will fail, the only way to run the remaining task is to rerun the complete DAG or manually run these examples (At least I am not aware of any other method).

Implement a RunOnceBranchOperator

In order to implement this functionality, it was required to find a way to skip exactly one task. Similar discussions, how to achieve this, are on Airflow Gitter Channel, and about the required trigger rule at Stackoverflow. The solution used here is to combine the “PythonBranchOperator” with the trigger rule “one_success”, as shown in an airflow example DAG.

The implemented “RunOnceBranchOperator” will run the “run_once_task_id” exactly once successfully, independent of the “execution_date”. To summarize, the task is only run if:
The task was never successfully completed by this DAG before (before in the sense, not in the airflow results database, the execution date could also be later as it is completely ignored)

How can it be used?

The Operator can be used to skip the task, if it was run successfully before:

Complete DAG Example

Result of first DAG run, where one bash task failed

A more complete example, which produced the graph above is was run using “airflow example_runtaskonce backfill -s 2017–01–01 -e 2017–01–01”. The code to generate the task is shown below.
The task failure was forced on purpose, to show the result of a rerun.

Rerun DAG will only run not successful tasks again

Result of second DAG run, where task “read_folders_in_folder-0” is skipped,because is was already successfully run before.

The DAG can now be rerun, e.g. with trigger_dag or backfill or any other way. To produce the graph, we used “airflow example_runtaskonce backfill -s 2017–01–02 -e 2017–01–02”. It will always check if a successful run of the task exists, and if not run it. Otherwise it will be skipped, as in this case the task “read_folders_in_folder_0”.

Conclusion & Feedback

In summary, airflow can be used to store the state of certain tasks. If it is reasonable for you use-case, you have to decide yourself.

I am happy about your thoughts, comments and critics. Please let me know what you think!