UDFs vs Map vs Custom Spark-Native Functions

Introduction

Apache Spark provides a lot of functions out-of-the-box. However, as with any other language, there are still times when you’ll find a particular functionality is missing. It’s at this point that you would often look to implement your own function. In this article, we’ll be demonstrating and comparing 3 methods for implementing your own functions in Spark, namely:

  1. User Defined Functions
  2. Map functions
  3. Custom Spark-native functions

By the end of this article, we hope to give the reader clear programming recommendations specifically as they relate to implementing custom functions in Spark.

Readers short on time may skip straight to the conclusion. For those interested in the journey, read on.

Setup

We’ll start by setting up our environment for conducting experiments.

We’ll be using Spark version 2.4.0 and Scala version 2.11.8 although any recent (2.0+) Spark distribution will do. We’ll also load Vegas, a visualization library for Scala/Spark, which will be useful closer to the end of the article.

spark-shell --packages="org.vegas-viz:vegas_2.11:0.3.11,org.vegas-viz:vegas-spark_2.11:0.3.11"

Next, we’ll generate a Dataset for testing our code against. The Dataset will be called testDfand will consist of exactly 1 column (named idby default). We’ll make this a fairly large Dataset of 10 million rows to simulate real-life conditions and we’ll also cache it in memory so we don’t have to constantly regenerate it from scratch.

scala> val testDf = spark.range(10000000).toDF.cache
testDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint]
scala> testDf.count
res0: Long = 10000000

Finally, we need to write a function that will return us an accurate measurement of execution time for a given block of code. There’s typically a lot of activity happening under the hood of your OS which is likely to affect execution times. To overcome this, we will execute a given block of code multiple times and calculate the average execution time. This should give us relatively accurate results for our purposes.

scala> def averageTime[R](block: => R, numIter: Int = 10): Unit = {
| val t0 = System.nanoTime()
| (1 to numIter).foreach( _ => block)
| val t1 = System.nanoTime()
| val averageTimeTaken = (t1 - t0) / numIter
| val timeTakenMs = averageTimeTaken / 1000000
| println("Elapsed time: " + timeTakenMs + "ms")
| }
averageTime: [R](block: => R, numIter: Int)Unit

That’s it for the basic setup, we’re ready to dive into some experiments.

Spark-Native Functions (Baseline)

To keep our code examples simple, we’ll be implementing a very basic function that increments the value of whatever argument is passed to it by 1. So for example, if we give our function the argument 1, it should return 2.

Before writing any custom functions, we first want to demonstrate how you would normally do this in Spark in order to establish a baseline for comparisons later:

scala> val resultSparkNative = testDf.withColumn(“addOne”, ‘id + 1)
resultSparkNative: org.apache.spark.sql.DataFrame = [id: bigint, addOne: bigint]
scala> resultSparkNative.show(5)
+---+------+
| id|addOne|
+---+------+
| 0| 1|
| 1| 2|
| 2| 3|
| 3| 4|
| 4| 5|
+---+------+
only showing top 5 rows

Nothing complicated there, just standard Spark native column functions. Note this is the same as:

scala> val resultSparkNative = testDf.withColumn(“addOne”, ‘id.plus(1))
resultSparkNative: org.apache.spark.sql.DataFrame = [id: bigint, addOne: bigint]
scala> resultSparkNative.show(5)
+---+------+
| id|addOne|
+---+------+
| 0| 1|
| 1| 2|
| 2| 3|
| 3| 4|
| 4| 5|
+---+------+
only showing top 5 rows

Under the hood, both + and plus call the Add class (which is defined here).

Now let’s take a quick look at the physical plan Spark generates for this operation:

scala> resultSparkNative.explain 
== Physical Plan ==
*(1) Project [id#0L, (id#0L + 1) AS addOne#34L]
+- *(1) InMemoryTableScan [id#0L]
+- InMemoryRelation [id#0L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Range (0, 10000000, step=1, splits=4)

Don’t be alarmed if you’ve never seen this kind of output before. A physical plan just describes the way in which Spark intends to execute your computation. In the example above, InMemoryRelation and InMemoryTableScan indicate that Spark will look in our memory to read the data that was cached. After this, Spark will perform a project which selects the columns we want. It’s during the project that Spark will actually calculate our new (addOne) column. That’s about it; this is after all a very simple query. Later on, we’ll be comparing the Physical Plans generated by our custom functions with this one to see if there are any differences. These differences may in turn help to explain performance variations, if any.

Let’s see how fast Spark’s native plus function is using the averageTime function we defined earlier:

scala> averageTime { resultSparkNative.count }
Elapsed time: 36ms

Nice, Spark was able to process 10 million records in less than 36 milliseconds (on average). This query will be our baseline. Ideally, we’ll want to get similar performance with our custom functions, if not better.

Now let’s pretend that Spark doesn’t actually have any native functions for incrementing values in a column by 1. What are our options then?

User Defined Functions

The first thing people will usually try at this point is a UDF (User Defined Function). UDFs allow developers to use regular Scala functions in their Spark queries. Here’s a quick implementation of our desired UDF:

scala> import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.udf
scala> val addOneUdf = udf { x: Long => x + 1 }
addOneUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,LongType,Some(List(LongType)))

With just two lines of code, we were able to implement our custom function. This is one of the advantages of UDFs; they’re straight-forward to implement.

Let’s go ahead and check that our UDF does what we expect:

scala> val resultUdf = testDf.withColumn("addOne", addOneUdf('id))
resultUdf: org.apache.spark.sql.DataFrame = [id: bigint, addOne: bigint]
scala> resultUdf.show(5)
+---+------+
| id|addOne|
+---+------+
| 0| 1|
| 1| 2|
| 2| 3|
| 3| 4|
| 4| 5|
+---+------+
only showing top 5 rows

Looks like we were successful; the results are equivalent to what we got with Spark’s native plus function.

Let’s have a look at the physical plan Spark generates when we use a UDF in our query:

scala> resultUdf.explain
== Physical Plan ==
*(1) Project [id#0L, UDF(id#0L) AS addOne#184L]
+- *(1) InMemoryTableScan [id#0L]
+- InMemoryRelation [id#0L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Range (0, 10000000, step=1, splits=4)

For the most part it looks pretty similar to what we saw previously. We have the InMemoryRelation and InMemoryTableScan same as before. The only real difference is during the project where we’re now calling our UDF instead of Spark’s native plus function.

Based on this and assuming our implementation of the UDF is fairly efficient, we should expect the performance of our query to be similar to our baseline query:

scala> averageTime { resultUdf.count }
Elapsed time: 40ms

Just as we guessed, this query and our baseline query are equivalent as far as execution time goes (a difference of a few milliseconds is inconsequential and could be the result of any number of factors e.g. music playing in the background). That’s great right?

Unfortunately, UDFs are a black-box from Spark’s perspective. All Spark knows about our UDF is that it takes id as its argument and it will return some value which is assigned to addOne. As a result, Spark can’t apply many of the optimizations it normally would if we were to use just native column functions (at least not today).

In our simple count query above, there weren’t any optimizations that Spark could have made so we saw no performance degradation compared to our baseline query. Performance degradations will be more apparent in cases where the black-box nature of a UDF really gets in the way of Spark’s optimization rules e.g. predicate push down. To demonstrate this, we’ll first create a parquet file containing our test data:

scala> val path = "temp.parquet/"
path: String = temp.parquet/
scala> testDf.write.mode("overwrite").parquet(path)

scala> val testDfParquet = spark.read.parquet(path)
testDfParquet: org.apache.spark.sql.DataFrame = [id: bigint]

Now we’ll show the difference in the plan generated by running a filter operation against this Parquet file using Spark’s native plus functions and our UDF:

scala> val resultSparkNativeFilterParquet = testDfParquet.filter('id.plus(1) === 2)
resultSparkNativeFilterParquet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint]
scala> val resultUdfFilterParquet = testDfParquet.filter(addOneUdf('id) === 2)
resultUdfFilterParquet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint]
scala> resultSparkNativeFilterParquet.explain
== Physical Plan ==
*(1) Project [id#324L]
+- *(1) Filter (isnotnull(id#324L) && ((id#324L + 1) = 2))
+- *(1) FileScan parquet [id#324L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/temp.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
scala> resultUdfFilterParquet.explain
== Physical Plan ==
*(1) Filter (if (isnull(id#324L)) null else UDF(id#324L) = 2)
+- *(1) FileScan parquet [id#324L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/temp.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>

Notice how there is nothing in PushedFilters for the second query where we use our UDF. Spark was unable to push the IsNotNull filter into our parquet source. Instead, the second query will have to perform the IsNotNull filter in memory after loading all of the data.

Lets see how much of a difference this makes in practice:

scala> averageTime { resultSparkNativeFilterParquet.count }
Elapsed time: 114ms
scala> averageTime { resultUdfFilterParquet.count }
Elapsed time: 204ms

Both queries take longer than our previous queries because they first have to load the parquet data into memory. However, we see that the query using our UDF takes almost twice as long as our query using the native plus function because Spark was unable to push filters down into the data source.

We’ve established here that although UDFs are easy to implement, they aren’t that great performance wise as they get in the way of Spark’s optimization rules. What are our other options for implementing custom functions then?

Map Functions

The map function returns a new Dataset that contains the result of applying a given function to each row of a given Dataset. For more information, see the official documentation here. Surprisingly, this is still considered an experimental feature although it has been available since Spark 1.6.0.

Here’s how you would use the map function to perform a similar operation to our previous queries:

scala> case class foo(id: Long, addOne: Long)
defined class foo
scala> val resultMap = testDf.map { r => foo(r.getAs[Long](0), r.getAs[Long](0) + 1) }
resultMap: org.apache.spark.sql.Dataset[foo] = [id: bigint, addOne: bigint]
scala> resultMap.show(5)
+---+------+
| id|addOne|
+---+------+
| 0| 1|
| 1| 2|
| 2| 3|
| 3| 4|
| 4| 5|
+---+------+
only showing top 5 rows

Not quite as simple as a UDF perhaps but not terribly complicated either and still only 2 lines of code. Now let’s take a peek at the physical plan Spark generates to execute this query:

scala> resultMap.explain
== Physical Plan ==
*(1) SerializeFromObject [assertnotnull(input[0, $line60.$read$$iw$$iw$foo, true]).id AS id#894L, assertnotnull(input[0, $line60.$read$$iw$$iw$foo, true]).addOne AS addOne#895L]
+- *(1) MapElements <function1>, obj#893: $line60.$read$$iw$$iw$foo
+- *(1) DeserializeToObject createexternalrow(id#0L, StructField(id,LongType,false)), obj#892: org.apache.spark.sql.Row
+- *(1) InMemoryTableScan [id#0L]
+- InMemoryRelation [id#0L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Range (0, 10000000, step=1, splits=4)

Yikes! That’s quite different. Just looking at the number of steps involved in this physical plan, we can already guess that this is going to be slower than our baseline query. Another thing that could slow us down are the deserialization and serialization steps which are required for each row just to create our addOne column using the map function. Let’s see how much these factors impact performance in practice:

scala> averageTime { resultMap.count }
Elapsed time: 198ms

Wow, the map based query is almost 5x slower than its counterparts against the same cached Dataset.

While we’re here, we may as well check to see if Spark is able to add the predicate push down optimization against the parquet data we generated earlier:

scala> val resultMapFilterParquet = testDfParquet.map { r => foo(r.getAs[Long](0), r.getAs[Long](0) + 1) }.filter('addOne === 2)
resultMapFilterParquet: org.apache.spark.sql.Dataset[foo] = [id: bigint, addOne: bigint]
scala> resultMapFilterParquet.explain
== Physical Plan ==
*(1) Filter (addOne#54160L = 2)
+- *(1) SerializeFromObject [assertnotnull(input[0, $line36.$read$$iw$$iw$foo, true]).id AS id#54159L, assertnotnull(input[0, $line36.$read$$iw$$iw$foo, true]).addOne AS addOne#54160L]
+- *(1) MapElements <function1>, obj#54158: $line36.$read$$iw$$iw$foo
+- *(1) DeserializeToObject createexternalrow(id#324L, StructField(id,LongType,true)), obj#54157: org.apache.spark.sql.Row
+- *(1) FileScan parquet [id#324L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/temp.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>

Apparently not.

At this point, it looks like UDFs have the upper hand over Map functions in terms of performance when writing custom functions and are also a little simpler to write. However, Spark’s native column functions trump both these options easily in terms of engineering ease and raw performance. What we need is a way to write our own Spark native functions.

Custom Spark-Native Functions

It is possible to extend the Apache Spark project to include our own custom column functions. To do so however there is a bit of setup required. First, we download a local copy of the Apache Spark project from GitHub and extract it to our home directory.

cd ~
wget https://github.com/apache/spark/archive/v2.4.0-rc5.tar.gz
tar -zxvf v2.4.0-rc5.tar.gz

At this point, readers are advised to explore how the Apache Spark project is organized, specifically the SQL module. We won’t be covering this topic in any detail here and instead refer you to this excellent article for more information.

Now we can start creating the files required to implement our own Spark-native function. We can take inspiration for our desired implementation from any of the existing native functions in the org.apache.spark.sql.catalyst.expressions package e.g. the UnaryMinus class from the Arithmetic.scala file. Since we only need to pass in one argument, we can make use of the UnaryExpression abstract class that is already defined for us in the org.apache.spark.sql.catalyst.expressions package. We will also mix in the NullIntolerant trait as this is one of the things Catalyst looks out for when trying to push predicates (see the QueryPlanConstraints.scala file inside of the org.apache.spark.sql.catalyst.plans.logical package for more details). To keep things simple, we’ll be supporting only LongType arguments but this could easily be extended with appropriate code in the future. We can optionally give it a prettyName. Lastly, we define the doGenCode method. This is the part that actually performs our calculation and is essentially used to generate Java code at run-time. Below is what a basic working implementation looks like for our purposes:

package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.types.{DataType, LongType}
case class Add_One_Custom_Native(child: Expression) extends UnaryExpression with NullIntolerant {
override def dataType: DataType = LongType
override def prettyName: String = "addOneSparkNative"
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = defineCodeGen(ctx, ev, c => s"$c + 1")
}

Next we create another object, this time inside the org.apache.spark.sql package mimicking the contents of the functions.scala file. We define our new column function in here, calling the class we produced in the previous step. The only feature we’ve added here is the ability to operate on and return Column objects (as opposed to Expression objects).

package org.apache.spark.sql
import org.apache.spark.sql.catalyst.expressions._
object CustomFunctions {
private def withExpr(expr: Expression): Column = Column(expr)
def addOneCustomNative(x: Column): Column = withExpr {
Add_One_Custom_Native(x.expr)
}
}

After this, we can build our own Spark distribution using the helpful instructions provided on the official Spark website here.

cd spark-2.4.0-rc5
./dev/make-distribution.sh --name custom-spark --tgz
tar -zxvf spark-2.4.0-bin-custom-spark.tgz

Once this is complete, we can launch a new spark-shell session using our custom distribution.

export SPARK_HOME=~/spark-2.4.0-rc5/spark-2.4.0-bin-custom-spark
export PATH=$SPARK_HOME/bin:$PATH
spark-shell --packages="org.vegas-viz:vegas_2.11:0.3.11,org.vegas-viz:vegas-spark_2.11:0.3.11"

To make use of our new function, all we need to do now is import it. After that, its back to regular old Spark:

scala> import org.apache.spark.sql.CustomFunctions.addOneCustomNative
import org.apache.spark.sql.CustomFunctions.addOneCustomNative
scala> val resultCustomNative = testDf.withColumn("addOne", addOneCustomNative('id))
resultCustomNative: org.apache.spark.sql.DataFrame = [id: bigint, addOne: bigint]
scala> resultCustomNative.show(5)
+---+------+
| id|addOne|
+---+------+
| 0| 1|
| 1| 2|
| 2| 3|
| 3| 4|
| 4| 5|
+---+------+
only showing top 5 rows

Cool, so our custom Spark-native function is working as expected, now let’s see the physical plan that was generated.

scala> resultCustomNative.explain
== Physical Plan ==
*(1) Project [id#0L, addOneSparkNative(id#0L) AS addOne#926L]
+- *(1) InMemoryTableScan [id#0L]
+- InMemoryRelation [id#0L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Range (0, 10000000, step=1, splits=4)

Its a simple plan much like the one we saw with our baseline query. We should therefore expect to see similar performance:

scala> averageTime { resultCustomNative.count }
Elapsed time: 40ms

Indeed, this is comparable to the execution time we saw using Spark’s native plus function.

Okay but can Spark apply its usual optimizations when we read from a parquet data source?

scala> val resultCustomNativeFilterParquet = testDfParquet.filter(addOneCustomNative('id) === 2)
resultCustomNativeFilterParquet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint]
scala> resultCustomNativeFilterParquet.explain
== Physical Plan ==
*(1) Project [id#324L]
+- *(1) Filter (isnotnull(id#324L) && (addOneSparkNative(id#324L) = 2))
+- *(1) FileScan parquet [id#324L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/temp.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>

Success! We were able to push the same IsNotNull filter down into our parquet source just like in our baseline query. Let’s do a quick check of execution time:

scala> averageTime { resultCustomNativeFilterParquet.count }
Elapsed time: 116ms

So we finally have a method for writing custom functions that can achieve the same level of performance as Spark-native functions. However, engineering effort-wise this method is by far the most intense and definitely has an initial learning curve to it. The fact that we have to go back to Java for the doGenCode portion of the implementation can also be daunting for those unfamiliar with the language.

A closer look at performance

Thus far, we’ve assessed performance by looking at the average execution time across 10 runs of a given block of code. While this is useful, we lose valuable information by looking at just the average. In this section, we’ll take a closer look at the distribution of execution times for each method.

We’ll start by defining a few utility functions:

scala> import vegas._
import vegas._
scala> import vegas.render.WindowRenderer._
import vegas.render.WindowRenderer._
scala> import vegas.sparkExt._
import vegas.sparkExt._
scala> import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Dataset
scala> def time[R](block: => R) = {
| val t0 = System.nanoTime()
| block
| val t1 = System.nanoTime()
| val timeTakenMs = (t1 - t0) / 1000000
| timeTakenMs
| }
time: [R](block: => R)Long
scala> case class ResultsSchema(method: String, iteration: Int, timeTaken: Long)
defined class ResultsSchema
scala> def genResults(methods: Map[String, org.apache.spark.sql.Dataset[_ >: foo with org.apache.spark.sql.Row <: Serializable]]) = {
| methods.keys.map { method => {
| (1 to 1000).map { iter =>
| val timeTaken = time { methods.get(method).get.count }
| ResultsSchema(method, iter, timeTaken)
| }}}.flatten.toSeq.toDS
| }
genResults: (methods: Map[String,org.apache.spark.sql.Dataset[_ >: foo with org.apache.spark.sql.Row <: Serializable]])org.apache.spark.sql.Dataset[ResultsSchema]
scala> def genSummary(ds: Dataset[ResultsSchema]) = {
| ds
| .groupBy('method)
| .agg(
| min('timeTaken) as "min",
| round(mean('timeTaken), 1) as "mean",
| max('timeTaken) as "max",
| round(stddev('timeTaken), 1) as "std",
| count("*") as "sampleSize")
| .orderBy('method)
| }
genSummary: (ds: org.apache.spark.sql.Dataset[ResultsSchema])org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
scala> def genGraph(ds: Dataset[ResultsSchema]) = {
| Vegas("bar chart", width=1000, height=200).
| withDataFrame(ds.withColumn("count", lit(1))).
| filter("datum.timeTaken < 500").
| mark(Bar).
| encodeX("timeTaken", Quantitative, scale=Scale(bandSize=50), axis=Axis(title="Time Taken (ms)")).
| encodeY("count", Quantitative, aggregate=AggOps.Count, axis=Axis(title="Count")).
| encodeColumn("method", title="Method").
| configCell(width = 200, height = 200)
| }
genGraph: (ds: org.apache.spark.sql.Dataset[ResultsSchema])vegas.DSL.ExtendedUnitSpecBuilder

Now we can simulate running 1000 queries using each of the 4 methods we’ve seen thus far for incrementing values by 1 and look at the distribution of execution times for any additional insights on performance. We’ll use the cached Dataset for this experiment.

scala> val methodsCacheCount = Map(
| "Native" -> resultSparkNative,
| "UDF" -> resultUdf,
| "Map" -> resultMap,
| "Custom Native" -> resultCustomNative
| )
methodsCacheCount: scala.collection.immutable.Map[String,org.apache.spark.sql.Dataset[_ >: foo with org.apache.spark.sql.Row <: Serializable]] = Map(Native -> [id: bigint, addOne: bigint], UDF -> [id: bigint, addOne: bigint], Map -> [id: bigint, addOne: bigint], Custom Native -> [id: bigint, addOne: bigint])
scala> val resultsCacheCount = genResults(methodsCacheCount)
resultsCacheCount: org.apache.spark.sql.Dataset[ResultsSchema] = [method: string, iteration: int ... 1 more field]
scala> genSummary(resultsCacheCount).show
+-------------+---+-----+---+----+----------+
| method|min| mean|max| std|sampleSize|
+-------------+---+-----+---+----+----------+
|Custom Native| 30| 33.4| 64| 3.3| 1000|
| Map|153|198.8|632|69.5| 1000|
| Native| 31| 36.8| 59| 4.4| 1000|
| UDF| 31| 34.5| 53| 3.3| 1000|
+-------------+---+-----+---+----+----------+
scala> genGraph(resultsCacheCount).show

It’s interesting to note how distributed the counts are in our queries using the Map function, taking anywhere between 153 milliseconds and 632 milliseconds. The remaining methods show very consistent times around the 35 milliseconds mark.

Now let’s try the same thing except using filter operations against the parquet data source:

scala> val methodsParquetFilter = Map(
| "Native" -> resultSparkNativeFilterParquet,
| "UDF" -> resultUdfFilterParquet,
| "Map" -> resultMapFilterParquet,
| "Custom Native" -> resultCustomNativeFilterParquet
| )
methodsParquetFilter: scala.collection.immutable.Map[String,org.apache.spark.sql.Dataset[_ >: foo with org.apache.spark.sql.Row <: Serializable]] = Map(Native -> [id: bigint], UDF -> [id: bigint], Map -> [id: bigint, addOne: bigint], Custom Native -> [id: bigint])
scala> val resultsParquetFilter = genResults(methodsParquetFilter)
resultsParquetFilter: org.apache.spark.sql.Dataset[ResultsSchema] = [method: string, iteration: int ... 1 more field]
scala> genSummary(resultsParquetFilter).show
+-------------+---+-----+---+----+----------+
| method|min| mean|max| std|sampleSize|
+-------------+---+-----+---+----+----------+
|Custom Native| 84| 99.0|321|12.5| 1000|
| Map|196|240.7|517|73.0| 1000|
| Native| 84|106.2|236|12.5| 1000|
| UDF|159|199.5|715|71.9| 1000|
+-------------+---+-----+---+----+----------+
scala> genGraph(resultsParquetFilter).show

These distributions are more interesting. Surprisingly, we see our Custom Native function actually does better than Spark’s Native function sometimes. This may be because of our simple implementation. On the other hand, both the UDF and Map based queries take almost twice as long as they’re unable to take advantage of Spark’s optimizations. Finally, its interesting to note the two humps in the UDF and Map based queries although sadly, we can offer no reasonable explanation for this phenomenon at this stage.

Conclusion

Alright, we’ve covered a lot of ground in this article. To bring together everything we’ve learned so far, we decided to rank each method on the basis of engineering ease and performance using a 3 point scale (where 1 is good and 3 is bad). Below are our results:

+----------------+------------------+-------------+---------+
| Method | Engineering Ease | Performance | Overall |
+----------------+------------------+-------------+---------+
| Native | 1 | 1 | 1 |
| UDF | 2 | 3 | 2.5 |
| Map | 2 | 3 | 2.5 |
| Custom Native | 3 | 1 | 2 |
+----------------+------------------+-------------+---------+
* Overall = (Engineering Ease + Performance) / 2

Based on this, we can make several recommendations. Firstly, developers should always try to use Spark-native functions over all else. If that’s not possible, developers are advised to try extending Spark’s native functions library. If there is a need to put together something quickly, UDFs are appropriate although these are not recommended for jobs running in production pipelines. Map based functions should be generally avoided.

Now for the caveats. Although switching from UDFs to native functions will generally improve performance, it may not always be the most effective way of improving overall job performance. It depends on where the bottlenecks are in your job. Spark’s Web UI is usually a good place to start investigating these kinds of questions.

Finally, its important to recognize that the Apache Spark project is constantly improving . Spark may not be able to optimize around UDFs today but that’s not to say that it won’t be able to in the future. At that point, it would be wise to reconsider the recommendations presented in this article.

All of the code used in this article can be found here.