In this post, we’ll revisit a few details about partitioning in Apache Spark — from reading Parquet files to writing the results back. None of them is too surprising when you think about it, but they are often forgotten.
You are implementing event ingestion. Streaming pipeline reads from Kafka and writes files to a landing location. Since streaming must run fast, the files it creates are small and contain different types of events together. Once a day, you want to compact the events into a few large files, separated by event type. The simple code below looks easy and seems to solve the problem.
But if you think about it, quite a number of questions arise
- What determines the number of tasks when reading the source?
- What is the difference between
- How processing of partitions is allocated to executors
- How many files will be written
Let’s try to answer these questions.
Parquet read partitioning
If we read 200 input files in the Parquet format, how many tasks does Spark use? One guess is 200, one per file. Another guess is, remembering that Parquet files are composed of row groups, is that one task is created for each row group. In practice, there’s another complication, as shown below. First, each file is split into blocks of a fixed size (configured by the
In the example above, we’re reading 2 files, they are split into 5 pieces, and therefore 5 tasks will be created to read them. If you wish, you may refer to the actual splitting code and the binpacking code, which is slightly more complicated than the description above.
It is important that splitting ignores row group boundaries. Only when each task is executed, it reads the footer of the corresponding parquet file (where the metadata is stored), finds row group boundaries, and processes the row groups that start inside the byte range assigned for the task (see code). Below you can see which tasks are reading which row groups
Note that one task (the red one) reads two row groups, while the yellow one has nothing to do. If this complicated scheme can create a suboptimal plan, why is it used, and why can’t we just create a task per row group? Of course, we can, but it requires that the driver reads the footer of every file, and it might be slow both with HDFS and S3 storage.
When using a “classic” Hadoop setup, where compute and storage are on the same nodes, distributing the planning works pretty well. Each task is executed on the node with the data, quickly reads the Parquet file footer, and decides what to do.
When using S3 storage, things can be worse. To decide if there’s anything to do, each task must make an additional S3 request that can have 100ms of latency — often comparable to the time it takes to read the data itself. Worse, this can happen several times, e.g once in Spark code, and once in Parquet code. This latency issue is not easy to solve. For example, AWS EMR basically gives up and downloads entire objects to a local SSD.
In open-source Spark, our best option is to keep partitions large. In the earlier post, I recommended making sure Parquet files are around 1GiB in size, and the row group size is also large. Given the above details, I’d add another recommendation — increase the maxPartitionBytes option to be more than 1GiB.
Consider this code
Logically, this requests that further processing of the data should be done using 16 parallel tasks and that these tasks should receive rows separated according to the value of the device_id column. The number of tasks ideally is high enough to benefit from processing parallelism without spending too much time on coordination. The partitioning columns are selected so that rows that must be processed together end up in the same task (e.g. for groupBy or join), or simply to distribute data in approximately equal parts.
Physically, the operation that takes one dataframe and creates a differently-partitioned dataframe is called shuffle. It is commonly described as reading the rows of a dataframe, taking a hash of the partitioning columns, taking reminder modulo the desired number of partitions, and sending the row to the appropriate server.
Let’s now fill in some details.
Specifying partition number
Suppose we want to aggregate a dataframe
In order to compute order count for a city, a task must be able to combine all data for a city, and therefore data must be repartitioned by city. Spark is smart enough to do that automatically. Likewise, if you join two dataframe by a column, Spark will automatically repartition both dataframes by that column. Then, why do we need to ever repartition by hand, and specify the number of partitions?
One common case is where the default number of partitions, defined by
spark.sql.shuffle.partitions, is suboptimal. For a concrete example, consider the r5d.2xlarge instance in AWS. It has 8 cores (so can run 8 tasks in parallel) and 64Gib of memory, so each task gets 8GiB of memory. As a rule of thumb, the memory required to store data is 10 times the input size in Parquet. So, if each task reads 1GiB, it needs 10GiB of memory that we just don’t have. And note that we did not account for any other memory demands. While Spark can use disk for scratch data, it is considerably slower. Often, increasing the number of partitions and therefore decreasing data size per partition can eliminate disk usage and improve performance.
Of course, one can increase
spark.sql.shuffle.partitions for the entire job, but if it’s possible to customize just specific parts using explicit repartition. For aggregation, one might do
and for the join, one must repartition both sides.
.join(dim_device.repartition(512, $"device_id"), $"device_id")
Spark 3 has introduced adaptive execution that often can perform such repartition automatically. However, for performance-critical jobs explicit repartition is often beneficial.
The takeaway: keep track of input data size per task, and use explicit repartition to adjust it.
Specifying partition columns
Recall that our motivating example was to merely combine a lot of small files into a few large ones. There is no aggregation or join. If we know that 16 is a good number of files, can we just specify the number of partitions?
It will work but would have two subtle issues.
First, since we have no columns to determine destination partition, this will use round-robin partitioning. The first row goes to partition 0, the second row goes to partition 1, and so forth. But what is “the first row”, given that dataframe API does not guarantee a specific order of rows? If a task fails, and we retry it, we can get a different order, resulting in inconsistent row order between tasks, and incorrect results. To prevent this, Spark will additionally sort each partition on all columns. This additional pass is not the end of the world, but better be avoided.
Second, repartitioning by an id column is often better for compression and further processing. Events for a given device often have the same values for columns, and if they are always in the same partition, it reduces entropy and the Parquet encoder will do a better job.
Therefore, when you repartition a dataframe, specify columns if possible.
Physical push vs logical pull
It is common to say that shuffle (the physical operation behind repartition) decides which partition a row belongs to and sends it to the appropriate server. That could work on a static cluster with a single highly predictable job. That can’t work if executors are dynamically scaled and preempted all the time. So instead, shuffle works as two loosely coupled stages. The picture below assumes we have 3 input partitions and 2 output partitions.
First stage processes partitions of the input dataframe. Each row is tagged with the destination partition. Then, the rows are sorted, using the destination partition as a key, and stored on a local disk. Each partition is processed by a separate task, and the Spark scheduler decides on which executor to run that task — and that implicitly defines where the data is stored. Generally, no attempt is made to balance across executors/servers — a task is scheduled to the first available one. If you are interested in learning more, the entry point to this stage is the ShuffleMapTask class, with specific logic for sorting data according to destination partition is in ShuffleExchangeExec class.
The second stage processes the repartitioned data. Each task contacts the executors that have the data designated for it, fetches it locally and then begins processing. Again, the executor/server for each task is determined by the Spark scheduler. It generally does not matter how the data is distributed after the first stage. In the example above, the second task of the first stage has a lot of green data. It might sound reasonable to execute the second task of the second stage on the same server, to minimize data transfer. However, such perfect scheduling is a very hard problem and is not attempted in typical cases.
The first practical consequence here is that repartitioning is not just sending data over the network. It involves writing data to disk in one stage, reading sorted shuffle data, sending them over the network, and writing it to disk on another machine. Disk I/O can be 3x the input data size in our simple example. If we use network disks, we also add 3x network traffic. So, prefer instances with local disks. Especially given that for many cloud providers they might cost roughly the same as network disks.
The second consequence is that disk usage can also be higher than the data size. In general, Spark tries to process the shuffle data in memory, but it can be stored on a local disk if the blocks are too large, or if the data must be sorted, and if we run out of execution memory. It is easy to have double the disk usage, or even worse. If you run out of disk, an executor is killed, and the tasks are retried, and it’s very expensive. Therefore, make sure you detect out-of-disk conditions.
Parquet write partitioning
Now let’s focus on the writing part of our example:
The semantics of this code is that each row of the input data will be written to a subdirectory with the corresponding name. The picture below shows the physical execution.
The top half of the picture is identical to the partitioning case. In particular, each task sorts the data according to the partitioning columns. But there are important differences in the other half:
- With partitionBy, there is no repartition in the Spark execution plan. In other words, the red and green rows are never collected by Spark executors — they are only placed nearby in the storage
- The number of the written objects is considerably higher than we’d expect
The last point is worth reiterating. Recall our original example
If we have 16 partitions by device_id and 500 different event types, the code writes 8000 objects. Ouch, that’s hardly the compaction we were after. In our case, we’d better use the same column for both data frame and write partitioning:
In the example above, we repartition dataframe by type, and therefore each write task will only receive events of a single type (or, sometimes, several types) and write one or a few files, just like we wanted. Therefore, make sure your DataFrame partitioning always matches write partitioning.
That is still not perfect — there are almost always event types with a lot of data, and event types with almost none, and in the scheme above the hot event types might have too much data for a single task to handle. However, addressing that issue will take a blog post of its own, and our code is often is good enough.
We went through a few details of Spark partitioning
- What partitions are determined when reading Parquet files
- Why would you need explicit repartition, why do you need to specify columns, and how does it work
- What is write partitioning, how it differs from dataframe partitioning, and what are the hidden gotchas?
If you found this post useful, you might want to also read Spark partitioning: full control, where I describe a technique for precisely controlling partitioning.