Apache Spark WTF??? — Stranded On Dates Rows

Ángel Álvarez Pascua
Towards Data Engineering
13 min readAug 28, 2024

Are you a Spark Scala enthusiast or more of a Pythonic PySpark aficionado? Maybe you think Java is just some old-school language you don’t really need to master to be a top-notch Spark developer? Well, in this article, I’m going to prove you wrong!

Welcome back to a new, summer-scorching episode! This time, we’re going on a date … not to meet a romantic partner, but to dive into an intriguing and challenging issue with invalid dates and how Java internally handles exceptions while parsing strings to dates. Trust me, you’re going to learn something new today — just like I did while uncovering this issue.

So … let’s take a travel down the blindside of Spark … and see what we find on … this path … called … Stranded On Dates Rows!

(The soundtrack for this article couldn’t be other than Stranded On Death Row)

The Case

Last week, a colleague reached out to me with a rather perplexing problem. When they tried submitting a Spark job to a Cloudera cluster, the job would always get stuck at the same task right at the start of execution. It didn’t seem to complete any tasks or throw any errors; it just sat there, idle. After more than an hour of this non-action, they had no choice but to manually kill the job.

There are usually dozens of processes running on this cluster, and it can get pretty saturated at times. Sure, that might explain why processes run a bit slower, but getting totally stuck at the same point? That’s a bit suspicious, don’t you think?

The Inquiry

Whenever a program gets stuck, my first thought is always: locks.

A lock is a synchronization tool used to control access to a shared resource among multiple threads. It makes sure that only one thread can access the resource at a time, preventing race conditions and ensuring data consistency. When a thread grabs a lock, other threads are blocked from accessing the resource until that lock is released. It’s like a “do not disturb” sign for threads.

When dealing with stuck threads, it’s crucial to understand the difference between deadlock and livelock:

  • Deadlock: In a deadlock situation, threads are completely blocked, each one waiting for resources held by the others. There’s no progress at all — just idle waiting. Because the threads are blocked and not performing any work, CPU usage is generally low. They’re stuck in an endless loop of waiting for each other, which means nothing’s getting done.
  • Livelock: Here, the threads are active and constantly responding, but they’re stuck in a never-ending cycle of futile actions. While there’s a lot of activity, it’s all in vain — CPU usage is high because the threads are actively running and trying to resolve the situation, but they’re not making any real progress. It’s like a hamster on a wheel: lots of movement, but no forward motion.

Here you can find more detailed info about locks.

So, what kind of lock was causing our Spark program to get totally stuck? To figure this out, we need to dig into what’s happening inside our JVM process. (Heads up, Python developers: even in PySpark applications, there are one or more JVMs at play.) To investigate, we can use the jstack command, which is included in most JRE installations, or take advantage of the handy link available in the Spark UI console.

Spark UI > Executors > Thread Dump link in the executor row with active tasks

As shown in the previous image, we have just one executor with a single active task. By clicking on the “Thread Dump” link, we get a list of threads for that executor. Threads can be in various states, but we’re mainly interested in those in the RUNNABLE state — these are the threads actually doing work. Specifically, we want to focus on the threads in the JVM that are running Spark tasks.

Each Spark executor has a ThreadPoolExecutor managing a pool of threads, created when the executor starts up. This ThreadPoolExecutor allows the executor to handle multiple tasks concurrently. The number of threads in this pool is usually configured according to the number of cores allocated to the executor (e.g., spark.executor.cores).

When a task is submitted to the executor, it’s handed off to the ThreadPoolExecutor. The executor then picks a thread from the pool to execute the task. Each task is wrapped in a TaskRunner instance, which implements the Runnable interface. The TaskRunner.run method is where task execution kicks off within the executor. When the ThreadPoolExecutor assigns a thread to a task, it calls the run method of the TaskRunner.

So, to pinpoint the issue, we need to look for threads in the stack trace that reference TaskRunner. And there we have it…

Scala/Java stack traces are read bottom-to-top, while Python traces are read top-to-bottom

Stack traces never lie, and this one reveals that locks are indeed at play, just as suspected.

Here’s a closer look at the stack trace main components:

  • org.apache.spark.scheduler.ShuffleMapTask: This handles map-side shuffle operations by processing and writing intermediate data for shuffling. It iterates through rows, applies the map function, partitions the results based on shuffle keys, and writes them to shuffle files. Interesting? Sure. Important for our case? Not really.
  • org.apache.spark.sql.execution.WholeStageCodegenExec: This optimization mechanism generates Java bytecode to execute multiple operators in a single function, reducing function call overhead, optimizing CPU usage, and improving execution speed by cutting down on intermediate data structures and Java method calls. Interesting? Definitely. Important for our case? Probably not.
  • java.text.ParseException.<init>: When you see <init> in a stack trace, it indicates that the constructor of that class is being executed. This tells us the JVM was creating a new instance of that class at the time of the thread dump. Interesting? Yes. Important for our case? Maybe (you don’t usually see constructor calls in thread dumps unless the class construction is somehow expensive)
  • java.lang.Throwable.fillInStackTrace: This method acquires a lock or monitor on an internal object while capturing and populating stack trace information. Interesting? Absolutely. Important for our case? Definitely! Since we’re dealing with locks, this should be a key piece of the puzzle.

The Alibi

So, let’s piece together what we’ve discovered so far: We have a task that gets stuck and doesn’t throw any external errors. Internally, it’s not only creating a ParseException instance but also seemingly getting locked while doing so.

But wait… is it really getting locked? Or are we misinterpreting temporary locks as deadlocks or livelocks?

If we repeat the process and gather more thread dumps, we usually see the same stack traces. However, sometimes there are slight changes. Even when stack traces look identical, there are minor differences, like the hash code of the ParseException class, which changes with each dump we take.

The hashcode of the object goes after the @ symbol

Not only that, but if we take a really, really, REALLY close look at the Spark UI, we can actually see that the application is moving forward and making progress!

Stages > Active Stage > Input Size / Records

At first glance, the Stages tab in the Spark UI seems to show the same information repeatedly. But if we wait a few minutes, reload the page, and pay close attention to the “Input Size/Records” column, we can see the number of records for the active task increasing.

This means the Spark application is actually working as expected! It’s just taking a painfully long time to complete that task. So, we’ve been barking up the wrong tree all along! We’re not dealing with locks, or at least, no lock is causing our Spark application to get stranded.

Still, this revelation doesn’t bring us any closer to pinpointing the exact cause … or does it?

The Real Culprit

On one hand, we have a task that’s running slower than expected. On the other hand, every time we snapshot the threads in the executor, we often see the same thread doing the same thing: creating a new ParseException. Almost always, we catch that thread “red-handed” executing the fillInStackTrace method from the Throwable superclass.

What if this seemingly innocent method is actually cost-intensive?

As it happens, fillInStackTrace is actually a pretty expensive operation — one of the costliest in Java. On top of that, it’s synchronized, which means there’s some extra overhead involved. But…

  • Why costly? The method traverses the entire call stack to capture method names, line numbers, and file names, which is computationally expensive. This process involves native calls, reflection, and the creation of many StackTraceElement objects — all of which add up to significant overhead.
  • Why synchronized? Synchronization ensures thread safety and keeps the stack trace data consistent when accessed or modified by multiple threads. But is this necessary during the instantiation of a new object? Not really, because no other thread can access the object until its constructor has finished. This means, no other thread could run the fillInStackTrace method while the object is still being constructed. So, in this case, we’re dealing with completely unnecessary overhead.

But here’s the kicker: in this case, we’re not actually using all that costly-gathered stack trace information in the newly created ParseException objects. Spark is only leveraging the Throwable mechanism to handle invalid string dates when parsing them. The goal is to detect these invalid dates, handle them properly, and return a null value instead.

Upon reviewing our Spark application’s code, we found that it was dealing with an 800 million-row DataFrame, where several string columns were being parsed to dates using Spark’s built-in to_date function.

So, let’s take a closer look at the documentation for this function:

Returns null with invalid input … and takes ages to finish when there is a lot of invalid input

The documentation doesn’t mention how having too many invalid string date values can seriously impact performance. That’s why I’ve just opened a ticket about this issue (SPARK-49288). They might not give it much attention, but I think it’s crucial to highlight how expensive it can be to handle numerous invalid values.

The real issue, as I see it, isn’t just about being cautious with invalid string dates during parsing. It’s that Spark creates all these exceptions internally, which keeps users completely in the dark about what’s happening. Most importantly, it’s worth noting how costly it can be to parse strings into dates when there are invalid values — empty strings count as invalid too.

DIY (= Do It Yourself)

Once we understand what’s going on, it’s straightforward to isolate and reproduce this behavior in your local environment. Here’s a quick project setup to test this issue, just follow the steps below:

  1. Create a Spark session
  2. Generate and populate DataFrames with invalid date string values (empty strings) of different sizes: 1,000,000 rows, 5,000,000 rows and 10,000,000 rows.
  3. Perform the following tests for each DataFrame size:
  • TO_DATE WITH EMPTY VALUES: Count nulls after applying the to_date function.
  • TO_DATE WITH NULL VALUES: Convert empty strings to null, then apply the to_date function, and count the resulting null values.
  • WITHOUT TO_DATE: Count the number of null values without applying the to_date function (i.e., all values remain empty strings).

We wanted to explore not only how costly applying to_date to empty strings is, but also whether null values have a negative impact and how the number of threads might affect the results. Here are the times (in seconds) we recorded for each test case. We ran the tests three times and calculated the average time for each case:

Times taken on average (three launches)

From the data, we can draw the following conclusions:

  • Applying to_date to empty values is 2x to 3x more expensive than either replacing empty values with null or not calling to_date at all when dealing with empty strings.
  • Replacing empty values with null or skipping to_date shows similar performance, with no significant difference observed between the two approaches.
  • Although we observed the worst performance with 1 thread, the differences in execution time between using 1, 8, or 16 threads were not as significant as expected. In fact, using 8 or 16 threads resulted in almost identical times across all tests.

Any More Interesting Takeaways From The Tests?

Although not directly related to the core issue, there are two important aspects to highlight from the test:

  • Why caching the dataframe? (line 25) Caching is crucial because we’re applying several transformations and a final count action to the same data. Without caching, the to_date calls would execute while the DataFrame is being populated (like in a some kind of streaming pipeline), mixing the time spent on populating it with the time spent parsing string values. By caching the data, we ensure that we’re measuring only the time taken by the to_date operations, not the overhead of DataFrame creation and population.
  • Why counting the dataframe before the three tests? (line 28) This is another aspect of Spark’s laziness. If we don’t perform this initial count, the first count during the to_date with empty values test would include the time Spark takes to create and populate the DataFrame. This would be unfair for the first test. Counting before the tests ensures that we’re comparing performance accurately, as all tests start with the same cached DataFrame.

Final Thoughts …

Is Java to blame for having a costly error-handling mechanism, or is it more that Spark is using this mechanism without considering the potential impact of handling millions upon millions of parsing errors?

I’ve worked with Java for the past 23 years, helping troubleshoot and fix lots of performance issues in applications of all kinds and sizes — ranging from simple batch jobs to massive web applications, including many third-party commercial products. Despite this extensive experience, I only discovered last week how costly constructing Throwable instances can be. That tips the scales more in favor of Spark as the culprit here, don’t you think?

The way Spark parses strings to dates is quite standard. I’ve done it myself dozens of times without any performance issues. The real problem here is that we’re not just handling a few minor calculations or serving results to a web user; we’re parsing millions of strings. And when you perform a task millions of times, and that task isn’t well-thought-out or optimized, you can easily run into performance issues like this one.

To all the Java detractors out there: would I have been able to pinpoint the root of this problem without Java experience? Hmm… probably not, or at least not as easily (it wasn’t too hard, but it did take me several hours and some help from a … friendly outsider). My point is, whether you like it or not, Spark is fundamentally a Java program and relies on the JVM at its core.

At the end of the day, if you’re using PySpark, I’d venture to say that most of the code you’re running is actually in Java. For Scala developers, this is also true: while Spark is written in Scala, it relies heavily on Java libraries and the JVM.

What I’m getting at is, while you might not need deep Java knowledge to be a Spark developer, having some understanding of Java and the JVM can be incredibly useful when you encounter issues like this. So, the more you know, the better you can troubleshoot and understand what’s happening under the hood.

… And Final Doubts

There were some doubts that quickly sparked to my mind when we found out about the real culprit of the slowness:

  1. Is Spark creating a new GregorianCalendar in every to_date call? No, it seems there is only one instance per thread of the class that is dynamically generated during the Whole-Stage CodeGen phase. The Calendar class from which GregorianCalendar inherits is not thread-safe, that means, there would be problems and unexpected results if we try to reuse the same instance from more than one thread concurrently. But because every instance of the Whole-Stage CodeGen class has its own instance of GregorianCalendar, everything runs smoothly.
  2. Does it behave the same in later Spark and Java versions? Even though the issue here was at the core of how Java internally handles errors and exceptions, maybe there could have been differences in later Spark or Java versions. No differences were found: the same tests using Spark 3.5 and Java 11 produced similar results.
  3. Is this issue limited to ParseException? Not at all. Whenever you create a new exception lots of times, you can run into the same problem. For instance, if you define a UDF that catches and internally ignores exceptions, but the data processed by the UDF generates those exceptions, your Spark application will suffer and take longer to complete.

Acknowledgments

This article wouldn’t have been possible without the invaluable contribution of Sumit Kumar Jethani Jethani. Thank you for bringing this issue to my attention.

I also want to express my sincere gratitude to Java Champion Dr Heinz Kabutz. Your guidance and insights during the investigation were incredibly valuable and greatly appreciated.

There is three types of people in the world:

  • Those who don’t know what happened
  • Those who wonder what happened
  • And people like us that make clear what happened

Did you like this article? Continue reading other interesting episodes from the Apache Spark WTF??? series:

Sick and tired of debugging some error or performance issue? I might help you! Send me your case! I’m genuinely passionate about intriguing errors and performance challenges — those are the things that keep me up at night and also thinking while walking my dog. I’ve been solving interesting cases for the last 15 years. so … don’t hesitate to reach me out at https://www.linkedin.com/in/angel-alvarez-pascua .

--

--