Data Lakehouse: Building the Next Generation of Data Lakes using Apache Hudi

An introduction to Data Lakehouses, its benefits, and a guide to build your own

Ryan D'Souza & Brandon Stanley
Slalom Build
14 min readMar 1, 2021

--

Summary

In this article, we introduce a modern data architecture paradigm known as the Data Lakehouse. The Data Lakehouse provides various advantages over traditional data lakes. We illustrate how the challenges of scalability, data quality, and latency faced by a client were addressed by modernizing their data platform and incorporating a lakehouse into their architecture. By the end of this article, you will be equipped with the fundamental knowledge to implement a Data Lakehouse using Apache Hudi.

Preface

With the evolution of IoT, cloud applications, social media, and machine learning in the past decade, the volume of data being collected by companies has increased exponentially. Simultaneously, the demand for high-quality data has shifted from frequencies of days and hours to a matter of minutes and seconds.

For several years, data lakes have served their purpose as a repository for storing raw and enriched data. However, as they’ve matured, enterprises have realized that the process of maintaining high quality, up-to-date, and consistent data is onerous. In addition to the complexity of ingesting incremental data, populating a data lake also requires business context and heavy reliance on interdependent batch processes. The following are key challenges of modern data lakes:

  1. Query-based Change Data Capture: The most common approach for extracting incremental source data is using a query that relies on a defined filtering condition. This causes issues when a table does not have a valid field for pulling data incrementally, adds unforeseen load on the source database, or the query doesn’t capture every database change. Query-based CDC does not include deleted records because there is no easy way to determine whether records have been deleted via a query. Log-based CDC is the preferred approach for CDC and addresses the aforementioned challenges. The approach is discussed further in the article.
  2. Incremental data processing in a data lake: The ETL process responsible for updating the data lake must read all the existing files in the data lake, make the changes, and rewrite the entire dataset as new files (as there is no easy way to update a specific file in which a record may reside for updates and deletes).
  3. Lack of support for ACID transactions: The inability to enforce ACID compliance can lead to inconsistent results when there are concurrent readers and writers.

The challenges above are further complicated by the increase in data volume and the frequency at which this data needs to be kept up to date. Efforts from Uber, Databricks, and Netflix have resulted in solutions aiming to address the challenges that data engineers face. Apache Hudi (Uber), Delta Lake (Databricks), and Apache Iceberg (Netflix) are incremental data processing frameworks meant to perform upserts and deletes in the data lake on a distributed file system, such as S3 or HDFS.

The culmination of the efforts from Uber, Databricks, and Netflix has resulted in the next generation of Data Lakes meant to deliver up-to-date data in a scalable, adaptable, and reliable manner — the Data Lakehouse.

What is a Data Lakehouse?

The evolution from Data Warehouses to Data Lakehouses

Simply put: Data Lake + Data Warehouse = Data Lakehouse.

Traditional data warehouses are designed to provide a platform for storing historical data that has been transformed/aggregated for specific use cases/data domains to be used in conjunction with BI tools to derive insights. Typically, data warehouses contain only structured data, are not cost-effective, and are loaded using batch ETL jobs.

Data Lakes were introduced to overcome some of these limitations by supporting structured, semi-structured, and unstructured data with low-cost storage, as well as enabling both batch and streaming pipelines. In comparison to data warehouses, data lakes contain raw data in multiple storage formats, which can be used for current and future use cases. However, data lakes still have limitations, including transaction support (which makes it difficult to keep data lakes up to date) and ACID compliance (which prevents concurrent reads and writes).

Data lakehouses reap the low-cost storage benefits of data lakes, such as S3, GCS, Azure Blob Storage, etc., along with the data structures and data management capabilities of a data warehouse. It overcomes the limitations of data lakes by supporting ACID transactions and ensuring consistency of the data as it is concurrently read from and updated. Additionally, lakehouses enable data consumption at lower latency and higher velocity than a traditional data warehouse because the data can be directly queried from the data lakehouse.

Key features of a data lakehouse (referenced from What is a Data Lakehouse?) :

  • Transaction support
  • Schema enforcement and governance
  • BI support
  • Storage is decoupled from compute
  • Openness
  • Support for diverse data types ranging from unstructured to structured data
  • Support for diverse workloads
  • End-to-end streaming

In order to build a data lakehouse, an incremental data processing framework, such as Apache Hudi, is required.

What is Apache Hudi?

Apache Hudi, which stands for Hadoop Upserts Deletes Incrementals, is an open-source framework developed by Uber in 2016 that manages the storage of large datasets on distributed file systems, such as cloud stores, HDFS, or any other Hadoop FileSystem compatible storage. It enables atomicity, consistency, isolation, and durability (ACID) transactions in a data lake.

Hudi’s transactional model is based on a timeline containing all actions performed on a table at different instances. It provides the following features (referenced from Apache Hudi):

  • Upsert support with fast, pluggable indexing.
  • Atomic publish with rollback, save points.
  • Snapshot isolation between writer & queries.
  • Manages file sizes, layout using statistics.
  • Asynchronous compaction of row & columnar data.
  • Timeline metadata to track lineage.

Use Cases

  1. Upserting/Deleting Target Data using Change Data Capture

Change data capture (CDC) refers to the process of identifying and capturing changes made in a source database. It replicates changes from a source database to a destination, in this instance a data lakehouse. This is especially important for capturing insert, update, and delete operations into the target tables.

There are three most commonly used CDC methods:

CDC Methods: Pros & Cons

Based on our experience, log-based CDC provides the best results for replicating changes from the source databases into the data lakehouse. One of the primary reasons for this is its ability to capture deleted records that cannot be captured otherwise using query-based CDC without the existence of a deletion flag in the source data.

Hudi can process change data capture by finding the appropriate files in an existing dataset and rewriting them to include any changes. Also, it provides the ability to query a dataset based on a particular point in time and the option to revert to a previous version.

With the prevalence of near real-time data ingestion using CDC tools, such as Oracle GoldenGate, Qlik Replicate (formerly Attunity Replicate), and DMS, the ability to apply these changes on existing datasets is essential.

2. Privacy Regulations

Recent privacy regulations, such as GDPR, require companies to have the ability to perform record-level updates and deletes to satisfy a person’s right to be forgotten. With support for deletions in Hudi datasets, the process of updating or deleting information for a specific user or within a specific timeframe is simplified significantly.

Types of Tables: Copy on Write vs. Merge on Read

Copy on Write: Data is stored in Parquet file format (columnar storage), and each update creates a new version of files during a write. This storage type is most suitable for read-heavy batch workloads as the latest version of the dataset is always available.

Merge on Read: Data is stored as a combination of Parquet (columnar storage) and Avro (row-based storage) file formats. Updates are logged to row-based delta files until compaction, which will produce new versions of the columnar files. This storage type is better suited for write-heavy streaming workloads as the commits are written as delta files, while reading the dataset requires compaction to merge the Parquet and Avro files.

A general rule of thumb: For tables that are only updated via batch ETL jobs, use Copy on Write. For tables that are updated via streaming ETL jobs, use Merge on Read. For more details, refer to How do I choose a storage type for my workload from Hudi’s documentation.

Types of Queries: Snapshot vs. Incremental vs. Read-Optimized

Snapshot: The latest snapshot of the table as of a given commit/compaction action. For Merge on Read tables, a snapshot query will merge the base files and the delta files on the fly; therefore, latency is expected.

Incremental: The changes in a table since a given commit/compaction.

Read-optimized: The latest snapshot of the table as of a given commit/compaction action. For Merge on Read tables, read optimized queries return a view that only contains the data in the base files without merging delta files.

Advantages of Hudi (over custom implementations):

  • Addresses data quality issues like duplicate records, missed updates, etc., which are commonly found in traditional incremental batch ETL pipelines.
  • Provides support for real-time pipelines.
  • Anomaly detection, machine learning use cases, real-time suggestions, etc.
  • Creates a mechanism (timeline) that can be used to track changes.
  • Provides native support for querying via Hive and Presto.

Equipped with an incremental data processing framework to implement a data lakehouse, we set forth on designing a solution to overcome the key challenges faced by a client looking to enhance their data platform.

Problem

We engaged with a client who needed an improved, cost-effective, and scalable data platform that would enable their data scientists to make near real-time predictions using the most current data and at lower latency to derive better insights. Their existing batch ETL processes ran on a delayed schedule and were compute-intensive as it required reprocessing the entire data set to hydrate their data lake. In addition, these processes used a query-based CDC method, which resulted in the inability to capture every source system change. The combination of stale and inaccurate data resulted in a lack of trust in the data platform, eliminating any immediate value that could have been derived from the data.

Solution

Using a log-based CDC tool (Oracle GoldenGate), Apache Kafka, and an incremental data processing framework (Apache Hudi running on AWS), we built a data lakehouse on AWS S3 to reduce latency, improve data quality, and support ACID transactions.

Target Architecture

Data Lakehouse Target Architecture

Environment

Oracle GoldenGate was used as the log-based CDC tool to extract data (i.e., transactions) from source system logs due to the client’s existing adoption of the Oracle GoldenGate product family. The logs are replicated to Kafka in near real-time, from where the messages are read and merged into the data lakehouse in Hudi format.

Apache Hudi was selected as the incremental data processing framework due to its integration with AWS EMR and Athena, making it the ideal candidate for this particular solution.

Implementation Steps

Step 1: Replicate source data using Oracle GoldenGate

As previously discussed, log-based CDC is the optimal solution since it serves both batch and streaming use cases. There is no longer the need to have separate ingestion patterns for batch and streaming sources. Traditionally, batch workloads would leverage a SQL statement, which would run on some desired frequency. Instead, log-based CDC enables capturing any change, which is then replayed to the desired destination (i.e., Kafka). The decoupling of the extraction from ingestion introduces the flexibility to ingest the incremental data based on how frequently the data needs to be updated within the data lakehouse. This can minimize costs because the data can be retrieved from Kafka within the defined retention period.

Oracle GoldenGate is a data replication tool used to capture transactions from source systems and replicate them into a target, such as Kafka topics or another database. It works by leveraging the database transaction log, which records everything that happens within a database. OGG will read and push the transactions to the specified target. GoldenGate supports several relational databases, including Oracle, MySQL, DB2, SQL Server, and Teradata.

In this solution, the changes are streamed from the source databases into Kafka using Oracle GoldenGate, which follows a three-step process:

  1. Extract the data from source database trail logs via Oracle GoldenGate 12c (classic version): Transactions occurring against the source databases extracted in real-time, stored in an intermediate log format (trail log).
  2. Pump the trail logs to a secondary remote trail log: The extracted trail logs are pumped to another trail log (managed by Oracle GoldenGate for Big Data 12c instance).
  3. Replicate the trail logs to Kafka via Oracle GoldenGate for Big Data 12c using the Kafka Connect Handler: The pumped transactions are received and replicated in Kafka messages. This process serializes (with or without Schema Registry) the Kafka messages and performs type conversions (if required) to the messages replayed from the transactions logs before publishing to Kafka.
Oracle GoldenGate Replication, Source: https://dzone.com/articles/creates-a-cdc-stream-from-oracle-database-to-kafka

Note: By default, updated records only contain the columns that are updated when replicated via GoldenGate. To ensure that incremental records can be merged into the data lakehouse with minimal transformation (i.e., the entire record with all columns is replicated), Supplemental Logging must be enabled. This includes the “before” and “after” images for each record.

GoldenGate replication includes an “op_type” field that indicates the type of database operation from the source trail file: I for insert, U for update, D for delete. This field is beneficial in determining how to upsert/delete a record in the data lakehouse.

The following is a sample insert record:

Oracle GoldenGate: Sample Insert Record

Note: The GoldenGate record contains a null before image and a non-null after image.

Sample update record

Note: The GoldenGate update record contains a non-null before image and a non-null after image.

Sample delete record

Note: The GoldenGate delete record contains a non-null before image and a null after image.

Step 2: Capture the replicated data within Kafka

The target for the GoldenGate replication is Kafka. Since GoldenGate for BigData will replicate the records to Kafka via the Kafka Connect Handler, schema evolution and the additional features offered through Schema Registry are supported.

Why Kafka? There are two primary reasons why Kafka serves as the intermediary layer between the CDC tool and the data lakehouse.

The first reason is GoldenGate cannot directly replicate the CDC data from the source databases to the Lakehouse in Apache Hudi format (because it is a Spark-based processing engine). The existing integration between Kafka and Spark Structured Streaming makes it ideal to stage the incremental records in Kafka, which can then be processed and written in Hudi format.

The second reason is to address consumers that require near real-time latency, such as detecting and avoiding the loss of subscribers to a service based on a set of transactions.

Step 3: Read the data from Kafka and write to S3 in Hudi format

The Spark Structured Streaming jobs perform the following operations:

  1. Read the records from Kafka.
Sample code for reading records from a Kafka topic.

2. Deserialize the records using Schema Registry.

Note: Any data in the Kafka topics serialized using Confluent Avro format cannot be deserialized natively using Spark APIs, preventing any downstream processing of that data required to populate the data lakehouse. This was the case for records replicated using GoldenGate. ABRiS is a Spark library that makes it possible to deserialize Kafka records in Confluent Avro format against a schema in Schema Registry. The version of ABRiS used in this solution is 3.2. The following video (@12:34) discusses this in further detail:

Sample code for deserializing Confluent Avro serialized records from Kafka.

3. Extract the required before/after image based on the Oracle GoldenGate “op_type” & write the records to the data lakehouse in Hudi format.

The Spark code uses the “op_type” field from the GoldenGate record to segregate the batch of incoming records into two groups: one containing inserts/updates and the second containing deletes. This is done so that the Hudi write operation configuration can be set accordingly. Further transformations subsequently extract the appropriate before or after image. The final step is to set the appropriate Hudi properties mentioned below and then write the upserts and deletes in Hudi format to the desired location in S3 via the foreachBatch Spark Structured Streaming API in a streaming or batch mode.

Important Hudi Properties

hoodie.datasource.write.precombine.field: The precombine field for a table is a mandatory configuration and cannot be nullable (i.e., not present for a record) within a table. This may cause issues when a data source does not contain a valid field used for tie-breakers. If a data source does not meet this requirement, it may be worth implementing custom deduplication logic for these tables.

hoodie.datasource.write.keygenerator.class: Set this value to org.apache.hudi.keygen.ComplexKeyGenerator for tables containing a composite key or partitioned by more than one column.

Set this value to org.apache.hudi.keygen.NonpartitionedKeyGenerator for non-partitioned tables.

hoodie.datasource.hive_sync.partition_extractor_class: Set this value to org.apache.hudi.hive.MultiPartKeysValueExtractor for creating a Hive table partitioned by more than one column.

Set this value to org.apache.hudi.hive.NonPartitionedExtractor for creating a non-partitioned Hive table.

hoodie.index.type: By default, this is set to BLOOM, which will only enforce the uniqueness of a key within a single partition. Use GLOBAL_BLOOM to enforce uniqueness across all partitions. Hudi will compare incoming records to files across the entire dataset to ensure a recordKey is only present in a single partition. Expect latency for very large datasets.

hoodie.bloom.index.update.partition.path: Ensure this is set to False for delete operations (if the GLOBAL_BLOOM index is used).

hoodie.datasource.hive_sync.use_jdbc: Set this value to False to sync the table to the Glue Data Catalog (if required).

Refer to the Apache Hudi configurations page for the full list of configurations and the Apache Hudi FAQ Page for any other inquiries.

Note (for using Apache Hudi with AWS Glue)

The hudi-spark-bundle_2.11–0.5.3.jar available on Maven will not work as-is with AWS Glue. Instead, a custom jar needs to be created by altering the original pom.xml.

  1. Download and update the contents of the pom.xml.

a) Remove the following line from the <includes> tag:

b) Add the following lines to the <relocations> tag:

2. Build the JAR:

The JAR built using the command above (located in “target/hudi-spark-bundle_2.11–0.5.3.jar” where the command was executed) can then be passed in as a Glue job parameter.

Once these three steps are in place, the data lakehouse is ready for use. The data can be consumed from the Raw S3 bucket using one of the query methods available through the Apache Hudi APIs mentioned above.

Conclusion

The result of the solution successfully addresses the challenges faced by traditional data lakes:

  1. Log-based CDC is a more reliable mechanism for capturing database transactions/events.
  2. Apache Hudi adopts the responsibility (previously owned by data platform owners) of updating the target data within a data lakehouse by managing the indexes and pertinent metadata required to hydrate a data lakehouse at scale.
  3. The support of ACID transactions removes the concern surrounding concurrent operations because Apache Hudi APIs will handle multiple readers and writers without producing inconsistent results.

As more enterprises adopt data platforms and enhance their data analytics/machine learning capabilities, the importance of the underlying CDC tools and pipelines serving the data must evolve to address some of the most commonly faced challenges. The improvements in the scalability, data latency, and the overall quality of the data delivered to downstream consumers showcase that the data lakehouse paradigm is the next generation of data platforms. This will serve as the foundation through which enterprises will derive greater value and insights from their data.

We hope this article has equipped you with the toolkit to build your own data lakehouse using Apache Hudi. Good luck, keep building, and let us know how it turns out!

--

--