Expedia Group Technology — Data
Turbocharging Efficiency & Slashing Costs: Mastering Spark & Iceberg Joins with Storage-Partitioned
Optimizing batch data processing workflows with high-performance, cost-effective join techniques
Introduction
With the rise of cloud computing, optimizing data pipelines has become synonymous with cost savings. In recent years, Apache Spark stands out as the top framework for big data processing.
Moreover, new table formats like Iceberg, Delta, and Hudi have changed how we manage large datasets.
At Expedia Group™, we use Spark and Iceberg to improve our data processing workflows. One feature that promises to greatly improve our performance is the storage-partitioned join (SPJ), introduced with Spark 3.3 and Iceberg.
In the Analytics Data Engineering (ADE) team, one of our main challenges is to efficiently process large amounts of data. We always look for new ways to boost performance and optimize costs. Using storage-partitioned join (SPJ) will be highly effective in achieving those goals.
In this article, we will delve deeply into this feature, as it deserves more recognition than it currently receives.
Introduction to Spark
Apache Spark is an open-source data processing framework. It provides high-level APIs in Java, Scala, Python, and R, and an optimized engine that supports general execution graphs.
While Spark is versatile and supports both batch and streaming data processing, the most interesting aspect for us is Spark SQL, which enables querying structured data with SQL.
Introduction to Iceberg: Understanding why it’s a game-changer compared to Hive
Apache Iceberg is a high-performance table format for large analytics datasets. It was designed to improve on the limitations of Hive tables by providing features such as:
- ACID transactions: Iceberg provides full support for ACID transactions, ensuring data consistency and reliability.
- Schema evolution: Iceberg allows for the seamless evolution of schemas without the need for costly table rewrites.
- Partition evolution: Unlike Hive, Iceberg supports the evolution of partitioning schemas over time, making it easier to manage and optimize.
- Hidden partitioning: Iceberg automatically optimizes data layout and partitioning without exposing the complexity to the end user.
These features position Iceberg as a transformative force in the big data ecosystem. For more information, please refer to Apache Iceberg’s official documentation.
Road to SPJ: theory
Joins in a distributed system: data shuffle is the wolf behind the scenes
In distributed systems, non-broadcast joins are notoriously expensive operations. For such joins, data needs to be moved between different nodes in the cluster to align the keys being joined.
This process, known as shuffling, involves significant network I/O and can drastically impact performance. This is not only the case for joins, but for all wide transformations like groupBy, joins and reduceByKey.
The diagram above illustrates how data shuffling works in a distributed system like Spark. Data is distributed over the network across nodes, which are different physical machines in a cluster.
Each node contains multiple partitions, and data from these partitions is shuffled and redistributed to ensure that data related to the same key is collocated in the same partition.
This process allows for efficient parallel processing but is network I/O intensive.
Deep dive into Spark execution plans
Below, we’ll explore how Spark execution plans work. Understanding execution plans is essential for grasping performance, and mastering them is crucial for optimization.
Analyzing these plans helps us see what happens internally. Even if the plan isn’t the final one from Adaptive Query Execution (AQE), it still provides valuable insights.
Note: To ensure the discussion remains focused and accessible, we will not discuss features like Merge-on-Read (MoR) and Copy-on-Write (CoW), or runtime filters. These topics are already brilliantly covered by numerous articles and can be addressed separately.
- First, let’s create a target table using Iceberg and partitioned by
bucket(N, id)
:
CREATE TABLE target_table (
id INT,
description STRING,
some_column1 STRING,
some_column2 INT
) USING ICEBERG
PARTITIONED BY (bucket(N, id));
- We will use a query that performs a full outer join between two large tables with the same partition schema:
INSERT OVERWRITE target_table
SELECT
a.id,
b.description,
b.some_column1,
b.some_column2
FROM
table_a a
FULL JOIN
table_b b ON a.id = b.id;
Sort-merge join: the standard method for handling non-broadcasted joins
The sort-merge join is the default strategy for handling non-broadcasted joins in Spark. Involving the below steps:
- Scan table A and B: Spark scans both tables, applying optimizations like partition filters or push filters at this level.
- Exchange (id): Data is exchanged at the key level — in this case, id. The data is distributed across shuffle partitions, with the number of partitions determined by spark.sql.shuffle.partitions. The distribution is achieved through the formula hash_key(id) % spark.sql.shuffle.partitions. This step is very time-consuming, especially in terms of network I/O, due to the data movement between cluster nodes.
- Local sort (id): Each partition locally sorts the data on the id. This is a computationally intensive step.
- Sort-merge join: The sorted datasets are merged together, based on the join keys. This step ensures that matching keys from both datasets are joined together.
- Project: After the join, a projection operation is performed. This step involves selecting the required columns from the joined dataset. It helps reduce the amount of data processed in subsequent steps by eliminating unnecessary columns.
- Exchange (bucket(N, id)): Data is shuffled according to the target table’s partitioning schema. This step ensures that the data is correctly partitioned for writing.
Enable write fanout
write.spark.fanout.enabled default value triggers a sort before write
By default, Iceberg sets write.spark.fanout.enabled
to false
, which forces a local sort before writing the data. This setting ensures that the data is clustered, which can improve write performance.
However, in scenarios where local sorting is not necessary, you can skip this step by enabling the fanout writer.
To enable the fanout writer, which does not require data to be clustered but does use more memory, set the property to true
at the table level using the following code:
alter table set tblproperties ('write.spark.fanout.enabled'='true')
More details are available through the Iceberg official documentation.
Shuffle-hash join: an improvement removing two expensive operations
Shuffle-hash join improves performance over sort-merge join by eliminating the CPU-intensive sorting steps. Instead, it hashes the join keys and distributes the data across the cluster based on these hashes. The key difference is the removal of the two sorts, which reduces computational overhead and leads to significant performance gains.
However, Spark uses sort-merge join as the default join strategy for several reasons. Mainly, sort-merge join is more robust and can handle large datasets more efficiently. It does not require the data to fit into memory, as shuffle-hash join does, which makes it more suitable for scenarios where data sizes are unpredictable or exceed the available memory.
The steps involved are:
- Hashing: The join keys are hashed.
- Shuffling: Data is redistributed across the cluster based on the hash values.
- Joining: The data is joined based on the hashed keys.
By removing the need for sorting, shuffle-hash join reduces computational overhead and can lead to significant performance gains. To trigger shuffle-hash join, the following option must be set:
spark.conf.set("spark.sql.join.preferSortMergeJoin", value = false)
SPJ: the successor of bucketed join
The storage-partitioned join is a new approach built on the concept of bucketed joins, a feature of Hive tables. However, it’s important to note that bucketed joins are not available for v2 data sources like Delta, Hudi and Iceberg.
In a bucketed join, the data is pre-partitioned and stored in buckets, based on the join keys. This allows the join to be performed more efficiently by reducing the amount of data shuffled across the network.
Storage-partitioned join takes this a step further by leveraging the partitioning capabilities of modern table formats like Iceberg.
Data is partitioned and stored in a way that aligns with the join keys, enabling highly efficient joins with minimal data movement.
This approach significantly reduces the overhead associated with traditional joins and can lead to substantial performance improvements.
For further insights into SPJ implementation, I recommend reading its well-crafted design paper available here.
SPJ requirements
Triggering SPJ requires the following:
- The partition schema of the two joined Iceberg tables must be exactly the same.
- To eliminate table source sorts, spark.sql.join.preferSortMergeJoin must be set to false.
- The following configurations must be applied:
spark.conf.set("spark.sql.sources.v2.bucketing.enabled", value = true)
spark.conf.set("spark.sql.requireAllClusterKeysForCoPartition", value = false)
spark.conf.set("spark.sql.sources.v2.bucketing.pushPartValues.enabled", value = true)
spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", value = true)
spark.conf.set("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", value = true)
Results: achieving 45% to 70% savings on queries involving joins
Configuration
- Environment: AWS EMR with S3 storage, Spark 3.4.1 on EMR version 6.14, Iceberg 1.5.0.
- Cluster: driver on r7gd.4x and 12 worker nodes on r7gd.16x, totaling 768 working cores. The AWS catalog price for the cluster is $66.68 per hour. To simplify cost calculation, storage costs are not included.
- Spark configuration
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "1024m") # Spark partition size after AQE
spark.conf.set("spark.sql.shuffle.partitions", 7680) # Multiple of number of cores and let AQE do its job
Scenarios
The following scenarios are based on real-world data. Data size refers to deserialized data into memory:
- Scenario 1: Join between two medium tables (Table A: 2 TB, Table B: 50 GB)
- Scenario 2: Join between two big tables (Table A: 10 TB, Table B: 1 TB)
- Scenario 3: Join between three medium tables (Table A: 4 TB, Table B: 250 GB, Table C: 90 GB) with wide transformations later in the data pipeline
Performance comparison of join strategies
As you can see, the results are outstanding. The different join strategies demonstrate significant variations in both execution time and cost, highlighting the importance of choosing the appropriate join type for each scenario.
For instance, in Scenario 1, using a storage-partitioned join reduces the duration to just six minutes, with a cost of only $6.7, compared to 11 minutes and $12.2 for a sort-merge join.
Even better, Scenario 3 shows a dramatic reduction in execution time and cost when using the storage-partitioned join. These optimizations are not only efficient but also translate into substantial cost savings.
By combining these three use cases, we anticipate saving $5,000 for our next data pipeline.
Merge statement: the big winner
One of the most significant benefactors of the storage-partitioned join (SPJ) is the merge statement. Behind the scenes, a merge statement typically involves multiple join operations.
By leveraging SPJ, the merge statement can optimize these operations by removing expensive shuffles and sorts. This makes it a powerful feature for data pipelines that rely heavily on merge operations.
When not to use SPJ
While storage-partitioned join (SPJ) offers significant performance improvements for many scenarios, it’s important to keep in mind when it may not be the best choice.
Very often, for equi-joins, a broadcast join is still a better option whenever feasible. Since Spark 3, Adaptive Query Execution (AQE) automatically selects a broadcast join when feasible, even if it wasn’t part of the initial physical plan. Additionally, SPJ is not applicable for non-equi-joins.
Conclusion
By combining Spark and Iceberg, we’ve unlocked the potential of storage-partitioned joins, delivering outstanding performance and significantly reducing costs. This is a major step forward in optimizing query execution.
However, storage-partitioned joins, similar to the bucketed join, require both tables to follow the same partitioning schema, which can be challenging and may not suit all scenarios.
It’s important to remember that many factors influence performance, and focusing on just one component might overlook key aspects of the system. Additionally, while SPJ is a powerful feature within Spark with Iceberg, other tools in the broader data lake ecosystem may not include this feature. As these tools continue to develop, they might introduce similar functionalities, but they could face different challenges.
To truly understand the benefits and limitations of SPJ, it’s essential to carefully consider the specific needs of each use case.
Learn more
- Spark project improvement proposal: link
- Explore SPJ configurations: Storage Partitioned Joins in Spark SQL
- Data + AI summit by Databricks session