Airflow 2.0: DAG Authoring Redesigned

Screenshot from awesome Airflow website

Airflow 2.0 is a big thing as it implements many new features. Like the high available scheduler or overall improvements in scheduling performance, some of them are real deal-breakers. But apart from deep, core-related features, Airflow 2.0 comes with new ways of defining Airflow DAGs. Let’s take a look at what’s been improved!

TaskFlow API (AIP-31)

The first significant change is the introduction of TaskFlow API. This new thing consists of three features:

  • XComArg — a layer over already existing XCom which simplifies accessing and passing information between tasks,
  • @task decorator which makes using PythonOperator smooth and easy,
  • Pluggable XCom backends which can be used to simplify persisting intermediate data between tasks.

Let’s take a closer look at each of those!

XComArg (#8652)

Before Airflow 2.0 to define a relationship between two tasks, users had to define:

  • Order relationship — by setting downstreams or upstreams between task
  • Data relationship — by specifying what data from one task should be used in another task. This was commonly done by using jinja templates.

If you’ve been using Airflow long enough, then you may have experienced that writing jinja templates by hand can be error-prone. And remembering about two relationships sounds like something unnecessary. Indeed, it’s unnecessary! If there’s a data relationship between tasks, then there also has to be an order relationship.

Understanding this, in each operator, we implemented the output attribute that returns an instance of XComArg. It’s an easy way to access the reference to XCom returned by a task and use it in other tasks. For example:

from airflow.operators.python import PythonOperator
from airflow.models import DAG
from airflow.utils.dates import days_ago
with DAG(
"xcom_args_are_awesome",
default_args={'owner': 'airflow'},
start_date=days_ago(2),
schedule_interval=None,
) as dag:
numbers = PythonOperator(
task_id="numbers",
python_callable=lambda: list(range(10))
)
show = PythonOperator(
task_id="show",
python_callable=lambda x: print(x),
op_args=(numbers.output,)
)

In the above example, you can see that the output data from task numbers is explicitly passed to show task as one of op_args. No more jinja templates! And what is more, Airflow automatically resolves the relation between the task and knows that task showhas to be downstream of task numbers. You can see it in the Airflow web interface:

@task decorator (#8962)

The next thing that makes up the TaskFlow API is the @task decorator. This simple decorator allows users to convert any Python function into a task instance using PythonOperator! Here is how the previous example can be simplified using this approach:

from airflow.operators.python import task
from airflow.models import DAG
from airflow.utils.dates import days_ago
@task
def numbers():
return list(range(10))
@task
def show(xs):
print(xs)
with DAG(
"tasks_are_awesome",
default_args={'owner': 'airflow'},
start_date=days_ago(2),
schedule_interval=None,
) as dag:
xs = numbers()
show(xs)

As you can see, the code is cleaner, has less boilerplate code, and is much more reusable. What is more, the value returned by invoking a decorated task is an XComArg. It means that it can be passed to either a normal operator or to another task-decorated function like in the example above. The ids of tasks are generated using a function name. This allows users to invoke a function multiple times to create many tasks of the same type. For example, we can invoke show multiple times in the Airflow DAG:

with DAG(
"tasks_are_awesome",
default_args={'owner': 'airflow'},
start_date=days_ago(2),
schedule_interval=None,
) as dag:
xs = numbers()
show(xs)
show(xs)
show(xs)
show(xs)

This will create 4 show tasks, each with a unique task_id:

To learn more about the task decorator check latest TaskFlow API tutorial.

Custom XCom backends (#8560)

The last thing of the TaskFlow API is a pluggable XCom backend. It’s less complicated than it may sound. Once we have @task decorator and XComArg we may want to start passing real data (like dataframes) between tasks. However, as you probably know, there are many limitations around what can be passed via the XCom table. The data has to be small and json-serializable (in Airflow 1.10, you could use pickle, but this has been removed in Airflow 2.0).

The recommended approach to pass big data from one task to another is to persist it somewhere in external storage, for example, GCS or S3 buckets. However, this means that each task has to implement the download and upload logic that will be used at the beginning and at the end of a task.

And that’s where a custom XCom backend brings an amazing alternative! It allows users to define a class that will be used to serialize the task’s output and deserialize the task’s input. Consider the following example:

import pandas as pd
from typing import Any
from airflow.models.xcom import BaseXCom
from airflow.providers.google.cloud.hooks.gcs import GCSHook
class GCSXComBackend(BaseXCom):
PREFIX = "xcom_gs://"
BUCKET_NAME = "airflow-xcom-backend"
@staticmethod
def serialize_value(value: Any):
if isinstance(value, pd.DataFrame):
hook = GCSHook()
object_name = "data_" + str(uuid.uuid4())
with hook.provide_file_and_upload(
bucket_name=GCSXComBackend.BUCKET_NAME,
object_name=object_name,
) as f:
value.to_csv(f.name)
# Append prefix to persist information that the file
# has to be downloaded from GCS
value = GCSXComBackend.PREFIX + object_name
return BaseXCom.serialize_value(value)
@staticmethod
def deserialize_value(result) -> Any:
result = BaseXCom.deserialize_value(result)
if isinstance(result, str) and result.startswith(GCSXComBackend.PREFIX):
object_name = result.replace(GCSXComBackend.PREFIX, "")
with GCSHook().provide_file(
bucket_name=GCSXComBackend.BUCKET_NAME,
object_name=object_name,
) as f:
f.flush()
result = pd.read_csv(f.name)
return result

This simple class implements two methods: serialize which is invoked every time a value is returned (or xcom pushed) in operators, deserialize which is invoked when resolving template fields before execution.

As you can see in this example, we do a special step if operators return a pandas DataFrame object. If such an object results from a task, we will upload it to a GCS bucket as we cannot store it in the XCom table (dataframes are too big!). We also override the XCom key of this object to save information about its location on external storage. This information is then referenced in the deserialize method to figure out if we need to download an object from external storage or not. This means that you can write DAGs like the following, without caring about how and where the data is persisted between tasks:

import pandas as pd
from airflow.operators.python import task
from airflow.models import DAG
@task
def generate_data() -> pd.DataFrame:
data = {"a": [1, 2, 3], "b": [12, 13, 14]}
return pd.DataFrame(data)
@task
def process_data(df: pd.DataFrame):
df["a"] = df["a"] + 2
print(df.head())
with DAG(
"using_gcs_for_xcom",
default_args={'owner': 'airflow'},
start_date=days_ago(2),
schedule_interval=None,
) as dag:
df = generate_data()
process_data(df)

By checking logs for process_data, we can see what had happened under the hood:

[2020-11-12 13:17:05,177] {logging_mixin.py:103} INFO - Running <TaskInstance: using_gcs_for_xcom.process_data 2020-11-12T13:16:59.385527+00:00 [running]> on host 950a17127708
[2020-11-12 13:17:05,265] {credentials_provider.py:300} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2020-11-12 13:17:06,503] {gcs.py:286} INFO - File downloaded to /tmp/tmplfdijbk7data__tmp_tmpttgk320s
[2020-11-12 13:17:06,607] {taskinstance.py:1230} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=using_gcs_for_xcom
AIRFLOW_CTX_TASK_ID=process_data
AIRFLOW_CTX_EXECUTION_DATE=2020-11-12T13:16:59.385527+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2020-11-12T13:16:59.385527+00:00
[2020-11-12 13:17:06,658] {logging_mixin.py:103} INFO - Unnamed: 0 a b
0 0 3 12
1 1 4 13
2 2 5 14
[2020-11-12 13:17:06,681] {taskinstance.py:1136} INFO - Marking task as SUCCESS. dag_id=using_gcs_for_xcom, task_id=process_data, execution_date=20201112T131659, start_date=20201112T131704, end_date=20201112T131706

Have you seen the File downloaded to line? That’s exactly where the custom XCom magic has happened! Additionally, we can check XCom table (Admin > XComs) to see how exactly the reference to the GCS object is stored there:

XCom backends are simple yet powerful and allow users a lot of customization. If you pass a lot of data between your tasks, and you would like to ditch the boilerplate code of uploading and downloading stuff from external storage — try it! To configure such backend, all you have to do is to adjust your airflow.cfg file (make sure that the path is on PYTHONPATH):

[core]
xcom_backend = path.to.your.XComBackendClass

To learn more about this feature check official documentation.

TaskGroups (AIP-34)

Another big change around the Airflow DAG authoring process is the introduction of the concept of TaskGroup. This new idea is an answer to authoring, managing, and viewing big DAGs. It’s a known fact that if you want to see a DAG in the Airflow web interface, then you will see the whole DAG, and there’s no way of toggling tasks that create a group. And that’s what the new TaskGroups allows you to do.

Let’s consider the following example:

from airflow.operators.bash import BashOperator
from airflow.models import DAG
from airflow.utils.task_group import TaskGroup
with DAG(
"using_task_group",
default_args={'owner': 'airflow'},
start_date=days_ago(2),
schedule_interval=None,
) as dag:
start_task = BashOperator(
task_id="start_task",
bash_command="echo start",
)
end_task = BashOperator(
task_id="end_task",
bash_command="echo end",
)
with TaskGroup(group_id="tasks_1") as tg1:
previous_echo = BashOperator(
task_id="echo_tg1",
bash_command="echo wow"
)
for i in range(10):
next_echo = BashOperator(
task_id=f"echo_tg1_{i}",
bash_command="echo wow"
)
previous_echo >> next_echo
previous_echo = next_echo
with TaskGroup(group_id="tasks_2") as tg2:
previous_echo = BashOperator(
task_id="echo_tg2",
bash_command="echo wow"
)
for i in range(10):
next_echo = BashOperator(
task_id=f"echo_tg2_{i}",
bash_command="echo wow"
)
previous_echo >> next_echo
previous_echo = next_echo
start_task >> [tg1, tg2] >> end_task

In this example, we have a start task, end task, and 20 other tasks between them split into two separate branches. However, the essential part of this example are the places where we use the TaskGroup context managers. In this way, we are encapsulating the logic into a “task group,” which will be rendered in Airflow WebUI like this:

This allowed us to compress the 20 tasks into 2 task groups, which definitely improves the UX of working with such large DAGs. By clicking on a task group, we can expand it to show every single task in it:

Isn’t it cool!? Since Airflow 2.0 working with arbitrary large DAGs will no longer be a problem as long as users use TaskGroup.

To learn more about this feature check official documentation.

And much more!

Apart from big features like TaskFlow API or TaskGroups, we are introduce plenty of smaller improvements to multiple operators and mechanisms around them. Here we shortly discuss a few that, in our opinion, are worth noting.

If you don’t know what a cluster policy is, please check the documentation. If you know Airflow’s cluster policy concept, you know that it was executed for every task during DAG loading. This allowed users to do some validation or mutation before executing a task. However, it was impossible to do exactly the same thing on the DAG level (yes, you could access the dag object of a task, but that’s not an optimal solution). And in Airflow 2.0, it will be possible as users will be able to define task_policy and dag_policy — each executed at a specific time. Here is a simple example of what can be done in cluster policies:

from airflow.exceptions import AirflowClusterPolicyViolation
from airflow import DAG
from airflow.models.baseoperator import BaseOperator
def dag_policy(dag: DAG):
"""Ensure that DAG has at least one tag"""
if not dag.tags:
raise AirflowClusterPolicyViolation(
f"DAG {dag.dag_id} has no tags. At least one tag required. File path: {dag.filepath}"
)
def task_policy(task: BaseOperator):
if task.task_type == 'HivePartitionSensor':
task.queue = "sensor_queue"

To learn more about this feature check official documentation.

Users of TriggerDagRunOperator or ExternalTaskSensor may know the pain of going from one DAG to the other one referenced by the operator or sensor. Many users had solved this problem by creating custom operator links, but this meant managing additional custom code. In Airflow 2.0, there’s no need for that! Both TriggerDagRunOperator and ExternalTaskSensor come with builtin operator links that will make navigating between related DAGs easy.

Apart from the features mentioned above, Airflow has many new operators and existing ones vastly improved by the community. If you want to try them, you can check how to use providers packages with the 1.10.X line of Airflow.

Stay tuned for Apache Airflow 2.0!

This blog post was originally published at https://www.polidea.com/blog/

Opportunity seeker, software engineer, open source enthusiast.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store