Sitemap
TDS Archive

An archive of data science, data analytics, data engineering, machine learning, and artificial intelligence writing from the former Towards Data Science Medium publication.

PySpark Data Skew in 5 Minutes

5 min readMay 10, 2022

--

Photo by John Bakator on Unsplash

There are lots of overly-complex posts about data skew, a deceptively simple topic. In this post, we will cover the necessary basics in 5minutes.

The primary source for this post was Spark: The Definitive Guide and here’s the code.

Let’s dive in…

What is Data Skew?

In spark, data are split into chunk of rows, then stored on worker nodes as shown in figure 1.

spark partition pyspark getNumPartitions() data skew cluster worker node core
Figure 1: example of how data partitions are stored in spark. Image by author.

Each individual “chunk” of data is called a partition and a given worker can have any number of partitions of any size. However, it’s best to evenly spread out the data so that each worker has an equal amount of data to process.

When the data are not balanced between workers, we call the data “skewed.”

If our machine has 8 worker nodes instead 3 as shown above, a perfectly distributed dataset would have the same number of rows on each as shown on the left in figure 2. A dataset with skew, on the other hand, would have a lot of data on some cores and very little data on others.

spark partition pyspark getNumPartitions() data skew cluster worker node core
Figure 2: even (left) vs. uneven (right) data skew. Image by author.

Why do I care?

Great, we know what skew is, but how does it impact our app? Some common results of data skew are…

  • Slow-Running Stages/Tasks: certain operations will take very long because a given worker is working with too much data.
  • Spilling Data to Disk: if data does not fit in memory on a worker, it will be written to disk which takes much longer.
  • Out of Memory errors: if worker runs out of disk space, an error is thrown.

Skewed data means uneven utilization of compute and memory resources.

How can I tell if my data are skewed?

If you’re experiencing some of the above symptoms and expect your data are skewed, you can use the following methods to diagnosis.

import pyspark.sql.functions as F
df.groupBy(F.spark_partition_id()).count().show()

The above code determines the key(s) that partition the data frame. This key can be a set of columns in the dataset, the default spark HashPartitioner, or a custom HashPartitioner.

Let’s take a look at the output…

spark partition pyspark getNumPartitions() data skew cluster worker node core
Figure 3: number of rows per spark_partition_id. Image by author.

In figure 3 we can see that the demo data created exhibits no skew — all row counts are identical in each partition. Great, but what if I want to see the data in each partition?

Well do do this we’ll access the underlying RDD and pull data by partition…

df.rdd.glom().collect()

.glom() returns a list of lists. The first axis corresponds to a given partition and the second corresponds to Row() objects in that partition. In figure 4 we’ve printed the first 2 Row() objects in each partition — printing all 125 Row()objects over 8 partitions isn’t easy to read.

spark partition pyspark getNumPartitions() data skew cluster worker node core
Figure 4: output of df.rdd.glom().collect() truncated to the first 2 rows in each partition. Image by author.

Pretty cool, right?

One word of caution, when using the .glom() method, you can easily overload your memory. If you’re working with large datasets, be sure to downsample so that whatever your collecting can fit into RAM.

How do I correct data skew?

Despite the above example, in the real world, perfect data distributions are rare. Often when reading data, we are pulling from pre-partitioned files or ETL pipelines which may not automatically be distributed as nicely.

To solve this problem, there are two main solutions…

1. Repartition by Column(s)

The first solution is to logically re-partition your data based on the transformations in your script. In short, if you’re grouping or joining, partitioning by the groupBy/join columns can improve shuffle efficiency.

df = df.repartition(<n_partitions>, '<col_1>', '<col_2>',...)

2. Salt

If you’re not sure what columns would lead to even workload by your app, you can use a random salt to evenly distribute data across cores. All we do is create a column with a random value the partition by that column…

import pyspark.sql.functions as Fdf = df.withColumn('salt', F.rand())
df = df.repartition(8, 'salt')

To check if our salt worked, we can use the same groupBy as above…

df.groupBy(F.spark_partition_id()).count().show()
spark partition pyspark getNumPartitions() data skew cluster worker node core
Figure 5: example distribution from salted keys. Image by author.

As you can see, we have some variation in the number of rows per partition, however our keys are fairly evenly distributed.

Conclusion

Now all those other complex posts do have valuable information —correctly partitioning data can have dramatic impacts on app performance. However, with the above information you (hopefully) have a framework to google your way to solutions.

That being said, here are some more complex quick tips:

  • You can check the distribution of task speed in the Spark UI. This is very helpful!
  • It’s often best to have the number of partitions be a multiple of the number of workers.
  • Here’s a quick overview of spark operations hierarchy.
  • A shuffle causes the data to be repartitioned. A good partition will minimize the amount of data movement needed by the program.
  • If you want a super practical and advanced resource on partition sizing, check this video.

Thanks for reading! I’ll be writing 12 more posts that bring academic research to the DS industry. Check out my comment for links to the main source for this post and some useful resources.

--

--

TDS Archive
TDS Archive

Published in TDS Archive

An archive of data science, data analytics, data engineering, machine learning, and artificial intelligence writing from the former Towards Data Science Medium publication.

Michael Berk
Michael Berk

Responses (1)