Advanced custom operators in Spark

Vladimir Prus
8 min readOct 11, 2021

--

In this post, we’ll look at advanced techniques to extend Apache Spark. We’ll create a new DataFrame transformation for a very efficient sorting, and we’ll create a UDF with a performance of a built-in function. The goal is to focus on the extension mechanisms and show that they are quite practical, therefore the actual algorithms will be intentionally simple.

Cutting Spark RDD to fit in the new world. Photo by Russ Ward on Unsplash

DataFrame transformations

Spark has a lot of high-level DataFrame transformations. But suppose you need a custom operation. For example, a specific sort algorithm will work best on your data, or you want to repartition data in a very specific way. The task would be easy if you had iterators in memory, but in the DataFrame world, no operation fits. The high-level abstractions get in the way.

One common recommendation is to escape to an older mechanism in Spark, called RDD. However, I’ll show below that this approach has performance problems, is not flexible enough, and there’s a better way.

The perils of RDD

First, let’s see why using RDD for customization has performance issues.

The DataFrame is just an alias for Dataset[Row] and calling its rdd method returns RDD[Row]. Row is an abstract class, and in current versions of Spark the concrete type is GenericRowWithSchema. It is nothing more than an array of Java objects, one per column. The objects are stored on the Java heap.

However, this is not a representation that Spark usually uses. Instead, there is a highly optimized class called InternalRow that stores column values inside large memory blocks (so-called Tungsten memory), using low-level memory operations. This results in a dramatic reduction of GC overhead and memory management. Whereas Row creates instances of small classes such as java.lang.Integer and wastes more memory than the data size, InternalRow has almost no overhead. In addition, taking over memory management allows Spark to exactly know memory usage, and switch to using disk if necessary, as opposed to getting killed.

Each Dataset instance has an associated Encoder that can convert between Row and InternalRow. Whenever you use Row-based operations, including RDD operations, Spark first converts from InternalRow to Row, then calls your Row functions, and then converts back. While it’s a linear-time operation, we give up all benefits of Tungsten and so use considerably more memory and put pressure on GC.

But there are problems beyond performance, and they stem from the fact that the 1:1 mapping between DataFrame and RDD I pictured above is also inaccurate.

When DataFrame is created, it only has a logical plan — a tree of operations needed to produce the data. But to actually produce it, the logical plan is converted to a physical plan. The process uses a set of strategies — passes that go over the tree, converting logical nodes to physical nodes. Only then, the SparkPlan.execute method is called, creates RDD, and does the work.

SparkPlan has attributes, like outputPartitioning and outputOrdering, which describe how the output data will be partitioned and ordered. More specific kinds of SparkPlan can also impose requirements on inputs. For example, the join operation requires that inputs are hash partitioned.

What happens if we ignore performance issues with RDD and try to use them to customize Spark? We can use a custom RDD partitioner or sort the data how we wish, but nothing of that will be known to SparkPlan and DataFrame layer in general. It will assume the results of our RDD are randomly partitioned and unsorted. Therefore, RDD might work if we immediately write the data, but is not even an option if further processing in the DataFrame layer is desired.

The picture above however shows the pieces we need to do better. Below, we’ll create the AlreadySorted operator. It does absolutely nothing, except for telling Spark that the data is already sorted, which is sufficient for Spark to skip additional sort pass. For a motivating example, consider code that merely moves data around

spark
.read.parquet("s3://your-landing-bucket/day=2021-08-28")
.write
.partitionBy("type")
.parquet("s3://your-mart-bucket/day=2021-08-28")

As I described in the post on Spark Partitioning, this will sort the data by the type column. But what if the data is already sorted? Let’s try to communicate it to Spark.

The structure of a custom operator

Creating a custom DataFrame operator requires at least:

  • Logical operator, which will be created by the user code
  • Physical operator, which will perform the actual transformation
  • Strategy class that converts logical operator to the physical one
  • Registration of the strategy class with Spark SQL session

Optionally, we can add an implicit class that can make our operator usage look exactly the same as built-in operators.

The operators

Let’s first define the classes for the logical and physical operators. The logical one is very easy

case class AlreadySorted(sortKeys: Seq[Attribute], 
child: LogicalPlan)
extends UnaryNode {
override def output: Seq[Attribute] = child.output
}

We’re extending the UnaryNode class, storing the input parameters and then informing Spark that output attributes are the same as for input. The only unclear aspect there is the “attribute” thing. While DataFrame operates in terms of Column instances, the internals of Spark use expression trees of the Catalyst optimization engine, and Attribute is Catalyst’s terminology.

The physical operator is conceptually easy as well:

case class AlreadySortedExec(sortKeys: Seq[Attribute], 
child: SparkPlan)
extends UnaryExecNode {
// Output has the same attributes as input
override def output: Seq[Attribute] = child.output
// Output has the same partitioning
override def outputPartitioning = child.outputPartitioning
// Pass child's data through
override protected def doExecute() = child.execute()
// Inform Spark that the output is sorted
override def outputOrdering =
sortKeys.map(SortOrder(_, Ascending))

}

Like with the logical operator, we specify that output attributes are the same as input attributes, but because this is an executable plan, we need to also specify how to compute data and what partitioning it has — and we pass the input through. And finally, we specify output ordering, where we declare that the data is already sorted.

The execution strategy

When Spark builds a physical execution plan, it runs a number of strategies over the logical plan, until no logical operator remains. To hook into this mechanism, we first need to define our own strategy that unconditionally converts between our two operators:

object AlreadySortedStrategy extends Strategy {
def apply(plan: LogicalPlan) = plan match {
case AlreadySorted(sortKeys, child) =>
AlreadySortedExec(sortKeys, planLater(child)) :: Nil
case _ => Nil
}
}

We then tell Spark to invoke our strategy class:

spark.experimental.extraStrategies = Seq(AlreadySortedStrategy)

At this point, we can rewrite our motivating example

spark
.read.parquet("s3://your-landing-bucket/day=2021-08-28")
.transform(df => {
val attr = $"type".expr.asInstanceOf[Attribute]
val l = AlreadySorted(Seq(attr), df.queryExecution.analyzed)
val e = RowEncoder(df.schema)
new DataFrame(df.sparkSession, l, e)
})
.write
.partitionBy("type")
.parquet("s3://your-mart-bucket/day=2021-08-28")

Inside the transform call, we obtain the logical plan of the original DataFrame and wrap it in our AlreadySorted operator, and then create a new DataFrame using the new logical plan. The code is not too tricky when written, but hardly anybody, this author included, remembers all this by heart. Let’s write the final piece: an implicit function that will encapsulate the logic above:

implicit class AlreadySortedWrapper(df: DataFrame) {
def alreadySorted(columnNames: Seq[String]): DataFrame = {
val columns = columnNames.map(
col(_).expr.asInstanceOf[Attribute])
val l = AlreadySorted(columns, df.queryExecution.analyzed)
val e = RowEncoder(df.schema)
new DataFrame(df.sparkSession, l, e)
}
}

As long as this definition is in scope, you can simply call the alreadySorted method on a DataFrame as if it was a built-in transformation.

Is this any good?

It’s worth asking whether the operator we’ve created is useful. It is certainly effective — in that it will eliminate unnecessary sort in our motivating example. So yeah, it’s a “very efficient sorting”, as I promised. It is however not widely useful — if the data is already sorted, then in-memory sort pass will be fast anyway, and if you’re out of memory, there will be other bottlenecks. I have used the sort example mostly because it’s easy, and shows the general pattern of extending Spark. We’ll apply it to a much more practical problem in the next post.

Efficient user-defined functions

Let’s turn from dataframe transformations to column transformations. You have a math code that takes an integer and returns an integer, and you wish to apply it to a column. The code is too complicated to express as a Column expression. You can wrap it as a UDF, but you know what UDFs are fairly inefficient. There must be another way, and it’s to create a custom Catalyst expression with code generation support.

Code generation

Code generation is a Spark mechanism to speed up the execution of expressions. Suppose you have

df.withColumn("totalCost", $"price"*$"quantity" + $"shipping")

Multiplying two numbers and adding a third is easy for the computer. But if at each step, we’ll have to lookup columns by name, retrieve them from different parts of memory (ruining caching), and only then do the math, it will be very slow. Therefore, Spark generates Java code for the expressions, combines it with code to access the columns from the InternalRow representation, and compiles it all into JVM bytecode.

The motivating example

If your source data has timestamps in milliseconds, and you want to get a Timestamp column, Spark surprisingly does not have a nice solution. Below is the simplified picture of the available time transformations.

The Timestamp type stores microseconds value, precise enough for us. But if we cast an integer to Timestamp, the integer is assumed to be a value in seconds. And if we take the long route via String, we’ll find exactly the same issue. But wait, if Timestamp uses almost the format we have, this makes for a very simple example for creating a custom operator.

The operator

The gist of our solution fits in 10 lines.

case class MillisToTs(millis: Expression) 
extends UnaryExpression with ExpectsInputTypes {

override def child: Expression = millis

override def inputTypes: Seq[AbstractDataType] = Seq(LongType)
override def dataType: DataType = TimestampType

override protected def doGenCode(ctx: CodegenContext,
ev: ExprCode): ExprCode = {
defineCodeGen(ctx, ev, c => s"$c * 1000")
}
}

The expression we create holds the reference to the input expression. It requires the input to be LongType, and defined the output to be TimestampType. The doGenCode method just multiplies the input by 1000, to convert from milliseconds to microseconds.

There are two further methods we have to define for completeness — one to give our function a name in debug outputs, and another as a fallback if the user has disabled code generation for any reason.

override protected def nullSafeEval(timestamp: Any): Any = {
timestamp.asInstanceOf[Long] * 1000
}

override def prettyName: String = "millis_to_ts"

Finally, we need a friendly way to use our expression:

def millis_to_ts(c: Column) = new Column(MillisToTs(c.expr))

With that in place, converting milliseconds to Timestamp is as easy as

df.withColumn("event_ts", millis_to_ts($"event_ts_ms"))

If you’re interested, the complete implementation, along with tests, is available on GitHub.

Is this any good?

As in the first example, our primary goal was to demonstrate extension mechanisms in the simplest fashion. There are other ways to convert milliseconds to timestamps, and such conversion is hardly a performance bottleneck. That said, the approach we’ve developed is safe and easy, and is useful in practice.

Conclusion

In this post, we looked at two ways to customize Spark. We created a DataFrame transformation that marks data as already sorted, and we created a UDF that converts milliseconds directly into Timestamp instances.

Both examples were easy, but the extension mechanisms are widely applicable. In the next post, we will discuss using DataFrame transformation to completely control partitioning.

--

--