Satisfying Apache Spark dependencies on Hadoop YARN
Because we can’t always call df.collect() and run pandas locally
Being a distributed framework, Apache Spark runs code across many nodes. However, this paradise in parallel processing can quickly turn into a nightmare in dependency hell, especially for the uninitiated.
The people setting up and managing Hadoop clusters are typically not the same ones writing ETL batch jobs and deploying fancy machine learning models running on bleeding edge Python packages. As such, the latter group may have difficulty using packages and jars that were never originally installed in the cluster.
While modern deployment practices like Dockerised Airflow allow us to easily revert to using the Python that’s already preinstalled in our clusters, this article aims to unleash us from the shackles of deprecated APIs and outmoded programming patterns.
…unless you’re calling df.collect()
to do everything locally as a workaround, then we’ll save you from Out-Of-Memory (OOM) errors 6 months down the road when your data no longer fits in memory.
Prerequisites
That’s right, this article has dependencies, too.
If you wish to follow along, a pseudo-cluster can be set up entirely within one Docker host for us to play with. If you’re unfamiliar with Docker, a quick tutorial on it (and docker-compose
) should suffice as we’re only using the most common features.
$ git clone https://github.com/tuckging/tutorials.git
$ cd tutorials/spark-on-hadoop/
$ docker-compose up --build
Barring any errors, and with sufficient RAM (around 6GB free), we should have a pseudo-cluster up and running, accessible at the following Web UIs:
- Jupyter Notebook at http://localhost:8888/tree
- YARN at http://localhost:8088
- HDFS NameNode at http://localhost:9870
From here on, we’ll be running code entirely within Jupyter, so we’ll change the prefix for bash commands from $
to !
to run in Jupyter cells.
Lesson #1: There is no inherent Spark version in Hadoop YARN
What!?
That’s right. We don’t actually install Spark onto a Hadoop cluster. Instead, the Spark jars are typically distributed throughout the cluster each time we call spark-submit
. It’s one of several reasons why it often takes up to 30 seconds just to start a new SparkContext.
In fact, this is a fresh install of Hadoop, and there is no Spark installed anywhere at all. So, let’s create a new Jupyter notebook (or open the provided one) and install the latest PySpark 3.2.1 into the jupyter
container:
import sys
!{sys.executable} -m pip install pyspark==3.2.1
All the other nodes in our pseudo-cluster remain Spark-less, so what happens when we try to create a SparkSession with yarn
as master?
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.master('yarn')
.getOrCreate()
)
spark
We should get versionv3.2.1
in both the cell output and in Spark UI (http://localhost:4040). At this point, you’d probably want to resolve master
and jupyter
to 127.0.0.1 in your hosts
file to avoid repeatedly retyping master:8088
to localhost:8088
. Just remember to clear the entries when you’re done!
We can then simply install another PySpark version like 3.0.0 (pip install pyspark==3.0.0
), restart the Jupyter kernel and it will also run just fine in the cluster, so long as Java and Hadoop versions remain compliant.
BTW, if anyone insists that their cluster has Spark installed, they could be talking about a Spark Standalone cluster, which is different from Hadoop YARN!
Lesson #2: Python UDFs require Python interpreters, duh!
Let’s say we need to invoke some Python code in our remote Spark Executors. We may do that using Python User-Defined Functions (UDFs) like so:
from pyspark.sql.functions import udfdef get_python_version():
import sys
return f'{sys.executable} v{sys.version}'
py_udf = udf(get_python_version)spark.range(3).withColumn("newCol", py_udf()).show(truncate=False)
Depending on the Spark version being run, your UDF may or may not run successfully. Under Spark 3.0.3, this fails in our pseudo-cluster with the error message: Cannot run program "python"
, but would succeed with Spark 3.1.1 and onwards. This is because since 3.1.1, the default python was updated from python2.7
to python3
, which matches what our pseudo-cluster has.
Before we properly fix the issue, let’s explore why it fails in the first place. If we think about it, we are clearly already running some python code, so how could it complain that it can’t find python
?
If we look closelier, the evaluation of f'{sys.executable} v{sys.version}'
is distributed to run on remote Spark Executors, and not on the Spark driver or local Jupyter/python client. Before calling .show()
, those remote nodes are only running JVMs for Spark Executors, without any s̶li̶t̶h̶e̶r sliver of python code running anywhere. It’s only upon invoking our Python UDF do they scramble to find python
and subsequently error out.
To help our Spark Executors find python
, we restart the kernel (to clear away the existing SparkSession), then simply define the PYSPARK_PYTHON environment variable [docs] before starting a new SparkSession.
import os
os.environ['PYSPARK_PYTHON'] = 'python3'
Alternatively, we can also go to each worker node in the cluster to symlink python
to the installed python3
or even specifically python3.8
, but that is cumbersome without a cluster manager, and even discouraged as it may break other software that expect python
to point to a different version.
“What if these are not the pythons we’re looking for?”
Whether for getting bleeding edge features like pattern matching in python3.10, or satisfying legacy code or packages, we sometimes need to run a different python version that’s not installed in the cluster. That brings us to…
Lesson #3: It’s Bring Your Own Python to Cluster Day!
According to Spark docs, conda
is the only officially recommended way to distribute our own chosen python interpreter. For the sake of demonstration, since Jupyter is running Python 3.8.10, we’ll choose to distribute a very similar Python 3.8.13 because the major version (3.8) must remain the same between Jupyter client and Spark Executors. In practice, your Jupyter kernel should already be using your desired python.
Using the Miniconda that’s preinstalled at /opt/conda
(thanks Dockerfile!), the following Jupyter cell creates a new conda environment with Python 3.8.13 and packs it into a .tgz
archive ready for distribution.
!/opt/conda/bin/conda install -y conda-pack
!/opt/conda/bin/conda create -y python=3.8.13 --name=conda-env
!/opt/conda/bin/conda pack --prefix /opt/conda/envs/conda-env -f -o ./conda-env.tgz
We then restart kernel, configure spark.yarn.dist.archives
to the local path of the packed .tgz
archive, followed by a #
and the final extracted folder name, conda-env
. This tells Spark on YARN to distribute the local .tgz
to all the nodes and extract it locally as conda-env
. From here, we simply point PYSPARK_PYTHON to ./conda-env/bin/python3.8
and voilà! Every node gets a Python interpreter!
import os
os.environ['PYSPARK_PYTHON'] = './conda-env/bin/python3.8'
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.master('yarn')
.config('spark.yarn.dist.archives','./conda-env.tgz#conda-env')
.getOrCreate()
)
spark
Lesson #4: PyPI packages hitch a ride when distributing conda environments
Now that we’ve managed the trivial case of evaluating sys.version
, let’s try providing PyPI dependencies like pandas==1.4.2
to Spark Executors. It’s super simple:
Step 1: install pandas
into the conda environment and repack it.
!/opt/conda/envs/conda-env/bin/pip install pandas==1.4.2
!/opt/conda/bin/conda pack --prefix /opt/conda/envs/conda-env -f -o ./conda-env.tgz
Step 2: restart kernel, recreate SparkSession, then try importing pandas.
from pyspark.sql.functions import udfdef get_pandas_version():
import pandas
return f'pandas v{pandas.__version__}'
py_udf = udf(get_pandas_version)spark.range(3).withColumn("newCol", py_udf()).show(truncate=False)
Step 3: Profit!
Tip: spark.yarn.dist.archives
also accepts HDFS paths, which allows the conda environment to enter YARN cache and improve SparkSession startup times slightly. Spark’s jars themselves can also be cached in this manner.
Alternatively, if your cluster already has the desired python interpreter, and only lacks PyPI packages, you could opt to distribute only the packages by using venv
and venv-pack
similarly.
Hack: Interestingly, it’s also technically possible to distribute our own portable Java runtime and point JAVA_HOME to use it. However, this is undocumented, so use it only as a last resort and at your own risk! Remember that Hadoop itself also runs on the cluster’s available Java version.
Lesson #5: New data source? There’s a Jar for that!
While Spark performs optimally with co-located HDFS files or Hive, we often find ourselves extracting or loading data from external sources like Google BigQuery, AWS S3, Kafka, PostgreSQL etc. Almost invariably, these external data sources provide connectivity to Spark via Java .jar
archives. So, let’s explore our options with Google’ spark-bigquery-connector as an example.
Generically, we download the .jar
(ensure compatible Scala version!) as bq.jar
!curl https://storage.googleapis.com/spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.24.2.jar -o bq.jar
Then restart kernel and provide it Spark like so:
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.master('yarn')
.config('spark.yarn.dist.jars','./bq.jar')
.config('spark.driver.extraClassPath','./bq.jar')
.getOrCreate()
)
spark
Similar to distributing conda environments, we first distribute the jar using spark.yarn.dist.jars
(comma-separated), then get Spark Driver and Executors to pick it up using spark.driver.extraClassPath
(colon-separated on Unix, semicolon-separated on Windows). We don’t specify extraClassPath
for Executors because they already inherit from Driver.
We then test the connector with some small sample public data:
df = (spark.read
.option("gcpAccessToken", '<your token here>')
.option("parentProject", "<your GCP project here>")
.format("bigquery")
.load("bigquery-public-data.samples.shakespeare")
)
df.groupBy('corpus') \
.sum('word_count') \
.orderBy('sum(word_count)', ascending=False) \
.show(3)
Great, it works!
However, oftentimes there is an easier way: restart kernel and simply lookup the maven artifact name and configure spark.jars.packages
like so:
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.master('yarn')
.config('spark.jars.packages','com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.24.2')
.getOrCreate()
)
spark
and Spark will automatically download and use the specified jars (+dependencies!) from the Maven Central Repository.
Lesson #6: It’s dangerous to go alone! Take your helper.py along.
Other than for trivial tasks, production-level Spark applications often grow in complexity beyond a single main.py
file. In our example, we’ll create two files helper.py
and utils.py
that each have the trivial function of printing their own module’s __name__
:
with open('./helper.py', 'w') as f:
f.write(
'''def get_module():
return __name__
''')
with open('./utils.py', 'w') as f:
f.write(
'''def get_module():
return __name__
''')
import helper
import utils
f'{helper.get_module()}, {utils.get_module()}'
We then bundle the two .py
files into a .zip
archive and provide it to Spark using .addPyFile()
:
!zip friends.zip utils.py helper.py
spark.sparkContext.addPyFile('./friends.zip')
For once, we don’t need to restart the kernel for .addPyFile()
to take effect. Spark will add the .zip
file to all its Executor’s Python search paths, so we can call upon them like so:
from pyspark.sql.functions import udf
def get_module_names():
import utils
import helper
return f'{helper.get_module()}, {utils.get_module()}'
py_udf = udf(get_module_names)
spark.range(3).withColumn("newCol", py_udf()).show(truncate=False)
At this point, you may have recreated SparkSession without defining first PYSPARK_PYTHON=‘python3’, so do remember to restart kernel and define it again if you’re running Spark 3.0.3 or earlier.
Bonus Lesson #1: Have you tried turning it off and on again?
You may have noticed that there is an emphasis on restarting the kernel every time we want to recreate the SparkSession. This is because while calling spark.stop()
does release resources from the cluster, it leaves behind a dangling JVM as a child process of Jupyter / python. Restarting the kernel ensures that we spawn a new JVM that can receive the full set of new environment variables and/or Spark configs.
Bonus Lesson #2: Time Waits For No One; change is the only constant
Throughout the ten years of open-source development of Spark, its dependencies have evolved over time and so has Spark itself. As such, it’s surprisingly easy to unknowingly mix incompatible dependencies, even when using generally available Java, Python, Scala or Hadoop versions.
Here are some tips:
- Python 3.8+ requires Spark 3.0.0+
- Java 7 was last supported in Spark 2.1.0
- Scala 2.11.x was last supported in Spark 2.4.8,
while Scala 2.12.x is for Spark 2.4.1 and onwards
Do remember that any additional Python or Java package you choose may have further version constraints, so do beware of that.
Wrapping it up
Running Spark on YARN allows us to distribute dependencies along with our main programs. Whether it’s Java jars, PyPI packages or even the python interpreter itself, they all follow the same pattern: first bundle them into archives, tell YARN to distribute them to all nodes, then start using them!
The concept is rather straightforward, but the devil is in the details of ensuring all dependencies are compatible with each other.