Apache Hudi Compaction

Sivabalan Narayanan
6 min readOct 17, 2022

--

Hudi supports two types of table namely, COW and MOR. COW has only base files, where as MOR has base parquet files and delta changes in avro format.

MOR storage layout

Data is laid out in terms of file groups in Hudi. Please take a look at this and this if you are not aware of file groups in hudi. Each file group in MOR table consists of a base parquet file and few log files. These log files store records belonging to incremental commits. Since MOR is designed to be write optimized, on new writes, after index tagging is complete, hudi just appends the records pertaining to each file groups as log blocks in log files. This means that there is no synchronous merge happening during write and could save us a lot of latency.

Fig1: Hudi’s MOR table file layouts showing diff file groups and its constituents

While serving the read query(snapshot read), for each file group, records in base file and all its constituent log files are merged together and served. And hence the read latency for MOR snapshot query might be higher compared to COW table since there is no merge involved incase of COW.

Hudi employs a table service called COMPACTION to compact these base files and log files to form a compacted new version of the file slice which contains only a base file. Compaction has to happen at regular intervals to bound the growth of log files and to ensure the read latencies does not spike up.

Fig2: Pictorial representation of Apache Hudi’s Compaction on a given file group.

Ways to trigger Compaction

There are different ways to trigger compaction.

  • Inline : Along with regular writer at the end of commit, compaction can be scheduled and executed.
  • Asynchronous execution within the same process: In streaming ingestion write models like detlastreamer and spark streaming, one can enable compaction to be executed asynchronously without blocking regular writers. This is one of the under-appreciated feature of Hudi in my opinion. Running compaction for large datasets could be operationally challenging if you were to schedule and execute by a separate process altogether. But Hudi eases by running compaction along side regular writes, but still not blocking regular writes which is the best it can get.
  • Scheduling and Execution by a separate process: We don’t want to block regular writes with long running table services. So, users can both schedule and execute compaction by a completely different process altogether. In this way, regular writers don’t need to worry about scheduling nor execution of these table services like compaction. This also means that users can give more compute resources for compaction process and leaner compute resources for regular ingestion jobs thus reducing the cost. Please do ensure you have configured lock providers for all jobs(both writer jobs and table service jobs).
  • Scheduling inline and executing async: It is also possible with spark datasource writers, to just schedule compaction inline, but let a separate process take care of executing compaction. In this mode, if not for metadata table, there is not even a need for lock providers due to how Hudi manages these table services. But if you have metadata enabled, please do ensure you have lock providers configured for all jobs.

Different strategies for Compaction

There are two strategies to define for compaction. One is Trigger Strategy and another is Compaction strategy.

Compaction Trigger Strategy:

Trigger Strategy will determine how often to trigger scheduling of compaction. Different strategies available are

  • NUM_COMMITS
  • NUM_COMMITS_AFTER_LAST_REQUEST
  • TIME_ELAPSED
  • NUM_AND_TIME
  • NUM_OR_TIME

NUM_COMMITS: Compaction will be triggered based on number of delta commits when this strategy is used. Use `hoodie.compact.inline.max.delta.commits` to define the value for N.

NUM_COMMITS_AFTER_LAST_REQUEST: Compaction will be triggered based on number of delta commits after last compaction. Again you can use `hoodie.compact.inline.max.delta.commits` to define the value for N.

TIME_ELAPSED: Compaction will be triggered based on time elapsed. You can define the time using `hoodie.compact.inline.max.delta.seconds` config.

NUM_AND_TIME: This strategy combines both num commits and time.

NUM_OR_TIME: If either of N delta commits or time has elapsed, compaction will get triggered.

Compaction Strategy:

The triggering strategy that we just saw, will only dictate when to trigger the schedule of compaction. What exactly will get compacted (for eg, which file groups) will be determined by this compaction strategy.

Hudi provides various strategies out of the box and is covered below.

LogFileSizeBasedCompactionStrategy: Orders the compactions based on the total log files size. This filters for file groups whose log file sizes is greater than the threshold and limits compaction within a configured IO bound.

Configs of interest:

`hoodie.compaction.logfile.size.threshold`

LogFileNumBasedCompactionStrategy: Orders the compactions based on the total log files count.

This filters the file groups whose log files count is greater than the threshold and limits the compactions within a configured IO bound.

Configs of interest:

`hoodie.compaction.logfile.num.threshold`

BoundedIOCompactionStrategy: CompactionStrategy which looks at total IO to be done for the compaction (read + write) and limits the list of compactions to be under a configured limit on the IO.

Configs of interest:

`hoodie.compaction.target.io`

DayBasedCompactionStrategy: This might work only when the dataset is partitioned based on date. This strategy orders compactions in reverse order of creation of Hive Partitions. It helps to compact data in latest partitions first and then older capped at the Total_IO allowed.

Configs of interest:

`hoodie.compaction.daybased.target.partitions`

BoundedPartitionAwareCompactionStrategy: This might work only when the dataset is partitioned based on date. This strategy ensures that the last N partitions are picked up even if there are more partitions created for the

table. lastNPartitions is defined as the N partitions before the currentDate. currentDay = 2018/01/01 The table

* has partitions for 2018/02/02 and 2018/03/03 beyond the currentDay This strategy will pick up the following

* partitions for compaction : (2018/01/01, allPartitionsInRange[(2018/01/01 — lastNPartitions) to 2018/01/01), 2018/02/02, 2018/03/03)

Configs of interest:

`hoodie.compaction.daybased.target.partitions`

UnBoundedCompactionStrategy: UnBoundedCompactionStrategy will not change ordering or filter any compaction. It is a pass-through and will compact all the base files which has a log file. This usually means no-intelligence on compaction.

UnBoundedPartitionAwareCompactionStrategy: This strategy is a custom UnBounded Strategy. This will filter all the partitions that are eligible to be compacted by a BoundedPartitionAwareCompactionStrategy and return the result. This is done so that a long running UnBoundedPartitionAwareCompactionStrategy does not step over partitions in a shorter running BoundedPartitionAwareCompactionStrategy. Essentially, this is an inverse of the partitions chosen in BoundedPartitionAwareCompactionStrategy.

Self Managed service

One thing where Hudi stands tall when compared to other similar systems is the self managed table services like Compaction. You get to configure the triggering strategy, the compaction strategy and the mode of operation and Hudi takes care of managing it for you. This greatly reduces the operational burden on data engineers which could be a nightmare when they had to manage the storage layouts for 1000s of tables in a large organization.

I am also really excited about an active work going on introduce a centrally managed Table service which can manage table services for N number of tables in a large organization. This again would reduce the management overhead for table services in a large organization. With this, one can assign higher priority to core tables, lesser compute for smaller tables, etc and everything can be managed centrally by a single table service without impacting any core ingestion writers/services.

Hudi has always been designed to be a platform (Lakehouse) rather than just a table format as we feel these are quintessential for managing your data lakes in the long run.

Conclusion

Unlike clustering, compaction is not an optional table service for MOR Hudi tables. It is a mandatory one and everyone should familiarize on how to operationalize it. Hope this blog made you understand what Compaction is and how one can go about compacting their MOR tables based on their needs. And hope you appreciate the self management capabilities that Hudi has to offer for such table services to reduce DIY for everyone.

--

--