Solving small file problem in spark structured streaming : A versioning approach

Rahul
3 min readJul 17, 2020

Streaming jobs usually creates too many small files which impacts the performance of jobs and queries reading these files. There is no universally acceptable solution on how to avoid small files in spark streaming.

Existing approaches: One approach is to write another job for compaction which aggregates many small files and writes them elsewhere in larger files, delete old files and then move the consolidated files to correct path. Compaction approach is very prone to data loss and data duplication. Another approach can be appending the new data in existing files in every micro-batch. However, most of columnar file formats like orc and parquet doesn’t support append.

I have discussed a solution here which solves small file issue using versioning. This approach works in spark > 2.4.

Use Case : The use case is to write a structured streaming job that fetches data from kafka every 5 minutes & store in hdfs with day partitions. The data fetched belongs to the current day or of previous 2–3 days.

The usual way to write this job is

df = spark.readStream.option("kafka").option("subscribe","topicxx")df.repartition("date").writeStream..option("checkpointLocation", checkpointLocation).partitionBy("date").mode("Append").trigger("5 minutes").parquet("/prod/data/cdr_data/").start()

After execution of a few micro-batches, we will have multiple small files in each of date partitions.

/prod/mobility/cdr_data/date=01-01-2010/file1.parquet
/prod/mobility/cdr_data/date=01-01-2010/file2.parquet
/prod/mobility/cdr_data/date=01-01-2010/file3.parquet
/prod/mobility/cdr_data/date=01-01-2010/file4.parquet

Versioning Approach

On every micro-batch, data will be written in a new inner partition of date partition named version having current unix time. e.g

/prod/mobility/cdr_data/date=01–01–2010/version=12345/file1.parquet
/prod/mobility/cdr_data/date=01–02–2010/version=12345/file1.parquet

The idea is to read those date partitions of existing hdfs data which are also present in current streaming dataset, union it with the new dataset and write this consolidated data to the date partitions with new version. Then, delete old versions of updated partitions. Thus, after after every micro-batch, we will have a single version and a single file in each date partition which is larger than previous.

Following steps are followed in every mirco-batch :

  • Read new data from kafka and create streaming dataset
  • Read existing data from hdfs and filter dataset of those date partitions which are also present in streaming dataset.
  • Union both datasets , append version column having value as current time stamp and write into hdfs.
  • Delete old version of updated date partitions.

We will face a challenge here that union of streaming dataset and static dataset isn’t allowed in spark. To solve this we will use forEachBatch Sink which is available in spark > 2.4. forEachBatch sink converts streaming dataset to a static dataset.

A pseudo code snippet of the solution is as follows :

df = spark.readStream.option("kafka").option("subscribe","topicx")df.writeStream().option("checkpointLocation", checkpointLocation).trigger("5 minutes").foreachBatch(for_each_batch_function).start()def foreach_batch_function(df, epoch_id):
oldDataset = spark.read.parquet("/prod/data/cdr_data/")
filteredPartitionList =
newDataset.select("date").distinct().collectAsList()
//filter from old dataset which are present in new dataset
for (Row row : filteredPartitionList)
filteredOldDataset=filteredOldDataset.union
(oldDataset.filter(date=row.date))

output = filteredOldDataset.union(newDataset).withColumn
(versionColumn, functions.lit(int(time.time())))
output.repartition("date").partitionBy("date","version")
.mode("append").parquet("/prod/data/cdr_data/")

// code to delete old versions of date partitions....
...........

We can use batch id in forEachFunction to maintain exactly once semantics of spark streaming. Rank window function can be used to delete old versions.

We might want to keep multiple versions if needed. Also, there can be cases that streaming micro-batch fails after writing data but before deleting old versions, which can leave multiple versions of a date partition.

This can be easily solved by implementing spark reader which reads only latest version in each partition using rank window function. Code snippet for the same is as follows:

df = spark.read.parquet("/prod/data/raw_cdr")
return df.withColumn("version_rank", functions.rank()
.over(Window.partitionBy("date"))
.orderBy(col("versionColumn").desc())))
.filter("version_rank = 1").drop("version_rank");

--

--