Apache Spark WTF??? — Padding, Padding Cool

Ángel Álvarez Pascua
Towards Data Engineering
10 min read6 days ago

At first glance, padding strings seems like one of the most trivial tasks in programming — mundane, even. It doesn’t appear to have much significance. But you know, appearances can be deceiving sometimes. What if I told you that string padding could actually be both intriguing and insightful? And what if we could uncover valuable insights about how Apache Spark operates internally, simply by padding strings? Would you believe me?

Welcome to the latest installment of the “Apache Spark WTF???” series. In this episode, we won’t be diving into a real-world scenario from a project I’ve worked on. Instead, I’ve set a personal challenge to explore string padding within Spark. Don’t worry, though — padding is just the excuse. As always, our journey will still take us deep into the uncharted and sometimes inhospitable realms of Spark’s internal workings.

The soundtrack for the article this time is ‘Daddy cool’ by Boney M.
Ready to put on your best dancing shoes? Let’s hit the disco floor!

Photo by Dustin Tramel on Unsplash

I’m crazy like a fool, wild about padding cool …

The Challenge

As you may know, Apache Spark comes equipped with a variety of handy built-in functions, including lpad and rpad, which allow for padding strings on the left and right sides, respectively. My objective for this challenge is to pads both sides of a string. And what’s the challenge, you ask? The goal is to implement this in the most efficient and performant way possible — or at least as efficiently as I can manage.

To address this, we’ll begin with a simple test. First, we’ll create a DataFrame with N × 10 million rows, where N is the number of cores available on our local machine. The DataFrame will have a single string column, with each row containing the character ‘+’. Next, we’ll apply a custom padding function to pad both sides of the strings with the character ‘-’, until the total length of each string reaches 100. Straightforward, right?

NOTE: Initially, I considered populating the DataFrame with randomly generated strings of varying lengths and characters. However, I soon realized this approach could introduce significant timing variations, potentially leading to discrepancies in the test results.

Version #1: First Things First

Let’s start from the beginning. We need to design a test that first populates a DataFrame with the test strings, then applies padding to them, while measuring the time taken for each operation.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._


object PaddingVersion1 {

def main(args: Array[String]): Unit = {
// Initialize Spark Session
val spark = SparkSession.builder
.master("local[*]")
.getOrCreate()

// Define constants
val numRows = spark.sparkContext.defaultParallelism * 10000000
val initChar = "+"
val padChar = "-"
val maxChars = 100

import spark.implicits._

// Create and populate a new dataframe
val startPopulating = System.currentTimeMillis()
val plusList = Seq.fill(numRows)(initChar)
val populatedDf = plusList.toDF("str")
populatedDf.cache().count()
val timePopulating = System.currentTimeMillis() - startPopulating

// Define the UDF to pad a string with a character on both sides
val padStringUDF = udf((input: String, length: Int) => {
val totalPadding = length - input.length
val paddingLeft = totalPadding / 2
val paddingRight = totalPadding - paddingLeft
padChar * paddingLeft + input + padChar * paddingRight
})

// Apply the padding UDF, replacing the existing column
val startPadding = System.currentTimeMillis()
val paddedDf = populatedDf.withColumn("str", padStringUDF(col("str"), lit(maxChars)))
paddedDf.show(truncate=false)
val timePadding = System.currentTimeMillis() - startPadding

// Print final info
println(s"numPartitions populatedDf: ${populatedDf.rdd.getNumPartitions}")
println(s"numPartitions paddedDf: ${paddedDf.rdd.getNumPartitions}")
println(s"numRows: $numRows - maxChars: $maxChars - timePopulating: $timePopulating - timePadding: $timePadding")

// Wait for the user to press Enter
println("Press Enter to continue...")
scala.io.StdIn.readLine()

// Stop Spark session
spark.stop()

}
}

A few clarifications:

  • spark.sparkContext.defaultParallelism is used to determine the total number of cores in our cluster. In local mode, this will give us the number of cores available on our machine.
  • populatedDf.cache().count() is used to ensure padding times don’t include DataFrame population. Since Spark is lazy, caching and counting forces the DataFrame to populate first, isolating the timing for each step.

If we run this initial version, we get an OutOfMemory error after waiting a few minutes. Additionally, if we check the Spark UI console, we see that no job has been triggered. Why? This happens because, although the Spark session is active, we’re generating all the data outside Spark using a single thread, and only then converting it into a DataFrame using toDF().

Version #2: Populating Cool

Using toDF() to convert lists or sequences to DataFrames is straightforward, but it’s important to consider its limitations. While toDF() is convenient for small datasets, as it allows for easy transformation and the assignment of column names, it is not optimized for larger collections. This is because toDF() is not parallelized and requires collecting all data to the driver, which can lead to performance bottlenecks and memory issues.

For larger datasets, it’s better to use spark.createDataFrame() or spark.range() directly from an RDD or a parallelized collection to take advantage of Spark’s distributed processing capabilities. Choose spark.createDataFrame() when you need to convert existing data into a DataFrame and require flexibility in defining the structure. Opt for spark.range() when you need a quick and efficient way to create a DataFrame filled with a range of integers, especially for testing or simulation purposes.

In the next iteration of the test, we create a DataFrame with numRows rows, where each row contains a single column named “str” filled with the string “+”. The process begins by generating a range of consecutive integers from 0 to numRows — 1, then adds the “str” column with the constant value “+”, and finally removes the default integer column named id, which is internally generated by range().

val populatedDf = spark.range(numRows)
.withColumn("str", lit(initChar))
.drop("id")

When running this second version… wait, no OutOfMemory error? Why, even though the data is the same? It’s simple: in the first test, we’re attempting to fit two copies of the data into memory — one when we create the data outside the Spark session, and another when we use toDF() to convert it into a DataFrame.

Still, the time spent populating the DataFrame is consistently slightly greater than the time spent padding the strings. Additionally, are we certain that we’re padding all the rows?

Version #3: Executing Cool

Let’s check the Spark UI for verification, specifically the Executors tab.

Spark UI > Executors tab

Holy cow! There’s only one task? Why is that?

Spark uses jobs, stages, and tasks to efficiently manage and execute distributed computations:

  • Jobs: Represent high-level actions that trigger computation.
  • Stages: Divide a job into parallelizable parts based on data dependencies.
  • Tasks: Are the smallest units of work, processing data partitions within a stage.

Given that tasks are units of work that process data partitions, and our DataFrame has N partitions — where N is the total number of cores in our cluster — we should expect N tasks to be executed, right?

Not only that, but if we navigate to the Stages tab in the Spark UI, click on the only available stage, and check the bottom of the page, we see that the total number of records processed is 21. Why 21 and not 20? This is primarily due to how Spark handles data retrieval, ensuring that it has enough rows to meet the request.

Spark UI > Stages tab > Click at the stage > Table at the bottom

It appears that the show action applied to our DataFrame does not trigger the processing of all the partitions. Since this action only displays a sample of the data, it doesn’t require calculations across all partitions. As a result, only the calculations in one of the partitions are executed.

If we replace the show action with a count action, that should do the trick, right? Well, yes… and no. While the count action would utilize all the partitions, it does not require applying the padding, as counting rows in the resulting DataFrame does not invoke our padding UDF. As a result, we end up with misleading padding times.

To trigger calls to our UDF, we need to explicitly use the padded column before counting, ensuring that Spark performs the necessary calculations. One straightforward way to achieve this is by applying a trivial filter using the padded column, as shown below:

paddedDf.filter("str is not null").count()

The time spent on populating is less than on padding, with the latter increasing considerably as expected. However, we observed an unexpected behavior: while the show action only triggers one job with no shuffle, the count action triggers two jobs and involves shuffling.

Info gathered from Jobs and Executors tabs in the SparkUI

To understand why Spark triggers two jobs and performs a small shuffle, let’s examine the count method inside Dataset.scala.

Spark performs partial counts in parallel across partitions and then aggregates these partial results to compute the total count. The shuffle operation is required to gather and combine these partial counts from different partitions into a single result, as the final aggregation requires collecting the counts in one place. However, this shuffle is relatively small compared to other operations (e.g., shuffles in joins or repartitions) because it only involves summing the partial counts, rather than transferring large volumes of data.

The count method involves two jobs due to the need for both preparation and execution phases:

  • Preparation Job: The first job prepares the execution plan, which involves generating a logical and physical plan outlining how the data will be processed and aggregated. This phase sets up the necessary operations, such as shuffling and partitioning.
  • Execution Job: The second job executes the plan by processing the data in parallel across partitions and performing the aggregation to compute the final count.

Actually, Adaptive Query Execution (AQE) is responsible for triggering the two jobs. According to the Spark documentation:

Performance Tuning — Spark 3.5.2 Documentation

When AQE is disabled, we observe only one job with 17 tasks. While the execution time remains the same and the execution plan becomes simpler, the small shuffle still occurs.

Is there any way to avoid the shuffle? Do we really need that count? Or, to put it another way, is there a way to make Spark pad all the values at minimal cost? Yes, by using the No Operation writer (noop).

paddedDf.write.format("noop").mode("overwrite").save()

The noop writer in Spark is a specialized output writer that performs no writing operations. Introduced in Spark 3.0, its primary use is in scenarios where the writing step is not required, such as for benchmarking or when you want to trigger transformations without saving the results to disk. The noop writer optimizes performance by avoiding unnecessary I/O during these operations.

By default, Spark uses SaveMode.ErrorIfExists when no mode is specified during the write operation. This means that if the destination (e.g., a table, file, or directory) already exists, it will throw an error, whereas if the destination does not exist, Spark will proceed with the write operation. However, since the noop writer doesn’t actually write any data, using the ErrorIfExists mode is nonsensical and will result in an error, as there is no real ‘existing’ data to check.

When testing the noop writer, we achieve only one job, one stage, 16 tasks, and no shuffle!

One job, one stage, 16 tasks and no shuffle

Version #4: Padding … But Padding Cool

Preferring built-in functions over UDFs is a widely recognized performance tip. But does this advice apply in this case? Absolutely! When we replace our UDF with built-in functions, we experience a significant reduction in padding time in the test.

val paddedDf = populatedDf.withColumn("str",
expr(s"""LPAD(RPAD(str, LENGTH(str) + ($maxChars - LENGTH(str)) / 2, '$padChar'), $maxChars, '$padChar')""")
)

To understand why, we need to delve into the details of the lpad and rpad built-in functions within UTF8String.java. Have you noticed that the code is written in Java rather than Scala? It appears that when Spark aims to achieve better performance at a lower level, it relies on Java instead of Scala. Isn’t that interesting?

lpad method in UTF8String.java

Basically, the lpad method left-pads the source string with the specified padding string using byte operations until it reaches the desired length. It calculates the necessary padding, constructs a new byte array, copies the padding and the original string into this array, and returns the resulting string. It’s important to note that both the source string and the padding string must be encoded in UTF-8.

And this only for left-padding, but we’re also right-padding the string; we have the corresponding rpad method too.

Can we beat this? Unlikely, as it appears to be well-optimized already. One potential idea could be to merge the lpad and rpad methods into a single function to execute checks and common calculations only once. However, since the code is written in Java, we would likely need to implement our method in Java, package it in an external JAR, and include it each time we launch our Spark application, calling it from a UDF. Is all that effort worthwhile? I don’t think so. Additionally, using UDFs in PySpark incurs extra overhead.

Challenge Accomplished!? (aka Final Thoughts)

While it’s true that appearances can be deceiving and the idea of outsmarting Spark by writing our own UDFs can be tempting, it’s not an easy task. We must remember that Spark is already optimized, and well-known performance tips exist for a reason; we must follow the tips unless proven otherwise.

Anyway, the challenge was to pad strings on both sides as efficiently as possible, and I believe we’ve done a great job while learning along the way. At the beginning of this article, I mentioned that padding, rather than being mundane, could become an intriguing and insightful task — and I hope it has been for you as well.

Here you can find the repository for the test project.

See you next time!

Sick and tired of debugging some error or performance issue? I might help you! Send me your case! I’m genuinely passionate about intriguing errors and performance challenges — those are the things that keep me up at night and also thinking while walking my dog. I’ve been solving interesting cases for the last 15 years. so … don’t hesitate to reach me out at https://www.linkedin.com/in/angel-alvarez-pascua .

--

--