In most cases, using Python User Defined Functions (UDFs) in Apache Spark has a large negative performance impact. This blog will show you how to use Apache Spark native Scala UDFs in PySpark, and gain a significant performance boost.
To create your Scala UDF, follow these steps:
- Create a UDF in our Scala project. We use the example of checking an IBAN for validity - we do after all work for a bank :-)
- We can then use SBT to create the jar and pass the jar to our PySpark session
- Register and use the Scala UDF in PySpark!
Find an example project in the link below:
Enjoy your speedy UDF.
Why do we want to use Scala UDFs in PySpark?
The PySpark API is a key component of Apache Spark; it allows developers and data scientists to make use of Spark’s high performance and scalable processing, without having to learn Scala. Significantly, it allows developers to take the wealth of machine learning and data analysis libraries available to Python, and run them on larger datasets than would be feasible within a single machine environment.
However, it is often unclear what is actually going on under the hood when running a PySpark job, which can lead to a range of problems.
Whilst the driver program is indeed running in native Python, the SparkSession object and any Dataframes or RDDs the driver creates will, in reality, be a thin Python wrapper around a JVM (Java) object. Simply put, whilst the controlling program is Python, it is actually python code telling a distributed Scala program on the cluster what to do. The data is stored in the JVM’s memory, and is being transformed by Scala code.
Taking these objects out of JVM memory and converting them into a form that Python can read (called serialisation and deserialisation) is expensive. Collecting results of a computation back to the Python driver program is typically low volume samples/aggregates, and done infrequently, so this overhead goes relatively unnoticed. However, if a program’s execution within the cluster converts data back and forth between Python and JVM objects for the entire dataset, performance can be dramatically affected.
One common place where this occurs is User Defined Functions, or UDFs. This allows Python developers to write arbitrary Python functions, such as validating IBANs, geocoding addresses etc, and execute them as part of a Spark SQL query. Whilst this is very powerful, when we understand what is happening in Spark, we see that this will result in a large performance hit.
In the diagram above, the Python program’s instructions (1) are translated into a Spark execution plan, and passed via the SparkSession JVM object (2) to two executors on different machines in a cluster (3). The executors would typically load data from an external source such as HDFS, perform some transformations in memory, and then write the data back to an external store. The data would stay within the JVM (3) for the life of the program.
When using a Python UDF, the data must go through several additional steps. Firstly, it must be serialised from Java (4), so it can be read into a separate Python process (5) where the UDF runs. The result then passes back through a second stage of serialisation and deserialisation (4) before it returns to the JVM, and can finally be written out as a result.
So how can we improve this picture? If we instead create the UDF’s as Spark UDFs written in Scala, we get the following:
Scala UDFs operate within the JVM of the executor, so the data will skip the two rounds of serialisation and deserialisation. This is much faster, as the benchmarks later will show.
So how do we achieve this?
Note: The following instructions apply to Spark 2.1 and above. We have a section further down to explain the process for older Spark versions.
In an entirely Scala-based Spark project we could simply create a UDF in the following way:
We cannot however do this in a PySpark project, as we can’t create a Scala function from within Python code. To be able to do this in Pyspark we firstly need to separate out our Scala UDF code into a Scala separate project. We can then build the Scala code into a Jar file, and pass this to our Python code instead.
Taking the following example from our github:
As you can see from the example project, this can be unit tested, and versioned separately to our Python code. Lastly, we can create a Jar of this project using SBT and we can integrate this jar in our spark session at start up time like so:
All we need to do now is register the Scala UDF in the sparkSession. Use the version below that matches your Spark installation, the interface for this has been improving over time.
Pre-2.1 Spark Instructions
The interface to register a JVM UDF was not available to PySpark before Spark 2.1. We have a tag in the repository (pre-2.1) that implements our own SparkUDF interface, in order to achieve this. This can then be registered using the “Pre Spark 2.1” registration method above.
Doesn’t Apache Spark 2.3 make this all unnecessary?
Introducing Vectorized UDFs for PySpark - The Databricks Blog
This is a guest community post from Li Jin, a software engineer at Two Sigma Investments, LP in New York. This blog is…
In this blog the guys and girls at Databricks show an improvement coming in Apache Spark 2.3 on Python UDFs by using Apache Arrow. You may think this renders our solution redundant.
However, good news! Native Scala UDFs are still faster, as the explanation above still holds. Although they are introducing the Vectorized UDF to make python serialisation faster, our solution completely removes the need for serialisation and deserialisation altogether.
How much faster did we make it?
When comparing our previous implementation of this same UDF in Python we anecdotally improved the runtime of one of our Spark jobs from 1.2 hours to 30 minutes, the actual CPU time itself was down to 5 minutes and the other 25 minutes was disk I/O (which remained unchanged in both solutions).
For a nice benchmark, comparing a plain Python UDF, a Python Vectorized UDF and a Scala UDF, please look at the following link: https://gist.github.com/maropu/9f995f65b1cb160865e79e14e5216320
Disclaimer; the logic in your UDF and your data that you’re running through greatly determines how large the difference in performance will be!
Go try it yourself!
Please find an example on GitHub and use it as your starting point!
Rob Keevil and John Müller