A quick look into Airflow success callback functions

Marin Aglić
6 min readMar 25, 2023

--

In this post we will take a look at the on success callback functions for the DAG and an operator.

Different outputs for the DAG on success callback function

Introduction

I was working on an example in Airflow using Dynamic Task Mapping and sending HTTP requests to some API service. I was using XCom to pass the data between some operators. However, the operators that sent the HTTP requests produced too much XCom data for my liking. So, I wanted to be able to clean them up after the DAG or operators finish. The approach I chose is to use the on_success_callback functions at both the operator and DAG level. I chose this approach because:

  1. I wanted to keep the DAG as simple as possible;
  2. If I wanted to do this for multiple DAGs using an operator, I’d need to add the operator to all of the DAGs (which is cumbersome);
  3. The on_success_callback function for my use case could be generalized enough to be made easily reusable for multiple DAGs.

However, while testing the example I came across something interesting that I wanted to share. So, let’s take a simple DAG as an example to demonstrate what happens.

I’m using Airflow 2.5.2.

The DAG

Let’s assume that we have this simple DAG:

with DAG(
dag_id="callback_test",
start_date=pendulum.now().subtract(hours=5),
schedule="0 * * * *",
description="This DAG demonstrates callback functions",
on_success_callback=on_success_callback,
tags=["airflow2.5", "callback test"],
):
gen = PythonOperator(task_id="generate_data", python_callable=get_data)

test = PythonOperator.partial(
task_id="test",
python_callable=lambda x: print(f"TEST PYTHON OPERATOR: {x}"),
on_success_callback=on_success_callback,
).expand(op_kwargs=gen.output)

empty = EmptyOperator(task_id="empty")

test >> empty

Here we start with a simple task called generate_data that will return some fixed array of data. Here is the function:

def get_data():
return [
{"x": 1},
{"x": 2},
{"x": 3},
{"x": 4},
{"x": 5},
]

The next operator is called test and it’s mapped over the results of the previous operator. The callable function will simply print some message along with the value the mapped task instance obtained.

Finally, we have the empty operator with task id empty.

Now, both the DAG and the test operator define an on_success_callback function. They both use the same function. Here is the definition:

@provide_session
def on_success_callback(context, session=None):
task_instance = context["ti"]

print("------------")
print("ON_SUCCESS_CALLBACK CALLED")
print(task_instance.task_id)
print(task_instance.map_index)
print("------------")

You can see that the callback function is pretty straightforward. It simply prints the task instance id and map index.

I added @provide_session here because in my use-case I wanted to cleanup some XCom values. So let’s just keep it, despite not using it in this example.

Now, when I enable the DAG, it should immediately run 4 times.

4 runs of the DAG are automatically started

The DAG runs and logs

If we go the webserver UI and take a look at the output of some of the map operators, we will see the following:

[2023-03-25, 19:23:55 CET] {logging_mixin.py:137} INFO - TEST PYTHON OPERATOR: 2
[2023-03-25, 19:23:55 CET] {python.py:177} INFO - Done. Returned value was: None
[2023-03-25, 19:23:55 CET] {taskinstance.py:1321} INFO - Marking task as SUCCESS. dag_id=callback_test, task_id=test, map_index=1, execution_date=20230325T170000, start_date=20230325T182354, end_date=20230325T182355
[2023-03-25, 19:23:55 CET] {logging_mixin.py:137} INFO - ------------
[2023-03-25, 19:23:55 CET] {logging_mixin.py:137} INFO - ON_SUCCESS_CALLBACK CALLED
[2023-03-25, 19:23:55 CET] {logging_mixin.py:137} INFO - test
[2023-03-25, 19:23:55 CET] {logging_mixin.py:137} INFO - 1
[2023-03-25, 19:23:55 CET] {logging_mixin.py:137} INFO - ------------
[2023-03-25, 19:23:55 CET] {local_task_job.py:212} INFO - Task exited with return code 0

From the log we can see that the python callable printed “TEST PYTHON OPERATOR: 2”, and the on success callback function was called with printing the task id: 2, and map index: 1

The output of the success callback, when called on the DAG, is visible in the scheduler logs. So, I enter the scheduler docker container (I’m running airflow on docker) and navigate to the following path:

/opt/airflow/logs/scheduler/latest/<dag_folder>

My DAGs are located under thedags/collect_weather_data/ directory and the DAG file name is callback_test.py. So my full path is /opt/airflow/logs/scheduler/latest/collect_weather_data and the log file I’m interested is: callback_test.py.log.

Looking at the contents of this log file, I noticed the following:

Image of a part of callback_test.py.log

Notice that “ON_SUCCESS_CALLBACK CALLED” indicates that the callback function was called (3 calls are present, but all 4 occured). But also notice that the task id is test, while the map index varies by dag run.

I’ve triggered an additional DAG run manually after this, and here are the relevant logs:

[2023-03-25T18:32:50.649+0000] {logging_mixin.py:137} INFO - [2023-03-25T18:32:50.647+0000] {dag.py:1317} INFO - Executing dag callback function: <function on_success_callback at 0x7f1495ea5990>
[2023-03-25T18:32:50.690+0000] {logging_mixin.py:137} INFO - ------------
[2023-03-25T18:32:50.691+0000] {logging_mixin.py:137} INFO - ON_SUCCESS_CALLBACK CALLED
[2023-03-25T18:32:50.691+0000] {logging_mixin.py:137} INFO - test
[2023-03-25T18:32:50.691+0000] {logging_mixin.py:137} INFO - 4
[2023-03-25T18:32:50.691+0000] {logging_mixin.py:137} INFO - ------------
[2023-03-25T18:32:50.698+0000] {logging_mixin.py:137} INFO - [2023-03-25T18:32:50.698+0000] {dag.py:2693} INFO - Sync 1 DAGs

Again, we have the task instance test and this time the map index is 4.

I ran the DAG manually a couple of more times. Here are the logs:

[2023-03-25T18:34:07.035+0000] {logging_mixin.py:137} INFO - [2023-03-25T18:34:07.032+0000] {dag.py:1317} INFO - Executing dag callback function: <function on_success_callback at 0x7f1495ecdbd0>
[2023-03-25T18:34:07.137+0000] {logging_mixin.py:137} INFO - ------------
[2023-03-25T18:34:07.137+0000] {logging_mixin.py:137} INFO - ON_SUCCESS_CALLBACK CALLED
[2023-03-25T18:34:07.137+0000] {logging_mixin.py:137} INFO - test
[2023-03-25T18:34:07.138+0000] {logging_mixin.py:137} INFO - 4
[2023-03-25T18:34:07.138+0000] {logging_mixin.py:137} INFO - ------------
[2023-03-25T18:34:07.190+0000] {logging_mixin.py:137} INFO - [2023-03-25T18:34:07.190+0000] {dag.py:2693} INFO - Sync 1 DAGs
[2023-03-25T18:34:07.345+0000] {logging_mixin.py:137} INFO - [2023-03-25T18:34:07.345+0000] {dag.py:3440} INFO - Setting next_dagrun for callback_test to 2023-03-25T18:00:00+00:00, run_after=2023-03-25T19:00:00+00:00
[2023-03-25T18:34:07.420+0000] {processor.py:175} INFO - Processing /opt/airflow/dags/collect-weather-data/callback_test.py took 0.685 seconds
[2023-03-25T18:34:10.194+0000] {processor.py:153} INFO - Started process (PID=1117) to work on /opt/airflow/dags/collect-weather-data/callback_test.py
[2023-03-25T18:34:10.195+0000] {processor.py:743} INFO - Processing file /opt/airflow/dags/collect-weather-data/callback_test.py for tasks to queue
[2023-03-25T18:34:10.197+0000] {logging_mixin.py:137} INFO - [2023-03-25T18:34:10.196+0000] {dagbag.py:532} INFO - Filling up the DagBag from /opt/airflow/dags/collect-weather-data/callback_test.py
[2023-03-25T18:34:10.289+0000] {processor.py:753} INFO - DAG(s) dict_keys(['callback_test']) retrieved from /opt/airflow/dags/collect-weather-data/callback_test.py
[2023-03-25T18:34:10.408+0000] {logging_mixin.py:137} INFO - [2023-03-25T18:34:10.393+0000] {dag.py:1317} INFO - Executing dag callback function: <function on_success_callback at 0x7f1495ea5c60>
[2023-03-25T18:34:10.499+0000] {logging_mixin.py:137} INFO - ------------
[2023-03-25T18:34:10.499+0000] {logging_mixin.py:137} INFO - ON_SUCCESS_CALLBACK CALLED
[2023-03-25T18:34:10.499+0000] {logging_mixin.py:137} INFO - empty
[2023-03-25T18:34:10.499+0000] {logging_mixin.py:137} INFO - -1
[2023-03-25T18:34:10.499+0000] {logging_mixin.py:137} INFO - ------------
[2023-03-25T18:34:10.521+0000] {logging_mixin.py:137} INFO - [2023-03-25T18:34:10.521+0000] {dag.py:2693} INFO - Sync 1 DAGs
[2023-03-25T18:34:10.568+0000] {logging_mixin.py:137} INFO - [2023-03-25T18:34:10.568+0000] {dag.py:3440} INFO - Setting next_dagrun for callback_test to 2023-03-25T18:00:00+00:00, run_after=2023-03-25T19:00:00+00:00
[2023-03-25T18:34:10.642+0000] {processor.py:175} INFO - Processing /opt/airflow/dags/collect-weather-data/callback_test.py took 0.455 seconds
[2023-03-25T18:34:11.849+0000] {processor.py:153} INFO - Started process (PID=1136) to work on /opt/airflow/dags/collect-weather-data/callback_test.py
[2023-03-25T18:34:11.850+0000] {processor.py:743} INFO - Processing file /opt/airflow/dags/collect-weather-data/callback_test.py for tasks to queue
[2023-03-25T18:34:11.851+0000] {logging_mixin.py:137} INFO - [2023-03-25T18:34:11.851+0000] {dagbag.py:532} INFO - Filling up the DagBag from /opt/airflow/dags/collect-weather-data/callback_test.py
[2023-03-25T18:34:11.885+0000] {processor.py:753} INFO - DAG(s) dict_keys(['callback_test']) retrieved from /opt/airflow/dags/collect-weather-data/callback_test.py
[2023-03-25T18:34:11.923+0000] {logging_mixin.py:137} INFO - [2023-03-25T18:34:11.921+0000] {dag.py:1317} INFO - Executing dag callback function: <function on_success_callback at 0x7f1495ea5c60>
[2023-03-25T18:34:11.963+0000] {logging_mixin.py:137} INFO - ------------
[2023-03-25T18:34:11.963+0000] {logging_mixin.py:137} INFO - ON_SUCCESS_CALLBACK CALLED
[2023-03-25T18:34:11.963+0000] {logging_mixin.py:137} INFO - test
[2023-03-25T18:34:11.963+0000] {logging_mixin.py:137} INFO - 4
[2023-03-25T18:34:11.963+0000] {logging_mixin.py:137} INFO - ------------
[2023-03-25T18:34:11.971+0000] {logging_mixin.py:137} INFO - [2023-03-25T18:34:11.971+0000] {dag.py:2693} INFO - Sync 1 DAGs
[2023-03-25T18:34:12.002+0000] {logging_mixin.py:137} INFO - [2023-03-25T18:34:12.002+0000] {dag.py:3440} INFO - Setting next_dagrun for callback_test to 2023-03-25T18:00:00+00:00, run_after=2023-03-25T19:00:00+00:00

If you look at the logs carefully, you’ll notice that at one of the runs the task instance was empty. This shows an inconsistency of how the task instance and map index are determined for the DAG on success callback function.

Conclusion

I have no idea why the described behaviour occurs. But it would make more sense that the DAG callback function obtains the last task instance in the DAG. However, both the task instance and map index appear to be random.

Whether the behaviour is a design decision or a bug, I don’t know, and I’m not going to get into that. The question remains — how is the task instance determined for this DAG callback function?

However, it leads me to conclude that you should not use the same callback function for your DAG and operator if what you’re trying to do depends on any (mapped) task instance property.

In my case, I wanted to do something like this:

delete_q = delete(XCom).filter(
and_(
XCom.dag_id == dag_id,
XCom.run_id == run_id,
XCom.task_id == task_id,
XCom.map_index == map_index,
)
)


session.execute(delete_q)

Edit: for the story, I used Airflow 2.5.2.

--

--

Marin Aglić

Working as a Software Engineer. Interested in Data Engineering. Mostly working with airflow, python, celery, bigquery. Ex PhD student.