Hourly Incremental OLAPs & Infinite playback Event Streams with Apache Hudi

Parth Gupta
6 min readOct 11, 2021

--

In this blog we will discuss about how we leveraged Hudi’s two most incredible super powers when it comes to building a streaming data platform.

  1. Incremental consumption — to process data every 30 mins and creating 1st hourly OLAP within our organization
  2. Infinite Playback of Event Stream — Storing 10 days worth of event stream (imagine a heavy kafka topic with 10 days retention) in the super cheap cloud object storage (like AWS S3) by exploiting Hudi’s commit timeline.
  3. Custom Hudi Payload class featuring Partial Record update

Current State

Problem Statement

For most of the usecases where business needs a manual intervention in deciding the next set of actions by looking at the KPIs and data trends and for other not so exactly real-timeish usecases, we need Near Real-Time systems and processes that are cost efficient and performant.

However, the data that we get in our data lake usually comes in daily batch granularity at D-1. Even if we run these daily batch data processing systems more than once a day to get fresher data at current D, the inherent limitations of these batch systems do not help us is solving the Near-RealTime business usecases.

Challenges

While ingesting the batch data to our datalake, our S3 backed datasets are partitioned on daily updated_date partitions. Even if we run these batch systems multiple times a day, the latest batches that we pull from an upstream Kafka or RDBMS application db is append into the current date’s partition in our S3 datasets.

When a downstream system wants to fetch these latest records from our S3 dataset, it needs to re-process the whole current day as there is no way the downstream process can make out the already processed records from incrementally added records without scanning the entire data partition.

Furthermore, if we partition our S3 datasets on hourly bases (instead of daily partitions) then this would set the partition granularity at hourly interval. Any downstream job trying to fetch latest updates at a granularity lower than an hour (say last x-mins) will have to again re-process hourly data partitions on every run. I.e, These batch sources will miss out on the crucial incremental data consumption capability needed in order to solve the N-RT usecases.

Infinite playback Event Streams

Now coming back to the features of Apache Hudi that help us solve these challenges, Let’s first try to understand how commits and commit timeline affect incremental consumption and event stream retention/playback.

Hudi maintains a timeline of all actions performed on the table at different instants of time. These commits contains information about the part-files that got inserted or re-written as part of upsert. We call this Hudi’s Commit Timeline.

For each Hudi table, we have an option to specify how many historical commits we want to keep. The default commits to keep is 10. I.e. after 10 commits, the 11th commit will additionally run a cleaner process that will clean the 1st commit history.

When a commit is cleaned, the cleaner simple cleans the outdated version of part-files corresponding to that commit. The associated data is preserved as all data in outdated part-files are anyways present in new-versioned part files. The important thing here is that we can fire snapshot query to fetch latest state of data but we won’t be able to run incremental query over cleaned commits to get incremental data.

In short, if a commit is cleaned, we loose the ability to playback the event stream from that commit. However, we still can playback event stream from any commit that is yet not cleaned.

In our case, we have configured our Hudi table to keep 10K commits to provide us 10 days worth of incremental read capability (similar to a kafka topic with 10 days retention)

The more the no of historical commits we keep, the more we have the ability to go back in time and replay the event stream.

Hourly OLAPs

Let me quickly show an overview of how our end to end Hourly olap computation pipeline with 10 days event stream looks like

Hourly OLAPs computation Pipeline

In the kafka layer, we have our kafka input sources each having a topic retention of 1 day.

In the ingestion layer, we have our spark structured streaming jobs reading from kafka sources and writing micro-batches to S3 backed Hudi table. This is where we have configured to keep 10k commits to enable 10 days event stream playback.

.option("hoodie.cleaner.commits.retained", 10000)
.option("hoodie.keep.max.commits", 10002)
.option("hoodie.keep.min.commits", 10001)

The compute layer consists of batch spark jobs that we are currently running every 30 mins and reprocessing all events that we have ingested into Hudi tables in last 60 mins. The hourly OLAP job reads in two transnational tables and optional N dimensional tables and joins them all to prepare our OLAPs incremental dataframe.

We are processing data worth 60 mins every 30 mins to enhance table join consistency.

Interestingly, 1 day kafka retention is usually not recommended in production systems. However, we are able to made this trade-off to save some SSD and Kafka broker costs as we anyways have 10 days event stream playback capability made possible by our S3 backed Hudi table.

Partial Record Updates

The pipeline above shows how we are creating the hourly incremental OLAPs by reading & merging two incremental upstream data sources.

However, these incremental data processing has its own set of challenges. It may happen that out of the two upstream tables, for a primary key, we get update in one of the data source but not in the other. We call this mis-matched transaction problem.

Below illustration tries to help us understand this challenge and also look at the solution implemented by us.

Here, both table A and B have some corresponding matching transactions and some mis-matched transactions. Using an inner join will simply ignore the mis-matched transactions which, then, might never flow into our base OLAP. Instead, using an outer join will incorporate the mis-matched transaction in our Hourly Incremental data load.

However, using an outer-join will add the missing column values with null. Now these nulls will need a separate handling.

On upserting this Hourly Incremental data to the base Hudi OLAP using the default payload class, it will simply overwrite the records in base Hudi OLAP with the new records in hourly incremental data that we have prepared. But in this way, we will loose the information that might already be there in the existing record when we overwrite the same with the null column values in the incoming record.

Thus, to solve this problem, we supply our custom partial row update payload class while upserting the outer-joined Hourly Incremental data to base Hudi OLAP.

A payload class defines functions that control how we merge new and old record while updating a record.

Our custom payload class compares all columns of the stored and incoming record and returns a new record by overlapping null columns in one records with the not-null columns in other record.

Thus, even if only one of the upstream table has got an update, this partially available new information is utilised by our custom payload class and it returns the completely up-to-date record with partially updated information incorporated into it.

Since primary key and partition key of the stored and partial row updated records are same, Hudi upsert operation automatically updates the old record to give us a de-duplicated and consistent view of the base OLAP.

More technical details here on how to write your own payload class.

Closing Notes

Combining these three concepts, namely incremental consumption, incremental Hourly OLAP processing and a custom partial row updates payload class, we have built a robust stream processing platform for our unicorn startup to scale all it way to becoming a hectacorn organisation.

Series

Part 1: Apache Hudi — The Basics https://medium.com/@parth09/apache-hudi-the-basics-5c1848ca12e0

Part 2: Hudifying the Datalake (Daily Batch) https://medium.com/@parth09/hudifying-the-datalake-daily-batch-e7b3d7ec8229

Part 3: Hourly OLAPs & Infinite playback Event Streams https://medium.com/@parth09/hourly-olaps-infinite-playback-event-streams-62777aefa8b0

Kudos to the team Sanket Duhoon, Robin Gahlot & Ritu Parno Behera

--

--