Expedia Group Technology — Data

Turbocharging Efficiency & Slashing Costs: Mastering Spark & Iceberg Joins with Storage-Partitioned

Optimizing batch data processing workflows with high-performance, cost-effective join techniques

Samy Gharsouli
Expedia Group Technology

--

Camels in the desert
Photo by Sergey Pesterev on Unsplash

Introduction

With the rise of cloud computing, optimizing data pipelines has become synonymous with cost savings. In recent years, Apache Spark stands out as the top framework for big data processing.

Moreover, new table formats like Iceberg, Delta, and Hudi have changed how we manage large datasets.

At Expedia Group™, we use Spark and Iceberg to improve our data processing workflows. One feature that promises to greatly improve our performance is the storage-partitioned join (SPJ), introduced with Spark 3.3 and Iceberg.

In the Analytics Data Engineering (ADE) team, one of our main challenges is to efficiently process large amounts of data. We always look for new ways to boost performance and optimize costs. Using storage-partitioned join (SPJ) will be highly effective in achieving those goals.

In this article, we will delve deeply into this feature, as it deserves more recognition than it currently receives.

Introduction to Spark

Apache Spark is an open-source data processing framework. It provides high-level APIs in Java, Scala, Python, and R, and an optimized engine that supports general execution graphs.

While Spark is versatile and supports both batch and streaming data processing, the most interesting aspect for us is Spark SQL, which enables querying structured data with SQL.

Introduction to Iceberg: Understanding why it’s a game-changer compared to Hive

Apache Iceberg is a high-performance table format for large analytics datasets. It was designed to improve on the limitations of Hive tables by providing features such as:

  • ACID transactions: Iceberg provides full support for ACID transactions, ensuring data consistency and reliability.
  • Schema evolution: Iceberg allows for the seamless evolution of schemas without the need for costly table rewrites.
  • Partition evolution: Unlike Hive, Iceberg supports the evolution of partitioning schemas over time, making it easier to manage and optimize.
  • Hidden partitioning: Iceberg automatically optimizes data layout and partitioning without exposing the complexity to the end user.

These features position Iceberg as a transformative force in the big data ecosystem. For more information, please refer to Apache Iceberg’s official documentation.

Road to SPJ: theory

Joins in a distributed system: data shuffle is the wolf behind the scenes

In distributed systems, non-broadcast joins are notoriously expensive operations. For such joins, data needs to be moved between different nodes in the cluster to align the keys being joined.

This process, known as shuffling, involves significant network I/O and can drastically impact performance. This is not only the case for joins, but for all wide transformations like groupBy, joins and reduceByKey.

An image showcasing data shuffling across nodes
data shuffling across nodes

The diagram above illustrates how data shuffling works in a distributed system like Spark. Data is distributed over the network across nodes, which are different physical machines in a cluster.

Each node contains multiple partitions, and data from these partitions is shuffled and redistributed to ensure that data related to the same key is collocated in the same partition.

This process allows for efficient parallel processing but is network I/O intensive.

Deep dive into Spark execution plans

Below, we’ll explore how Spark execution plans work. Understanding execution plans is essential for grasping performance, and mastering them is crucial for optimization.

Analyzing these plans helps us see what happens internally. Even if the plan isn’t the final one from Adaptive Query Execution (AQE), it still provides valuable insights.

Note: To ensure the discussion remains focused and accessible, we will not discuss features like Merge-on-Read (MoR) and Copy-on-Write (CoW), or runtime filters. These topics are already brilliantly covered by numerous articles and can be addressed separately.

  • First, let’s create a target table using Iceberg and partitioned by bucket(N, id):
CREATE TABLE target_table (
id INT,
description STRING,
some_column1 STRING,
some_column2 INT
) USING ICEBERG
PARTITIONED BY (bucket(N, id));
  • We will use a query that performs a full outer join between two large tables with the same partition schema:
INSERT OVERWRITE target_table
SELECT
a.id,
b.description,
b.some_column1,
b.some_column2
FROM
table_a a
FULL JOIN
table_b b ON a.id = b.id;

Sort-merge join: the standard method for handling non-broadcasted joins

An image showcasing sort merge join execution plan with two batches of data
Sort-merge join execution plan

The sort-merge join is the default strategy for handling non-broadcasted joins in Spark. Involving the below steps:

  1. Scan table A and B: Spark scans both tables, applying optimizations like partition filters or push filters at this level.
  2. Exchange (id): Data is exchanged at the key level — in this case, id. The data is distributed across shuffle partitions, with the number of partitions determined by spark.sql.shuffle.partitions. The distribution is achieved through the formula hash_key(id) % spark.sql.shuffle.partitions. This step is very time-consuming, especially in terms of network I/O, due to the data movement between cluster nodes.
  3. Local sort (id): Each partition locally sorts the data on the id. This is a computationally intensive step.
  4. Sort-merge join: The sorted datasets are merged together, based on the join keys. This step ensures that matching keys from both datasets are joined together.
  5. Project: After the join, a projection operation is performed. This step involves selecting the required columns from the joined dataset. It helps reduce the amount of data processed in subsequent steps by eliminating unnecessary columns.
  6. Exchange (bucket(N, id)): Data is shuffled according to the target table’s partitioning schema. This step ensures that the data is correctly partitioned for writing.

Enable write fanout

An image explaining write.spark.fanout.enabled default value triggers a sort before write
write.spark.fanout.enabled default value triggers a sort before write

By default, Iceberg sets write.spark.fanout.enabled to false, which forces a local sort before writing the data. This setting ensures that the data is clustered, which can improve write performance.

However, in scenarios where local sorting is not necessary, you can skip this step by enabling the fanout writer.

To enable the fanout writer, which does not require data to be clustered but does use more memory, set the property to true at the table level using the following code:

alter table set tblproperties ('write.spark.fanout.enabled'='true')

More details are available through the Iceberg official documentation.

An image of the Execution plan with write.spark.fanout.enabled
Execution plan with write.spark.fanout.enabled

Shuffle-hash join: an improvement removing two expensive operations

An image showcasing how shuffle-hash join remove 2 expensive cpu intensive steps
Shuffle-hash join remove 2 expensive cpu intensive steps

Shuffle-hash join improves performance over sort-merge join by eliminating the CPU-intensive sorting steps. Instead, it hashes the join keys and distributes the data across the cluster based on these hashes. The key difference is the removal of the two sorts, which reduces computational overhead and leads to significant performance gains.

However, Spark uses sort-merge join as the default join strategy for several reasons. Mainly, sort-merge join is more robust and can handle large datasets more efficiently. It does not require the data to fit into memory, as shuffle-hash join does, which makes it more suitable for scenarios where data sizes are unpredictable or exceed the available memory.

The steps involved are:

  1. Hashing: The join keys are hashed.
  2. Shuffling: Data is redistributed across the cluster based on the hash values.
  3. Joining: The data is joined based on the hashed keys.

By removing the need for sorting, shuffle-hash join reduces computational overhead and can lead to significant performance gains. To trigger shuffle-hash join, the following option must be set:

spark.conf.set("spark.sql.join.preferSortMergeJoin", value = false)
An image explaining how a shuffle hash join execution plan is still overwhelmed by the 2 exchanges
Shuffle hash join execution plan is still overwhelmed by the 2 exchanges

SPJ: the successor of bucketed join

storage partitioned join execution plan removes 2 exchanges and 2 sorts

The storage-partitioned join is a new approach built on the concept of bucketed joins, a feature of Hive tables. However, it’s important to note that bucketed joins are not available for v2 data sources like Delta, Hudi and Iceberg.

In a bucketed join, the data is pre-partitioned and stored in buckets, based on the join keys. This allows the join to be performed more efficiently by reducing the amount of data shuffled across the network.

Storage-partitioned join takes this a step further by leveraging the partitioning capabilities of modern table formats like Iceberg.

Data is partitioned and stored in a way that aligns with the join keys, enabling highly efficient joins with minimal data movement.

This approach significantly reduces the overhead associated with traditional joins and can lead to substantial performance improvements.

For further insights into SPJ implementation, I recommend reading its well-crafted design paper available here.

SPJ requirements

Triggering SPJ requires the following:

  1. The partition schema of the two joined Iceberg tables must be exactly the same.
  2. To eliminate table source sorts, spark.sql.join.preferSortMergeJoin must be set to false.
  3. The following configurations must be applied:
spark.conf.set("spark.sql.sources.v2.bucketing.enabled", value = true)
spark.conf.set("spark.sql.requireAllClusterKeysForCoPartition", value = false)
spark.conf.set("spark.sql.sources.v2.bucketing.pushPartValues.enabled", value = true)
spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", value = true)
spark.conf.set("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", value = true)

Results: achieving 45% to 70% savings on queries involving joins

Configuration

  • Environment: AWS EMR with S3 storage, Spark 3.4.1 on EMR version 6.14, Iceberg 1.5.0.
  • Cluster: driver on r7gd.4x and 12 worker nodes on r7gd.16x, totaling 768 working cores. The AWS catalog price for the cluster is $66.68 per hour. To simplify cost calculation, storage costs are not included.
  • Spark configuration
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "1024m")  # Spark partition size after AQE
spark.conf.set("spark.sql.shuffle.partitions", 7680) # Multiple of number of cores and let AQE do its job

Scenarios

The following scenarios are based on real-world data. Data size refers to deserialized data into memory:

  1. Scenario 1: Join between two medium tables (Table A: 2 TB, Table B: 50 GB)
  2. Scenario 2: Join between two big tables (Table A: 10 TB, Table B: 1 TB)
  3. Scenario 3: Join between three medium tables (Table A: 4 TB, Table B: 250 GB, Table C: 90 GB) with wide transformations later in the data pipeline

Performance comparison of join strategies

SPJ is a cost-breaker

As you can see, the results are outstanding. The different join strategies demonstrate significant variations in both execution time and cost, highlighting the importance of choosing the appropriate join type for each scenario.

For instance, in Scenario 1, using a storage-partitioned join reduces the duration to just six minutes, with a cost of only $6.7, compared to 11 minutes and $12.2 for a sort-merge join.

Even better, Scenario 3 shows a dramatic reduction in execution time and cost when using the storage-partitioned join. These optimizations are not only efficient but also translate into substantial cost savings.

By combining these three use cases, we anticipate saving $5,000 for our next data pipeline.

Merge statement: the big winner

One of the most significant benefactors of the storage-partitioned join (SPJ) is the merge statement. Behind the scenes, a merge statement typically involves multiple join operations.

By leveraging SPJ, the merge statement can optimize these operations by removing expensive shuffles and sorts. This makes it a powerful feature for data pipelines that rely heavily on merge operations.

When not to use SPJ

While storage-partitioned join (SPJ) offers significant performance improvements for many scenarios, it’s important to keep in mind when it may not be the best choice.

Very often, for equi-joins, a broadcast join is still a better option whenever feasible. Since Spark 3, Adaptive Query Execution (AQE) automatically selects a broadcast join when feasible, even if it wasn’t part of the initial physical plan. Additionally, SPJ is not applicable for non-equi-joins.

Conclusion

By combining Spark and Iceberg, we’ve unlocked the potential of storage-partitioned joins, delivering outstanding performance and significantly reducing costs. This is a major step forward in optimizing query execution.

However, storage-partitioned joins, similar to the bucketed join, require both tables to follow the same partitioning schema, which can be challenging and may not suit all scenarios.

It’s important to remember that many factors influence performance, and focusing on just one component might overlook key aspects of the system. Additionally, while SPJ is a powerful feature within Spark with Iceberg, other tools in the broader data lake ecosystem may not include this feature. As these tools continue to develop, they might introduce similar functionalities, but they could face different challenges.

To truly understand the benefits and limitations of SPJ, it’s essential to carefully consider the specific needs of each use case.

Learn more

Learn about life at Expedia Group

--

--

Responses (1)