Hudifying the Datalake (Apache Hudi for Batch Data Processing)

Parth Gupta
5 min readOct 11, 2021

--

Current State

Datalake ingestion & Compute process — Handling updates

  1. In our use cases data ranging from 1–10% are updates on historical records.
  2. When a record updates, we need to delete the previous entry from a previous updated_date partition and add the entry in latest partition
  3. In absence of delete & upsert capabilities, we have to re-read the whole historic table partition -> de-duplicate the data -> overwrite that complete table partition with the new de-duplicated data

Challenges in the current batch process

This process works but has its own share of flaws:

  1. Time & Money — The whole historic table needs to be overwritten every day
  2. Data Versioning — No out of the box data & manifest versioning (Rollbacks, Concurrent Reads & Writes , Point in Time query, time travel and related features are not present)
  3. Write Amplification — External(or self managed) data versioning in a daily historical data overwrite scenario increases the write amplification, thereby taking way more S3 storage

With Hudi, we expected to find better optimized solution of data de-duping and data versioning while ingesting the data into datalake.

HUDI-fying the Datalake — Query Patterns

When we began our journey of implementing Apache HUDI (we call it Hudifying the datalake) on our datalake, we classified the tables in 2 classes based on the query pattern of the primary user of the table.

  1. ETL Facing : This refers to most of the raw/base snapshot tables that we ingest from various production systems into the datalake. If the tables are getting used extensively by ETL jobs, then we kept the daily data partitioned at updated_date. This is so that downstream jobs can simply read the latest updated_at partition and (re)process the data.
  2. Analyst Facing: generally includes dimension tables and most of the computed OLAPs that are queried by Business Analysts. BAs usually need to see data based on transaction(or event) created_date and are not much concerned with updated_date.

Here is a sample ecommerce Order data flow from ingestion into the datalake to OLAP creation and then finally to a Business Analyst querying it

Hierarchical view of ETL optimized & Analyst Optimized tables in Datalake

As the date partition column is different for both type of tables, we employed different strategy for solving both these usecases.

Analyst Facing Tables/OLAPs (partitioned by created_date)

In HUDI, we need to specify the partition column and the primary key column so that hudi can handle updates and deletes for us.

Below is the logic how we handled updates and deletes in analyst facing tables:

  1. Read the D-n updated_date partitions of upstream data.
  2. Apply data transformations. Now this data will have only the new inserts and few updated records.
  3. Issue hudi “upsert” operation to upsert the processed data to destination Hudi table.

As both, primary key and created_date remains same for the exiting and incoming record, Hudi gets to the existing record’s partition and partition file path by using this information from incoming records created_date and primary_key column.

ETL Facing Tables (partitioned by updated_date)

When we started with Hudi, after reading many blogs and documentation, it seemed logical to partition ETL facing tables on created_date.

Also, Hudi provides incremental consumption capabilities which allows us to partition our tables on created_date and also fetch only those records which were upserted (Inserted or Updated) on D-1 or D-n.

Challenges with “created_date” partitioning

This approach works fine in theory but while retro-fitting incremental consumption in legacy daily batch processes, it posed other set of challenges:

Hudi maintains a timeline of all actions performed on the table at different instants of time. These commits contains information about the part-files that got inserted or re-written as part of upsert. We call this Hudi tables Commit Timeline.

The important information to note here is that incremental query is based on commit timeline and is not dependent on the actual updated/created date information present within the data records.

  1. Cold Start: when we migrate an existing upstream table to Hudi, a D-1 Hudi incremental query would fetch complete table instead of just D-1 updates. This happens because in the start, the whole table is created with either a single initial commit or multiple commits happening within the D-1 commit timeline and lacks the true incremental commit info.
  2. Historic Data ReIngest: In every regular incremental D-1 pull, we are expecting only records that got updated on D-1 as output. However, in case of historical data re-ingest, again a problem similar to previously described cold-start problem would occur and the downstream job can also give OOM.

As a workaround for ETL facing jobs, we tried to keep the data partitioning at updated_date itself. However, this approach came with its own set of challenges.

Challenges with “updated_date” partitioning

As we know about Hudi table’s Local Index, Hudi relies on the Indices to fetch the Row-to-Part_file mapping which is stored within the data partition’s local directory. Therefore, for Hudi to automatically de-duplicate a record across the partitions is not possible in case our table is partitioned on updated_date.

Hudi’s global Index strategy requires us to keep an internal or external index to maintain data de-duplicacy across partitions. With large data volumes, in tune of~200M records/day, this approach works either very slow or simply fails with OOM.

Therefore, to solve this data-duplication challenge updated_date partitioning, we came up with a brand new de-duplication strategy that is also performant.

The “New” De-Duplication Strategy

  1. Find Updates — From daily incremental load, filter out only the updates (1–10% of DI data)(where updated_date> created_date)(Fast, Map-only operation)
  2. Find Stale Updates — Broadcast join these “updates” with the downstream hudi base table. A performant Broadcast join becomes possible as we are only taking updated records(just 1–10% of Daily Incremental). This gives us all the existing records from base hudi table corresponding to the updated records
  3. Delete Stale Updates — Issue a hudi delete command on these “stale updates” on base hudi table path
  4. Insert DI — Issue a hudi insert command on the complete Daily Incremental load on base hudi table path

**Further optimisation, populate _hoodie_is_deleted column in stale updates with true and union it with daily incremental load. Issue an upsert command of this data over base hudi table path. It will perform both, insert and delete, in a single operation(and single commit).

Apache Hudi Advantages

  1. Time & Cost — Hudi doesn’t overwrite the whole table while de-duplicating data. It just re-writes the part files that received updates. Hence smaller upsert jobs
  2. Data Versioning — Hudi preserves table versions (Commit history) and hence provides as of time queries(time travel) and table version rollback capabilities.
  3. Write Amplification — Since only part files are changed and preserved for data manifest versioning, we don’t need to keep versions of complete data. Hence, overall write amplification is minimum.

As an additional benefit of data versioning, It solves the concurrent Read & Write issue as data versioning makes it possible for a concurrent reader to read the versioned copy of data files and not throw a FileNotFoundException when the concurrent writer overwrites that same partition with new data files.

Series

Part 1: Apache Hudi — The Basics https://medium.com/@parth09/apache-hudi-the-basics-5c1848ca12e0

Part 2: Hudifying the Datalake (Daily Batch) https://medium.com/@parth09/hudifying-the-datalake-daily-batch-e7b3d7ec8229

Part 3: Hourly OLAPs & Infinite playback Event Streams https://medium.com/@parth09/hourly-olaps-infinite-playback-event-streams-62777aefa8b0

Kudos to the team Sanket Duhoon, Robin Gahlot & Ritu Parno Behera

--

--