Apache Spark WTF??? — Spark (Broad)Cast A Spell On Me
Welcome back to uncovering some of Spark’s darkest and lesser-known secrets. In this episode, we’ll delve deep into a concerning issue regarding a particularly mischievous broadcast join. We aim to understand how broadcast joins trully function and, more precisely, how Spark’s autobroadcast performs its (black) magic. We’ll explore potential pitfalls and challenges that may arise, and most importantly, why they occur.
So, lower the blinds, dim the lights, and grip your wand tightly, as we depart from the Leaky Driver tavern and venture into the uncharted, forbidden forest of Broadcast Joins. Here, before you can (broad)cast any spell, you might face totally unexpected attacks and be torn to pieces by despicable and remorseless creatures known as… OutOfMemory demons!”
(Soundtrack for this article: I Put a Spell On You)
The Case
First and foremost, we’ll need the following sorcerous ingredients to brew our soury concoction:
- Big Golem’s Heart (aka Big Table). It’s not really big big, just … big enough. A medium-sized “heart” will suffice.
- Moonlight Essence (aka Small Table). A tiny fraction of “essence” is crucial for (broad)casting this spell. The smaller, the more astounding the results.
- Grimoire of Brews (aka Hive Catalog). All components must be properly registered in the “Grimoire” for this potion to be successfully crafted.
Once we have all these elements at hand, we perform a simple inner-join spell, only to encounter an always fabulous and never unwelcomed … Out Of Memory!
Why is this possible? How could such a seemingly straightforward task in Spark fail so miserably? Is it a bug? A misconception on our part? Is the world as we know it coming to an end? Not yet, my dear friend, not yet. We just need to delve a little deeper into the intricacies of joins in Spark.
Join Selection Strategy
When dealing with joins in Spark, the engine employs various strategies based on different criteria to determine the most cost-effective approach:
- Equi / Non-equi Join: Equi-join involves conditions with equals (==) operators, while non-equi join includes conditions with non-equals (<>, >, <, etc.) operators. Spark utilizes different strategies based on the join type. Equi-Join Strategies in cost-performant decreasing order: Broadcast Hash Join, Sort-Merge Join and Shuffle Hash Join. Non-equi Join Strategies also in cost-performant order: Broadcast Nested Loop Join and Cross (Cartesian) Join.
- Join Type Suitability: Spark supports various join types such as inner, outer, semi, anti, and cross (cartesian) joins. However, not all strategies are suitable for every join type. For instance: Broadcast Hash Join is efficient for inner and semi joins but not recommended for outer/anti joins and cross joins due to data preservation challenges and potential data explosion.
- Table Sizes Consideration: The size of the tables involved in a join plays a crucial role in determining suitable strategies. For instance, broadcast join requires the broadcast data to fit into the memory of the driver and all executors. Failing this requirement prohibits the use of broadcast joins.
This understanding helps optimize Spark’s join operations by selecting the most appropriate strategy based on join conditions, types, and table sizes.
How Broadcast Hash Join Works
In the scenario we’re examining, where an equi-join is involved and our smaller table fits the criteria for a Broadcast Hash Join, Spark follows these steps:
- Data Collection: The driver node gathers data from partitions of the small table, which may be distributed across one or more executors.
- Hash Table Creation: Using the fields of the join conditions as keys, the driver constructs a hash table.
- Broadcasting: The serialized hash table is broadcast across the executors. Each executor receives a single read-only copy of the hash table, regardless of the number of cores.
- Hash Join Execution: On each executor, a standard hash join is executed. That implies:
- Rows from the large dataset are hashed using the join key.
- These hashed rows are paired with corresponding entries in the broadcast hash table.
- Concurrently, the join operation is performed on each executor, and matched records are returned as results.
This efficient process leverages distributed computing capabilities, reducing data movement and enhancing join performance for scenarios where one dataset is significantly smaller than the other.
Spark employs Broadcast Hash Join automatically when it estimates that the smaller dataset’s size is below a certain threshold and can fit into memory across all nodes. The property spark.sql.autoBroadcastJoinThreshold sets this threshold and defaults to 10 MB, but can be adjusted according to specific application needs. However, Spark allows to disable this automatic mechanism globally (for all joins) by setting the threshold to -1.
Spark’s Catalyst Optimizer handles the selection of join strategies based on dataset characteristics. Yet, developers can explicitly instruct Spark to use a broadcast join using the broadcast() hint in SQL or the .broadcast() method in the DataFrame API. This hint overrides the autoBroadcastJoinThreshold setting. Notably, if a broadcast hint is specified on either side of the join, the hinted side will be broadcast, regardless of the automatic threshold evaluation.
This flexibility allows developers to fine-tune join strategies for optimal performance, especially in scenarios where they have prior knowledge of dataset sizes and distribution.
Pros of Broadcast Hash Join:
- Reduced Data Shuffling: Broadcasting the smaller dataset minimizes the need for shuffling large data volumes across the network, reducing network traffic and latency.
- Improved Performance: Local join processing on each node enhances execution speed compared to traditional joins involving extensive data shuffling.
- Optimized Resource Usage: Efficient memory utilization across the cluster by storing broadcast data in memory reduces disk I/O operations, optimizing overall resource utilization.
- No Sorting Required: Hashing eliminates the need for data sorting, saving computational resources and time.
- Data Skew Handling: Helps distribute data processing evenly across nodes, mitigating issues caused by data skew and preventing node overloading.
Cons of Broadcast Hash Join:
- Memory And Network Overhead: Broadcasting and storing data across all nodes can incur additional memory and network overhead, particularly in large clusters or with frequent broadcast operations.
- Scalability Limitations: While effective for smaller datasets, scalability can be limited with larger datasets due to memory constraints and network traffic.
When encountering errors in remote environments like Databricks or Cloudera, where troubleshooting can be complex, recreate the error in a simpler, isolated, and more manageable environment could be time-saving and sometimes, our only chance.
In this case, we were submitting our job on a Cloudera 7.1.7 cluster with Java 8, Scala 2.11.12 and Spark 2.4.7. We had a join between a “big” table with +30 millions of rows and a small table with no rows. The issue shouldn’t be that difficult to reproduce in our local environment, should it? Well, indeed it ended up being.
Try #1: Creating On-The-Fly Tables
Let’s kick things off with a straightforward approach. We’ll begin by crafting a large table featuring just two columns: one for the key and another for the value. We’ll fill this table with thousands of rows, each containing random values. You can check out the source code of the test here, and take a look at the query plan is generated:
The execution didn’t go as expected, adding even more mystery to the matter. Not only did the error fail to reproduce, but Spark also opted for a much less cost-effective SortMergeJoin instead of a broadcast join. Why???
To shed some light on the situation, we can ask Spark to provide us with the number of rows and the size in bytes of the two dataframes involved, using the statistics that are internally calculated from the execution plan. For instance, given a dataframe df, df.queryExecution.analyzed.stats.sizeInBytes will give us that info.
The statistics collected from both dataframes are … simply jaw-dropping, to say the least.
Given these sizes, it’s no surprise Spark opted for a SortMergeJoin. However, there’s a perplexing issue: our big table isn’t as big as Spark’s stats suggest, and our small table doesn’t have any rows at all. Yet, according to Spark’s stats, both tables have exactly the same humongous size. WTF??? (x1)
Try #2: Reading Tables From Files
Despite the new questions arising from the previous test, we still needed to replicate that issue in our local environment.
For our second attempt, let’s read the two dataframes from files. We can get a medium-sized CSV file containing data for 500,000 people from this site for the “big” table, and for the small table, we can create another CSV file containing only the same header. This time, the test is simpler: we only need to create the two dataframes by reading their corresponding CSV files and then perform the join. You can find the source code for the second test here. Though, the query plan and the sizes we obtain now aren’t very promising either:
Once again, no OutOfMemory error was raised. However, this time Spark accurately estimated the size of the tables, selecting a Broadcast Hash Join strategy for the join and appropriately broadcasting the small table. Everything appears to be working smoothly… but frustratingly, we still can’t reproduce the issue. Again … WTF??? (x2)
Try #3: The Last One?
Let’s recap our findings from the previous tests:
- On one hand, it appears that when we create dataframes “on-the-fly” directly in memory, Spark struggles to accurately calculate or estimate their size.
- On the other hand, when we create dataframes by reading files from the filesystem, Spark promptly assigns them the size of their respective files.
So, if we create our small table in memory, its size is going to exceed the threshold for the autobroadcast mechanism to utilize it. However, if we also read our big table from a file, the size of the dataframe will match the size of this file. And if we use a format like Parquet, which allows for compression such as Snappy, the size of the dataframe will likely be less than the size of the final data loaded into memory, as compression is applied at the disk level but not in memory. Consequently, our big table could be eligible for broadcast if it’s small enough, or we can manually set the property spark.sql.autoBroadcastJoinThreshold to the size of our big table to achieve the desired effect.
That’s what our third test aims to explore and here’s the query plan:
Now, we seem to have stumbled upon what we were seeking. Spark opts for a Broadcast Hash Join and broadcasts the big table instead of the small one. But still, perplexingly, this test resists to throw an OutOfMemory error … at least, not yet. WTF??? (x3)
Could it be because we haven’t configured any memory parameters in our JVM? Depending on the available memory, the operating system, and the vendor and version of our JVM, the maximum size for the heap could vary. Let’s add the following flag -XX:+PrintFlagsFinal to our execution to know the MaxHeapSize used by default in our JVM. To make the textbox for JVM params visible in IntelliJ: Run/Debug Configuration > Modify options > Add VM Options.
Because this is a local test with relatively small data, let’s lower our MaxHeapSize by replacing the previous flag with -Xmx512M. Unfortunately, we still haven’t encountered any OutOfMemory errors, but we’re getting closer, I can feel it.
Now, let’s take a moment to think about the broadcasting process. At which step does it consume the most memory? Exactly, it’s during the creation of the hash table in the driver. And what data does this hash table contain? The values are the rows of the table to be broadcast, and the keys are the columns used in the join condition. If we perform the join using multiple fields instead of just one, the keys will become larger, resulting in more memory consumption.
So, the solution is simple: adding more fields to the join criteria and a little bit of HOCUS-POCUS!
Note: In the real scenario on Cloudera, the join condition involved 5 fields.
When A Small Table Becomes Big (Local Environment)
Understanding the internal workings of Spark’s Catalyst Optimizer sheds light on how join strategies are selected. Spark utilizes the class SparkStrategy to make these decisions. Typically, the Broadcast Hash Join emerges as the most cost-efficient strategy. Hence, in the absence of developer hints, Spark naturally leans towards employing it whenever feasible. To determine if broadcasting is viable, Spark ultimately invokes the canBroadcastBySizes method from this class. Below is the relevant code snippet from this class:
Here we can observe how Spark determines when to use broadcast based on the criteria for join compatibility and the need for at least one side to be “broadcastable”. In our previous tests, we used inner joins, which are fully compatible with broadcast. Therefore, Spark’s decision not to select the broadcast strategy in our first test was likely due to the sizes of the tables exceeding the autoBroadcastJoinThreshold property. In fact, they were significantly larger.
Spark estimates these sizes by examining the Logical Plan generated during its transformation stage. This plan acts as a blueprint for optimizing and executing the computation. Consequently, the query plan (or Physical Plan) executed directly depends on this preceding Logical Plan. By using the explain method on a dataframe, we can inspect its Physical Plan, and for a more detailed view, we can opt for the extended, verbose version. For instance, in our first test, executing joinDF.explain(extended=true) yields the following initial Logical Plan:
A LogicalRDD represents a node in Spark’s logical execution plan, corresponding to an RDD derived from an existing distributed dataset within Spark’s processing framework. This encompasses RDDs created from external data sources or existing RDDs within Spark.
An ExternalRDD, on the other hand, represents RDDs specifically created from external data sources. These sources may include files stored in HDFS, S3, databases, or any other external storage systems supported by Spark.
Both classes, LogicalRDD and ExternalRDD, can be found in the file ExistingRDD.scala. Interestingly, they both implement exactly the same computeStats method:
HOLY COW!!! No wonder whenever a dataframe is created using these nodes ends up getting always the maximum value.
When A Small Table Becomes Big (Cloudera Environment)
In the previous section, we observed how Spark assigns Long.MaxValue as the size for our small table. However, does this behavior still hold true in the Cloudera environment? Well, not quite. In the application submitted to the Cloudera cluster, no dataframes are created using methods like parallelize or createDataFrame. Instead, it loads all tables from the Hive Catalog using getTable. Despite this difference, Spark still returns Long.MaxValue for the size of the small table with zero rows. But why?
Well, it seems that when we call the getTable method to obtain a dataframe corresponding to a Hive table, Spark invokes HiveExternalCatalog, which in turn employs HiveClientImpl to retrieve the size of the table from its Hive metadata. HiveClientImpl contains the following code snippet to fetch the size of the table:
Here’s the thing, in this scenario, the small table is external and also has zero rows (other external tables with rows have a totalSize greater than zero, as opposed to what the comment in the method states, btw). Since both totalSize and rawDataSize are zero, Spark doesn’t create the statistics and ends up executing the apply method from the class DetermineTableStats. In this method, the size is set again as defaultSizeInBytes (which defaults to Long.MaxValue) unless the property spark.sql.statistics.fallBackToHdfs is set to true. If this property is enabled, Spark retrieves the size from HDFS and no OutOfMemory is raised, because Spark gets the right sizes.
Spark 3.5 Behaves The Same Way?
As stated before, all this analysis has been conducted using Spark 2.4.7. Does all this still applies to later versions of Spark? It seems so.
Here are the results of executing the same tests using Spark 3.5.0:
- FirstTry: Still performs a SortMergeJoin and the sizes of the two dataframes are also Long.MaxValue.
- SecondTry: Also performs a BroadcastHashJoin and gets the same sizes.
- ThirdTry: The same OutOfMemory error is thrown.
Conclusion
When I started investigating this issue, I already knew how to fix it: either disabling the auto broadcast mechanism for the whole application (setting spark.sql.autoBroadcastJoinThreshold to -1) or hinting the Catalyst Optimizer to broadcast the small table (using broadcast()). That’s unusual, but I wanted to understand how Spark works under the covers and why something naively so simple could lead to an OutOfMemory. What I found, not only enlightened me, but also made me more aware about the internals of Spark.
While there are numerous resources available on broadcast joins in Spark, many of them provide only limited explanations about how Spark selects the proper strategy and tend to overlook the details of how table sizes are calculated. By gaining insight into the somewhat sloppy mechanism by which Spark determines sizes, we’ve become better prepared to tackle potential OutOfMemory errors that may arise when using broadcast joins.
I hope you found this article as enjoyable to read as it was for me to write. Looking forward to the next installment!
Did you like this article? Continue reading other interesting episodes from the Apache Spark WTF??? series:
- The Deadly Filter: You’ll never see filters the same way.
Sick and tired of debugging some error or performance issue? I might help you! Send me your case! I’m genuinely passionate about intriguing errors and performance challenges — those are the things that keep me up at night and also thinking while walking my dog. I’ve been solving interesting cases for the last 15 years. so … don’t hesitate to reach me out at https://www.linkedin.com/in/angel-alvarez-pascua .