Spark join optimization on skew data using bin packing

Introduction:

Apache Spark is a powerful distributed framework for various operation on big data. As being such a system, one of the most important goals of the developer is distributing/spreading tasks evenly between all executors.

The low level of spark data structure is RDD which is split into partitions. For getting the most of spark the data should have equal size partitions.

Skew data means that the count of some keys in the dataset is much bigger than other keys. In join operation this cause uneven partitions by the key of the join.

This article aimed to cover my experiment reducing the running time by 80% and making job much more stable by identifying we have a skew data join issue and fix it with bin packing. I create open source project in github and also upload the package to PyPi.

How to identify the skew data issue in a join?

1. Job Monitoring

  • Although spark resource manager/spark UI is not very clear (at least for me), it can give good indication on problems and trouble shooting. If you see that most of task are finished in a short time and only small number of tasks are running (1–3) for most of the running time (much longer than other tasks), it can be the first hint.
1. Spark UI — 2 tasks are still running
  • The above scenario (a) stays the same even if you increase the cluster size. The running time is not improving linearity with the increasing of the cluster size. Monitoring CPU, memory and network can indicate that even if we increase significantly the cluster size, the heavy job is done by small number of nodes/executors (You can use Ganglia Monitoring System).

2 . Split “big” job to “chain of smaller” jobs

If you have multiple transformations that all accumulate to one action, I recommended splitting the transformations to more than one action so it would be easier to find the bottleneck operation (use write as action as checkpoint).

More than that consider breaking the “big” job to chain of “smaller” ones. Of course, doing so can easily influence on the total running time, but having “checkpoints”/” chain of small jobs” have the following advantages:

  • Ease debugging and focusing the bottlenecks and failures. (also use spark.sparkContext.setJobGroup before each action for tracking checkpoints in spark UI).
  • By splitting to small jobs, each spark job can have its own customize spark submit (for example customize executor-memory, executor-cores and co.)

3 . Analyze the input dataset

This is probably the most important stage in analyzing whether the data is skewed. You can run the query like the bellow:

Of course, if the join is based on more than column, we need to do weight query by the combination of all columns (or concat all columns to one column).

This data frame (df_join_key_count) should be examined carefully. In this stage you need to understand how the join key is spread over the dataframe (base on column or multiple columns).

In a reference to the join key, the dataset can come in variety of different forms. We assume that it can be symmetric or asymmetric data shape.

For taking advance of spark parallelism, the goal should be having a join key that is spread as symmetric flat. The count of each join key should be close to be equal.

In my scenarios the join keys were spread asymmetric, I had 6 join keys with count ~ 4.25M, but these join keys have a long tail that decreases till 1:

Y axis is the weight of keys and the X axis the number of keys that have this count.

2. weight by repeated keys

How we fix the skewed join key?

Spark is built to work with equal sized partitions and the goal is to spread partitions to executors evenly as they can. Hash partitioning tries to spread around the data as evenly as possible over all the partitions based on the join keys.

The dataframes are shuffled before join, due to reparation by the join keys. My solution is based on customize partitions using bin packing by the following steps:

1. Generate Key <-> Weight List

Run the count/weight query on the left df for getting the count of each join key.

Cast this DF to python list (collect to master is not recommended action, but after the group by we have smaller df)

2. Bin Packing

Use bin packing logic to add bin_id for each object in the python list. After calling this method, the count of most of keys in each bin will be equal or close to be equal as possible. Join key cannot be split between 2 bins.

3. Prepare DF for join

Expand the source dataframes with the new bin_id column (left and right).

4. Join Operation

Each df has now new key of bin_id and this is key is part of the join and running like the bellow query:

server load distribution in join operation from Ganglia

How to use this code?

Open source project:

github: https://github.com/ijan10/pyspark_handle_skewed_data.git

Installation: %>pip install join_skew_data

For using this code please be notice the bellow 2 methods:

  1. left_join_with_skew_key
  2. repartition_dfs_for_join

The first one is in case you wish to do simple join without additional expression and the function return transformation datafarme of the join results.

The second one is for cases you wish to add an expression or customize the join. In such case the method returns the dataframe after repartition with bin_id. Please keep in mind bin_id is now part of the join. We tested this solution with “big” and “small” tables with broadcast hint.

In my scenario, we succeed to reduce running time of the job from 6 hours to total 1-hour chain of jobs (on the same type of cluster). Also, the job became much more stable.