Managing Partitions with Spark

Irem Ertürk
6 min readApr 3, 2022

--

Partitioning in workspace by Cartoonstock

If you ever wonder why everyone moved from Hadoop to Spark, I highly recommend understanding the differences between memory and disk-based operations. I will not deep dive into that topic here as there is already a good set of resources that you can watch and read through(my favorite resource is that Youtube video ).

To be able to fully take advantage of Apache Sparks’ in-memory data strategy understanding how to manage partitions in Apache Spark is necessary. If we understand how partitioning happens within Spark it would easier to stay in the in-memory space without moving to disk-space calculations (in other words shuffles). Here, what I will be focusing on the best practices for partitioning the data and various Spark partitioning methods.

The main idea of partitioning is to optimize job performance. The job performance can be obtained by ensuring each workload is distributed equally to each Spark executor. Unfortunately, there is no single rule of thumb for that but there are a couple of things that we need to keep eye on.

First, we need to ensure that, there is no executor without work, and non of the executors become a bottleneck due to unbalanced work assignments.

  • Avoid having big files, as each Spark executor can handle one partition at a time, if we have fewer partitions than our Spark executors, the remaining executors will stay idle and we are not able to advantage fully of the existing resources.
  • Avoid having lots of small files, as that requires more network communication to access each small file sitting on the data lake (such as AWS S3, Google Cloud Storage, etc.) and computation may require lots of shuffling data on disk space.

On the other hand, job performance is also highly dependent on the computation needs, as I have mentioned above Spark is designed to advantage from in-memory calculations, but if our computational needs require to access other partitions the calculations happen by on-disk. Therefore we need to consider our calculation needs and partition our data to reduce shuffling.

  • Do not partition by columns with high cardinality
  • Partition by specific columns that are mostly used during filter and groupBy operations.
  • Even though there is no best number, it is recommended to keep each partition file size between 256MB to 1GB.

Let's start with understanding the different methods that exist in PySpark (Python interface for Spark)

  • repartition(numsPartition, cols) By numsPartition argument, the number of partition files can be specified. On the other hand, the cols argument ensures that only one partition is created for the combination of column values.
  • coelesce(numPartitions) is optimized for reducing the partition numbers (by not shuffling data but not ensuring exact equality in distribution), therefore whenever a partition number is needed to be reduced, coalesce should be used.
  • partitionBy(cols) is used to define the folder structure of data, however, there is no specific control over how many partitions are going to be created.

Let's do some experiments by using different partition methods and investigate the partition number, file sizes, and folder structures. During this exercise, the TLC Trip Record Data’s green taxi datasets between 2019 to 2020 are used and loaded into the green_df data frame.

Coalesce vs Repartition

df_coalesce = green_df.coalesce(8)
df_coalesce \
.write \
.mode("overwrite") \
.csv("data/partitions/coalesce_8.csv", header=True)
print(df_coalesce.rdd.getNumPartitions())

The code snippet above creates eight partitions for the green_df and saves partitions into partiotions/coalesce_8.csv . Below you can see the created partitions and corresponding file sizes. It is important to notice that, the file sizes vary between partitions, as the coalesce does not shuffle data between the partitions to the advantage of fast processing with in-memory data.

On the other hand, if we use repartition function to create an equal number of partitions, we will see that the partition files’ sizes are quite similar.

df_repartition8 = green_df.repartition(8)
print(df_repartition8.rdd.getNumPartitions())
df_repartition8 \
.write \
.mode("overwrite") \
.csv("data/partitions/repartition_8.csv", header=True)

Repartition by Columns

We can use repartition function to partition by column values as well. In the below code snippet the partition happens based on the payment_type column and there will one partition file will be created for each unique payment_type. That approach ensures that all data that is associated with the specific payment_type resides in one partition file.

df_repartition_paymenttype = green_df.repartition("payment_type")
print(df_repartition_paymenttype.rdd.getNumPartitions())
df_repartition_paymenttype \
.write \
.mode("overwrite") \
.csv("data/partitions/repartition_col_v1.csv", header=True)

As seen below, the size of partition files may vary a lot, depending on the data distribution based on selected columns.

If you check one of the partition file content, we will only see rows that are specific to one payment type.

df_sample = spark \
.read \
.csv('data/partitions/repartition_col_v1.csv/part-00003-a5c7301a-5575-41d0-a2f8-7f3a8ca6c790-c000.csv', header=True)
df_sample.select('payment_type').distinct().collect()

PartitionBy

Different from the coalesce andrepartition functions, partitionBy effects the folder structure and does not have a direct effect on the number of partition files that are going to be created nor the partition sizes. It just ensures that the folder structure created and data is split respectively based on specified column combinations. As seen below, on the top level we have years and then months, and finally the actual partition files.

green_df \
.write \
.partitionBy("pickup_year", "pickup_month") \
.mode("overwrite") \
.csv("data/partitions/partitionBy.csv", header=True)

Repartition and PartitionBy

As seen in the previous example, partitionBy does not have control over how many partition files are created. But we might want to have a single partition file for each column combination. That is possible by using repartition and partitionBy together over the same columns.

green_df \
.repartition("pickup_year") \
.write \
.partitionBy("pickup_year") \
.mode("overwrite")\
.csv("data/partitions/repartion_partionBy_col.csv", header=True)

As seen in below figure, repartition ensures a single partition file is created for the specified column combinations (in here only by a year) and partitionBy ensures the folder structure.

But what about if we specify the partition number with repartition and file structure with partitionBy. In such a case, repartition ensures that each folder contains a maximum of a specified number of partitions.

green_df \
.repartition(2) \
.write \
.partitionBy("pickup_year") \
.mode("overwrite")\
.csv("data/partitions/repartion_partionBy_num.csv", header=True)

Hope that helps you to understand different usages of partition files. You can find the Jupyter notebook that is created for the experiment on GitHub.

Furter Resources

--

--