Airflow DAG cross dependency, custom sensing DAG execution.

Alan Vainsencher
Plarium-engineering
6 min readMar 20, 2022

--

As shared in my previous post, at my company we have most of our services on Google Cloud Platform. As part of the data engineering department, we of course use Airflow. More specifically, we use the GCP-hosted version called cloud composer.

As we, data engineers, develop and deploy our DAGs, the data scientists need a way to know when they can execute their DAGs. Their DAGs are dependent on our DAGs. In this article, I will focus on sensing other DAG execution status. In other words, we needed a way for a DAG B to know what is the execution status of DAG A.

When I was assigned this task I did what all of us would have done: google it. For such a common scenario, there must already be an implemented proved solution. And in fact, there was. There is a native airflow operator called ExternalTaskSensor. The problem with it is that, honestly, sometimes I could make it work but sometimes I couldn’t. But most importantly - it was not flexible enough for our needs.

We, data engineers, have DAGs that run multiple times a day. And data scientists needed their DAGs to check whether the first, second, last or specific execution of the day was executed successfully or failed. In addition to that, in their DAGs, they needed to take a different flow according to the state of our DAG execution. So we implemented a custom sensor operator and here is the code with the explanations.

The airflow/cloud composer versions that the code was tested are the following ones:

Yes, old versions. I haven’t tried the code in the newer versions, but it should work, maybe with some changes.

First, we have our main_dag(DAG A), the one that is running and the other dag is dependent on it (DAG B). Main_dag does not depend on any other dag, and it is the one that is watched on to check whether it has finished or not.
This example code of the dag will always succeed because there is only few echos and sleep, here you would need to replace the code of your real dag that is watched on.

#main_dag.py DAG A
from datetime import datetime, timedelta
from airflow import DAG
from airflow.utils.dates import days_ago

# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0
}
dag = DAG(
dag_id='primary_dag',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(minutes=2),
start_date=days_ago(1),
tags=['example'],
catchup=False,
max_active_runs=1
)


t1 = BashOperator(
task_id='before_sleep',
depends_on_past=False,
bash_command='echo "Running before sleep"',
retries=3,
dag=dag
)

t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
dag=dag
)

t3 = BashOperator(
task_id='after_sleep',
depends_on_past=False,
bash_command='echo "Its running after sleep"',
retries=3,
dag=dag
)

t1 >> t2 >> t3

Then we have the Dag or Dags that watch the main_dag and check whether it finished.
It would look like this. If the sensing successes then take one path, if fails take the other.

from airflow import DAG
from dagstatussensor import DagStatusSensor

from airflow.utils.dates import days_ago
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator


default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0
}
dag = DAG(
dag_id='external_sensor_testing',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval="*/2 * * * *",
start_date=days_ago(1),
tags=['example'],
catchup=False,
max_active_runs=1
)

def what_status(**kwargs):
return kwargs['ti'].xcom_pull(dag_id="external_sensor_testing", key='status')

t1 = DagStatusSensor(
task_id='task_sensor',
status_to_check ='success',
dag_name="primary_dag",
dag=dag,
run_to_check=0,
timeout=40,
do_xcom_push=True,
poke_interval=20,
soft_fail=False
)

t2 = BranchPythonOperator(
task_id='branching',
python_callable=what_status,
provide_context=True,
trigger_rule='all_done',
dag=dag)

op = DummyOperator(task_id='successful', dag=dag,trigger_rule='all_done')
op2 = DummyOperator(task_id='failure', dag=dag,trigger_rule='all_done')

t1 >> t2
t2 >> op
t2 >> op2

The watcher dag it starts with a DagStatusSensor. Hey, but what is that?
It is a custom implementation of a sensor that basically is the implementation that pokes the execution of any other dag. And the code is here.

from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from datetime import datetime
from typing import List, Optional, Union
from airflow.models.dagrun import DagRun
from airflow.utils import timezone
from airflow.utils.db import provide_session
from sqlalchemy.orm.session import Session
from time import sleep
from airflow.exceptions import AirflowSensorTimeout, AirflowSkipException


"""
This sensor is for checking the status of dag. If its success then only it will return true or mark
as completed so next task will get execute.
In case dag is failed it will raise an airflow exception as failed.
params:
dag_name: pass the dag_id for your dag
status_to_check: pass 'success' if you want sensor to return true for this.
for e.g.
dag_status_sensor = DagStatusSensor(dag_name='test_dag',status_to_check='success',task_id='dag_status_sensor',poke_interval=30,timeout=1800,dag=dag)
"""

class DagStatusSensor(BaseSensorOperator):

@apply_defaults
def __init__(self,dag_name,status_to_check, run_to_check=0,*args,**kwargs):
self.dag_name = dag_name
self.status_to_check = status_to_check
self.run_to_check = run_to_check

super(DagStatusSensor,self).__init__(*args,**kwargs)

def poke (self,context):

task_instance = context['task_instance']
start = timezone.make_aware(datetime.now().replace(hour=0,minute=0,second=0, microsecond=0))
end = timezone.make_aware(datetime.now().replace(hour=23,minute=59,second=59, microsecond=0))
daggies = MyDagRun.find(dag_id=self.dag_name, execution_start_date=start, execution_end_date=end)

if(daggies):
length = len(daggies)
if(length>0):
if daggies[self.run_to_check].state == self.status_to_check:
task_instance.xcom_push("status", "successful")
return True
elif daggies[self.run_to_check].state == "failed":
task_instance.xcom_push("status", "failure")
return False
else:#if state is running or else. After timeout will raise exception
task_instance.xcom_push("status", "failure")
return False
else:#if no runs of sensed dag, after timeout will raise exception and fail
task_instance.xcom_push("status", "failure")
return False
else:
task_instance.xcom_push("status", "failure")
return False

def execute(self, context):
started_at = timezone.utcnow()
while not self.poke(context):
if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
# If sensor is in soft fail mode but will be retried then
# give it a chance and fail with timeout.
# This gives the ability to set up non-blocking AND soft-fail sensors.
if self.soft_fail and not context['ti'].is_eligible_to_retry():
raise AirflowSkipException('Snap. Time is OUT.')
else:
raise AirflowSensorTimeout('Snap. Time is OUT.')
sleep(self.poke_interval)
self.log.info("Success criteria met. Exiting.")


class MyDagRun(DagRun):

@staticmethod
@provide_session
def find(
dag_id: Optional[Union[str, List[str]]] = None,
run_id: Optional[str] = None,
execution_date: Optional[datetime] = None,
state: Optional[str] = None,
external_trigger: Optional[bool] = None,
no_backfills: bool = False,
session: Session = None,
execution_start_date: Optional[datetime] = None,
execution_end_date: Optional[datetime] = None,
) -> List[DagRun]:

DR = MyDagRun

qry = session.query(DR)
dag_ids = [dag_id] if isinstance(dag_id, str) else dag_id
if dag_ids:
qry = qry.filter(DR.dag_id.in_(dag_ids))
if run_id:
qry = qry.filter(DR.run_id == run_id)
if execution_date:
if isinstance(execution_date, list):
qry = qry.filter(DR.execution_date.in_(execution_date))
else:
qry = qry.filter(DR.execution_date == execution_date)
if execution_start_date and execution_end_date:
qry = qry.filter(DR.execution_date.between(execution_start_date, execution_end_date))
elif execution_start_date:
qry = qry.filter(DR.execution_date >= execution_start_date)
elif execution_end_date:
qry = qry.filter(DR.execution_date <= execution_end_date)
if state:
qry = qry.filter(DR.state == state)
if external_trigger is not None:
qry = qry.filter(DR.external_trigger == external_trigger)
if no_backfills:
# in order to prevent a circular dependency
from airflow.jobs import BackfillJob
qry = qry.filter(DR.run_id.notlike(BackfillJob.ID_PREFIX + '%'))

return qry.order_by(DR.execution_date).all()

A few comments to clarify what is going on here:

  • status_to_check is the status of the main_dag that you want to check. Usually, you’d want to check if it succeeded but sometimes you’d want to know if it failed, or know other statuses. So here you define it.
  • run_to_check is the run of the day that you want to check its status. It could be the first run of the day, second…or last. 0 is as the first, 1 as second and -1 as last. As any python list.
  • For an explanation of what MyDagRun is please check this stackoverflow question and why is it needed.

And where to put all these files?

Each airflow instance has a bucket in Google Cloud storage.
The dags go in the dag folder, and the dagstatussensor.py goes in the plugins folder.

Entire project on github for better readability.

--

--