We were all using Airflow wrong… And now, it’s fixed!

deepak.c
Locale
Published in
10 min readJan 5, 2022

This blog talks about easily turning your existing automation scripts into Airflow tasks using the relatively new Kubernetes Pod Operator.

Why use Airflow?

Airflow is an open-source tool that lets you orchestrate workflows. A workflow in this context is a series of jobs that need to be executed in a certain topological order. These jobs can also be scheduled to run periodically.

A typical example could be a series of interdependent jobs that ingest data into a datastore. For instance, a workflow can comprise of the following sequential jobs:

  1. Check if data is present in S3
  2. Trigger a spark job to perform transformations
  3. Ingest data into a datastore
  4. Archive the data in S3 to cold storage

Note that these jobs are different from vanilla CRONs because they have a dependency on the preceding job being executed successfully. Hence, Airflow models these dependencies as a Directed Acyclic Graph (DAG), such as the one shown below.

A typical DAG in Airflow with inter-dependent tasks

Notice the arrows around the tasks that signify the dependencies between them.

At Locale.ai, we use Airflow extensively to orchestrate our data pipelines and user-facing workflows (shameless plug, check out our new feature — workflows).

What are tasks?

One of the key reasons why Airflow is so powerful is it’s abstraction of a task; the ability to stitch them together to form dependencies and run them across a cluster of machines. A task in airflow is a basic unit of execution and forms the nodes of the DAG.

If you are curious about how airflow executes these tasks, do checkout the architecture of Apache Airflow.

TLDR; Airflow has components that can use a message broker to schedule a task by pushing it into a queue, allowing them to be picked up by task workers. Depending on the executor (another neat abstraction in Airflow) being used, these workers could be scaled across machines to support a higher number of concurrent tasks.

The data ingestion workflow described in the previous section can be modeled as a set of Airflow tasks where each task has a clear separation of concern. As far as we maintain separation of concerns, it becomes possible to reuse these tasks across other use cases.

Armada of Operators

To allow reusability of commonly occurring use cases, Airflow provides abstractions called operators and sensors.

Basically, any task in a workflow would belong to one of the two categories,

  1. Operator: Tasks that perform an action (eg: submitting a spark job).
  2. Sensor: Tasks that wait for an action to complete (eg: waiting for a spark job to complete).

Tasks that wait for an operation to complete, require different scheduling patterns from tasks that perform an operation. These differences are handled in Airflow’s operators and sensor base classes. Now, those base classes can be extended using inheritance for you to add any other functionality. In addition to this, the fact that Airflow is open source has endeared it to a large community of open source contributors who have collectively created tons of operators for different use cases. The array of freely available operators enable developers to model workflows with minimal code.

Do you need a workflow that downloads weather info from a public site, triggers a spark job to crunch historical data using GCP dataproc and notify you on slack if it’s gonna rain?

You could simply re-use SimpleHttpOperator, DataprocSubmitJobOperator, SlackAPIPostOperator and BranchPythonOperator

What’s wrong with operators?

Although operators are great to re-use, they introduce subtle problems that might go under the radar at first. The code implementing these operators and the dependencies needed to run them need to be shipped as part of the airflow codebase that gets deployed onto airflow workers.

This prevents developers from using Airflow solely as a platform to schedule tasks and adds the overhead of coupling Airflow with executing those tasks.

This impacts organisation-wide adoption in the following ways:

  1. Developers across teams will have workloads of different nature (from notifying on slack to periodically re-training an ML model). All operators, whether re-used or created in-house for custom use cases, as well as their dependencies will now have to be deployed on airflow workers. This makes tracking airflow-related code very difficult as it gets crammed with code with vastly different use cases. It is often bulky and has conflicting python dependencies.
  2. Airflow is written in Python. So, extending operators will need to be written in Python. This prevents Airflow from re-using existing jobs/code written in other languages.
  3. Since tasks can be vastly different in nature, scheduling them on the right infrastructure using executors like the celeryExecutor poses a problem.

For example, a task that sends an email might need a general purpose machine but a task that trains an ML model might need an instance with GPUs. It’s not feasible to convert all workers to high end GPU machines for the sake of one task that might run infrequently. The other way around is to maintain different queues for different types of tasks based on the type of compute instances they need. Different sets of workers can then be spun up to listen to these queues across varying types of VM instances that can match those needs. This in-turn requires having different autoscaling groups for each set of workers and you get the gist of where I’m going with this :).

KubernetesPodOperator to the Rescue

Fortunately, some of the problems mentioned above are already solved. Container technologies solve the problem of packaging dependencies. Kubernetes is tried and tested to effectively orchestrate these containers, match the needs of different workloads with the right infrastructure and scale them in a vendor agnostic way.

Therefore, using both Airflow and Kubernetes together results in a tech stack that enables Airflow to orchestrate different types of workloads periodically and allows Kubernetes to provision infrastructure appropriately to match these workloads. Since the unit of deployment is a container, it allows for easy separation of dependencies by packaging them into separate images.

This blog (and it’s catchy title) is inspired by the blog by Bluecore where the solution to the same problem is explained in detail:

At the time the blog was written, Airflow had no official KubernetesPodOperator and the team at Bluecore had to build one themselves.

In the later sections of this blog, we will present a sample problem statement and demonstrate how it can be tackled using the KubernetesPodOperator.

Problem Statement

Most of our analytics at Locale.ai is powered using Clickhouse:

So, we ran into a use case to create a workflow that migrates data from one clickhouse database to another. Utility code including migration and benchmarking scripts pertaining to clickhouse was already implemented and tracked in a separate repo called internal-scripts. For the sake of simplicity, let’s say that the workflow can drop the destination database (incurs downtime), create a new one and migrate data to it from the source database.

Podcast Begins…(Pun Intended)

This section will contrast solving the above problem using two approaches where one of them uses the pod operator and the other doesn’t.

Pre-Pod Era

Solving the problem without using the Kubernetes Pod Operator would involve the following steps:

  1. Create operators using Airflow plugins that expose functions that interact with clickhouse.
  2. Add support to the operator described in step 1 to create and drop a database.
  3. Steps 1 and 2 can be skipped if a Clickhouse operator, such as this one already exists.
  4. The dependencies needed to support the operator will still need to be added to the codebase that tracks all the airflow related code. These dependencies include python libraries that connect to clickhouse, which would need to be deployed on the airflow workers.
  5. Let’s say the code that migrates data across databases is already implemented and is tracked in a separate codebase. Let’s assume this code is specifically written to handle specific use cases (like cmd args to skip tables named with certain patterns, logic involving inserting records to remote hosts with and without SSL support, flags to migrate clickhouse dicts). Now, this codebase cannot be re-used as it is because an operator needs to be created from it either by subclassing an Airflow Operator class or instantiating some code as part of a function fed into an Airflow Python Operator.
  6. Probably the easiest way now is to add the internal repo as a pip installable dependency to the Airflow repo, of course assuming that the internal repo housing clickhouse scripts is written in Python.
  7. Assuming there are no dependency conflicts, we can proceed to create the operator that migrates data by reusing some parts of the code from the internal scripts repo.
  8. Create a DAG that calls these operators and deploy it.
Architectural diagram of an Airflow cluster where external code is ported as an operator

Post-Pod Era

We can use Kubernetes Pod Operator to achieve the same with minimal friction in the following way:

  1. Basic clickhouse operations like creating a database, dropping it and running queries are supported as part of Yandex’s official docker image for clickhouse. So, the tasks to drop the database and re-create it can use KubernetesPodOperator to spawn Yandex’s publicly available image and run the necessary commands. This frees us up from having to look for a specific Airflow operator that interacts with clickhouse.
  2. Migrating tables is again specific to the organization. But the KubernetesPodOperator allows using the same internal-scripts repo without any code change at all! All that is needed is for us to push an image of that repo to our internal container registry and pass the image name along with the cmd arguments needed to spawn the script as part of the pod operator.
  3. As an icing on the cake, we can control the infrastructure that runs these tasks. An example could be to schedule long running tasks onto a different k8s namespace and/or a different node-pool. This pattern of re-using the scheduling capabilities of k8s can be repeated to other types of workloads with more specific requirements (like ML models on GPUs, I/O bound tasks on SSDs etc) leveraging more powerful Kubernetes features like taints, tolerations, pod and node affinities!
  4. Create a DAG file and deploy it!
Architectural diagram of an Airflow cluster where external code is reused to trigger a job using the pod operator.

DAG

This section contains the source code needed to create the final DAG solely using the KubernetesPodOperator and re-using the code present in the internal-scripts repo.

Rendered DAG on Airflow UI

Source Code

from datetime import datetime, timedelta 
from airflow import DAG
from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import Variable
from airflow.operators.dummy import DummyOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s


DAG_NAME = 'clickhouse_db_sync'

# Airflow Variables
CLICKHOUSE_DESTINATION_DB = Variable.get("CLICKHOUSE_DESTINATION_DB", "clickhouse_destination_db")
CLICKHOUSE_SOURCE_DB = Variable.get("CLICKHOUSE_SOURCE_DB", "clickhouse_source_db")

CLICKHOUSE_DESTINATION_DB_HOST = Variable.get("CLICKHOUSE_DESTINATION_DB_HOST", "clickhouse://abc.com")
CLICKHOUSE_DESTINATION_DB_USER = Variable.get("CLICKHOUSE_DESTINATION_DB_USER", "<user>")
CLICKHOUSE_DESTINATION_DB_PASSWORD = Variable.get("CLICKHOUSE_DESTINATION_DB_PASSWORD", "<password>")
CLICKHOUSE_DESTINATION_DB_PORT = Variable.get("CLICKHOUSE_DESTINATION_DB_PORT", "9000")

CLICKHOUSE_SOURCE_DB_HOST = Variable.get("CLICKHOUSE_SOURCE_DB_HOST", "localhost")
CLICKHOUSE_SOURCE_DB_USER = Variable.get("CLICKHOUSE_SOURCE_DB_USER", "<user>")
CLICKHOUSE_SOURCE_DB_PASSWORD = Variable.get("CLICKHOUSE_SOURCE_DB_PASSWORD", "<password>")
CLICKHOUSE_SOURCE_DB_PORT = Variable.get("CLICKHOUSE_SOURCE_DB_PORT", "9440")


args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=3),
'provide_context': True,
'start_date': datetime(year=2021, month=1, day=1,),

}

# Kubernetes affinities needed to specify optimal pod placement
affinity = k8s.V1Affinity(
node_affinity=k8s.V1NodeAffinity(
preferred_during_scheduling_ignored_during_execution=[
k8s.V1PreferredSchedulingTerm(
weight=1,
preference=k8s.V1NodeSelectorTerm(
match_expressions=[
k8s.V1NodeSelectorRequirement(key="org", operator="In", values=["locale"])
]
),
)
]
),
)

# DAG generator
with DAG(
dag_id=DAG_NAME, default_args=args, schedule_interval=None, tags=['clickhouse', "internal"],
catchup=False, is_paused_upon_creation=True,
) as dag:

# Dummy start task. Can be later modified to send slack notifications upon DAG start
start = DummyOperator(task_id="start")

# Drop destination database (downtime based migration because clickhouse currently doesn't support easy db renaming).
drop_destination_db = KubernetesPodOperator(
namespace='default',
image='yandex/clickhouse-client:21.8.5',
arguments=[
"-m", "--host",
CLICKHOUSE_DESTINATION_DB_HOST,
"-u", CLICKHOUSE_DESTINATION_DB_USER, "--password", CLICKHOUSE_DESTINATION_DB_PASSWORD,
"--port", CLICKHOUSE_DESTINATION_DB_PORT,
"--query", f"DROP DATABASE {CLICKHOUSE_DESTINATION_DB};"
],
name="drop_destination_db",
affinity=affinity,
task_id="drop_destination_db",
is_delete_operator_pod=True,
get_logs=True,
)

# Recreate an empty database
create_db = KubernetesPodOperator(
namespace='default',
image='yandex/clickhouse-client:21.8.5',
arguments=[
"-m", "--host",
CLICKHOUSE_DESTINATION_DB_HOST,
"-u", CLICKHOUSE_DESTINATION_DB_USER, "--password", CLICKHOUSE_DESTINATION_DB_PASSWORD,
"--port", CLICKHOUSE_DESTINATION_DB_PORT,
"--query", f"CREATE DATABASE {CLICKHOUSE_DESTINATION_DB};"
],
name="create_db",
affinity=affinity,
task_id="create_source_db",
is_delete_operator_pod=True,
get_logs=True,
)

# Migrate data from source to destination db
migrate_data = KubernetesPodOperator(
namespace='default',
image='gcr.io/<project-id>/internal-scripts:v1.0.0',
cmds=["python", "clickhouse/migration.py"],
arguments=[
"--source-host", CLICKHOUSE_SOURCE_DB_HOST, "--source-user", CLICKHOUSE_SOURCE_DB_USER,
"--source-password", CLICKHOUSE_SOURCE_DB_PASSWORD, "--source-port", CLICKHOUSE_SOURCE_DB_PORT,
"--source-db", CLICKHOUSE_SOURCE_DB,
"--destination-host", CLICKHOUSE_DESTINATION_DB_HOST,
"--destination-user", CLICKHOUSE_DESTINATION_DB_USER, "--destination-password", CLICKHOUSE_DESTINATION_DB_PASSWORD,
"--destination-port", CLICKHOUSE_DESTINATION_DB_PORT, "--destination-db", CLICKHOUSE_DESTINATION_DB,
"--destination-secure-flag", "n",
"--destination-verify-flag", "n", "--skip-patterns", "copy", "test", "backup", "dict",
"--execute-query-on", "source"
],
name="migrate_db",
affinity=affinity,
task_id="migrate_db",
is_delete_operator_pod=True,
get_logs=True,
)

# Dummy end task. Can be later modified to notify on slack
end = DummyOperator(task_id="end")

start >> drop_destination_db >> create_db >> migrate_data >> end
Rendered DAG on Airflow UI

Originally published at https://blog.locale.ai on January 5, 2022.

--

--