Apache Hudi Timeline: Foundational pillar for ACID transactions

Sivabalan Narayanan
11 min readJul 9, 2023

--

Hudi maintains a timeline of all actions performed on a given table to support efficient retrieval of data for read queries in an ACID compliant manner. The timeline is also constantly consulted during writes and table services which is key to regular functioning of the table. If any of the timeline actions goes haywire(due to fat fingering, or multi writers w/o configuring lock providers), it could lead to data consistency issues(data loss or data duplication) or could end up in unrecoverable errors. So, let’s take a deep dive into timeline nuances to assist in operating Apache Hudi tables.

Instants

All actions performed on the table are represented as instants in the hudi timeline. You can find a directory called “.hoodie” under your table base path where these instants are maintained. A Hudi instant consists of the following components:

  • Instant action: type of action performed on the table.
  • Instant time: Instant time is a ms format timestamp that is treated as an identifier for an operation on the timeline.
  • State: Current state of the instant. There are 3 different states, Requested, Inflight and Completed. A given instant will be at one of the states at any point in time. Every operation starts with “requested” state, then moves to “Inflight” and finally goes to “completed” state on which case entire operation is deemed as completed. Until an operation moves to “completed” state, its deemed as pending and read queries are guarded against reading any data from any such operations.

Hudi guarantees that the actions performed on the timeline are atomic & timeline consistent based on the instant time.

Actions

We have quite a few different actions that happens on an Apache Hudi table each serving different purpose like ingesting data by regular writers, table services like compaction and clustering, cleaning and archival, and so on. Let’s take a look at each of these actions.

For the most part of the blog, we will assume a single writer model as the focus is to illustrate the timeline events. But if something necessitates, will go over some multi-writer sequences as well.

Commit

“Commit” action represents a write to COW table. Whenever a new batch is ingested to a table, a new commttime is generated and the operation enters requested state. You can find “tN.commit.requested” in the hudi timeline. For eg, 20230705155904980.commit.requested (where “20230705155904980” is the commit time for this operation. Generally requested marks the completion of planning phase. For regular writes, there is not much to do in preparation phase. The execution phase starts by adding the “inflight” instant. And once the execution completed, you will see the “completed” commit file in the timeline.

| — 20230705155904980.commit.requested

| — 20230705155904980.commit.inflight

| — 20230705155904980.commit

So until we see 20230705155904980.commit, all read queries will not be reading any data that’s partially written by this commit. Once the completion is marked by the addition of 20230705155904980.commit to the timeline, any new reads that hits the table will get to read the data committed by this commit of interest.

Delta commit

Delta commit represents writes to MOR table. This could result in either log files or base parquet files. But “delta commit” refers to a regular write to MOR table. The sequence is similar to “commit” we saw above.

| — 20230707081934362.deltacommit.requested

| — 20230707081934362.deltacommit.inflight

| — 20230707081934362.deltacommit

Both commits and delta commits, only results in addition of new files. The completed file will list all meta information about files added, and stats like bytes written, records written, records updated etc.

Clean commit

Hudi adds new version of files named file slices on any updates to existing file groups. Older versions of file slices are cleaned up(or removed) by the cleaner as per the cleaner configurations. Unlike regular writes (commits and delta commits), cleaner will also go through a planning phase which in the end will result in tX.clean.requested which will contain the clean plan. It will track all files that need to be deleted as part of the clean.

Major reason to serialize the plan into the requested file is to ensure idempotency. To be resilient to crashes mid way while cleaning, we wanted to ensure a clean plan once completed will be taken to completion w/o fail. Also, the completed clean exactly shows which files were deleted as part of the clean commit and not just a partial list of files irrespective of how many times the clean was re-attempted. Same rationale applies to clustering plan, compaction plan and restore plans as well.

| — 20230708091954360.clean.requested

| — 20230708091954360.clean.inflight

| — 20230708091954360.clean

You can find info about all files removed by the cleaner in the completed “20230708091954360.clean” file. Let’s walk through a simple example to understand what cleaner does.

T1.commit :

  • Inserts new data
  • adds a new file fg1_fs1 (fg refers to file group and fs refers to file slice)

T2.commit:

  • Updates the same set of data.
  • Adds a new file slice fg1_fs2 to existing filegroup fg1.

T3.commit

  • Updates the same set of data.
  • Adds a new file slice fg1_fs3 to existing filegroup fg1.

And now cleaner gets triggered with cleaner configs set to “2” for num commits to retain. So, any file slices created earlier than the latest 2 commits are eligible to be cleaned. So, cleaner will add fg1_fs1 to the clean plan and later will delete it during execution. So, this leaves only fg1_fs2 and fg1_fs3 in the storage.

T4.clean

  • Cleans up fg1_fs1

This cycle will repeat. For eg, T5.commit will add fg1_fs4 and T6.clean will delete fg1_fs2 and so on. You can read more about cleaning here.

Replace commit

Unlike regular writes which result in commits and deltacommits, some operations could result in replacing certain data files. For eg, clustering, insert_overwrite operations adds new data files, but also replaces certain data files as well. Most of these are async in the sense that replaced files are not deleted synchronously, but just marked for replacement. At some point later in time, when cleaner comes around it will take care of deleting the files.

| — 20230707081954360.replacecommit.requested

| — 20230707081954360.replacecommit.inflight

| — 20230707081954360.replacecommit

Lets say, 4 data files were written to the table using 4 commits t1.commit(file1), t2.commit(file2), t3.commit(file3) and t4.commit(file4). Each file here represents a different file group in hudi. Lets suppose we trigger clustering to batch small files into larger ones.

T1.commit :

  • Inserts new data fg1_fs1

T2.commit:

  • Inserts new data fg2_fs1

T3.commit

  • Inserts new data fg3_fs1

T4.commit

  • Inserts new data fg4_fs1

t5.replacecommit will create a new file, file5 replacing the 4 files created by the previous commits.

T5.replacecommit

  • Creates new file group fg5_fs1 by replacing file groups (1 to 4)

Until t5.replacecommit (completed timeline file) is added to the timeline read queries will read data from the 4 files and once the completed t5.replacecommit is added to the timeline, any new read queries will read just file5 and ignore file1 to file4. The completed t5.replacecommit will contain all the information about which files are added and which ones are replaced.

Also, another difference between commits and replace commit is, there isn’t much involved in the planning phase for regular commits. But incase of replace commit, the planning involves going through existing file groups and based on clustering plan strategy and configures, hudi will determine what file groups to consider to cluster and how to bin pack them into different clustering operations. So, for very large tables, even planning could take some non-trivial amount of time. Also, there are chances that at the end of planning phase, there won’t be any clustering plan generated and hence we may not see any “.replacecommit.requested” file. This means that there is nothing to cluster at this time. And the clustering plan will be re-attempted again at some later time. You can read more about clustering here.

Clustering is one such example. But there are other operations which will result in replace commit actions. These include insert_overwrite, insert_overwrite_table, and delete_partition operation.

Compaction commit

Compaction refers to the process of compacting base and associated log files in MOR table into a new base file. You can read more about compaction here. Similar to clustering, this will also go through a planning phase and based on compaction strategy, optionally a compaction plan will be generated tracking the list of log files and the base file to be compacted. If a plan is generated, it will result in a compaction.requested file in the timeline. This marks the end of the planning phase. And then during execution phase, an inflight file will be created and eventually once the compaction is complete, a completed file is added to timeline to mark the completion of the compaction of interest.

| — 20230707091954370.compaction.requested

| — 20230707091954370.compaction.inflight

| — 20230707091954370.commit

Again, similar to clean and clustering, the plan once serialized (in other words, once the requested file written out), hudi is resilient to any number of crashes and re-attempts and eventually hudi will definitely take it to completion. Ensure all partially failed attempts are properly cleaned up and only the final succeeded attempted data files are left intact. This is very critical when you are operating a very large table having to compact large number of file groups. Also, other operations in table will progress assuming the planned compaction will eventually complete. And so, we can never go back in time or revert the planned compaction. It has to be completed at any cost if more writes went into the table. This is one of the key design by which Hudi could support async compaction. If you see a timeline w/ the following sequence, its a valid sequence of events.

| — t100.compaction.requested

| — t110.deltacommit.requested

| — t110.deltacommit.inflight

| — t100.compaction.inflight

| — t110.deltacommit

| — t100.commit

If you use deltastreamer in continuous mode, this is the usual sequence of timeline events you will see.

Rollbacks

Any partially failed writes are rolled back using “rollback” operation. In single writer mode, rollbacks are eager. Whenever a new commit is started, hudi checks for any pending commits and will trigger a rollback. Among all different operations supported in Hudi, only clean, rollback and restore will remove files, none of the other operations will remove any data files. Replace commit could mark certain files as replaced, but will not delete them.

The rollback planning phase includes finding all files that were added as part of the partially failed commit and adding it to the rollback plan. And as we saw before, the plan gets serialized into rollback.requested file. And the execution starts by creating an inflight file in the timeline and eventually when rollback completes, a completed rollback file will be added to timeline.

Let’s say here is the timeline just before crash.

| — t10.commit.requestet

| — t10.commit.inflight

| — t10.commit

| — t20.commit.requested

| — t20.commit.inflight

Just after this the process crashed. So, the user restarts the pipeline and a rollback will be triggered since t20 is deduced as pending.

| — t10.commit.requested

| — t10.commit.inflight

| — t10.commit

| — t20.commit.requested

| — t20.commit.inflight

| — t25.rollback.requested

At the end of rollback, hudi will remove the commit meta files of the commit being rolled back. In this case, all timeline files pertaining to commit t20 will be deleted. So, here is what the timeline might look like once the rollback is completed.

| — t10.commit.requested

| — t10.commit.inflight

| — t10.commit

| — t25.rollback.requested.

| — t25.rollback.inflight

| — t25.rollback

With multi-writers, Hudi also introduce lazy rollbacks which used heart beat based rollback mechanism. We can take a deeper look at the rollback algo in a future blog.

Similar to clustering, compaction, rollback is also designed to be idempotent. We serialize the plan in the requested file and so even if rollback crashes mid-way, we can re-attempt w/o any issues. Hudi ensures to re-use the same rollback instant time to rollback a given commit. The completed rollback file will list out all files that was deleted as part of the rollback. Rollbacks in COW will delete the partially written files, but in case of MOR, if the partially failed commit added a log file, rollback will add another log file with rollback block and will not delete the original log file. This is one of the key design of MOR table to keep any writes as appends. We can also look at the log file design in some future blog.

Savepoint

To assist in disaster and recovery scenarios, Hudi introduces two operations called savepoint and restore. Adding a savepoint to a commit ensures cleaner and archival will not touch anything related to the savepointed commit. And this means that, user can restore the table to the savepointted commit of interest on a need basis. You are allowed to add savepoint to a commit only if it not been cleaned up already.

Savepoint has only two states, inflight and completed. There is no requested with savepoint as there is no planning phase. During the execution phase, hudi finds all files that are required to serve a read query as of the commit time of interest. These files are added to the tX.savepoint.inflight file. And immediately a completed savepoint file is added to the timeline.

| — t10.commit.requested

| — t10.commit.inflight

| — t10.commit

| — t10.savepoint.inflight

| — t10.savepoint

You can add savepoint at a later stage as well, just that the cleaner should not have cleaned up the files. For eg, your table could have commits from t10 to 200 (once every 10 secs). So, at time t210, you are allowed to add savepoint to commit t50 if cleaner has cleaned up data files only until t30.

Restore

Restore is used to restore the entire table to some older point in time. Just incase the table got landed some bad data, or if data is corrupt or other valid reasons, if the user wishes to take the table to, how it was 10 hours back, the restore operation will come in handy. Users can add savepoint to one of the commit 10 hours back and trigger a restore. Restore technically means rolling back N commits in a sequence in reverse chronological order. For eg, if table has commits t10, t20, t30, t40, t50, t60, t70, t80, t90 and t100. User prefers to restore the table to t40. Hudi will rollback t100, followed by rollback of t90, and then rollback of t80, etc.. until rollback of t50 kicks in.

Hudi will go through a similar state transition like other table services. A requested plan will be generated to track all commits that need to be rolledback. And then during execution, an inflight file will be created and eventually once complete, a completed restore file is added to the timeline.

| — t10.commit.requested

| — t10.commit.inflight

| — t10.commit

| — t10.savepoint.inflight

| — t10.savepoint

| — t20.commit.requested

| — t20.commit.inflight

| — t20.commit

.

.

| — t100.commit.requested

| — t100.commit.inflight

| — t100.commit

After restore, here is how the timeline might look like.

| — t10.commit.requested

| — t10.commit.inflight

| — t10.commit

| — t10.savepoint.inflight

| — t10.savepoint

| — t120.restore.requested

| — t120.restore.inflight|

| — t120.restore

Indexing

Hudi supports adding various indexes to assist in read and write latencies. Couple of such partitions are column stats partition and bloom filter partition. To initialize these indexes first time for a large tables, we can’t block the ingestion writer as it could take up lot of time. So, Hudi introduced an AsyncIndexer to assist with initializing these partitions asynchronously.

| — t200.indexing.requested

| — t200.indexing.inflight

| — t200.indexing

This goes through a typical state transition like any other operation. We will try to cover async indexing in detail in a separate blog.

Active and Archival timeline

Hudi dissected the entire timeline into active and archival timeline. Any instants you see under “.hoodie” directory refers to active timeline and those archived goes into “.hoodie/archived” folder. You can read more about the archival timeline here. But the rationale behind the split is to ensure we have max bounds on the metadata(timeline instants) and so the reads will not see increase in latencies as the timeline grows more and more.

Hudi cli

Hudi cli has commands to view the timeline of the table. We will go over them in some other blogs w/ examples. But the command is “timeline” if you wanted to give it a try.

Conclusion

Timeline plays a very very crucial role in serving the right data w/ ACID compliance. Understanding different timeline events will be very beneficial in managing Apache Hudi tables in any organization and also to assist in carrying out any investigations on a need basis.

--

--