Smart Sensors: the future of Airflow sensing

Read this before implementing Airflow’s (early beta) Smart Sensors

Andrej Baran
Slido developers blog
8 min readJun 7, 2021

--

The Airflow’s early access Smart Sensor feature was introduced in December 2020. At Slido, we’ve been successfully using it since January 2021. The implementation wasn’t easy and we wanted to give up, but in hindsight, we must admit that going through it was the right thing to do. Dispute introducing tech debt!

Sometimes you win and sometimes you learn

Airflow has a sensors feature. But the feature is not used a lot. The biggest issue with ordinary sensors is the threat of deadlock.

Imagine you have 10 worker slots. If at some point there are 10 sensors waiting for other tasks to finish, they occupy all the worker slots and you’re done. The sensors are waiting for the tasks to finish but there is no slot in which the tasks could run. Deadlock.

Smart sensors attempt to change the status quo. That’s exactly what makes the feature look amazing. But there are a lot of hiccups, you have to deal with:

  • missing documentation
  • unexpected behavior (smart sensor is executed by a special DAG)
  • misleading (context is not what context it used to be)

The only way out will be through reading the source code of the Smart Sensor implementation. But if you make it work, solve all the problems, you’ll find the magic. Colleagues will sing about shard executions, queue bigger than your wife's to-do list with no fear of getting stuck, and a feeling of implementing early access feature into the production. Hell yeah.

Yeah yeah yeah, this feature is still in early access and might change in the future. Based on our experience we hope it does, for the better. This post won’t cover all the problems. Only the ones we encountered during implementation. “Let me lead you out of the truth”. Are you ready?

New sensor, new docker build

In the documentation, you can find the following:

You have to register each new sensor in the Airflow config. Once you change the config you have to distribute the change to the webserver, workers, and schedulers.

Only then Airflow starts to recognize the sensor as smart, otherwise it works as an ordinary sensor. Everything seems to work. If you know in the concept you are not considering the state RUNNING as a problem. But smart sensor has a different workflow. Execution state is not RUNNING but SENSING and there is no warning that a registered smart sensor for some reason is not smart at all.

  • Ordinary sensor workflow: A sensor task is triggered and waits to be executed, in a state QUEUED. Once execution starts it goes into a state RUNNING. The task will occupy a job slot until the job isn’t done.
    Final flow: QUEUED > RUNNING (taking job slot in the parent DAG) >[SUCCESS, FAILED]
  • Smart sensor workflow: A sensor task is triggered and waits to be executed, in a state QUEUED. Once execution starts it goes into a state SENSING. The task is not taking any job slots. It will wait until the smart sensor executor, a different DAG will finish the job.
    Final flow: QUEUED > SENSING (not taking job slot in the parent DAG, executed elsewhere) >[SUCCESS, FAILED]

Imaging, you have to create a new Docker build on every Airflow config change, while the Airflow codebase is not in the same repo as our DAGs codebase so it's very uncomfortable to do changes in both places and orchestrate the deployment process. I personally don’t understand why there is such a need to register the sensor in the Airflow config.

It’s True, but not so True

One registration is not enough, once you registered your sensor as smart in the config, you have to prove it. You do so by overloading the is_smart_sensor_compatiblemethod. In the method, you have to make sure that:

  • soft_fail is False
  • there are no callbacks

Airflow tries to use the current DagRun flow to implement the smart sensor. We already talked about the execution state. But what exactly happens during this state and what are the differences?

  • Ordinary sensor execution: It takes a job slot in parent DAG and starts poke function in a loop. The poke function checks if conditions have been fulfilled, if not, waits and pokes again. During the execution, it’s occupate the job slot and task status is RUNNING.
  • Smart sensor execution: A task goes into a state SENSING. It not taking a job slot, but it will register a new smart job in smart_sensor_group_shard DAG. smart_sensor_group_shard DAG will execute a poke function of the smart sensor, evaluate if conditions have been met. If not, the process fails, the task is not going to wait, but it goes to the state UP_FOR_RESCHEDULE and the job slot is released.

Each poke is a special DagRun in the smart_sensor_group_shard DAG, and if the poke fails, it does not mean the smart sensor fails. In the smart sensor task, they catch the failure and reschedule the task for another poke run. I guess this little hack helped develop the feature into production much faster. I don’t think it was smart at all.

That’s why you can’t have soft_fail and callbacks in a smart sensor, otherwise, it won’t pass the compatibility check. An incompatible smart sensor acts as an ordinary sensor. Without any warning! If you are new to smart sensors, it’s quite hard to recognize this and it’s very easy to get misled.

our implementation to ensure compatibility of smart sensor

Sensing forever

Once you turn on the smart sensors there will be new DAGs with names like smart_sensor_group_shard_{x} which are smart sensor executors. As one executor can handle quite a lot, we are currently running only a single shard. The number of shards can be changed in airflow.cfg. So it’s up to you how many of them you need.

The default setting is 5. Let say you leave it like this. Once you deploy this config, you will see 5 new DAGs with the name smart_sensor_group_shard_{x}. You have to turn them all on, manually!

A newly triggered smart sensor task is distributed into an exact shard. If the shard is off, the task ends up in the SENSING state forever. Again, without any warning, message, or notification.

What you see is not what you get

SENSING and UP_FOR_RESCHEDULE are new Airflow states used only by a smart sensor. You need them to distinguish what’s happening in your DAG.

  • SENSING state is used to signalize you that a task is waiting to be executed or is currently executing. If the smart_sensor_group_shard is missing or not running then there is no executor. Where there is no executor, there is nothing to do, just to get stuck. No, you won’t receive a warning.
  • UP_FOR_RESCHEDULE — In the smart sensor life cycle, poke failure is expected. After failing, the task goes into a queue for rescheduling and determining if it needs another try and “goes into state SENSING“ or “it’s time to end the task”.

These two states describe what’s currently happening in the loop. A smart sensor task will be jumping between these states:

SENSING > (RUNNING)> UP_FOR_RESCHEDULE > SENSING .

Stop for a second and think why RUNNING is in parentheses. If you think, it’s because a smart sensor is registered in a DAG, but the executor of the smart sensor will be a different DAG, you’re right.

The execution of the task will happen in a different DAG. It may sound innocent, but this is the biggest pain and it will haunt you all the time. This decoupling means you have to pass everything into your smart sensor task. The smart sensor task is disconnected from the original DagRun. It can’t call xcom, it doesn’t have basic info such asexecution_date, dag_id or task_id. The only connections between these two DAGs, actually tasks, are poke_context_fields that will be transferred to the poke function as the argument context but without execution_date, start_date, or dag_run which could come from the original DAG.

When you are writing code in IDE, it will suggest you those variable or functions as the smart sensor inherits them from BaseSensorOperator but during code execution, the poke function will run in a different object, SensorInstance. What you see, when you write the code, is the orange part. But what your smart sensor gets, is the green part.

https://airflow.apache.org/docs/apache-airflow/stable/_images/smart_sensor_architecture.png

It takes some time to get oriented in this new concept of a smart sensor. When you write code you have to believe in yourself and know what can be used when — otherwise you risk ending up with weird errors in production.

You can pass a variable to the smart sensor, whose value is known before the DAG is run. But the contents of variables like execution_date, dag_run, or dag_id will be only generated during DagRun initialization.

Not sure if there is a better way of dealing with this, but overriding get_poke_context that is used in register_in_sensor_service works. The get_poke_context function is the only place where we can get the information from the “orange part” and pass it to the “green part” and it works.

Don’t give up

As I said before, we use smart sensors for some time already, and yes, writing the smart sensor code is very tough, testing is uneasy, debugging almost impossible, but that outcome is incredible.

Before the smart sensor era, if we saw a sensor in DAG, it was a red flag followed by a series of questions. Why do you need it? How long does it take to process? How many of them can exist at one time? Won’t they get the whole Airflow scheduling mechanism stuck?

Most of the time, we were looking for different, mostly more complicated, solutions to handle the observation problem without a sensor. Now we use smart sensors, everywhere. You can track third parties, check if their exports finished, you can watch if certain files are on the file system, or you can improve the orchestration of Airflow.

https://pixabay.com/photos/don-t-give-up-motivation-3403779/

Orchestration in Airflow is done by scheduling. If there is a dependency you can trigger DAG from another DAG when conditions happen. But if a DAG is dependent on more than one DAG, you have a problem. There is no pre-built solution and you have to deal with it. Here smart sensors come in handy. Your DAG can start with a dependency check, watch until certain DAGs finish successfully, and only then go further. No need to worry about how many sensors will run like this. They are smart!

There are many jobs that can be brought to a new level by a smart sensor. Don’t give up and try them.

--

--