Authoring custom Spark Native Functions

Nikolai N
EurowingsDigital
Published in
13 min readJun 28, 2024

I was inspired by an article “Why You Should Start Writing Spark Custom Native Functions?” which discussed an example of creating a custom Spark Native Function for generating a UUID. While that example was great, it lacked the ability to accept input arguments. In real-world scenarios, we often need functions that require 1, 2, or 3 input arguments. Let’s try to fill this gap!

Below, I present the results of my research on creating custom Spark Native Functions that can accept multiple arguments. The article includes:

  • Brief Introduction
  • “Dissecting” Standard Functions
  • Authoring our own Function
  • Alternative Implementation Approach
  • Performance Comparison
  • Conclusion

⚠ The code provided in the article is written in Scala. I understand that among those working with Spark in Europe, Python is more popular due to its lower entry barrier. However, Spark is written in Scala, which is why the Scala API is the richest, and the usage of Scala empowers users to fully exploit the capabilities of Spark.

I’m very glad that we in Eurowings Digital use Scala in our Data Enginering team for working with Spark . It opens up a lot of possibilities for us. E.g. to develop Spark Native Functions :-)

Brief Introduction

To begin with, it’s helpful to try to understand how Spark SQL works at a conceptual level. For those interested, I would recommend reading the article “Deep Dive into Spark SQL’s Catalyst Optimizer.” And for the lazy ones, here’s a brief summary relevant to our topic :-)

Catalyst uses abstract syntax trees (ASTs) to represent the logical and physical plans of a query. These trees are processed through various transformations and optimizations to enhance query execution. The process involves applying rules to transform the trees, thereby refining the execution plan. This tree-based approach enables Catalyst to perform complex optimization, providing flexibility and scalability in handling intricate query transformations and user-defined rules.

The optimization phases include:

  • Analysis: This stage involves validating the logical query plan and determining attributes.
  • Logical Optimization: It applies standard rules to simplify and restructure the query plan to enhance performance.
  • Physical Planning: It transforms the optimized logical plan into one or more physical plans, selecting the most efficient execution strategy.
  • Code Generation: By utilizing a code generation platform, Java bytecode is generated at runtime, significantly speeding up query execution.

“Dissecting” Standard Functions

To understand how to write your own Spark Native Function, it’s best to study the code of existing functions. Let’s start with single-argument functions. To do this, simply open the org.apache.spark.sql package in IntelliJ or any other IDE, locate a well-known single-argument function such as unbase64, and use a “close look”-approach. So, what do we see (there’s actually much more there, but I’ve included only what is relevant to the “topic”):

object functions {

private def withExpr(expr: Expression): Column = Column(expr)
// ...
def unbase64(e: Column): Column = withExpr { UnBase64(e.expr) }
// ...
}

With the help of this code the function becomes available to us for import into our project using import org.apache.spark.sql.functions.unbase64. Let’s delve into the UnBase64 function. It is located in org.apache.spark.sql.catalyst.expressions.stringExpressions and is a Case Class (by the way, why do you think it’s a Case Class and not a regular class? The answer is simple — the query optimizer can easily use them in pattern matching unlike a regular class):

case class UnBase64(child: Expression)
extends UnaryExpression
with ImplicitCastInputTypes
with NullIntolerant {

override def dataType: DataType = BinaryType
override def inputTypes: Seq[DataType] = Seq(StringType)

protected override def nullSafeEval(string: Any): Any =
CommonsBase64.decodeBase64(string.asInstanceOf[UTF8String].toString)

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
nullSafeCodeGen(ctx, ev, (child) => {
s"""
${ev.value} = ${classOf[CommonsBase64].getName}.decodeBase64($child.toString());
"""})
}

override protected def withNewChildInternal(newChild: Expression): UnBase64 =
copy(child = newChild)
}

What is important for us to pay attention to here?

  • This class inherits from UnaryExpression, which is an abstract class representing a unary expression, meaning an expression that contains only one operand, fitting our case with a single-argument function.
  • Inheriting from ImplicitCastInputTypes is used for implicit data type conversion during data operations. This mechanism allows for automatic data type conversion if necessary for the operation to be executed.
  • Inheriting from NullIntolerant indicates that any input Null will result in a Null output.
  • The functions dataType and inputTypes are straightforward.
  • The nullSafeEval function — this function implements the core logic and will be used in interpretive mode. In the context of Spark and Catalyst, interpretive mode refers to executing queries without compiling them into bytecode. Instead of generating and running optimized bytecode, queries are interpreted and executed directly based on their logical plans.
    - Pay attention to the “speaking” name — the function should be null-safe
    - For the core logic, the decodeBase64 from the CommonsBase64 class in Java is simply reused (why reinvent the wheel?)
    - Also, note that Spark uses the UTF8String class to represent strings under the hood, which is also considered in the code
  • The doGenCode function is responsible for generating quasiquotes (in short, a powerful Scala feature that allows developers to embed code fragments inside other code; especially useful in the context of metaprogramming, where code generates other code or manipulates it), which will be used to create JVM bytecode. These quasiquotes essentially contain Java code that we want to execute. Here, the decodeBase64 method from the Java CommonsBase64 class is used, just like in nullSafeEval. (By the way, according to the warning on the official website, this feature has been significantly reworked and integrated into the new metaprogramming system. Perhaps that’s why we are still waiting for the official version of Spark for Scala 3 as well?)
  • The function withNewChildInternal is an integral part of working with AST and the Catalyst optimizer in Apache Spark. When implementing transformations or optimizing query plans, its use helps ensure that new instances of nodes in the tree are created with updated child elements, maintaining immutability. Therefore, in most existing functions, this will involve creating a copy to ensure the immutability of the original instance.

Curious individuals can also delve into the implementation of the CommonsBase64 class, but since it is a standard Java class, there is not much point in doing so.

Authoring our own Function

Just like in the original article, to write our own Spark Native Function, we need to create additional files in our project and place them in src/main/scala/org/apache/spark/sql/ (but you can place them wherever is convenient for you):

  • The first file will contain the Catalyst Expression and we will place it in src/main/scala/org/apache/spark/sql/catalyst/expressions/
  • The second file will make this function accessible, and we will place it in src/main/scala/org/apache/spark/sql/

For the implementation, you can choose any function that interests you. I chose to write my own weekOfYear function for several reasons (in reality, it was simply needed to calculate the week number as requested by analysts in a specific way, so I decided to try implementing a similar function for educational purposes and for the article).

For our custom single-argument function (let’s call it myWeekOfYear), we create a file src/main/scala/org/apache/spark/sql/customFunctions.scala in our project (this is the “second file” as numbered above). This is where we will import it from for further use in our Spark application.

object customFunctions {  

private def withExpr(expr: Expression): Column = Column(expr)

def myWeekOfYear(col: Column): Column = withExpr {
MyWeekOfYear(col.expr)
}
}

Now, the most important part is the “first file” with the Catalyst Expression. I placed it in the separate file src/main/scala/org/apache/spark/sql/catalyst/expressions/myWeekOfYearExpression.scala. However, if you have many functions, you can group them together logically, similar to how it’s done for the aforementioned stringExpressions.

case class myWeekOfYearExpression(child: Expression)
extends UnaryExpression
with ImplicitCastInputTypes
with NullIntolerant {

override def inputTypes: Seq[DataType] = Seq(StringType)
override def dataType: DataType = StringType

override def nullSafeEval(dateString: Any): Any = {
if (dateString == null) null
else {
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd", Locale.ENGLISH)
val date =
LocalDate.parse(dateString.asInstanceOf[UTF8String].toString, formatter)
val weekNumber = date.get(WeekFields.ISO.weekOfWeekBasedYear)
val year = date.get(WeekFields.ISO.weekBasedYear)
// Use formatted string with leading zero for weeks 1-9 (01-09)
UTF8String.fromString(f"$year-W$weekNumber%02d")
}
}

override protected def doGenCode(
ctx: CodegenContext,
ev: ExprCode
): ExprCode = {
val dateFormatter = classOf[DateTimeFormatter].getName
val localDate = classOf[LocalDate].getName
val weekFields = classOf[WeekFields].getName
val utf8String = classOf[UTF8String].getName

val formatter = ctx.addMutableState(
dateFormatter,
"formatter",
v =>
s"""$v = $dateFormatter.ofPattern("yyyy-MM-dd", java.util.Locale.ENGLISH);"""
)

nullSafeCodeGen(
ctx,
ev,
(dateString) => {
s"""
|if ($dateString == null) {
| ${ev.isNull} = true;
|} else {
| $localDate date = $localDate.parse($dateString.toString(), $formatter);
| int weekNumber = date.get($weekFields.ISO.weekOfWeekBasedYear());
| int year = date.get($weekFields.ISO.weekBasedYear());
| ${ev.value} = $utf8String.fromString(String.format("%d-W%02d", year, weekNumber));
|}
""".stripMargin
}
)
}

override def prettyName: String = "MyWeekOfYear"

override protected def withNewChildInternal(newChild: Expression): myWeekOfYearExpression =
copy(child = newChild)

}

So, what have we done (in comparison/analogy to UnBase64)?

  • inputTypes and dataType — no comments needed, as they are straightforward.
  • nullSafeEval — this is where the main logic of our function is implemented in Scala. In principle, nothing prevents us from implementing the function itself anywhere in our project and simply “wrap” its call here. But more on that below. Also, don’t forget about UTF8String if working with strings.
  • doGenCode — essentially, here we rewrite our function from nullSafeEval to Java.
  • prettyName — giving a nice name :-) Usually, this value should match the function name in SQL.
  • withNewChildInternal — simply creating a copy, as in the function discussed above from the standard Spark functionality.

By the way, a side note. Instead of using:

case class myWeekOfYearExpression(child: Expression)
extends UnaryExpression
with ImplicitCastInputTypes
with NullIntolerant l{
...

we could write a more understandable (in my subjective view):

case class myWeekOfYearExpression(dateString: Expression)
extends UnaryExpression
with ImplicitCastInputTypes
with NullIntolerant {

override def child: Expression = dateString
...

Here we replace the less clear “child” with the more semantically relevant “dateString” in the input parameters of the case class.

Alternative Implementation Approach

We have implemented the required function above. However, there are a couple of points to consider:

  • In nullSafeEval and doGenCode, we essentially duplicated the code of our function in both Scala and Java, which is not ideal:
    - We would like to avoid code duplication.
    - We would like to avoid using two different programming languages, or use them minimally if necessary.
  • We would like to reuse the existing functions in our project by simply “wrapping” them in Catalyst Expressions — essentially, reuse them in nullSafeEval and doGenCode.

Alright, that’s possible too! Let’s assume our function is already implemented in the MyFunctions-object, for example like this:

object MyFunctions {

def myWeekOfDate(dateString: String): String = {
if (dateString.isEmpty) null.asInstanceOf[String] // Null safe
else {
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd", Locale.ENGLISH)
val date =
LocalDate.parse(dateString.toString, formatter)
val weekNumber = date.get(WeekFields.ISO.weekOfWeekBasedYear)
val year = date.get(WeekFields.ISO.weekBasedYear)
// Use formatted string with leading zero for weeks 1-9 (01-09)
f"$year-W$weekNumber%02d"
}
}
}

Then we can significantly simplify our case class myWeekOfYearExpression by reusing this function. In this case, it would look something like this:

case class myWeekOfYearExpression(child: Expression)
extends UnaryExpression
with ImplicitCastInputTypes
with NullIntolerant {

//...

override def nullSafeEval(dateString: Any): Any = {
val date = dateString.asInstanceOf[UTF8String].toString
UTF8String.fromString(MyFunctions.myWeekOfDate(date))
}

override protected def doGenCode(
ctx: CodegenContext,
ev: ExprCode
): ExprCode = {
nullSafeCodeGen(ctx, ev, dateString => {
val className = MyFunctions.getClass.getName.stripSuffix("$")
s"""
${ev.value} = UTF8String.fromString($className.myWeekOfDate($dateString.toString()));
"""
})
}

//...

}

Compared to the first version, here we only change nullSafeEval and doGenCode—essentially, we just "adapt" the call to the existing function MyFunctions.myWeekOfDate to use UTF8String instead of String.

At first glance, this approach is simpler and more elegant, but let’s not jump to conclusions.

Performance Comparison

Alright, let’s move on to the most interesting part — performance comparison! To do this, let’s “wrap” the exact same code in a Spark UDF, so we have a “baseline”. For example, like this (don’t forget about null-safety):

val myWeekOfYearUDF = udf((dateString: String) => {
if (dateString == null) null
else MyFunctions.myWeekOfDate(dateString)
}
)

Now we can use this UDF in our Spark jobs to compare the performance with the Catalyst expression implementation.

Before running tests, let’s examine the query plans for applying these functions to a column (into which we’ll insert randomly generated dates as strings):

// Variant 1

== Physical Plan ==
*(1) Project [random_date#6, MyWeekOfYear1(random_date#6) AS weekOfYear#8]
+- *(1) Project [...]
+- *(1) Range (...)

// Variant 2

== Physical Plan ==
*(1) Project [random_date#17, MyWeekOfYear2(random_date#17) AS weekOfYear#19]
+- *(1) Project [...]
+- *(1) Range (...)

// UDF

== Physical Plan ==
*(1) Project [random_date#28, UDF(random_date#28) AS weekOfYear#30]
+- *(1) Project [...]
+- *(1) Range (...)

We see that in the first two cases, Catalyst “sees” our functions. In the third case, the UDF is a “black box” to Catalyst, which is reflected in the query plan.

I performed performance comparisons on my modest MacBook Pro with an M2 chip, using default settings that I always use for our projects, and with Spark versions 3.2.0 and 3.5.0. However, this turned out to be unnecessary because the Spark version did not significantly affect the results. Therefore, I present the graphs below in a single instance, without separating them by Spark versions.

  • For each implementation variant, we will run experiments with 1 million, 10 million, and 100 million rows.
  • Each experiment will be repeated 50 times.
  • For each experiment, we will generate a DataFrame with the required number of rows containing random dates, apply our function, and save the data in Parquet format.

I didn’t run a full experiment for 1 billion rows because the general trend was already clear, and I didn’t want to wait a long time :-) Therefore, for 1 billion rows, I only ran experiments 5 times and provide box and whisker plots more for reference.

You can see the results in the graphs below (apologies for the aesthetics, I’m not an artist :-)). The Y-axis represents the execution time of the test scenario in milliseconds, and the X-axis shows different implementation variants: Variant 1 with inline Java code, Variant 2 with simply wrapping an existing Scala function, and the implementation variant using UDF.

“Box and whisker plot” for 1 million rows
“Box and whisker plot” for 10 million rows
“Box and whisker plot” for 100 million rows
“Box and whisker plot” for 1 billion rows, for reference.

And let’s see how much slower Variant 2 and the UDF are compared to Variant 1, depending on the data volume.

Variant 2 and the UDF are compared to Variant 1, depending on the data volume.

What we observe is:

  • Variant 1 (with inline Java code) is the most performant, as expected.
  • Implementation via UDF is predictably the slowest.
  • Variant 2 (wrapping a Scala function) and the UDF variant show a performance disadvantage of 8–20% depending on the data volume.
  • The performance gain from using Variant 1 tends to increase with larger data volumes.
  • It appears that our function myWeekOfDate is quite straightforward (who would doubt it :-)), so regardless of the implementation variant, we do not achieve as significant performance gains as with UUIDs in the original article (where UDFs took 190% longer for 100 million rows and 250% longer for 1 billion rows).

The fact that UDFs perform worse than Spark Native Functions was expected. However, the smaller difference between the two Catalyst Expression implementations and simultaneously between Variant 2 and UDF may appear surprising at first glance. Personally, I wasn’t surprised by this; rather, I was disappointed because this fact makes wrapping existing functions in Catalyst Expressions seem impractical — therefore, it’s challenging to significantly improve the performance of already used functions on the project with minimal effort.

So why do we observe a noticeable performance difference between the two implementation variants of Catalyst Expression: inline code vs. Scala objects? I see several reasons:

  • Code generation complexity: Embedding Scala objects and classes in doGenCode can increase the complexity of the generated Java code. Inline Java code directly integrates into Spark’s Catalyst optimizer and code generation. When writing code in Java, Catalyst generates Java bytecode from our inline code. However, when using Scala objects, the bytecode for Scala objects is generated by the Scala compiler, not directly by Catalyst itself (meaning Catalyst reuses the bytecode obtained from the Scala compiler), which can interfere with Catalyst’s ability to perform optimizations.
  • Compiler optimization: It’s possible that the optimizer may not fully optimize the invocation of an external function, resulting in performance loss. For example, when manipulating columns, we need to find them, extract values from various memory locations, and then apply transformations. Catalyst, when generating Java code for expressions, optimizes this process by combining it with code to access columns from the InternalRow representation and compiling everything into a unified JVM bytecode. This is one reason why such code tends to be more performant.
  • Additional function dispatching: This involves additional overhead during execution.
  • Garbage collection overhead: Increased memory usage may lead to more frequent garbage collection, impacting overall performance.

Conclusion

So now we know how to write our own Spark Native Functions. When I started writing this article, I thought we would cover examples of functions with two and three arguments, but it turned out the article was already quite extensive. For those who are truly interested, I hope they can easily develop such functions after reading this article, simply by inheriting their case class from BinaryExpression or TernaryExpression.

Perhaps one of the most important questions is whether it’s really necessary, beyond academic purposes. This is a very interesting question in my opinion.

Undoubtedly, using Spark Native Functions to perform operations at the Spark engine level can significantly speed up data processing by optimizing queries and reducing overhead costs. Furthermore, writing such functions provides greater flexibility and the ability to create specialized operations tailored to specific needs. On the other hand, it requires more expertise from the team and may complicate long-term code maintenance, as finding qualified personnel, in my experience, becomes increasingly challenging.

My personal viewpoint through an “applied” lens would be to adequately assess the potential performance gain considering your data volumes, the expertise of your team (including medium and long-term perspectives, which are much more complex), and how critical this performance boost is for solving your business problem. There may be simpler ways to accelerate your Spark jobs that are worth considering.

--

--

Nikolai N
EurowingsDigital

Highly skilled Big Data Engineer & Data Architect | Spark, Scala, Python, Cloud | Help companies leverage their data | Connecting business and IT