Apache Airflow, Spark, and Kubernetes for Streamlined Workflow Management

Safouane Ennasser
Cloud Native Daily
Published in
13 min readJun 29, 2023

This article is part of a series of tutorials describing several ways of deploying data pipes in local and on cloud.
Please refer to the
previous part as it describes well, the integration between Apache Spark and Kubernetes.

Summary

1. Introduction
2. Install Airflow using Helm chart
3. Update the Airflow Chart configuration
5. Install Spark Operator
6. Create a DAG using the Spark job
7. Testing
8. Conclusion

Introduction

In this tutorial, we will explore the integration of Apache Airflow, Apache Spark, and Kubernetes.
Building upon the previous tutorial, where we deployed Spark jobs on Kubernetes using the Spark-operator Helm chart, we will now incorporate Apache Airflow into our data pipeline.
Apache Airflow is a popular workflow management platform that allows for the scheduling, monitoring, and orchestration of complex data pipelines. By leveraging Airflow’s capabilities alongside Spark and Kubernetes, we can create powerful and scalable data processing workflows.

Install Airflow using Helm chart

To begin, we will install Apache Airflow using the default Helm chart. This installation provides a baseline setup for Airflow on our Kubernetes cluster. We can execute the following commands:

helm repo add apache-airflow https://airflow.apache.org
helm upgrade --install airflow apache-airflow/airflow --namespace airflow --create-namespace

These commands add the Apache Airflow repository to Helm and install the Airflow chart into the “airflow” namespace. This process sets up the necessary components for Airflow, such as the PostgreSQL database, Redis, scheduler, web server, and worker pods.

After the installation, we can verify the Airflow deployment by checking the running pods on our cluster:

kubectl get pods -n airflow

# Result should be something like
NAME READY STATUS RESTARTS AGE
airflow-postgresql-0 1/1 Running 0 62m
airflow-redis-0 1/1 Running 0 62m
airflow-scheduler-5b678ddf57-2xt59 2/2 Running 0 62m
airflow-statsd-c8b888cc8-92wwr 1/1 Running 0 62m
airflow-triggerer-767d8d8dd7-9svnm 2/2 Running 0 62m
airflow-webserver-5cdb787f9f-zfbpx 1/1 Running 0 62m
airflow-worker-0 2/2 Running 0 62m

This command will display the list of pods running in the “airflow” namespace, including the Airflow components like the scheduler, web server, and worker pods.

Let’s check access to the web UI. First, we need to forward the Airflow webserver port to the host machine

kubectl port-forward svc/airflow-webserver 8080:8080 --namespace airflow

Next, get to http://localhost:8080 with these creds:

  • User: admin
  • Password: admin

Now you should see Airflow webserver UI.

Update Airflow Chart configuration

Once Apache Airflow is installed using the default Helm chart, you may want to customize the configuration to better suit your needs. The default configuration provides a starting point, but you can update it to align with your specific requirements.

1. Extract configuration from the running Airflow

To update the configuration, you need to retrieve the current configuration values and modify them accordingly. You can do this by running the following command:

helm show values apache-airflow/airflow > values.yaml

This command will save the current configuration values to a values.yaml file in the location where the command was executed.

2. Update configuration

Open the values.yaml file and make the necessary changes to the configuration. Here are our configuration updates:

  • Airflow Executor: By default, the executor is set to "CeleryExecutor". In our case we prefer to use the Kubernetes executor, so update the executor value to "KubernetesExecutor".
# Airflow executor
# One of: LocalExecutor, LocalKubernetesExecutor, CeleryExecutor, KubernetesExecutor, CeleryKubernetesExecutor
executor: "KubernetesExecutor"
  • Image Repository and Tag:
    You can specify your own Docker image repository and tag for the Airflow image by modifying the images: airflow section in the values.yaml file to specify the desired repository and tag.
    For that, we’ll create a custom docker image our_own_apache_airflow:1.0, and push it to the local cluster image repository.
    Change the values of the repository and tag like follow:
# Images
images:
airflow:
repository: our_own_apache_airflow
tag: "1.0"
# Specifying digest takes precedence over tag.
digest: ~
pullPolicy: IfNotPresent

Note the reference to our_own_apache_airflow image.
Let’s create it based on the official image of the last version of Airflow.

a/ Create Airflow Dockerfile

#Dockerfile

FROM apache/airflow:2.6.2

COPY requirements.txt /

RUN pip install --no-cache-dir "apache-airflow==${AIRFLOW_VERSION}" -r /requirements.txt

b/ Create requirement.txt file:

# requirements.txt

apache-airflow[amazon]
apache-airflow-providers-cncf-kubernetes

c/ Build the image:

docker build -t our_own_apache_airflow:1.0 .

d/ Load the created image to Kubernetes internal repository

# In our case we are running a local kubernetes cluster using KIND

kind load docker-image our_own_apache_airflow:1.0

Get Dags from a private repository

(skip it if you want to build your DAGs inside the Airflow Docker image)
If you prefer to load DAGs from a private Git repository instead of building them inside the Airflow Docker image, you can modify the values.yaml file to configure the necessary settings. This allows you to maintain your DAGs separately and easily update them without modifying the Docker image.

# Git sync
dags:
persistence:
...
gitSync:
enabled: true

# git repo clone url
# ssh example: git@github.com:apache/airflow.git
repo: <YOUR_GITHUB_REPOSITORY_SSH_URL>
branch: main
rev: HEAD
# depth: 1
# the number of consecutive failures allowed before aborting
# maxFailures: 0
# subpath within the repo where dags are located
# should be "" if dags are at repo root
subPath: "<PATH_INSIDE_GITHUB_REPOSITORY_WHERE_TO_FIND_DAGS>"
# sshKeySecret: airflow-ssh-secret
sshKeySecret: airflow-ssh-secret

Replace <YOUR_GITHUB_REPOSITORY_SSH_URL> and <PATH_INSIDE_GITHUB_REPOSITORY_WHERE_TO_FIND_DAGS> with the right values from your repository.
Note the usage of sshKeySecret: airflow-ssh-secret.

  • Create airflow-ssh-secret

First, we need to generate an ssh key pair to be used to connect to the GitHub repository. (we are creating a new key to avoid any confusion about managing keys)

ssh-keygen -t rsa -b 4096 -C "your_email@example.com"

# 1
Enter file in which to save the key (...): airflow_ssh_key
# 2 Leave it empty, by hitting Enter
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
# Result
Your identification has been saved in airflow_ssh_key
Your public key has been saved in airflow_ssh_key.pub

As you can see this generates our ssh key pair (airflow_ssh_key
and airflow_ssh_key.pub). You can find these files in the current directory where you ran the command.

Uploading public key to GitHub

Now we add the created public key to the repository:
1. Get to your GitHub repository for store DAGs
2. Go to Settings then Deploy keys
3. Click add deploy key
4. Create a title, and copy/paste your public key (the content of airflow_ssh_key.pub)
5. Confirm by hitting Add key

After creating and adding our SSH public key to GitHub, we need to add the private one to airflow installation so it could access the repos and clone new pushed DAGs.

  • ExtraSecrets:
    Now we need to encode the private key and put the result into a local file temp.txt (we’ll use it’s content just in the next step)
base64 airflow_ssh_key -w 0 > temp.txt

# This will encode our private key using Base64, and put the result into temp.txt file

Back to our values.yaml file where we referenced the previously created private key.
Do you still remember the reference to sshKeySecret: airflow-ssh-secret in values.yaml ?
We have created a link from git-sync to load an ssh key named airflow-ssh-secret. However, that key is not yet added to the configuration.

Until this step, we have just generated the keys, added the public one to GitHub, and encode the private one using base64.
To add the private key to the conf, we have to add this section in values.yaml, at the end of the file, at the same level as logs:

extraSecrets:
airflow-ssh-secret:
data: |
gitSshKey: 'YOUR PRIVATE KEY ENCODED BASE64'

logs:

# where <YOUR PRIVATE KEY ENCODED BASE64> is the content
# of the previously generated temp.txt file.
  • Redirect logs to S3:
    As a first step we’ll explore the option of storing logs into an AWS S3 bucket, next (in the next article) we’ll see how to use a more convinient tools for collecing and managing these logs.
    To do that, just back to values.yaml file and change the config:logging section
config:
core:
...
logging:
# remote_logging: '{{- ternary "True" "False" .Values.elasticsearch.enabled }}'
remote_logging: 'True'
logging_level: 'INFO'
remote_base_log_folder: 's3://<MY AWS BUCKET>'
remote_log_conn_id: 'aws_default'
delete_worker_pods: 'False'
encrypt_s3_logs: 'False'

We have linked as well a connection named : ‘aws_default’ that we’ll create in the next section.

3. Update airflow installation

Now our file is ready to update Aifrlow chart installation.

helm upgrade --install airflow apache-airflow/airflow --namespace airflow  --values values.yaml

Airflow is now running with our own docker image, with the modified configuration we wrote in the previous section.
Let’s add connections to AWS (follow these steps)

We assume that you have already created an AWS S3 bucket, and that you have your own AWS keys !! (different from ssh keys we created previously)

  1. Get into webserver UI (like we did in the first section), then go to admin>connections.
  2. Click ‘+’ button to add a new connection
  3. Connection Id = aws_default
  4. Connection Type = Amazon Web Services
  5. AWS Access Key ID = Your AWS Access Key ID
  6. AWS Secret Access Key = Your AWS Secret key

Install Spark Operator

(please read the previous tutorial)

If you’re new to deploying Apache Spark on Kubernetes using Helm charts and the Spark Operator, I highly recommend checking out our previous tutorial titled “Deploying Apache Spark on Kubernetes using Helm Charts: Simplified Cluster Management and Configuration.” This tutorial provides a comprehensive guide to getting started with Spark Operator and covers essential concepts, installation steps, and running Spark jobs on Kubernetes.

By reading the previous tutorial, you will gain a solid understanding of the fundamentals of deploying Spark applications on Kubernetes and how to leverage the power of Helm charts and the Spark Operator for streamlined cluster management. You’ll learn about Helm, Helm charts, and their role in defining, installing, and upgrading complex Kubernetes applications.

So, we assume that you have already followed the previous tutorial, and built your own apache spark image. Here is a quick summary:

  • Install Spark operator:
# Add the chart and install spark operator
helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
helm install my-release spark-operator/spark-operator --namespace spark-operator --create-namespace --set webhook.enable=true
  • Create a ’spark’ service account and cluseterbinding:
kubectl create serviceaccount spark --namespace=spark-operator
kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=spark-operator:spark --namespace=spark-operator
  • Test running Spark-pi exemple
# spark-pi.yaml

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi
namespace: spark-operator
spec:
type: Scala
mode: cluster
# Replce with your own spark image
image: "gcr.io/spark-operator/spark:v3.1.1"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"
sparkVersion: "3.1.1"
restartPolicy:
type: Never
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.1.1
serviceAccount: spark
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.1.1
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
# Run the job
kubectl apply -f .\spark-pi.yaml

You should see spark-pi running by checking the pods

kubectl get pods -n spark-operator

Create a DAG

Creating a DAG using Spark job is a crucial step in integrating Apache Spark with Airflow. In this section, we’ll walk through the process of setting up a new connection in the Airflow UI for Kubernetes and creating a DAG using the SparkKubernetesOperator to run the Spark job defined in the spark-pi.yaml file.

1. Create a new connection in Airflow UI for Kubernetes

Before we can proceed with creating the DAG, we need to establish a connection between Airflow and our Kubernetes cluster. This connection will allow Airflow to interact with Kubernetes and submit Spark jobs. Here’s how you can create the connection:

  • Access the Airflow UI.
  • Navigate to the Admin section and click on “Connections.”
  • Click on the “Create” button to add a new connection.
  • Set Connection Id to kubernetes_default
  • Check In cluster configuration box
  • Leave the rest empty and save.

2. Create a DAG using SparkKubernetesOperator

Regardless of the approach you choose to load DAGs into Airflow, whether it’s through mounting a disk, building them inside the Docker image, or utilizing a remote Git repository, you need to reference the previously created spark-pi.yaml file.

To maintain an organized structure, create a directory called “kubernetes” within your DAGs directory. This directory will serve as a dedicated location for storing your Spark YAML job files associated with a specific DAG.

Here is an example of a DAG sourced from the official Spark Operator repository:

"""
This is an example DAG which uses SparkKubernetesOperator and SparkKubernetesSensor.
In this example, we create two tasks which execute sequentially.
The first task is to submit sparkApplication on Kubernetes cluster(the example uses spark-pi application).
and the second task is to check the final state of the sparkApplication that submitted in the first state.

Spark-on-k8s operator is required to be already installed on Kubernetes
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
"""

from datetime import timedelta, datetime

# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.utils.dates import days_ago
k8s_hook = KubernetesHook(conn_id='kubernetes_config')
# [END import_module]

# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'max_active_runs': 1,
'retries': 3
}
# [END default_args]

# [START instantiate_dag]

dag = DAG(
'spark_pi',
start_date=days_ago(1),
default_args=default_args,
schedule_interval=timedelta(days=1),
tags=['example']
)

submit = SparkKubernetesOperator(
task_id='spark_transform_data',
namespace='spark-operator',
application_file='/kubernetes/spark-pi.yaml',
kubernetes_conn_id='kubernetes_default',
do_xcom_push=True,
)



submit

Note: Please ensure to replace application_file='/path/to/spark-pi.yaml' with the actual path to your spark-pi.yaml file. Keep in mind that the application_file path is relative to the DAG folder.

For example, if your DAGs are located in the /path/to/dags folder and you have placed the spark-pi.yaml file inside the kubernetes subdirectory, the correct path would be application_file='/kubernetes/spark-pi.yaml'.

In your case, if the spark-pi.yaml file is located in the /opt/airflow/dags/kubernetes directory, the correct path would be application_file='/kubernetes/spark-pi.yaml', considering that the DAGs are located in the /opt/airflow/dags directory.

3. Create a clusterbinding

Create a ClusterBinding for airflow to let it create SparkApplication objects into kubernetes

kubectl create clusterrolebinding default-admin --clusterrole cluster-admin --serviceaccount=airflow:airflow-worker --namespace spark-operator

Testing

Now that we have added the DAG and spark-pi.yaml files to the Airflow DAGs directory, it's time to test our newly created DAG. We will use the Airflow web UI to activate the DAG and monitor its execution.

  1. Access the Airflow web UI by visiting http://localhost:8080 (or the appropriate URL and port where Airflow is running).
  2. On the Airflow web UI, navigate to the “DAGs” page. You should see the spark-pi DAG listed among the available DAGs.
  3. Toggle the status switch of the spark-pi DAG to activate it.
  4. Once activated, the DAG will start executing based on its schedule or trigger. You can monitor the progress of the DAG by navigating to the “Graph View”, which provide a visual representation of the DAG’s tasks and their execution status.
  5. As the spark-pi DAG executes, you can click on the individual tasks to view their logs and track any potential issues. The logs will provide detailed information about the Spark job execution, including any error messages or output generated by the job.
  6. Keep an eye on the DAG’s execution status. If the DAG completes successfully, you should see a green checkmark indicating a successful run. If any tasks fail, you will see a red cross indicating a failed task.
Airflow task running on a Spark cluster
Airflow task running on a Spark cluster

Congratulations! You have successfully tested your DAG and observed the execution of the Spark job using the spark-pi.yaml file.

1. Access Spark UI

Now that we have added the DAG and spark-pi.yaml files to the Airflow DAGs directory and activated the DAG, let's explore an additional feature that can help us monitor the Spark job execution: the Spark UI.

During the execution of the Spark job within the DAG, Airflow captures the logs generated by the tasks. These logs contain valuable information, including the URL to access the Spark UI for detailed insights into the job execution.

  1. Navigate to the Airflow web UI by visiting http://localhost:8080 (or the appropriate URL and port where Airflow is running).
  2. On the Airflow web UI, go to the “DAGs” page and locate the spark-pi DAG.
  3. Click on the spark-pi DAG to view its task list.
  4. Select one of the tasks that is responsible for executing the Spark job. For example, let’s choose the run_spark_job task.
  5. Within the task details, click on the task logs to open the log view.
  6. In the logs, search for the specific message that provides the link to the Spark UI. It should contain a URL that points to the Spark UI for the corresponding job execution such as Spark web UI at http://spark-pi-4302ba89083b011c-driver-svc.spark-operator.svc:4040.
  7. Copy the Spark UI link from the logs and paste it into a new browser tab.
  8. The Spark UI will open, providing detailed information about the running Spark job, including job progress, task-level details, and performance metrics.

By accessing the Spark UI through the link found in the task logs, you can gain deeper insights into the Spark job’s execution. The Spark UI offers a wealth of information that can help you analyze the job’s performance, troubleshoot issues, and optimize your Spark applications.

Conclusion:

In this tutorial, we explored the integration of Apache Airflow, Apache Spark, and Kubernetes to create a powerful data processing pipeline. By leveraging Helm charts, we simplified the management and configuration of our Spark and Airflow deployments.

We started by installing Airflow using the default Helm chart, allowing us to quickly set up the Airflow environment. We then discussed the importance of creating a custom Docker image for Airflow to add dependencies and extend its capabilities.

Next, we demonstrated how to modify the values.yaml file to load DAGs from a private Git repository, enabling seamless integration with version control and collaborative development.

We also learned how to create a SparkKubernetesOperator DAG, which allows us to submit Spark jobs to the Kubernetes cluster using the Spark Operator. We saw the importance of referencing the spark-pi.yaml file within the DAG and observed the job execution through the Airflow web UI.

Additionally, we explored how to access the Spark UI from the task logs, providing us with detailed insights into the Spark job’s execution and performance.

By combining the strengths of Apache Airflow, Apache Spark, and Kubernetes, we have created a robust and scalable data processing pipeline. This integration empowers us to handle complex workflows, schedule and monitor Spark jobs, and efficiently process large-scale data.

As you continue your journey with Airflow, Spark, and Kubernetes, there are endless possibilities for building and orchestrating sophisticated data pipelines. So keep exploring, experimenting, and leveraging these powerful tools to unlock the full potential of your data processing workflows.

Happy data processing!

--

--