Introduction to KubernetesExecutor and KubernetesPodOperator

Rafay Aleem
Uncanny Recursions
Published in
5 min readOct 4, 2020

In the previous tutorial, we didn’t delve into the concept of Executors in Airflow. In Airflow, Executors define a mechanism by which task instances get run.

Repo link: https://github.com/mrafayaleem/etl-series

When you spun up an Airflow cluster using helm in the previous tutorial, you might have noticed a scheduler and a worker in your docker ps output. These two components are related to how a CeleryExecutor works in Airflow. We are not going to talk in detail about this type of Executor in this blog. For more details, see this link.

The other type that I would want to touch upon for the sake of brevity is the LocalExecutor. This type of Executor runs a task on a local process inside the scheduler instance.

Tip: Use the following to run Airflow using a LocalExecutor. You would only notice a webserver and a scheduler but no worker node.

helm install airflow stable/airflow -f chapter2/airflow-helm-config-local-executor.yaml --version 7.2.0

Although, you can run multiple parallel tasks using multiple processes, this would not scale out well because all these processes will be running on the same scheduler instance. LocalExecutor isn’t recommended even for spawning a task using KubernetesPodOperator because the spawning process (process forking) would still live on the same scheduler instance which can eventually cause scalability issues. See this link for more details.This is one of the reasons why LocalExecutor is not recommended beyond local testing.

Introducing KubernetesPodOperator

Let’s first setup Airflow with the CeleryExecutor as follows

helm install airflow stable/airflow -f chapter2/airflow-helm-config-celery-executor.yaml --version 7.2.0

You should see the following pods running in your Kubernetes:

We will run the tasks defined indag_that_executes_via_KubernetesPodOperator. This DAG executes two tasks with a single downstream that marks the DAG as complete.

Following code defines the underlying tasks:

org_node = KubernetesPodOperator(
namespace='default',
image="python",
cmds=["python", "-c"],
arguments=["print('HELLO')"],
labels={"foo": "bar"},
image_pull_policy="Always",
name=task,
task_id=task,
is_delete_operator_pod=False,
get_logs=True,
dag=dag
)

Notice that we need to specify a docker image for this kind of operator. For each of these tasks, it pulls the python image and prints a HELLO message in this case. This operator gives you the freedom to execute any task baked into a docker image which is extremely handy. Moreover, since it’s Kubernetes that is managing most of the task resource allocation, this method makes Airflow an excellent solution for scaling your ETL work loads as long as the Kubernetes cluster can handle pods that are scheduled via Airflow.

You can observe this by running kubectl get pods . Kubernetes takes care of the pod creation request and starts executing the tasks. Once done, the pod status is marked as Completed.

Pod getting scheduled to execute one of the task
Pods marked as Completed for each of the tasks

You can still leverage Celery for executing Python tasks

Great thing about keeping CeleryExecutor is that it gives you the flexibility to still execute some tasks using the Celery scheduler while also allowing you to leverage a K8s cluster that can run any docker image. For instance, a light weight task such as pulling in a single 5MB file from S3 storage and emailing to customer support doesn’t warrant its own pod and can easily be run on the Celery worker without needing much resources. On the contrary, an ETL job written in Java Spring Boot that extracts millions of rows every 24 hours and takes around 8hrs to complete can be very easily scheduled using the KubernetesPodOperator.

Now, lets uninstall airflow by using helm uninstall airflow and continue to the next section of this tutorial.

KubernetesPodOperator and KubernetesExecutor: The Difference

Reinstall Airflow configured to use the KubernetesExecutor using the following and then run dag_that_executes_via_k8s_executor

helm install airflow stable/airflow -f chapter2/airflow-helm-config-kubernetes-executor.yaml --version 7.2.0

This DAG just prints a HELLO message using the BashOperator. You would notice that each task in this DAG was executed in its own pod.

KubernetesExecutor Inception

Now, try running dag_that_executes_via_KubernetesPodOperator. You would notice that Airflow schedules a pod that begins with dagthatexecutesviak8sexecutor. This pod, in return, starts another pod to execute the actual tasks defined in that DAG using the KubernetesPodOperator. Notice pods that begins with dagthatexecutesviakubernetespodoperator.

Feels like an overhead, eh?

Takeaways

Between using a CeleryExecutor and KubernetesExecutor, the latter saves you from setting up extra stack for message broker (such as RabbitMQ) and Celery worker nodes that would need to be scaled up or down based on your workloads, since everything would be handled by the Kubernetes cluster. However, this adds to the overhead of pod creation and termination especially if you have hundreds or thousands of very short living tasks.

In my opinion, running Airflow under a CeleryExecutor gives you more freedom and better resource utilization if you have a lot of short running tasks and also enables the flexibility of running anything that can be run inside a docker container (via KubernetesPodOperator) and scale out well. Airflow under KubernetesExecutor still won’t be a bad choice if most of your tasks have considerable execution times.

Where to go next from here

  • Visit the code in chapter2 in and play around with different kinds of Executors and example DAGs here: https://github.com/mrafayaleem/etl-series
  • Find out what’s the significance of this config in the helm chart for the KubernetesExecutor
airflow:
executor: KubernetesExecutor

config:
AIRFLOW__KUBERNETES__DAGS_IN_IMAGE: "False"
AIRFLOW__KUBERNETES__DAGS_VOLUME_HOST: "/Users/aleemr/powerhouse/etl-series/dags"
AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY: "apache/airflow"
AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: "1.10.10-python3.6"
AIRFLOW__KUBERNETES__RUN_AS_USER: "50000"
AIRFLOW__KUBERNETES__DELETE_WORKER_PODS: "False"

--

--

Rafay Aleem
Uncanny Recursions

Senior Data Engineer at Faire. Based in Toronto. Music aficionado who likes playing guitar and is an Eric Clapton fan.