How to solve the “large number of small files” problem in Spark
A solution for the "large number of small files" problem
Big Data is no longer a buzzword because everyone knows the power of data and what it can do. Spark is a robust framework for processing Big Data. It has many advantages over traditional data processing frameworks but has some limitations. One of the most significant limitations is that it stores the output in many small-size files while using object storage systems like HDFS, AWS S3, etc. This is because Spark is a parallel processing framework, and when the parallel tasks process the data, they store the output in multiple files across the partitions. So let's say your Spark application has 200 parallel tasks and 10 partitions; then the Spark application can potentially store results into 200 * 10 = 2000 files. Running this job multiple times daily will create even more files in object storage. This is called a large number of small files problem in Spark.
In this article, I shall explain different ways to solve this problem. This article will help Data Engineers optimize their Spark applications' output storage.
Why do we consider "a large number of small files" a problem?
Below are some reasons why we consider "a large number of small files" as an issue.
- Name Node overhead
In HDFS, files are split into blocks that are stored and replicated on the Data Nodes. Each block has a size of 128 MB, and every block has metadata associated with it. The HDFS namespace tree and associated metadata are maintained as objects in the NameNode's Main Memory, each occupying approximately 150 bytes. So, the Name Node has to manage these objects and serve the read/write requests. A large number of small blocks increases the overhead of the Name Node. When the Spark application reads data from HDFS, It has to serve more read requests for blocks. See the figure below, which illustrates HDFS architecture well.
- Increased metadata overhead on query execution
Writing small files to object storage is very straightforward, but queries over them will run much slower or fail to complete. Querying many small files incurs overhead to read metadata, conduct a non-contiguous disk seek, open the file, close the file and repeat. The overhead is only a few milliseconds per file, but those milliseconds add up when you're querying millions of files. Hive is an excellent tool for querying files stored in HDFS. It is a data warehousing solution built on top of HDFS. It supports schema-on-write, partitioning, and indexing data to speed up queries, but none works when we have large numbers of small files in HDFS. - Inadequate query performance because of skewed data
Data skewness can hamper the query performance. Spark tries to distribute an equal workload to all tasks when a query is executed. When Spark executes a query, specific tasks may get many small-size files, and the rest may get big-size files. For example, 200 tasks are processing 3 to 4 big-size files, and 2 are processing 1000 small-size files. Those two tasks take more time to finish processing than the other 200 tasks. This is how the query performance is impacted.
Solutions to "the large number of small files" issue
We covered how a large number of small files can hamper the cluster and performance of the application. There are different ways to solve this problem.
- We can reduce the parallelism of an application to reduce the number of small files.
- Also, we can optimize the dataframe before writing it to the object storage.
Let's see how we can implement these solutions…
Reduce parallelism
When the Spark application is executed, it creates a DAG. Then the DAG Scheduler creates stages, which are further divided into several tasks. A task is a unit of work that sends to the executor. The Task Scheduler schedules task execution on executors. Each stage has some task, one task per shuffle partition. The Same task is done over different partitions of RDD. Each task processes each shuffle partition.
The number of shuffle partitions depends on two properties.
spark.default.parallelism
spark.sql.shuffle.partitions
The property spark.default.parallelism
was introduced with RDD
hence this property is only applicable to RDD. This property defines the initial number of partitions created when creating an RDD. The default value for this configuration is set to the number of all cores on all nodes in a cluster. If you are running Spark locally, it is set to the number of cores on your system. You can set this property as below:
spark.default.parallelism = <<integer value>>
The property spark.sql.shuffle.partitions
controls the number of shuffle partitions created after wide transformation for RDD and dataframe. Whenever wide transformations such as join()
, agg()
, etc., are executed by a Spark application, it generates N shuffle partitions where N is the value set by the spark.sql.shuffle.partitions
property. Spark can potentially create N files in the output directory. If you are storing a dataframe in a partitioned table, the total number of files will equal N * <potential table partitions> in the output directory. The default value for this property is 200. You should set the number of shuffle partitions according to your cluster's data size and available resources. The number of partitions should be multiple of the number of executors you have so that partitions can be equally distributed across tasks. You can set this property as below:
spark.sql.shuffle.partitions = <<integer value>>
Reducing the shuffle partitions value reduces small files because the same-size data gets distributed across the available shuffle partitions after decreasing shuffle partitions.
Reduce shuffle partitions before writing a dataframe
The simplest and most commonly used technique is to reduce shuffle partitions of the dataframe before saving it to the table. For repartitioning, Spark has two functions — repartition()
and coalesce()
. We can apply any one of the above functions to reduce the partitions. The coalesce()
function works much faster when you reduce the number of partitions because it sticks input partitions together but coalesce()
doesn't guarantee uniform data distribution. The repartition()
function re-shuffles data across shuffle partitions, providing better distribution of data, but there is a lot of shuffling involved when using repartition()
compared to coalesce()
. Providing a column name along with the partition count to the repartition()
function guarantees data distribution based on provided column values.
Let's see the difference between PySpark repartition()
vs coalesce()
.
We will first create an RDD having 20 integer values.
rdd = spark.sparkContext.parallelize(range(0,20))
rdd.collect() #show records in RDD
Let's use getNumPartitions()
function to check the number of partitions of an RDD.
print("Initial number of partitions: " + str(rdd.getNumPartitions()))
Now, we will print data in each partition. We can use the glom()
function to check data present in each partition.
In the output above, you can see how RDD's data is distributed across different partitions.
Let's use the repartition()
function on the first RDD we just created.
rdd2 = rdd.repartition(4)
print("Number of partitions after repartition: " + str(rdd2.getNumPartitions()))
Now, we will print data in each partition.
Let's use the coalesce()
function on the first RDD we created.
rdd3 = rdd.coalesce(4)
print("Number of partitions after coalesce: " + str(rdd3.getNumPartitions()))
Let's print data in each partition.
You can see the difference between records in partitions after using repartition()
and coalesce()
functions. Data is more shuffled when we use the repartition()
function, whereas the coalesce()
function does less shuffling. (It just moves data to the nearest partitions.) Also, we can see that the coalesce()
function has a better distribution of data than repartition()
.
Repartition on a derived column
This is just an advancement to the above solution. In the above solution, the distribution of records across shuffle partitions is not guaranteed. Here we will create a derived column and pass it to the repartition function to redistribute data based on that column. This solution is applicable in the scenario where we don't have any column in which data is distributed evenly. In simple terms, our dataframe has skewed columns.
If you apply the repartition()
function on the dataframe to reduce shuffle partitions to reduce the number of small files, you will observe that your Spark application has been slowed down. So, to resolve the small files issue without compromising performance, you need to create a derived column where values are highly distributed. You must pass this derived column along with the partition count to the function. After that, the repartition()
function will equally distribute records across shuffle partitions. Then your small files issue will be solved without compromising the performance.
In the PySpark example given below, I am processing 2GB of data. I have a dataframe called df
. I want to repartition it and then store it on a table. This can be achieved in three steps, as given below.
- Create a variable
number_of_files
and assign an integer value to it. Depending on the data, you need to tweak the value of this variable. I have setnumber_of_files = 10
. Create a column having unique values, as shown below. I created the_unique_id
column having unique values by using themonotonically_increasing_id()
function.
from pyspark.sql import functions as F
number_of_files = 10
df = df.withColumn('_unique_id', F.monotonically_increasing_id())
2. After this, apply the salting technique to distribute records across partitions using a random value. In Spark, salting is a technique that adds random values to distribute Spark partition data evenly. For this, we need to create a derived column by taking the modulo value of the _unique_id
column created above; not really a random number, but it works well. I have created a column called _salted_key
in this example.
df = df.withColumn('_salted_key', \
F.col('_unique_id') % number_of_files)
3. Repartition dataframe based on _salted_key
column.
df = df.repartition(number_of_files, '_salted_key') \
.drop('_unique_id', '_salted_key')
After the above three steps, we will have our dataframe repartitioned. We can save this dataframe to the target table or save it as a file.
Conclusion
In this article, I covered different techniques by which we can solve the "large number of small files" issue in Spark. There's no silver bullet in configuring Spark parameters for optimizing storage or application performance. It all depends on your data as well as the code. It is advised to perform some analysis on the source data beforehand to understand the data better so that you can try different techniques to optimize your Spark application's storage and performance.
Happy learning!