Databricks Delta — Partitioning best practice

gregzrichardson
Nintex Developers
Published in
5 min readMar 17, 2020

Partitioning is good …and bad

Partitioning (bucketing) your Delta data obviously has a positive — your data is filtered into separate buckets (folders in blob storage) and when you query this store you only need to load data from the buckets you are interested in.

Avoiding loading data you don’t need with a simple partition filter sounds like it’s all good, but having too many partitions causes trouble. Too many partitions results in too many small data files. This in turn results in too much metadata, and all the metadata needs to be loaded into driver memory when a stream needs to read from this delta store. This is a cause of driver failure, a type that is difficult to diagnose. In our case it was just the streams getting stuck on ‘Stream initializing…’

How many files?

Data in Spark is ideally stored in a smaller number of large files between 128MB and 1GB in size. This allows the driver and workers to operate efficiently. Having the data fragmented into many small files will slow down reading of the Delta store and will overload the driver memory as it attempts to load metadata for many small files into memory at once.

There are two causes of file fragmentation — unoptimized updates and excessive partitioning.

Unoptimized updates

In a continuously streaming Delta stream, data is added in small chunks over time as it streams in in a series of micro batches. With default configuration this will cause the creation of a huge number of small files.

Excessive partitioning

If a data column with high ordinality (many discrete values) is chosen as a partition, the Delta store can end up with thousands of partitions. This makes the data look tidy in the file store but causes each micro batch of data to be split into many small files.

The deadly combination

If a stream has unoptimized updates and excessive partitioning, then the two factors multiply. A delta store organised this way can easily end up with millions of small fragmented files (the number of partitions times the number of tiny updates per partition).

Why is this so bad?

From discussions with Databricks engineers, Databricks currently (March 2020) has an issue in the implementation of Delta streaming — while the data is neatly partitioned into separate folders, the metadata isn’t. According to their engineers it is on the Databricks roadmap to apply partition filters to the metadata, so this may not be an issue in the future.

The result of this is that when a stream requests to read data from a particular offset of a Delta store, even if the reading stream only requires a small subset of the source data, the metadata for the entire source table must be checked. In the case of excessive partitioning combined with time fragmentation, this can be millions of files and can overwhelm the driver.

Auto-optimize to the rescue?

Yes… and no. Auto-optimize will help by forcing all executors of a batch to merge their output before writing it to a partition. This helps, but if there are either many partitions or many batches there will still be many output parquet files. Auto-compact will help a lot more by compacting new additions into existing files, but doesn’t help with having too many partitions.

But unless you have a tsunami of data coming in, turn it on:

ALTER TABLE myDatabase.myTable SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true)

Jamming up

We had an issue where a stream reading from our bronze Delta store jammed up. It just got stuck in ‘Stream initializing…’. The bronze table had a daily optimization job running but did not yet have auto-optimize enabled and had a lot of partitions. We had partitioned by customerId and there were thousands of them.

spark.readStream
.format("delta")
.partitionBy("platform", "product", "customerId")
...
.start(...)

To make matters more confusing if we restarted the silver stream from a fresh checkpoint it worked fine. It would only fail when starting from an existing checkpoint.

What was happening was that the daily optimize was working well, compacting a huge number of files that had appeared during the last 24 hours into a very small number of files. When a silver stream without an existing checkpoint ran, it would use these new combined files and work fine.

When a stream with an existing checkpoint ran, it knew which small file it was up to, but Delta can only checkpoint by entire file, not within a file. So even if a partition had thousands of files optimized into one, the stream would still need to work through the remaining small files to get up to where the next large file started. The thousands of files was then multiplied by the number of partitions (several thousand in our case). The result was millions of small files to be processed. This caused the driver for the silver stream cluster to examine so many files it would run out of memory and fail.

The fix

So of course the fix was first to auto-optimize and auto-compact to prevent the build up of small files in the first place and to reduce the number of partitions.

How to choose partitions

Reporting (sink) tables

Tables that are to be used only for reporting via Spark SQL can be partitioned more heavily — but only create partitions who’s values are used as WHERE filters in the SQL queries.

In this case the partitions are very helpful for limiting the data that Delta has to fetch to process the query, and they are not a detriment because there is no need to fetch the metadata for the Delta store to process an entire partition. There are no checkpoints involved in a SQL query.

Source tables

Tables that are to be used as a source for streaming should have minimal partitioning. Even if the streaming query uses the partitions, the stream will still need to process metadata for all partitions to evaluate its checkpoints.

It depends

So in our case we need to partition our silver tables differently — for those that are a final sink of data and used directly for SQL reporting, we include the customer ID as a partition (thousands of values). For those that are used as streaming sources for gold tables, we do not include the customer ID as a partition as it would be too heavily partitioned.

And the driver is once again happy.

--

--