Fixing small files performance issues in Apache Spark using DataFlint

Meni Shmueli
5 min readDec 31, 2023

TLDR

DataFlint is an open source performance monitoring library for Apache Spark.
DataFlint has a more human readable UI for Spark that alerts you on performance issues, such as small files IO issues, and can even suggest fixes!

DataFlint Demo:

The data engineer experience

The first time I developed a big data application with Apache Spark my spark job couldn’t finish because I partitioned the data incorrectly and accidentally wrote millions of extremely small files to S3.

As time went on I saw that this kind of development experience was very common in the big data world. I learned to use tools like Spark UI but also discovered it’s (many) limitations, mainly it’s just not very human readable.

A Real World Example

Let’s try to recreate this exact scenario with a simple PySpark script:

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("Sales Filterer") \
.master("local[1]") \
.getOrCreate()

df = spark.read.load("~/data/store_sales")

df_filtered = df.filter(df.ss_quantity > 1)

df_filtered.write \
.mode("overwrite") \
.partitionBy("ss_quantity") \
.parquet("/tmp/store_sales")

spark.stop()

In this simple example we read some sales related data[1], we filter by items that have quantity bigger than 1 and then saving the data partitioned by the quantity[2]

By statically analyzing the code you couldn’t find any performance issue, but on my machine[3] this simple script takes around 1 minute to run!

Enter DataFlint

To find and fix this performance issue, we can use DataFlint — an open source performance monitoring library for Apache Spark.

To install it all we need to do is to add this 2 lines:

.config("spark.jars.packages", "io.dataflint:spark_2.12:0.1.0") \
.config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin") \

To our python application:

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("Sales Filterer") \
.config("spark.jars.packages", "io.dataflint:spark_2.12:0.1.0") \
.config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin") \
.master("local[1]") \
.getOrCreate()

df = spark.read.load("~/data/store_sales")

df_filtered = df.filter(df.ss_quantity > 1)

df_filtered.write \
.mode("overwrite") \
.partitionBy("ss_quantity") \
.parquet("/tmp/store_sales")

spark.stop()

(for more installation options, see the docs)

Now when we will open Spark UI we will have a new button that will open DataFlint

Spark UI tab selection, with the new DataFlint option

Identifying the performance issue

Once we enter DataFlint for our spark job, we can see that DataFlint identified a “reading small files” issue in real-time during the query run

And when the query ends we can that DataFlint also identified a “writing small partitioned files” issue:

And finally this really simple query of filtering only ~273MB of data, taking 51 seconds to be completed

So what exactly is going on? and how can we fix it?

Small files IO in big data

Big Data engines are usually optimized to work with files in size ranges of 128MB-1GB, working with really small files such as in our case (DataFlint calculated average of 85KB per file we read) can cause all sorts of performance problems[5], some are not so obvious than others.

In our case, for each small file spark reads from HDFS, Spark will apply the filter, partition the remaining data by quantity and save each partition of the small file to HDFS, resulting on writing even smaller files!

reading small files + partitioning = writing even smaller files

So after we understand what is happening, how can we fix it?

Fixing the writing small files issue

In this scenario we will focus on fixing the writing small files problem, as our source table file sizes might not be in our control[5].

Luckily for us, DataFlint identified a “writing small partitioned files” issue for us and even suggesting a fix — telling spark to repartition the data in-memory by our table partition key:

Now let’s apply this fix in our code:

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("Sales Filterer") \
.config("spark.jars.packages", "io.dataflint:spark_2.12:0.1.0") \
.config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin") \
.master("local[1]") \
.getOrCreate()

df = spark.read.load("~/data/store_sales")

df_filtered = df.filter(df.ss_quantity > 1)
df_repartitioned = df_filtered.repartition("ss_quantity") = # THE FIX

df_repartitioned.write \
.mode("overwrite") \
.partitionBy("ss_quantity") \
.parquet("/tmp/store_sales")

spark.stop()

And see the updated query plan in DataFlint:

The fixed query, notice there is no more alert over the writing stage

We can see that our query has 2 new steps — repartition by quantity hash to 200 partitions (200 is a spark default for repartitioning), and a spark optimizer repartitioned it to 4 partitions.

By hash partitioning by quantity we ensure that each spark partition has all the necessary data for writing exactly 1 file to that table’s partition — meaning having all the records with the same quantity value.

Spark behavior after repartition. writing 1 file for each write partition!

Now the fixed query is only taking only 15 seconds instead of 51 seconds, more than 300% speed boost!

The spark query after applying the repartition fix

Before you go

If you using Apache Spark, I encourage you to give DataFlint a try for yourselves:

If you have any questions or suggestion you can contact me by the DataFlint slack community (details in the README) or via LinkedIn: https://www.linkedin.com/in/meni-shmueli-developer/

Notes

[1] For the TPC-DS standard dataset, with low ratio to simulate real-world cases of reading small files. You can generate the dataset via the TPC-DS data gen tool or download the sales table from here: https://file.io/U69UdtEvieSF
[2] The choice of the partition key for this example arbitrary, in the case of TPC-DS the quantity field has low cardinality so it make sense to partition by it for this example.
[3] 2023 16 inch MacBook pro, if running on all cores it runs too fast to be able to see the Spark/dataflint UI so I added a local[1] configuration to limit spark to use only 1 core.
[4] in addition, this performance problem can accur without reading small files. For example, if you read a lot of big files but most of the data has been filtered
[5] for further read, see https://siraj-deen.medium.com/small-file-large-impact-addressing-the-small-file-issue-in-spark-2e431ed5c91b

--

--