How to run Apache beam on Spark cluster

Hao Xu
5 min readMar 26, 2023

--

Overview

The official Apache Beam documentation provides instructions on how to run Beam pipelines on a Spark cluster. These instructions require setting up a “job server” to run the pipeline. However, if you prefer a simpler approach and do not want to manage a job server, you can follow the instructions provided in the documentation to make it work. You can find a complete code example at https://github.com/HaoXuAI/beam-spark-example.

Background

Apache Beam is a unified programming model for building batch and streaming data processing pipelines. Apache Spark is a fast and general-purpose distributed computing system that can be used as an execution engine for Beam pipelines. In this guide, we will show you how to run Apache Beam on a Spark cluster.

We will assume that you have already installed Apache Spark and Apache Beam on your system. If you haven’t installed them yet, you can follow the installation instructions on their respective websites:

We will use the Python SDK for Apache Beam in this guide, but the same principles apply for other languages.

Write a Beam Pipeline

For this demo, we’ll be using the wordcount.py example from Apache Beam. You can find the code at: https://github.com/HaoXuAI/beam-spark-example/blob/main/beam_example_wc.py

Compile the pipeline to a job jar

The script below will compile the job to a jar that can be run on a spark cluster. You can distribute it to S3 and load it into your spark cluster to run.

python -m beam_example_wc \
--runner=SparkRunner \
--output_executable_path=./wc_job.jar \
--environment_type=PROCESS \
--environment_config='{\"command\": \"/opt/apache/beam/boot\"}' \
--spark_version=3

The command compiles the pipeline using the SparkRunner, which is a runner provided by Apache Beam that allows the pipeline to run on a Spark cluster.

The --output_executable_path option specifies the path and filename of the compiled pipeline, which is saved as a job jar. The job jar can then be submitted to the Spark cluster using the spark-submit command.

The --environment_type option specifies the type of environment in which the pipeline will be executed. In this case, it is set to PROCESS, which means that the pipeline will be run in a separate process.

The --environment_config option provides additional configuration parameters for the execution environment. Here, a command is specified to run before the pipeline is executed. In this case, the command is /opt/apache/beam/boot.

The --spark_version option specifies the version of Spark that will be used to run the pipeline. Here, it is set to 3.

You can find more instructions at Apache beam SDK harness.

Start up Spark Cluster

In order to run apache beam on spark cluster, you have to start up the spark cluster with specific beam environment.

The reason is beam has this “SDK Harness” component to actually execute your job like shown in the image below. And you have to have your spark executor to be able to run the component as either a docker container, or a process.

Resource: https://stackoverflow.com/questions/73637444/in-apache-beams-sparkrunner-how-does-the-docker-environment-type-affect-an-exi

So how to inject the “SDK Harness” really depends on your spark environment. I will use Minikube and Spark on kubernetes cluster as an example shows how it will integrate together.

I will assume you have already have a Minikube kubernetes cluster and setup a required permissions to submit spark. If you haven’t, feel free to read below instructions to do it.

Submit the beam job

It is fairly simple to submit your beam job jar to spark cluster on kubernetes:

spark-submit --master k8s://https://127.0.0.1:60632 \
--conf spark.kubernetes.driver.podTemplateFile=driver_pod_template.yaml \
--conf spark.kubernetes.executor.podTemplateFile=executor_pod_template.yaml \
--class org.apache.beam.runners.spark.SparkPipelineRunner \
--conf spark.kubernetes.container.image=apache/spark:v3.3.2 \
./wc_job.jar

The job is executed by the org.apache.beam.runners.spark.SparkPipelineRunner class, which is specified using the --class option.

The --master option specifies the URL of the Kubernetes API server to use for submitting the job. You can find it by running command:

 kubectl config view --output=jsonpath='{.clusters[].cluster.server}'

The --conf option is used to pass configuration parameters to Spark. spark.kubernetes.driver.podTemplateFile and spark.kubernetes.executor.podTemplateFile specify the path to the driver and executor pod templates, respectively, which define the properties of the driver and executor pods that will be created for the job. And this is the place to ask spark executor to inject the beam SDK harness.

Below is an example that the ini-container in the executor pod will download the beam SDK harness and share it to the actuall executor container.

spec:
containers:
- name: spark-kubernetes-executor
volumeMounts:
- name: beam-data
mountPath: /opt/apache/beam/
initContainers:
- name: init-beam
image: apache/beam_python3.7_sdk
command:
- cp
- /opt/apache/beam/boot
- /init-container/data/boot
volumeMounts:
- name: beam-data
mountPath: /init-container/data
volumes:
- name: beam-data
emptyDir: {}

The --conf spark.kubernetes.container.image=apache/spark:v3.3.2 option specifies the Docker image to use for the Spark containers. In this case, it is set to apache/spark:v3.3.2.

Finally, ./wc_job.jar specifies the path to the job jar file, which contains the compiled Beam pipeline.

Check the Job

Now you can visit minikube dashboard and checking the spark pods status, or you can visist localhost:4040 to check the spark job.

Minikube cluster beam on spark pods
Beam on spark application

References

--

--