Setting up Hadoop Yarn to run Spark applications
In this post I’ll talk about setting up a Hadoop Yarn cluster with Spark. After setting up a Spark standalone cluster, I noticed that I couldn’t submit Python script jobs in cluster mode. This bothered me more than it should have, so I spent some time learning to setup a Yarn cluster with Spark. The code is on GitHub here.
Introduction
In one of my previous posts I talked about setting up a Spark standalone cluster. This post is basically a continuation on that one that talks about how to setup a Yarn cluster and add Spark to it. You can check out the previous story here. Near the end of the post, I discuss the limitations of my approach and possible improvements.
Setting up Spark on a Yarn cluster would allow me to submit jobs in cluster mode. What’s the difference between client and cluster mode? When running a job in client mode, the driver is running on the client, e.g. a laptop. If the client fails, the job is shutdown. The executors still run on the Spark cluster. For the cluster mode, everything is run on the cluster. If you’ve started a job using your laptop, you can feel free to close it (see reference 2). So, in both modes, the actual work is performed on the cluster.
I had only two requirements when setting up this cluster:
- It needs to run in Docker;
- Make it as simplest as I could (at the end I think I mostly failed in this regard);
- Workers are not predefined — I want to have an arbitrary number of workers with docker compose
--scale
option.
What is Hadoop, Yarn, and Spark?
Apache Hadoop is a software platform that facilitates the processing of a large amount of data across a cluster of computers [3]. It is designed to detect failures at the application level and be scalable to clusters of thousands of machines. Each machine offers local computation and storage [3]. Hadoop is based on the MapReduce programming model. If these seem familiar, it’s because they are higher order functions that can be found in many programming languages. The map
function iterates over the elements of a collection and applies a transformation function on each element. The reduce
function converts the input array into an array of a different dimension or single value. The combination of these functions proved to be a very powerful programming model.
One of the modules of Hadoop is the HDFS (Hadoop Distributed File System) that handles data redundancy and scalability across nodes [1].
Hadoop Yarn is basically a component of Hadoop which provides resource management and job scheduling. Let’s try to put it simply. You have a global resource manager (called ResourceManager) running on a master node. The ResourceManager assigns resources to a running application. Each worker node runs a NodeManager. The NodeManager is responsible for the node’s containers and reporting their resource usage back to the ResourceManager [5]. A container in YARN is just an abstract notion that represents resources.
The ResourceManager consists of:
- Scheduler
- ApplicationsManager.
When you submit an application to the cluster, the Scheduler is responsible for allocating resources to the application taking into consideration all of the various constraints. The Scheduler performs only its scheduling function. It takes into account the application’s requirements based on the resources available to a resource Containers (memory, network, disk, etc.) [5].
An application can be a job or a DAG of jobs. The ApplicationManager is responsible for accepting job submissions. It negotiates the first container to run the ApplicationMaster and is responsible for restarting the container in case of failure. The ApplicationMaster is per-application specific. It negotiates the resources with the Scheduler and works with the NodeManagers to make sure the application gets executed [5]. The ApplicationMaster can also create executors that run the actual jobs.
NodeManagers are run on worker nodes and are responsible for creating the containers on the node. Both the ApplicationMaster and executors run in containers [1].
Preparing the Dockerfile
All of the code for this post is in the same GitHub repo with the Spark standalone cluster. Therefore, I named the Dockerfile Dockerfile-yarn
.
The first part of the Dockerfile is basically the same as the one I used for the Spark standalone cluster with the exception of downloading and installing Hadoop:
FROM python:3.10-bullseye as spark-base #base image
# Install tools required by the OS
RUN apt-get update && \
apt-get install -y --no-install-recommends \
sudo \
curl \
vim \
unzip \
rsync \
openjdk-11-jdk \
build-essential \
software-properties-common \
ssh && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Setup the directories for our Spark and Hadoop installations
ENV SPARK_HOME=${SPARK_HOME:-"/opt/spark"}
ENV HADOOP_HOME=${HADOOP_HOME:-"/opt/hadoop"}
RUN mkdir -p ${HADOOP_HOME} && mkdir -p ${SPARK_HOME}
WORKDIR ${SPARK_HOME}
# Download and install Spark
RUN curl https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz -o spark-3.3.1-bin-hadoop3.tgz \
&& tar xvzf spark-3.3.1-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \
&& rm -rf spark-3.3.1-bin-hadoop3.tgz
# Download and install Hadoop
RUN curl https://dlcdn.apache.org/hadoop/common/hadoop-3.3.1/hadoop-3.3.1.tar.gz -o hadoop-3.3.1-bin.tar.gz \
&& tar xfz hadoop-3.3.1-bin.tar.gz --directory /opt/hadoop --strip-components 1 \
&& rm -rf hadoop-3.3.1-bin.gz
Next, let’s install the Python dependencies (using the previous section as the previous image layer):
FROM spark-base as pyspark
# Install python deps
COPY requirements/requirements.txt .
RUN pip3 install -r requirements.txt
Next, let’s setup more environment variables. We do the following:
- Set the
JAVA_HOME
environment variable; - Add bin and sbin directories located in
SPARK_HOME
(/opt/spark
) ANDHADOOP_HOME
(/opt/hadoop
) to thePATH
; - Add
JAVA_HOME/bin
to thePATH
.
# Set JAVA_HOME environment variable
ENV JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64"
# Add the Spark and Hadoop bin and sbin to the PATH variable.
# Also add $JAVA_HOME/bin to the PATH
ENV PATH="$SPARK_HOME/sbin:/opt/spark/bin:${PATH}"
ENV PATH="$HADOOP_HOME/bin:$HADOOP_HOME/sbin:${PATH}"
ENV PATH="${PATH}:${JAVA_HOME}/bin"
Next, add Spark related environment variables and tell the dynamic link loader where to find Hadoop native libraries:
# Setup Spark related environment variables
ENV SPARK_MASTER="spark://spark-yarn-master:7077"
ENV SPARK_MASTER_HOST spark-yarn-master
ENV SPARK_MASTER_PORT 7077
ENV PYSPARK_PYTHON python3
ENV HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"
# Add Hadoop native library path to the dynamic link library path
ENV LD_LIBRARY_PATH="$HADOOP_HOME/lib/native:${LD_LIBRARY_PATH}"
Setup the user that will be used by HDFS and Yarn. It is probably not a smart idea to use the root user in production.
# Set user for HDFS and Yarn (for production probably not smart to put root)
ENV HDFS_NAMENODE_USER="root"
ENV HDFS_DATANODE_USER="root"
ENV HDFS_SECONDARYNAMENODE_USER="root"
ENV YARN_RESOURCEMANAGER_USER="root"
ENV YARN_NODEMANAGER_USER="root"
For some strange reason, HADOOP does not read the JAVA_HOME
variable from the environment. Therefore, I needed to add an export command to the file $HADOOP_HOME/etc/hadoop/haddop-env.sh
.
# Add JAVA_HOME to haddop-env.sh
RUN echo "export JAVA_HOME=${JAVA_HOME}" >> \
"$HADOOP_HOME/etc/hadoop/hadoop-env.sh"
Copy the configuration files. We’re copying the Spark spark-defaults.conf
file and the configurations for Hadoop. Make the binaries and scripts executable and set the PYTHONPATH
environment variable.
# COPY the appropriate configuration files to their appropriate locations
COPY yarn/spark-defaults.conf "$SPARK_HOME/conf/"
COPY yarn/*.xml "$HADOOP_HOME/etc/hadoop/"
# Make the binaries and scripts executable and set the PYTHONPATH environment variable
RUN chmod u+x /opt/spark/sbin/* && \
chmod u+x /opt/spark/bin/*
ENV PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
For this setup, we don’t need to start the ssh server. However, someHadoop scripts require ssh in order for the nodes to connect. Therefore, we generate an ssh key and store it in the appropriate directory. Also copy the configuration for ssh.
RUN ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa && \
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys && \
chmod 600 ~/.ssh/authorized_keys
COPY ssh_config ~/.ssh/config
The contents of the ssh_config file:
Host *
UserKnownHostsFile /dev/null
StrictHostKeyChecking no
Finally, copy the entrypoint-yarn.sh
file to entrypoint.sh
, expose port 22 and set the entrypoint.
COPY entrypoint-yarn.sh entrypoint.sh
EXPOSE 22
ENTRYPOINT ["./entrypoint.sh"]
Spark and Hadoop Yarn configurations
Let’s take a look at the Spark defaults configuration file that we use first:
spark.master yarn
spark.submit.deployMode client
spark.driver.memory 512m
spark.executor.memory 512m
spark.yarn.am.memory 1G
spark.eventLog.enabled true
spark.eventLog.dir hdfs://spark-yarn-master:8080/spark-logs
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.fs.logDirectory hdfs://spark-yarn-master:8080/spark-logs
spark.history.fs.update.interval 10s
spark.history.ui.port 18080
Here, we set the spark master to be yarn, the default deploy mode for submitting jobs is client, and set the driver, executor and application master memory used in client mode. We also enable event logs, set the directory to store the logs (spark.eventLog.dir
), the directory from which the logs will be read (spark.history.fs.logDirectory
) , the history provider, update interval for which the history provider checks the logs, and the history server port.
I placed this file and the following hadoop yarn configuration files under the directory yarn
.
Core site (core-site.xml)
Set the location for the master node in the file core-site.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://spark-yarn-master:8080</value>
</property>
</configuration>
The value spark-yarn-master
is the service name in the docker-compose file.
HDFS site (hdfs-site.xml)
In this file, we store the location where on the local filesystem the DFS name node should store the name table (dfs.namenode.name.dir
). We also store where on the local filesystem the data node should store its blocks (dfs.datanode.data.dir
) and the number of block replications.
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.namenode.name.dir</name>
<value>/opt/hadoop/data/nameNode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/opt/hadoop/data/dataNode</value>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
</configuration>
The options dfs.permissions.enabled
enables or disables permission checking when navigating the HDFS system over the web interface.
See reference 6.
Mapred site (mapred-site.xml)
Set yarn as the job scheduler.
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=$HADOOP_HOME</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=$HADOOP_HOME</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=$HADOOP_HOME</value>
</property>
<property>
<name>yarn.app.mapreduce.am.resource.mb</name>
<value>1024</value>
</property>
<property>
<name>mapreduce.map.memory.mb</name>
<value>512</value>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>512</value>
</property>
</configuration>
The settings:
mapreduce.framework.name
— the runtime framework for executing MapReduce jobsyarn.app.mapreduce.am.env
— the user added environment variables for Application Master processesmapreduce.map.env
andmapreduce.reduce.env
— specify environment variables for map and reduce jobsyarn.app.mapreduce.am.resource.mb
— the amount of memory required by the Application Master in MBmapreduce.map.memory.mb
andmapreduce.reduce.memory.mb
— the amount of memory to request from the scheduler for each map and reduce task
See reference 7.
Yarn site (yarn-site.xml)
Configure yarn and set resource limits.
<?xml version="1.0"?>
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>spark-yarn-master</value>
</property>
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>2</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>2048</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>2048</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>512</value>
</property>
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
</configuration>
The following settings are set:
yarn.resourcemanager.hostname
— the hostname for the ResourceManageryarn.nodemanager.resource.cpu-vcores
— used by the ResourceManager to limit the number of vcores that can allocated for containers. Does not limit the number of CPUs used by Yarn containersyarn.nodemanager.aux-services
— a list of services to use. In this case, use the MapReduce shuffle serviceyarn.nodemanager.resource.memory-mb
— amount of physical memory in MB that can be allocated for containersyarn.nodemanager.scheduler.maximum-allocation-mb
andyarn.nodemanager.scheduler.minimum-allocation-mb
— The maximum and minium allocation for every container request at the ResourceManager in MByarn.log-aggregation-enable
— log aggregation collects the containers logs and allows us to inspect them while developing/debugging.
For more see reference 8.
Capacity scheduler (capacity-scheduler.xml, optional)
This is really not required for the cluster, but I kept it anyway. The capacity scheduler is a pluggable scheduler that aims to maximise the throughput of the cluster. It allows multiple tenants to share the cluster while ensuring that the “applications are allocated resources in a timely manner” under capacity constraints [9].
<configuration>
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>default</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.capacity</name>
<value>100</value>
</property>
<property>
<name>yarn.scheduler.capacity.resource-calculator</name>
<value>org.apache.hadoop.yarn.util.resource.DominantResourceCalculator</value>
</property>
</configuration>
The following settings are configured:
yarn.scheduler.capacity.root.queues
— setup the queueyarn.scheduler.capacity.root.default.capacity
— queue capacity as a percentage. The sum of all capacities for all queues must be equal to 100%yarn.scheduler.capacity.resource-calculater
— the ResourceCalculator to use to compare the resources in the scheduler. The DomaintResourceCalculator takes into account various resources, such as memory, cpu, etc.
For reference see 9.
Entrypoint script
Here is the entire code for the entrypoint.sh
script:
#!/bin/bash
SPARK_WORKLOAD=$1
echo "SPARK_WORKLOAD: $SPARK_WORKLOAD"
/etc/init.d/ssh start
if [ "$SPARK_WORKLOAD" == "master" ];
then
hdfs namenode -format
# start the master node processes
hdfs --daemon start namenode
hdfs --daemon start secondarynamenode
yarn --daemon start resourcemanager
# create required directories, but may fail so do it in a loop
while ! hdfs dfs -mkdir -p /spark-logs;
do
echo "Failed creating /spark-logs hdfs dir"
done
echo "Created /spark-logs hdfs dir"
hdfs dfs -mkdir -p /opt/spark/data
echo "Created /opt/spark/data hdfs dir"
# copy the data to the data HDFS directory
hdfs dfs -copyFromLocal /opt/spark/data/* /opt/spark/data
hdfs dfs -ls /opt/spark/data
elif [ "$SPARK_WORKLOAD" == "worker" ];
then
# added later (after publishing)
hdfs namenode -format
# start the worker node processes
hdfs --daemon start datanode
yarn --daemon start nodemanager
elif [ "$SPARK_WORKLOAD" == "history" ];
then
while ! hdfs dfs -test -d /spark-logs;
do
echo "spark-logs doesn't exist yet... retrying"
sleep 1;
done
echo "Exit loop"
# start the spark history server
start-history-server.sh
fi
tail -f /dev/null
We start by reading the argument passed to the script, as that argument tells us what kind of node we want — master, worker or history. We then start ssh regardless of the node.
If we’re in the master node, we format the hdfs namenode and start all of the processes that should be run on the master:
- NameNode (
namenode
) - SecondaryNameNode (
secondarynamenode
) - ResourceManager (
resourcemanager
)
We also create the HDFS directories for the spark logs and application data that we will use, and copy the data from the local filesystem to the HDFS filesystem.
On the worker we start the DataNode (datanode
) and NodeManager (nodemanager
). On the history, we wait for the spark-logs directory to be created on HDFS before starting Spark history server.
The last command prevents the docker container from exiting.
Docker compose (docker-compose.yarn.yml)
Here is the docker compose file that defines the master, worker and history server services.
version: '3.8'
services:
spark-yarn-master:
container_name: da-spark-yarn-master
build:
# We want to use the Dockerfile prepared for yarn
dockerfile: Dockerfile-yarn
context: .
image: da-spark-yarn-image
entrypoint: ['./entrypoint.sh', 'master']
volumes:
- ./book_data:/opt/spark/data
- ./spark_apps:/opt/spark/apps
env_file:
- .env.spark
ports:
- '9090:8080'
- '9870:9870'
- '7077:7077'
- '8088:8088'
spark-yarn-worker:
image: da-spark-yarn-image
entrypoint: ['./entrypoint.sh', 'worker']
depends_on:
- spark-yarn-master
env_file:
- .env.spark
volumes:
- ./book_data:/opt/spark/data
- ./spark_apps:/opt/spark/apps
yarn-history-server:
container_name: da-spark-yarn-history
image: da-spark-yarn-image
entrypoint: ['./entrypoint.sh', 'history']
depends_on:
- spark-yarn-master
env_file:
- .env.spark
ports:
- '18080:18080'
Starting the cluster, checking running processes and submitting jobs
Before running, you should check the virtual disk size that Docker will assign to the containers. I suggest at least 70 GB.
You can build the images by running make build-yarn
and run them using make run-yarn
. There is also the make run-yarn-scaled
command that brings up three data nodes. Wait until all of the files copied to HDFS are listed and the history container exits the loop and start the spark history server. When this happens, we’re ready.
Once up, we can navigate to the NameNode web UI on localhost:9870
. On the DataNode tab we can see the DataNodes connected to the cluster:
We can also take browse the HDFS directories from utilities > browse the file system
.
Let’s test whether everything is working by submitting a job. We can run make submit-yarn-test
to submit the pi.py
example in cluster mode. The command is translated into this:
docker exec da-spark-yarn-master spark-submit --master yarn --deploy-mode cluster ./examples/src/main/python/pi.py
Let’s also test with one of our custom applications:
make submit-yarn-cluster app=data_analysis_book/chapter03/word_non_null.py
After everything is finished, we can navigate to the ResourceManager web interface (localhost:8088
) to see the finished jobs.
We can also check out the run applications on the Spark history server (localhost:18080
).
Some useful commands
If you want to find out which JVM processes are running on a node, you can docker exec into a container and run the jps command. For example, running docker exec da-spark-yarn-master jps
will print out the following processes:
1139 Jps
101 NameNode
150 SecondaryNameNode
202 ResourceManager
You may also want to check the health of your nodes. You can list them all with docker exec da-spark-yarn-master yarn node -list -all
.
Total Nodes:3
Node-Id Node-State Node-Http-Address Number-of-Running-Containers
9f9b704b1432:38955 RUNNING 9f9b704b1432:8042 0
337c92ac60cf:39681 RUNNING 337c92ac60cf:8042 0
8540057b647a:38205 RUNNING 8540057b647a:8042 0
You can also check the logs of an application by running docker exec da-spark-yarn-master yarn logs -applicationId <application-id>
.
In each case, you could first enter the docker container by running docker exec da-spark-yarn-master bash
and then run, e.g. yarn node -list -all
.
Limitations and possible improvements
Working links
One of the biggest issues with this setup is that DataNode hostnames are assigned to random strings. Consequently, on the Hadoop web interfaces all of the links contain this assigned hostname address. This means that the links do not work. When you click on a link it tries to take you to hostname:port, but docker containers are accessible through localhost.
Additionally, I did not expose any ports used by the DataNodes. Hence, the DataNodes are not exposed to the host system.
MapReduce JobHistory server
I have removed starting the MapReduce JobHistory server. In all honesty, the server ran ok and you could see map reduce jobs submitted through yarn jar
, e.g.:
yarn jar /hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.1.0-beta.jar pi 16 100000
However, the server did not display any information about submitted Spark jobs. I am not sure whether it should have or not, or whether I configured something wrong.
Summary
In this story, I covered how to bring up a Hadoop Yarn cluster on Docker and run Spark applications on it. I also discussed the limitations of this approach.
I hope you found this post useful.
The code is on GitHub.
References
- https://www.linode.com/docs/guides/how-to-install-and-set-up-hadoop-cluster/
- https://www.linode.com/docs/guides/install-configure-run-spark-on-top-of-hadoop-yarn-cluster/
- https://www.ibm.com/analytics/hadoop
- https://hadoop.apache.org/
- https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html
- https://hadoop.apache.org/docs/r3.3.1/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml
- https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml
- https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-common/yarn-default.xml
- https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html