Spark UDF — Deep Insights in Performance

Nikhilesh Nukala — Consultant (Data Engineering), Yuhao ZhuAdvanced Analytics Consultant, Guilherme BraccialliPrincipal Data Engineer, Tom Goldenberg- Jr Principal (Data Engineering), QuantumBlack

This blog will demonstrate a performance benchmark in Apache Spark between Scala UDF, PySpark UDF and PySpark Pandas UDF.

At QuantumBlack, we often deal with multiple terabytes of data to drive advanced analytics projects. The data pipelines should run at speed from anywhere and be performant, while the workload should be distributable, scalable and reliable. Apache Spark is one platform we leverage for this.

Spark offers a variety of solutions to complicated challenges, but we face many situations where the native functions are not sufficient to solve the problem. For this reason, Spark allows us to register custom functions called User-Defined Functions, or UDFs.

In this article, we will explore the performance characteristics of Spark’s UDFs.

Spark supports multiple languages such as Python, Scala, Java, R and SQL, but often the data pipelines are written in PySpark or Spark Scala. We believe PySpark is adopted by most users for the following reasons:

1. Faster learning curve — Python is a simpler language than Scala.

2. Wide Community Support — An ecosystem of programmers feedback on PySpark performance and suggest updates.

3. Availability of rich libraries — Python has a rich set of libraries for machine learning, time series analysis and statistics.

4. Negligible performance difference — The introduction of Spark DataFrames means the performance of Scala and Python are fairly similar. Dataframes are now organized under named columns to help Spark understand the schema, while the operations that are used to build the dataframe are compiled into a physical execution plan by the Catalyst Optimizer, accelerating computation.

5. Easy code handover between data engineers and data scientists. In some data frame operations that require UDFs, PySpark can have an impact on performance. There are approaches to address this by combining PySpark with Scala UDF and UDF Wrapper.

When a PySpark job is submitted while the driver program is running in vanilla Python, the driver creates SparkSession object and Dataframes/RDDs. These will be a Python wrapper around a JVM (Java) object. To simplify this, PySpark provides a wrapper that runs native Scala code.

Spark User-Defined Functions (UDFs):

Registering Spark custom functions in Scala, Python and Java has become a very popular way to expose advanced functionality to SQL users, enabling users to call in the functions without writing the code.

For example, multiplying a set of million rows by 1000:

def times1000(field):

return field * 1000.00

or reverse geocode a latitude and longitude dataset:

import geohashdef geohash_pyspark(lat, lon):  return geohash.encode(lat, lon)

Spark SQL offers a built-in method to easily register UDFs by passing in a function in your programming language. Scala and Python can use native function and lambda syntax, but in Java we need to extend the UDF class.

UDFs can work on a variety of types and can return a different type than the one they are initially called with. In Python and Java, we need to specify the return type.

UDF can be easily registered by running:

spark.udf.register("UDF_Name", function_name, returnType())

*returnType() is mandatory in Python and Java.

Types of Spark UDFs and execution:

In distributed mode, Spark uses master/worker architecture for execution. The central coordinator, called driver, communicates with a potentially large number of distributed workers, called executors. The driver and workers run their own Java process (JVM).

The driver runs the main() method and creates a SparkContext, RDDs and performs transformations and actions. The executors are responsible for running the individual tasks.

Benchmarking the performance:

To benchmark the performance of the three Spark UDFs, we have created a random Latitude, Longitude dataset, with 100 million rows and worth 1.2 GB, and have defined 2 UDFs: a simple function which multiplies each row by 1000 and a complex geohash function.

Cluster Configuration: Databricks cluster with 8 nodes

Driver node: 16 cores and 122 GB memory

Worker nodes: 4 cores with 30.5 GB memory, autoscaling enabled.

Notebook codehttps://bit.ly/2YxiVp4 — QuantumBlack’s approach to benchmark Scala UDF, PySpark UDF and PySpark Pandas UDF.

Along with the three types of UDFs discussed above, we have created a Python wrapper to call the Scala UDF from PySpark and found that we can bring the best of two worlds i.e. ease of Python coding and performance of Scala UDF.

Creating a Python wrapper to call Scala UDF from PySpark code:

from pyspark.sql.column import Column
from pyspark.sql.column import _to_java_column
from pyspark.sql.column import _to_seq
from pyspark.sql.functions import col

def udfGeohashScalaWrapper(lat, lon):
_geohash = sc._jvm.sparkudfperformance.UDFs.udfGeohash()
return Column(_geohash.apply(_to_seq(sc, [lat, lon], _to_java_column)))
def udfTimes1000ScalaWrapper(field):
_times1000 = sc._jvm.sparkudfperformance.UDFs.udfTimes1000()
return Column(_times1000.apply(_to_seq(sc, [field], _to_java_column)))

Databricks Pandas UDF benchmarkinghttps://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

Key Takeaways:

Below are the results from the tests performed

Our results demonstrate that Scala UDF offers the best performance. As mentioned earlier, the step of translating from Scala to Python and back adds to the processing of the Python UDFs.

We also found that PySpark Pandas UDF provides a better performance for smaller datasets or simpler functions than PySpark UDF. When a more complex function, such as geohashing, is introduced, this characteristic changes. In these circumstances, PySpark UDF is around 10 times more performant than the PySpark Pandas UDF.

We have also found that creating a Python wrapper to call Scala UDF from PySpark code is around 15 times more performant than the two types of PySpark UDFs.

Taking these performance characteristics into account, QuantumBlack now:

  • Uses PySpark UDFs when the data volume is not big or need quick insights using simpler functions.
  • Builds an internal library with re-usable Scala UDFs
  • Creates Python wrappers to call Scala UDFs

References:

Learning Spark — O’Reilly

--

--

QuantumBlack, AI by McKinsey
QuantumBlack, AI by McKinsey

We are the AI arm of McKinsey & Company. We are a global community of technical & business experts, and we thrive on using AI to tackle complex problems.