Data joins in distributed systems.

Understand how joins are handled by modern data warehouses and data processing tools.

Shailav Shrestha
8 min readJan 27, 2022

“I doubled my nodes, why is my join still so slow!!? “

As someone who often works with large amounts of data, one problem that frequently bugs me is slow data joins. And most of the time, the problem is not due to the number of nodes but rather, due to not utilizing the nodes appropriately. In my quest to optimize data joins, one thing was clear: unless I understood how joins work, there is little that I could do to optimize them.

In this story, I will cover the basics of what happens under the hood when you join two different distributed datasets. I will also cover the different approaches to JOINS in a distributed architecture. For most of my illustrations, I will use a table/relation, however know that the idea extends well beyond them.

As is typical in my blogs, rather than focusing on theoretical aspects, I will focus on examples to hit the main ideas.

Preamble: How is data stored in a distributed system?

Let’s start with a scenario. Say you have a dataset that does not fit into memory (or disk). No matter what you do, it is impossible to process, let even store that data in your computer. What do you do?

A large table partitioned into 2 smaller tables using user_id as partition column

In such a scenario, a reasonable approach is to break down the data into two smaller datasets, store them separately into two different machines, and then process them separately.

We call this approach partitioning. A partition key is generated from each record in a dataset and then based on the key, the dataset is divided.

In the example, you can observe the large dataset being partitioned based on user_id. We observe two partitions in the example however in general, we can control the number of partitions as per our requirement (such as based on the number of nodes).

Joins on distributed datasets — Setup

When our dataset is partitioned across several nodes, handling data join becomes trickier. It is not as straightforward as it would be in a single machine. There are a handful of approaches that are generally used for data joins. To illustrate these different approaches, we will be using the following scenario.

Scenario :

As illustrated in the diagram, we work with two relations.

  1. USER relation that contains information about the age of a user. The relation is partitioned by user_id across 2 nodes
  2. GIFT relation that contains information about what gift to provide for each age. The GIFT relation is partitioned by age across 2 nodes

Goal :

We want to see all the gifts based on user_id (instead of age). This would require a join between USER and GIFT relation with age as the join column.

Scenario Illustration. We want to join datasets that are partitioned across multiple nodes

While there are many other approaches, we will discuss only two of them. Many other join approaches are derived using similar ideas so once you understand the following, the other joins are relatively easy to catch up.

  1. Sort-merge joins (which can be categorized as a Reduce-Side Join)
  2. Broadcast hash Joins (which can be categorized as a Map-Side Join)

The primary approach in both of these joins is similar — bring all of the related data to the same place and then process the joins locally. We call this “co-locating” the pertinent data.

In the scenario above, you can observe that the primary problem we have is that all of the records which need to be joined (based on our join column — age) are not present on the same node. For example, take age=15. All the records in the GIFT table with age = 15 are in node 1 so that’s fine. However observe that for the USER table, one record is is node 1 (user_id=2, age=15) and the other record is in node 2 (user_id = 4, age=15). Since these data are in different nodes, we cannot join them efficiently.

Below, we will discuss how the two joins tackle this problem and how the data join finally occurs.

Joins on distributed datasets — Sort-merge Joins

In sort-merge joins, colocation of related data is done by repartitioning the data on the basis of the join key/column. In essence, we ensure that all the records in both of the relations with the same join column value will be sent to the same node.

In the example, observe that in both the GIFT table and USER table, all the records with age in [15, 17, 19 (odd values)] are partitioned to one node and all the records with age in [16, 18, 20 (even values)] are partitioned to another node. After repartitioning we can be assured that node 1 will not require any data present in node 2 and vice versa.

Sort-merge join. Illustrates the behavior across map, shuffle and reducer phases

Sort-merge join operates in the following steps :

  • In each node, a mapper function is called. The mapper function generates a map key based on the values of the join key. A map key does not have to be unique, rather all records with the same join column value should generate the same map key. In our example, for simplification, we directly use the join key (age) as the map key but know that some deterministic function of the join key is used in the actual case.
  • A global shuffle step then occurs where (typically) a driver node decides which nodes should deal with which partitions. It then instructs the nodes to synchronize between themselves so that each node has an exclusive copy of all the partitions they are supposed to deal with. After the global shuffle step, the nodes can be assured that no other nodes have the partitions that they are currently working on. This assures some useful guarantees like global uniqueness/no data redundancy. In our example, observe how the shuffle step takes in the keys generated by the mapper function and then redistributes them between the nodes such that node 1 only deals with odd map keys and node 2 only deals with even map keys.
  • Finally, on each node, a reducer function is called. The reducer function organizes the data based on the relation, sorts the data by the join keys, and then merges the data to obtain the final join result. (Hence termed — sort-merge). Since the actual join occurs on the reducer side, it is categorized as a reduce-side join operation. Note that the sort operation before joining results in a significant performance advantage as after the sort all the records with the same join key values will become adjacent and hence allow for efficient merge. (If you are familiar with the merge sort algorithm, the idea is similar to the merge process in the merge sort where because both the sub-arrays are pre-sorted, we can efficiently merge them)

Sort-merge is probably the most commonly used approach to distributed joins and for good reasons! As long we can scale out and have sufficient network bandwidth, we can handle the data at any scale. It is the default approach in tools like Apache Spark. (I am going to conveniently gloss over the possibility of data skew. However, note that few problems can arise in sort-merge joins. I will be covering more about it in a future blog.)

Joins on distributed datasets — Broadcast Hash Joins

There is one problem with sort-merge joins. A shuffle is an extremely expensive process. It consumes a lot of compute resources and network bandwidth. So (in most cases, not always!) as a rule of thumb, if we can avoid using a shuffle, we should avoid a shuffle.

Can we perform a join without a shuffle?

If both of the relations we deal with are extremely large, then it’s difficult to avoid a shuffle (unless we use partitioned hash joins— which makes certain assumptions about the data partitions.).

However, if we are confident that at least one of the relations we are dealing with is small, in such a case we can completely avoid a shuffle. The only caveat is that this small table should be small enough to fit in the memory of all the nodes. (Otherwise, data may spill to disk and things might backfire!)

Broadcast and hash structures for efficient joins

Broadcast hash joins approaches the distributed join problem by completely avoiding a shuffle. The basic idea is that instead of shuffling the data across nodes, let us “broadcast” the table itself so that each node participating in the join has a copy of the entire table. In such a case, by utilizing in-memory hash structures to store the small table, we can efficiently perform the required joins locally.

In broadcast hash join, the mapper takes care of the join and there is no reducer step. Hence this join is categorized as a map-side join.

Broadcast Hash Joins

In the above diagram, we can be confident that GIFT is a small table (since there is only one record for each age). Hence the GIFT table is broadcasted to each node. Each node then stores the broadcasted table using efficient hash structures and the mapper function takes in both the GIFT table and the partitioned USER table to produce the final join output.

Joins on distributed datasets — Final Comments

The approaches discussed above are only a small subset of different approaches available. Depending on how the data is partitioned/ structured, we can use that information to perform more efficient joins. For example, partitioned hash join takes advantage of the fact that the tables are pre-partitioned based on join columns, map-side merge join assumes that the tables are not only partitioned but also sorted on the basis of the join key. Generally, these approaches can be categorized into either map-side or a reduce-side join depending on whether it is the mapper or the reducer that performs the join.

There is no single approach that performs the best in all cases. But rather it is a choice that the data engineer has to make to solve the problem at hand. Most query optimizers today abstract away this problem by utilizing detailed table statistics but they are not always perfect. As data engineers, we must be in a position to identify, reason with, and resolve any potential problems that may arise due to inefficient data joins.

Thank you for reading.

--

--