Satisfying Apache Spark dependencies on Hadoop YARN

Because we can’t always call df.collect() and run pandas locally

Tuck Ging
Lynx Data Engineering
9 min readMay 11, 2022

--

Being a distributed framework, Apache Spark runs code across many nodes. However, this paradise in parallel processing can quickly turn into a nightmare in dependency hell, especially for the uninitiated.

The people setting up and managing Hadoop clusters are typically not the same ones writing ETL batch jobs and deploying fancy machine learning models running on bleeding edge Python packages. As such, the latter group may have difficulty using packages and jars that were never originally installed in the cluster.

While modern deployment practices like Dockerised Airflow allow us to easily revert to using the Python that’s already preinstalled in our clusters, this article aims to unleash us from the shackles of deprecated APIs and outmoded programming patterns.

…unless you’re calling df.collect() to do everything locally as a workaround, then we’ll save you from Out-Of-Memory (OOM) errors 6 months down the road when your data no longer fits in memory.

Prerequisites

That’s right, this article has dependencies, too.

If you wish to follow along, a pseudo-cluster can be set up entirely within one Docker host for us to play with. If you’re unfamiliar with Docker, a quick tutorial on it (and docker-compose) should suffice as we’re only using the most common features.

$ git clone https://github.com/tuckging/tutorials.git
$ cd tutorials/spark-on-hadoop/
$ docker-compose up --build
Fair warning: the above command will install JRE, Python, Hadoop and Jupyter etc, so it might take more than a couple of minutes, especially if you’re using a HDD!

Barring any errors, and with sufficient RAM (around 6GB free), we should have a pseudo-cluster up and running, accessible at the following Web UIs:

From here on, we’ll be running code entirely within Jupyter, so we’ll change the prefix for bash commands from $ to ! to run in Jupyter cells.

Lesson #1: There is no inherent Spark version in Hadoop YARN

What!?

That’s right. We don’t actually install Spark onto a Hadoop cluster. Instead, the Spark jars are typically distributed throughout the cluster each time we call spark-submit. It’s one of several reasons why it often takes up to 30 seconds just to start a new SparkContext.

In fact, this is a fresh install of Hadoop, and there is no Spark installed anywhere at all. So, let’s create a new Jupyter notebook (or open the provided one) and install the latest PySpark 3.2.1 into the jupyter container:

import sys
!{sys.executable} -m pip install pyspark==3.2.1

All the other nodes in our pseudo-cluster remain Spark-less, so what happens when we try to create a SparkSession with yarn as master?

from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.master('yarn')
.getOrCreate()
)
spark
Spark 3.2.1 confirmed

We should get versionv3.2.1 in both the cell output and in Spark UI (http://localhost:4040). At this point, you’d probably want to resolve master and jupyter to 127.0.0.1 in your hosts file to avoid repeatedly retyping master:8088 to localhost:8088 . Just remember to clear the entries when you’re done!

We can then simply install another PySpark version like 3.0.0 (pip install pyspark==3.0.0 ), restart the Jupyter kernel and it will also run just fine in the cluster, so long as Java and Hadoop versions remain compliant.

BTW, if anyone insists that their cluster has Spark installed, they could be talking about a Spark Standalone cluster, which is different from Hadoop YARN!

Lesson #2: Python UDFs require Python interpreters, duh!

Let’s say we need to invoke some Python code in our remote Spark Executors. We may do that using Python User-Defined Functions (UDFs) like so:

from pyspark.sql.functions import udfdef get_python_version():
import sys
return f'{sys.executable} v{sys.version}'
py_udf = udf(get_python_version)
spark.range(3).withColumn("newCol", py_udf()).show(truncate=False)

Depending on the Spark version being run, your UDF may or may not run successfully. Under Spark 3.0.3, this fails in our pseudo-cluster with the error message: Cannot run program "python" , but would succeed with Spark 3.1.1 and onwards. This is because since 3.1.1, the default python was updated from python2.7 to python3 , which matches what our pseudo-cluster has.

Before we properly fix the issue, let’s explore why it fails in the first place. If we think about it, we are clearly already running some python code, so how could it complain that it can’t find python ?

Photo originally by David Clode on Unsplash

If we look closelier, the evaluation of f'{sys.executable} v{sys.version}' is distributed to run on remote Spark Executors, and not on the Spark driver or local Jupyter/python client. Before calling .show() , those remote nodes are only running JVMs for Spark Executors, without any s̶li̶t̶h̶e̶r sliver of python code running anywhere. It’s only upon invoking our Python UDF do they scramble to find python and subsequently error out.

To help our Spark Executors find python , we restart the kernel (to clear away the existing SparkSession), then simply define the PYSPARK_PYTHON environment variable [docs] before starting a new SparkSession.

import os
os.environ['PYSPARK_PYTHON'] = 'python3'
f’{sys.executable} v{sys.version}’ evaluated on remote Spark Executors

Alternatively, we can also go to each worker node in the cluster to symlink python to the installed python3 or even specifically python3.8 , but that is cumbersome without a cluster manager, and even discouraged as it may break other software that expect python to point to a different version.

“What if these are not the pythons we’re looking for?”

Whether for getting bleeding edge features like pattern matching in python3.10, or satisfying legacy code or packages, we sometimes need to run a different python version that’s not installed in the cluster. That brings us to…

Lesson #3: It’s Bring Your Own Python to Cluster Day!

According to Spark docs, conda is the only officially recommended way to distribute our own chosen python interpreter. For the sake of demonstration, since Jupyter is running Python 3.8.10, we’ll choose to distribute a very similar Python 3.8.13 because the major version (3.8) must remain the same between Jupyter client and Spark Executors. In practice, your Jupyter kernel should already be using your desired python.

Using the Miniconda that’s preinstalled at /opt/conda (thanks Dockerfile!), the following Jupyter cell creates a new conda environment with Python 3.8.13 and packs it into a .tgz archive ready for distribution.

!/opt/conda/bin/conda install -y conda-pack
!/opt/conda/bin/conda create -y python=3.8.13 --name=conda-env
!/opt/conda/bin/conda pack --prefix /opt/conda/envs/conda-env -f -o ./conda-env.tgz

We then restart kernel, configure spark.yarn.dist.archives to the local path of the packed .tgz archive, followed by a # and the final extracted folder name, conda-env . This tells Spark on YARN to distribute the local .tgz to all the nodes and extract it locally as conda-env . From here, we simply point PYSPARK_PYTHON to ./conda-env/bin/python3.8 and voilà! Every node gets a Python interpreter!

import os
os.environ['PYSPARK_PYTHON'] = './conda-env/bin/python3.8'
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.master('yarn')
.config('spark.yarn.dist.archives','./conda-env.tgz#conda-env')
.getOrCreate()
)
spark
Every node gets the newly distributed python!
Python for days!

Lesson #4: PyPI packages hitch a ride when distributing conda environments

Now that we’ve managed the trivial case of evaluating sys.version , let’s try providing PyPI dependencies like pandas==1.4.2 to Spark Executors. It’s super simple:

Step 1: install pandas into the conda environment and repack it.

!/opt/conda/envs/conda-env/bin/pip install pandas==1.4.2
!/opt/conda/bin/conda pack --prefix /opt/conda/envs/conda-env -f -o ./conda-env.tgz

Step 2: restart kernel, recreate SparkSession, then try importing pandas.

from pyspark.sql.functions import udfdef get_pandas_version():
import pandas
return f'pandas v{pandas.__version__}'
py_udf = udf(get_pandas_version)
spark.range(3).withColumn("newCol", py_udf()).show(truncate=False)
pandas sighting confirmed! 🐼🐼🐼

Step 3: Profit!

Tip: spark.yarn.dist.archives also accepts HDFS paths, which allows the conda environment to enter YARN cache and improve SparkSession startup times slightly. Spark’s jars themselves can also be cached in this manner.

Alternatively, if your cluster already has the desired python interpreter, and only lacks PyPI packages, you could opt to distribute only the packages by using venv and venv-pack similarly.

Hack: Interestingly, it’s also technically possible to distribute our own portable Java runtime and point JAVA_HOME to use it. However, this is undocumented, so use it only as a last resort and at your own risk! Remember that Hadoop itself also runs on the cluster’s available Java version.

Lesson #5: New data source? There’s a Jar for that!

While Spark performs optimally with co-located HDFS files or Hive, we often find ourselves extracting or loading data from external sources like Google BigQuery, AWS S3, Kafka, PostgreSQL etc. Almost invariably, these external data sources provide connectivity to Spark via Java .jar archives. So, let’s explore our options with Google’ spark-bigquery-connector as an example.

Generically, we download the .jar (ensure compatible Scala version!) as bq.jar

!curl https://storage.googleapis.com/spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.24.2.jar -o bq.jar

Then restart kernel and provide it Spark like so:

from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.master('yarn')
.config('spark.yarn.dist.jars','./bq.jar')
.config('spark.driver.extraClassPath','./bq.jar')
.getOrCreate()
)
spark

Similar to distributing conda environments, we first distribute the jar using spark.yarn.dist.jars (comma-separated), then get Spark Driver and Executors to pick it up using spark.driver.extraClassPath (colon-separated on Unix, semicolon-separated on Windows). We don’t specify extraClassPath for Executors because they already inherit from Driver.

We then test the connector with some small sample public data:

df = (spark.read
.option("gcpAccessToken", '<your token here>')
.option("parentProject", "<your GCP project here>")
.format("bigquery")
.load("bigquery-public-data.samples.shakespeare")
)
df.groupBy('corpus') \
.sum('word_count') \
.orderBy('sum(word_count)', ascending=False) \
.show(3)
Shakespeare’s longest work by word count is Hamlet. Good to know!

Great, it works!

However, oftentimes there is an easier way: restart kernel and simply lookup the maven artifact name and configure spark.jars.packages like so:

from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.master('yarn')
.config('spark.jars.packages','com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.24.2')
.getOrCreate()
)
spark

and Spark will automatically download and use the specified jars (+dependencies!) from the Maven Central Repository.

It’s jars upon jars on jars! Photo by Ella Olsson on Pexels

Lesson #6: It’s dangerous to go alone! Take your helper.py along.

Other than for trivial tasks, production-level Spark applications often grow in complexity beyond a single main.py file. In our example, we’ll create two files helper.py and utils.py that each have the trivial function of printing their own module’s __name__:

with open('./helper.py', 'w') as f:
f.write(
'''def get_module():
return __name__
''')
with open('./utils.py', 'w') as f:
f.write(
'''def get_module():
return __name__
''')
import helper
import utils
f'{helper.get_module()}, {utils.get_module()}'

We then bundle the two .py files into a .zip archive and provide it to Spark using .addPyFile():

!zip friends.zip utils.py helper.py
spark.sparkContext.addPyFile('./friends.zip')

For once, we don’t need to restart the kernel for .addPyFile() to take effect. Spark will add the .zip file to all its Executor’s Python search paths, so we can call upon them like so:

from pyspark.sql.functions import udf
def get_module_names():
import utils
import helper
return f'{helper.get_module()}, {utils.get_module()}'
py_udf = udf(get_module_names)
spark.range(3).withColumn("newCol", py_udf()).show(truncate=False)
Thank you, friends.zip! We couldn’t have done it without you!

At this point, you may have recreated SparkSession without defining first PYSPARK_PYTHON=‘python3’, so do remember to restart kernel and define it again if you’re running Spark 3.0.3 or earlier.

Bonus Lesson #1: Have you tried turning it off and on again?

You may have noticed that there is an emphasis on restarting the kernel every time we want to recreate the SparkSession. This is because while calling spark.stop() does release resources from the cluster, it leaves behind a dangling JVM as a child process of Jupyter / python. Restarting the kernel ensures that we spawn a new JVM that can receive the full set of new environment variables and/or Spark configs.

Bonus Lesson #2: Time Waits For No One; change is the only constant

Throughout the ten years of open-source development of Spark, its dependencies have evolved over time and so has Spark itself. As such, it’s surprisingly easy to unknowingly mix incompatible dependencies, even when using generally available Java, Python, Scala or Hadoop versions.

Here are some tips:

  • Python 3.8+ requires Spark 3.0.0+
  • Java 7 was last supported in Spark 2.1.0
  • Scala 2.11.x was last supported in Spark 2.4.8,
    while Scala 2.12.x is for Spark 2.4.1 and onwards

Do remember that any additional Python or Java package you choose may have further version constraints, so do beware of that.

XKCD Comic by Randall Munroe

Wrapping it up

Running Spark on YARN allows us to distribute dependencies along with our main programs. Whether it’s Java jars, PyPI packages or even the python interpreter itself, they all follow the same pattern: first bundle them into archives, tell YARN to distribute them to all nodes, then start using them!

The concept is rather straightforward, but the devil is in the details of ensuring all dependencies are compatible with each other.

--

--

Lynx Data Engineering
Lynx Data Engineering

Published in Lynx Data Engineering

We provide innovative consulting and software services to leading companies in industries including banking, telecommunications, retail and life sciences. By harnessing our expertise in graph theory, AI and data science, our clients are able to predict and improve their business.

Tuck Ging
Tuck Ging

Written by Tuck Ging

Data Engineer by day, Percussionist by night