Compaction / Merge of parquet files

Chris Finlayson
bigspark
Published in
4 min readJan 12, 2020

Optimising size of parquet files for processing by Hadoop or Spark

The small file problem

One of the challenges in maintaining a performant data lake is to ensure that files are optimally sized for the storage medium.

General rule of thumb being:

Many small files == bad
Small numbers of large files ==
better
(…although we can get a bit more scientific on the ideal file size)

Persisting large amounts of small files is a particular issue on HDFS as the namenode takes the strain in memory for tracking every file in the current snapshot.

An example of small files in a single data partition

Small files can often be generated as the result of a streaming process. e.g. If the rate of data received into an application is sub-optimal compared with how frequently the application writes out to storage. It can also be the result of incremental updates into a table partition.

Asides from memory strain, small files also present a major performance hit for read processing as the consumer process will need to spend additional handles for open/closing of many more files than is optimal for reading.

To handle this , it is good practice to run a compaction job on directories that contain many small files to ensure storage blocks are filled efficiently. It is common to do this type of compaction with MapReduce or on Hive tables / partitions and we will walk through a simple example of remediating this issue using Spark.

It is also helpful to not overly partition your data. Shallow and wide is a better strategy for storage of compacted files rather than deep and narrow.

Optimal file size for HDFS

In the case of HDFS, the ideal file size is that which is as close to the configured blocksize value as possible (dfs.blocksize), often set as default to 128MB.

Avoid file sizes that are smaller than the configured block size. An average size below the recommended size adds more burden to the NameNode, cause heap/GC issues in addition to cause storage and processing to be inefficient.

Larger files than the blocksize are potentially wasteful. e.g. Creating files of 130MB would mean that file extend over 2 blocks, which carries additional I/O time.

Optimal file size for S3

For S3, there is a configuration parameter we can refer to — fs.s3a.block.size — however this is not the full story. File listing performance from S3 is slow, therefore an opinion exists to optimise for a larger file size. 1GB is a widely used default, although you can feasibly go up to the 4GB file maximum before splitting.

The penalty for handling larger files is that processes such as Spark will partition based on files — if you have more cores available than partitions, they will be idle. 2x1GB files in a partition can only be operated on by 2 cores simultaneously, whereas 16 files of 128MB could be processed by 16 cores in parallel.

The solution

We can control the split (file size) of resulting files, so long as we use a splittable compression algorithm such as snappy.

Let’s walk through an example of optimising a poorly compacted table partition on HDFS. Essentially we will read in all files in a directory using Spark, repartition to the ideal number and re-write.

Consider a HDFS directory containing 200 x ~1MB files and a configured dfs.blocksize of 128MB

Let us generate some parquet files to test:

from pyspark.sql.functions import litdf=spark.range(100000).cache()
df2=df.withColumn("partitionCol",lit("p1"))
df2.repartition(200).write.partitionBy("partitionCol").saveAsTable("db.small_files")

And review the outputs (using a customised version of a tool called hdfs-shell)

200 files created in p1 partition
207.8MB raw file size generated

It is straightforward for us to calculate the optimal number of files as:

(Total file size / Configured blocksize ) = Ideal number of files

In this example:

(207 / 128) = 1.61

We take the rounded up value as our repartition coefficient (2).

A simple pyspark snippet for this:

def get_repartition_factor(dir_size):
block_size = sc._jsc.hadoopConfiguration().get(“dfs.blocksize”)
return math.ceil(dir_size/block_size) # returns 2
df=spark.read.parquet(“/path/to/source”)
df.repartition(get_repartition_factor(217894092))
.write
.parquet("/path/to/output")

The above code will read the 200 files into a dataframe, repartition based on the optimal calculated value and output 2 equally sized files.

Its also possible to use .coalesce() to the same effect and for a more performant write operation, however the resulting files will not be equally sized — therefore less optimal for further processing.

2 equally sized files are gnerated

The resulting files are 113MB each, which is approaching the dfs.blocksize optimum.

Summary

It is becoming an increasingly important data management exercise to routinely compact persisted files. This is key for maintaining optimal read performance and reducing metadata management overheads.

With a minor development effort, a crawler process could be created to incrementally walk through all tables and their partitions in a datalake — pulling total size of each directory and calculating the ideal number of files in comparison to the storage block size — taking action if the composition is not optimal.

Chris is a co-founder of bigspark. He likes to referring to himself in third person and creating data management solutions. Please reach out on Linkedin if you want to get in touch!

--

--