Apache Hudi — The Basics

Parth Gupta
6 min readOct 11, 2021

--

Features

Apache Hudi stands for Hadoop Updates, Deletes and Inserts.

In a datalake, we use file based storage (parquet, ORC) to store data in query optimized columnar format.

However, these file based formats do not offer functionality to optimally handle Updates, deletes and ACID properties like in case of a RDBMS.

Apart from enabling the update and delete capabilities in our datalake, Apache Hudi also provides data versioning and rollback capabilities that are specially needed to streamline datalake operations in production.

The detailed challenges that we face in our batch and streaming processes are highlighted in the their corresponding blogs (next in series).

However, before we tell you how we have used Apache Hudi to solves these challenges, let us try to understand the core concepts of Apache Hudi before we embark on this voyage of Hudi-fying our datalake.

Upserts

Conceptually, rewriting the entire partition can be viewed as a highly inefficient upsert operation, which ends up writing way more than the amount of incoming changes.

Apache Hudi also needs to re-write some data files in order to provide upsert/delete capabilities to a file based system like parquet.

However, it rewrites only the part files containing the updated record and not the entire partition. Also, this save significantly on write amplification in systems where some past data history/manifest versions are stored to provide operational robustness to datalake.

Also, once you are comfortable with the basic concepts of Apache HUDI, I highly recommend that you read this blog on how Hudi indexes the records and technically provide the update and delete functionality whilst saving on write amplification.

A sample dataframe upsert operation looks like this:

dailyIncrementalDf.write
.format("org.apache.hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "primary_key")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "updated_at")
.option(HoodieWriteConfig.TABLE_NAME, "sample_hudi_table")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save("s3://my-bucket/snapshots/sample_hudi_table/")

This block upserts all records in our dailyIncrementalDF in the base Hudi table present at location s3://my-bucket/snapshots/sample_hudi_table/

Primary Key

Apache HUDI relies on our primary key to provide us with RDBMS like update and delete functionality.

In case we are using bloom filter based indexing (or otherwise too), it is recommended(but optional) to create a primary key prefixed with an “immutable” timestamp value to auto enable range partitioning of bloom filer checks.

Eg: we created primary keys on event_ts + transaction_id

Partition Field

Our datalake is designed to have datasets partitioned primarily on a stringified date value.

However, please make sure that we remove the seconds part from the partition column in case the base data has timestamp instead of date. An operation like this might help => cast(to_date(txn_started_at) as string)

Pre-Combine Key

It is used to pick the latest record in case we get multiple records with same primary key. We have used “updated_date” as the pre-combine key.

Incremental consumption

Although upserts can solve the problem of publishing new data to a partition quickly, downstream consumers do not know what data has changed since a point in the past. Typically, consumers learn this by scanning the entire partition/table and recomputing everything, which can take a lot of time and resources. Thus, we also need a mechanism to more efficiently obtain the records that have changed since the last time the partition was consumed.

Do read this blog by Hudi creator on the concept of Incremental consumption.

HUDI Commit Timeline

Hudi maintains a timeline of all actions performed on the table at different instants of time.

When a (mini)batch of data is inserted or updated to a hudi table, It adds a commit to .hoodie/ folder that is prefixed with a commit time in yyyymmddhhmmss format.

These commits contains information about the part-files that got inserted or re-written as part of upsert. This commit information forms the core of incremental consumption and data versioning.

Once an incremental query is fired with a specified commit time, it pulls in all the records that got updated or inserted since the specified commit time.

Commit Timeline

To make this happen, only those records that are a part of the commits after the start time are scanned by the query.

For carrying out rollbacks, whenever an updated part-file is added and the corresponding old part-file is removed from active data manifest, the old part-files are not actually removed. Rather, they are stored alongside the updated part-file so that rolling back to the old state of a hudi table becomes as easy as changing the pointers from new part-files to old.

Hudi gives us the option to configure how many old commits (or versions) of data we want to keep for a particular table. The more the no of historic commits, the more back in time we can travel in case of incremental consumption, rollback and point-in-time query.

Query types

Hudi supports the following query types [doc]:

  • Snapshot Queries : Queries see the latest snapshot of the table. It provides a drop-in replacement for existing parquet tables, while providing upsert/delete and other write side features.
  • Incremental Queries : Queries only see new data written to the table, since a given commit time. This effectively provides change streams to enable incremental data pipelines.

Hudi Table Types

  • Copy On Write : Stores data using exclusively columnar file formats (e.g parquet). Updates simply version & rewrite the files by performing a synchronous merge during write [doc].
  • Merge On Read : Stores data using a combination of columnar (e.g parquet) + row based (e.g avro) file formats. Updates are logged to delta files & later compacted to produce new versions of columnar files synchronously or asynchronously [doc].

For the sake of simplicity and easy onboarding of folks to the world of Apache HUDI, we are only considering Copy-On-Write (COW) type tables to explore most of the Hudi concepts.

The additional functionality offered by Merge-On-Read (MOR) table type goes beyond most of our usecases and we practically do not need MOR tables till the time we want to move our focus from Near-RealTime to RealTime usecases.

Write Operations

The possible values of “hoodie.datasource.write.operation” are upsert, insert, bulk_insert (for converting parquet table to Hudi table) and delete, where upsert is the default operation.

Some code samples for carrying out these operation can be found in this blog post

Payload class

A payload class defines functions that control how we merge new and old record while updating a record.

We can choose to keep the incoming record (default) or choose between old and new record based on pre-combine key column value.

Also, we can write our own custom payload class if we want to support our unique usecase.

The default payload class ignores pre-combine key while de-duping records with same primary key and picks the record that it encounters at last. i.e. it sorts/de-dupes based on natural order of incoming events/records.

However, we can override this behaviour by specifying a pre-packaged custom payload class.

By setting .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, “org.apache.hudi.common.model.DefaultHoodieRecordPayload”) while writing/upserting data to hudi dataset will use the values in pre-combine key to sort/de-dup the events/records

Interestingly, we have built our own custom payload class that let’s us update partial records based on the partial new information that our pipeline may get [doc]. This is specially useful when we are creating Near-RealTime OLAPs using Apache Hudi’s Incremental consumption and upsert capabilities. In many cases it eliminates the need of a slow/batch layer to incorporate updates. More details on this in later blogs.

Next?

Head over to our next blog to catch some exciting action on how we have applied Apache Hudi to optimize de-duplication and data versioning in our daily batch ingestion pipelines.

Series

Apache Hudi — The Basics https://medium.com/@parth09/apache-hudi-the-basics-5c1848ca12e0

Hudifying the Datalake (Daily Batch) https://medium.com/@parth09/hudifying-the-datalake-daily-batch-e7b3d7ec8229

Hourly OLAPs & Infinite playback Event Streams https://medium.com/@parth09/hourly-olaps-infinite-playback-event-streams-62777aefa8b0

--

--