Setting up Hadoop Yarn to run Spark applications

Marin Aglić
13 min readJan 10, 2023

--

In this post I’ll talk about setting up a Hadoop Yarn cluster with Spark. After setting up a Spark standalone cluster, I noticed that I couldn’t submit Python script jobs in cluster mode. This bothered me more than it should have, so I spent some time learning to setup a Yarn cluster with Spark. The code is on GitHub here.

Introduction

In one of my previous posts I talked about setting up a Spark standalone cluster. This post is basically a continuation on that one that talks about how to setup a Yarn cluster and add Spark to it. You can check out the previous story here. Near the end of the post, I discuss the limitations of my approach and possible improvements.

Setting up Spark on a Yarn cluster would allow me to submit jobs in cluster mode. What’s the difference between client and cluster mode? When running a job in client mode, the driver is running on the client, e.g. a laptop. If the client fails, the job is shutdown. The executors still run on the Spark cluster. For the cluster mode, everything is run on the cluster. If you’ve started a job using your laptop, you can feel free to close it (see reference 2). So, in both modes, the actual work is performed on the cluster.

I had only two requirements when setting up this cluster:

  1. It needs to run in Docker;
  2. Make it as simplest as I could (at the end I think I mostly failed in this regard);
  3. Workers are not predefined — I want to have an arbitrary number of workers with docker compose --scale option.

What is Hadoop, Yarn, and Spark?

Apache Hadoop is a software platform that facilitates the processing of a large amount of data across a cluster of computers [3]. It is designed to detect failures at the application level and be scalable to clusters of thousands of machines. Each machine offers local computation and storage [3]. Hadoop is based on the MapReduce programming model. If these seem familiar, it’s because they are higher order functions that can be found in many programming languages. The map function iterates over the elements of a collection and applies a transformation function on each element. The reduce function converts the input array into an array of a different dimension or single value. The combination of these functions proved to be a very powerful programming model.

One of the modules of Hadoop is the HDFS (Hadoop Distributed File System) that handles data redundancy and scalability across nodes [1].

Hadoop Yarn is basically a component of Hadoop which provides resource management and job scheduling. Let’s try to put it simply. You have a global resource manager (called ResourceManager) running on a master node. The ResourceManager assigns resources to a running application. Each worker node runs a NodeManager. The NodeManager is responsible for the node’s containers and reporting their resource usage back to the ResourceManager [5]. A container in YARN is just an abstract notion that represents resources.

The ResourceManager consists of:

  • Scheduler
  • ApplicationsManager.

When you submit an application to the cluster, the Scheduler is responsible for allocating resources to the application taking into consideration all of the various constraints. The Scheduler performs only its scheduling function. It takes into account the application’s requirements based on the resources available to a resource Containers (memory, network, disk, etc.) [5].

An application can be a job or a DAG of jobs. The ApplicationManager is responsible for accepting job submissions. It negotiates the first container to run the ApplicationMaster and is responsible for restarting the container in case of failure. The ApplicationMaster is per-application specific. It negotiates the resources with the Scheduler and works with the NodeManagers to make sure the application gets executed [5]. The ApplicationMaster can also create executors that run the actual jobs.

NodeManagers are run on worker nodes and are responsible for creating the containers on the node. Both the ApplicationMaster and executors run in containers [1].

Preparing the Dockerfile

All of the code for this post is in the same GitHub repo with the Spark standalone cluster. Therefore, I named the Dockerfile Dockerfile-yarn.

The first part of the Dockerfile is basically the same as the one I used for the Spark standalone cluster with the exception of downloading and installing Hadoop:

FROM python:3.10-bullseye as spark-base #base image

# Install tools required by the OS
RUN apt-get update && \
apt-get install -y --no-install-recommends \
sudo \
curl \
vim \
unzip \
rsync \
openjdk-11-jdk \
build-essential \
software-properties-common \
ssh && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*



# Setup the directories for our Spark and Hadoop installations
ENV SPARK_HOME=${SPARK_HOME:-"/opt/spark"}
ENV HADOOP_HOME=${HADOOP_HOME:-"/opt/hadoop"}

RUN mkdir -p ${HADOOP_HOME} && mkdir -p ${SPARK_HOME}
WORKDIR ${SPARK_HOME}

# Download and install Spark
RUN curl https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz -o spark-3.3.1-bin-hadoop3.tgz \
&& tar xvzf spark-3.3.1-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \
&& rm -rf spark-3.3.1-bin-hadoop3.tgz

# Download and install Hadoop
RUN curl https://dlcdn.apache.org/hadoop/common/hadoop-3.3.1/hadoop-3.3.1.tar.gz -o hadoop-3.3.1-bin.tar.gz \
&& tar xfz hadoop-3.3.1-bin.tar.gz --directory /opt/hadoop --strip-components 1 \
&& rm -rf hadoop-3.3.1-bin.gz

Next, let’s install the Python dependencies (using the previous section as the previous image layer):

FROM spark-base as pyspark

# Install python deps
COPY requirements/requirements.txt .
RUN pip3 install -r requirements.txt

Next, let’s setup more environment variables. We do the following:

  1. Set the JAVA_HOME environment variable;
  2. Add bin and sbin directories located in SPARK_HOME (/opt/spark) AND HADOOP_HOME (/opt/hadoop) to the PATH;
  3. Add JAVA_HOME/bin to the PATH.
# Set JAVA_HOME environment variable
ENV JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64"

# Add the Spark and Hadoop bin and sbin to the PATH variable.
# Also add $JAVA_HOME/bin to the PATH
ENV PATH="$SPARK_HOME/sbin:/opt/spark/bin:${PATH}"
ENV PATH="$HADOOP_HOME/bin:$HADOOP_HOME/sbin:${PATH}"
ENV PATH="${PATH}:${JAVA_HOME}/bin"

Next, add Spark related environment variables and tell the dynamic link loader where to find Hadoop native libraries:

# Setup Spark related environment variables
ENV SPARK_MASTER="spark://spark-yarn-master:7077"
ENV SPARK_MASTER_HOST spark-yarn-master
ENV SPARK_MASTER_PORT 7077
ENV PYSPARK_PYTHON python3
ENV HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"

# Add Hadoop native library path to the dynamic link library path
ENV LD_LIBRARY_PATH="$HADOOP_HOME/lib/native:${LD_LIBRARY_PATH}"

Setup the user that will be used by HDFS and Yarn. It is probably not a smart idea to use the root user in production.

# Set user for HDFS and Yarn (for production probably not smart to put root)
ENV HDFS_NAMENODE_USER="root"
ENV HDFS_DATANODE_USER="root"
ENV HDFS_SECONDARYNAMENODE_USER="root"
ENV YARN_RESOURCEMANAGER_USER="root"
ENV YARN_NODEMANAGER_USER="root"

For some strange reason, HADOOP does not read the JAVA_HOME variable from the environment. Therefore, I needed to add an export command to the file $HADOOP_HOME/etc/hadoop/haddop-env.sh.

# Add JAVA_HOME to haddop-env.sh
RUN echo "export JAVA_HOME=${JAVA_HOME}" >> \
"$HADOOP_HOME/etc/hadoop/hadoop-env.sh"

Copy the configuration files. We’re copying the Spark spark-defaults.conf file and the configurations for Hadoop. Make the binaries and scripts executable and set the PYTHONPATH environment variable.

# COPY the appropriate configuration files to their appropriate locations
COPY yarn/spark-defaults.conf "$SPARK_HOME/conf/"
COPY yarn/*.xml "$HADOOP_HOME/etc/hadoop/"

# Make the binaries and scripts executable and set the PYTHONPATH environment variable
RUN chmod u+x /opt/spark/sbin/* && \
chmod u+x /opt/spark/bin/*

ENV PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH

For this setup, we don’t need to start the ssh server. However, someHadoop scripts require ssh in order for the nodes to connect. Therefore, we generate an ssh key and store it in the appropriate directory. Also copy the configuration for ssh.

RUN ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa && \
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys && \
chmod 600 ~/.ssh/authorized_keys

COPY ssh_config ~/.ssh/config

The contents of the ssh_config file:

Host *
UserKnownHostsFile /dev/null
StrictHostKeyChecking no

Finally, copy the entrypoint-yarn.sh file to entrypoint.sh, expose port 22 and set the entrypoint.

COPY entrypoint-yarn.sh entrypoint.sh

EXPOSE 22

ENTRYPOINT ["./entrypoint.sh"]

Spark and Hadoop Yarn configurations

Let’s take a look at the Spark defaults configuration file that we use first:

spark.master                     yarn
spark.submit.deployMode client
spark.driver.memory 512m
spark.executor.memory 512m
spark.yarn.am.memory 1G
spark.eventLog.enabled true
spark.eventLog.dir hdfs://spark-yarn-master:8080/spark-logs
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.fs.logDirectory hdfs://spark-yarn-master:8080/spark-logs
spark.history.fs.update.interval 10s
spark.history.ui.port 18080

Here, we set the spark master to be yarn, the default deploy mode for submitting jobs is client, and set the driver, executor and application master memory used in client mode. We also enable event logs, set the directory to store the logs (spark.eventLog.dir), the directory from which the logs will be read (spark.history.fs.logDirectory) , the history provider, update interval for which the history provider checks the logs, and the history server port.

I placed this file and the following hadoop yarn configuration files under the directory yarn.

Core site (core-site.xml)

Set the location for the master node in the file core-site.xml:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://spark-yarn-master:8080</value>
</property>
</configuration>

The value spark-yarn-master is the service name in the docker-compose file.

HDFS site (hdfs-site.xml)

In this file, we store the location where on the local filesystem the DFS name node should store the name table (dfs.namenode.name.dir). We also store where on the local filesystem the data node should store its blocks (dfs.datanode.data.dir) and the number of block replications.

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.namenode.name.dir</name>
<value>/opt/hadoop/data/nameNode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/opt/hadoop/data/dataNode</value>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
</configuration>

The options dfs.permissions.enabled enables or disables permission checking when navigating the HDFS system over the web interface.

See reference 6.

Mapred site (mapred-site.xml)

Set yarn as the job scheduler.

<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=$HADOOP_HOME</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=$HADOOP_HOME</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=$HADOOP_HOME</value>
</property>
<property>
<name>yarn.app.mapreduce.am.resource.mb</name>
<value>1024</value>
</property>
<property>
<name>mapreduce.map.memory.mb</name>
<value>512</value>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>512</value>
</property>
</configuration>

The settings:

  • mapreduce.framework.name— the runtime framework for executing MapReduce jobs
  • yarn.app.mapreduce.am.env — the user added environment variables for Application Master processes
  • mapreduce.map.env and mapreduce.reduce.env — specify environment variables for map and reduce jobs
  • yarn.app.mapreduce.am.resource.mb — the amount of memory required by the Application Master in MB
  • mapreduce.map.memory.mb and mapreduce.reduce.memory.mb — the amount of memory to request from the scheduler for each map and reduce task

See reference 7.

Yarn site (yarn-site.xml)

Configure yarn and set resource limits.

<?xml version="1.0"?>
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>spark-yarn-master</value>
</property>
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>2</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>2048</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>2048</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>512</value>
</property>

<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
</configuration>

The following settings are set:

  • yarn.resourcemanager.hostname — the hostname for the ResourceManager
  • yarn.nodemanager.resource.cpu-vcores — used by the ResourceManager to limit the number of vcores that can allocated for containers. Does not limit the number of CPUs used by Yarn containers
  • yarn.nodemanager.aux-services — a list of services to use. In this case, use the MapReduce shuffle service
  • yarn.nodemanager.resource.memory-mb — amount of physical memory in MB that can be allocated for containers
  • yarn.nodemanager.scheduler.maximum-allocation-mb and yarn.nodemanager.scheduler.minimum-allocation-mb — The maximum and minium allocation for every container request at the ResourceManager in MB
  • yarn.log-aggregation-enable — log aggregation collects the containers logs and allows us to inspect them while developing/debugging.

For more see reference 8.

Capacity scheduler (capacity-scheduler.xml, optional)

This is really not required for the cluster, but I kept it anyway. The capacity scheduler is a pluggable scheduler that aims to maximise the throughput of the cluster. It allows multiple tenants to share the cluster while ensuring that the “applications are allocated resources in a timely manner” under capacity constraints [9].

<configuration>
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>default</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.capacity</name>
<value>100</value>
</property>
<property>
<name>yarn.scheduler.capacity.resource-calculator</name>
<value>org.apache.hadoop.yarn.util.resource.DominantResourceCalculator</value>
</property>
</configuration>

The following settings are configured:

  1. yarn.scheduler.capacity.root.queues — setup the queue
  2. yarn.scheduler.capacity.root.default.capacity — queue capacity as a percentage. The sum of all capacities for all queues must be equal to 100%
  3. yarn.scheduler.capacity.resource-calculater — the ResourceCalculator to use to compare the resources in the scheduler. The DomaintResourceCalculator takes into account various resources, such as memory, cpu, etc.

For reference see 9.

Entrypoint script

Here is the entire code for the entrypoint.sh script:

#!/bin/bash

SPARK_WORKLOAD=$1

echo "SPARK_WORKLOAD: $SPARK_WORKLOAD"

/etc/init.d/ssh start

if [ "$SPARK_WORKLOAD" == "master" ];
then
hdfs namenode -format

# start the master node processes
hdfs --daemon start namenode
hdfs --daemon start secondarynamenode
yarn --daemon start resourcemanager

# create required directories, but may fail so do it in a loop
while ! hdfs dfs -mkdir -p /spark-logs;
do
echo "Failed creating /spark-logs hdfs dir"
done
echo "Created /spark-logs hdfs dir"
hdfs dfs -mkdir -p /opt/spark/data
echo "Created /opt/spark/data hdfs dir"


# copy the data to the data HDFS directory
hdfs dfs -copyFromLocal /opt/spark/data/* /opt/spark/data
hdfs dfs -ls /opt/spark/data

elif [ "$SPARK_WORKLOAD" == "worker" ];
then
# added later (after publishing)
hdfs namenode -format

# start the worker node processes
hdfs --daemon start datanode
yarn --daemon start nodemanager
elif [ "$SPARK_WORKLOAD" == "history" ];
then

while ! hdfs dfs -test -d /spark-logs;
do
echo "spark-logs doesn't exist yet... retrying"
sleep 1;
done
echo "Exit loop"

# start the spark history server
start-history-server.sh
fi

tail -f /dev/null

We start by reading the argument passed to the script, as that argument tells us what kind of node we want — master, worker or history. We then start ssh regardless of the node.

If we’re in the master node, we format the hdfs namenode and start all of the processes that should be run on the master:

  1. NameNode (namenode)
  2. SecondaryNameNode (secondarynamenode)
  3. ResourceManager (resourcemanager)

We also create the HDFS directories for the spark logs and application data that we will use, and copy the data from the local filesystem to the HDFS filesystem.

On the worker we start the DataNode (datanode) and NodeManager (nodemanager). On the history, we wait for the spark-logs directory to be created on HDFS before starting Spark history server.

The last command prevents the docker container from exiting.

Docker compose (docker-compose.yarn.yml)

Here is the docker compose file that defines the master, worker and history server services.

version: '3.8'

services:
spark-yarn-master:
container_name: da-spark-yarn-master
build:
# We want to use the Dockerfile prepared for yarn
dockerfile: Dockerfile-yarn
context: .
image: da-spark-yarn-image
entrypoint: ['./entrypoint.sh', 'master']
volumes:
- ./book_data:/opt/spark/data
- ./spark_apps:/opt/spark/apps
env_file:
- .env.spark
ports:
- '9090:8080'
- '9870:9870'
- '7077:7077'
- '8088:8088'

spark-yarn-worker:
image: da-spark-yarn-image
entrypoint: ['./entrypoint.sh', 'worker']
depends_on:
- spark-yarn-master
env_file:
- .env.spark
volumes:
- ./book_data:/opt/spark/data
- ./spark_apps:/opt/spark/apps

yarn-history-server:
container_name: da-spark-yarn-history
image: da-spark-yarn-image
entrypoint: ['./entrypoint.sh', 'history']
depends_on:
- spark-yarn-master
env_file:
- .env.spark
ports:
- '18080:18080'

Starting the cluster, checking running processes and submitting jobs

Before running, you should check the virtual disk size that Docker will assign to the containers. I suggest at least 70 GB.

You can build the images by running make build-yarn and run them using make run-yarn. There is also the make run-yarn-scaled command that brings up three data nodes. Wait until all of the files copied to HDFS are listed and the history container exits the loop and start the spark history server. When this happens, we’re ready.

Once up, we can navigate to the NameNode web UI on localhost:9870. On the DataNode tab we can see the DataNodes connected to the cluster:

NameNode web interface

We can also take browse the HDFS directories from utilities > browse the file system.

HDFS file system

Let’s test whether everything is working by submitting a job. We can run make submit-yarn-test to submit the pi.py example in cluster mode. The command is translated into this:

docker exec da-spark-yarn-master spark-submit --master yarn --deploy-mode cluster ./examples/src/main/python/pi.py

Let’s also test with one of our custom applications:

make submit-yarn-cluster app=data_analysis_book/chapter03/word_non_null.py

After everything is finished, we can navigate to the ResourceManager web interface (localhost:8088) to see the finished jobs.

Finished jobs on ResourceManager web interface

We can also check out the run applications on the Spark history server (localhost:18080).

Spark history server web interface

Some useful commands

If you want to find out which JVM processes are running on a node, you can docker exec into a container and run the jps command. For example, running docker exec da-spark-yarn-master jps will print out the following processes:

1139 Jps
101 NameNode
150 SecondaryNameNode
202 ResourceManager

You may also want to check the health of your nodes. You can list them all with docker exec da-spark-yarn-master yarn node -list -all.

Total Nodes:3
Node-Id Node-State Node-Http-Address Number-of-Running-Containers
9f9b704b1432:38955 RUNNING 9f9b704b1432:8042 0
337c92ac60cf:39681 RUNNING 337c92ac60cf:8042 0
8540057b647a:38205 RUNNING 8540057b647a:8042 0

You can also check the logs of an application by running docker exec da-spark-yarn-master yarn logs -applicationId <application-id>.

In each case, you could first enter the docker container by running docker exec da-spark-yarn-master bash and then run, e.g. yarn node -list -all.

Limitations and possible improvements

Working links

One of the biggest issues with this setup is that DataNode hostnames are assigned to random strings. Consequently, on the Hadoop web interfaces all of the links contain this assigned hostname address. This means that the links do not work. When you click on a link it tries to take you to hostname:port, but docker containers are accessible through localhost.

Additionally, I did not expose any ports used by the DataNodes. Hence, the DataNodes are not exposed to the host system.

MapReduce JobHistory server

I have removed starting the MapReduce JobHistory server. In all honesty, the server ran ok and you could see map reduce jobs submitted through yarn jar, e.g.:

yarn jar /hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.1.0-beta.jar pi 16 100000

However, the server did not display any information about submitted Spark jobs. I am not sure whether it should have or not, or whether I configured something wrong.

Summary

In this story, I covered how to bring up a Hadoop Yarn cluster on Docker and run Spark applications on it. I also discussed the limitations of this approach.

I hope you found this post useful.

The code is on GitHub.

References

  1. https://www.linode.com/docs/guides/how-to-install-and-set-up-hadoop-cluster/
  2. https://www.linode.com/docs/guides/install-configure-run-spark-on-top-of-hadoop-yarn-cluster/
  3. https://www.ibm.com/analytics/hadoop
  4. https://hadoop.apache.org/
  5. https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html
  6. https://hadoop.apache.org/docs/r3.3.1/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml
  7. https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml
  8. https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-common/yarn-default.xml
  9. https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html

--

--

Marin Aglić

Working as a Software Engineer. Interested in Data Engineering. Mostly working with airflow, python, celery, bigquery. Ex PhD student.