Hacking in PySpark: The Toolkit
Apache Spark is a cluster computing framework written in Java, that one can interact with using Scala, Python, R and obviously Java. In this article, I’m going to show you a few tips and tricks on integrating your favorite Python interactive consoles with Spark.
Installing PySpark
You might want to skip this section if you’re you have already configured your PySpark environment, however, I wasn’t very successful at finding a succinct tutorial that would explain what precisely I had to do, to get going with Spark on my local environment. Did I need to download the latest release from Spark’s official website? Was the Java package a prerequisite for PySpark? Or would I simply need to do pipenv install pyspark
and everything would just work? I’ve found an excellent article from Charles Bochet, but it only added to my confusion, as it mentioned installing the Java package but not the readily available pyspark package in PyPI.
In fact, it turned out that installing the Java package is no longer required, and the Python package was sufficient. All I had to do to get PySpark on my system was:
$ pipenv install pyspark
I was using pipenv, but you can, equally well just use plain pip install pyspark
.
For the purpose of this article, I’ll be using PySpark 2.4.0 on Python 3.7. If using a different setup, your mileage may vary. Some output has been stripped, especially around installing packages. Also, examples used here are meant to be used in a local/dev environment, not on production systems.
The first transformation
At this time, you could install your IPython and manually instantiate your SparkSession. It’s not hard at all. Let’s install IPython first:
$ pipenv install IPython
And then launch IPython to create your first data frame:
$ ipython
Python 3.7.0 (default, Sep 7 2018, 16:19:35) In [1]: import pyspark.sqlIn [2]: spark = pyspark.sql.SparkSession.builder.master('local').appName('default').getOrCreate() In [3]: df = spark.createDataFrame([{'number': n} for n in range(10)]) In [4]: df.show()
+------+
|number|
+------+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+------+
Or, an RDD:
In [5]: rdd = spark.sparkContext.parallelize(range(10))In [6]: rdd.collect()
Out[6]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
If, at that point, you get an exception similar to this one:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.IllegalArgumentException: Unsupported class file major version 56
at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:166)
at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:148)
at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:136)
at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:237)
then you’re using a version of Java that’s too new for PySpark (which has been compiled against Java 8). You’ll need to ensure you have Java 8 JDK installed (Oracle Java 8 Downloads) and set JAVA_HOME to point at the right Java version. See this Stack Overflow article for an explanation.
It all doesn’t look too bad, and you might improve it a bit by creating factory functions for both SparkSession and SparkContext:
import pysparkdef spark_session():
return pyspark.sql.SparkSession.builder.master('local').appName
('default').getOrCreate()def spark_context():
return spark_session().sparkContext
But, there’s a simpler way.
PySpark’s built-in interactive console
PySpark comes with its own interactive console, that can be launched with pyspark
. For some reason, though, we need to set an additional environment variable, SPARK_HOME
before we can use it. Why can’t pyspark just use its own location as a sane default remains a mystery to me?
SPARK_HOME
should be set to the location of the pyspark python package. If using pipenv, you can use the following snippet to add the environment variable to pipenv’s .env
file:
echo SPARK_HOME=`python -c 'import os.path, pyspark; print(os.path.dirname(pyspark.__file__))'` >> .env
And then, just reload the virtual environment. Once that environment variable is set, just run the interactive console like so:
$ pyspark
Python 3.7.0 (default, Sep 7 2018, 16:19:35)
[GCC 5.4.0 20160609] on linux
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/Using Python version 3.7.0 (default, Sep 7 2018 16:19:35)
SparkSession available as 'spark'.
>>> spark
<pyspark.sql.session.SparkSession object at 0x7fd58aae3ba8>
>>> sc
<SparkContext master=local[*] appName=PySparkShell>
>>> df = spark.createDataFrame([{'number': n} for n in range(10)])
>>> df.show()
+------+
|number|
+------+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+------+
As you might have noticed, SparkSession is now available as spark
and SparkContext as sc
without the need to actually import anything!
But, it’s not as good as IPython. There’s no autocompletion, colouring, file-system inspection. Let’s try making this work with IPython.
IPython
And it’s very simple. It’s just a matter of setting two more environment variables. For the sake of simplicity, let’s just add them directly on the command line:
$ PYSPARK_DRIVER_PYTHON=ipython pyspark
Python 3.7.0 (default, Sep 7 2018, 16:19:35)
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/Using Python version 3.7.0 (default, Sep 7 2018 16:19:35)
SparkSession available as 'spark'.In [1]: sc
Out[1]: <SparkContext master=local[*] appName=PySparkShell>In [2]: spark
Out[2]: <pyspark.sql.session.SparkSession at 0x7fb8f64f1ef0>
I won’t repeat my cheesy DataFrame / RDD example yet again. You get the idea.
Jupyter
Going a step further, it’s trivial to PySpark work with Jupyter. Let’s install Jupyter and set the necessary environment variables:
$ pipenv install Jupyter
$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook pyspark
A new tab should open in your default browser. You can now create a new notebook (New > Python 3 at the top right corner of your browser window) and try the following code in the interactive console:
Jupyter and df.toPandas()
Let’s take an example closer to real-life. An example CSV from UK Crown Prosecution Service released under Open Government Licence will do just fine. If you download this file, load it into a data frame and try to display it, the output will leave a lot to be desired:
My solution here is to use df.toPandas()
. This will only work if we install pandas first:
$ pipenv install pandas
Once that succeeds, just let the magic happen:
That looks a lot better. Of course, you can perform further magic to display only selected columns (in pyspark or pandas). Bear in mind, that calling .toPandas()
will cause the data frame to materialize, so it might not be the best idea in huge data sets.
I’ve shown you a few tricks of mine to make your local PySpark hacking easier. You could now even consider going beyond local, and deploying Jupyterhub — but that’s beyond the scope of this article.
As you have noticed, it’s mostly about installing the necessary Python packages and setting the right environment variables. Here is my typical set of Python dependencies I use with PySpark:
pyspark
ipython
jupyter
pandas
And my favorite set of environment variables, which will run my PySpark interactive console in Jupyter:
SPARK_HOME=/home/peterkilczuk/.virtualenvs/pyspark-fresh-qnWOWdjL/lib/python3.7/site-packages/pyspark
PYSPARK_DRIVER_PYTHON=jupyter
PYSPARK_DRIVER_PYTHON_OPTS=notebook
Bear in mind, that whichever interactive console you go for, setting SPARK_HOME
will be required.
I hope you’ve found this article helpful.
To write this article, I’ve used the following sources: