Building a Data Lakehouse with AWS EMR and Apache Hudi: A CDC Use Case — part1/2

Amar_Kumar
11 min readSep 14, 2023

--

In the ever-evolving landscape of data storage and processing, three distinct solutions have emerged as game-changers: Data Lakes, Data Lakehouses, and Delta Lakes. While each of these technologies holds its own promise, the choice between them can significantly impact how you handle your data. To shed light on the matter, we will explore these data storage paradigms and dive into a specific use case related to Change Data Capture (CDC)

Table of contents :

Overview:

· Understanding Data Lakehouses(Brief Recap)
· Meet Apache Hudi
· Specify the table type In Hudi: Copy on Write Vs. Merge on Read
· HUDI Copy on Write (COW)
· HUDI MERGE ON READ(MOR)
· Implementing CDC using AWS EMR and Apache HUDI
Prerequisites
· Lets get started with the implementation
1. Setting up the HUDI parameters for the use case.
2. Generating a dummy dataset
3. Copy on Read (COW) implementation
4. Use Spark SQL to read the COW table
5. Lets perform batch update operation
6. Lets perform batch insert operation
7. Lets perform delete operation
Code
· Conclusion

Understanding Data Lakehouses(Brief Recap)

A Data Lakehouse is a modern data architecture that combines the best of both Data Lakes and data warehouses. It offers the ability to store vast amounts of raw data while providing structure and organization for efficient querying and analytics. Data Lakehouses address the challenges of data quality, consistency, and performance that Data Lakes sometimes encounter.

Key features of Data Lakehouses:

  • Schema on Read: Like Data Lakes, Data Lakehouses allow you to store data without a predefined schema, enabling flexibility in data ingestion.
  • Structured Querying: Data Lakehouses introduce structured tables and indexes, facilitating SQL-based querying and making data more accessible for analysts and data scientists.
  • CDC Integration: Data Lakehouses are well-suited for Change Data Capture (CDC) use cases, where real-time data changes must be tracked and processed.

Simply put: Data Lake + Data Warehouse = Data Lakehouse.

Data Lakehouse Overview

Meet Apache Hudi

Apache Hudi (Hadoop Upserts Deletes and Incrementals) is an open-source data management framework that simplifies the process of managing large-scale, incremental data. It provides support for both real-time and batch data processing and is particularly valuable for CDC operations.

Key features of Apache Hudi:

  • ACID Compliance: Hudi ensures Atomicity, Consistency, Isolation, and Durability (ACID) compliance for data operations, critical for maintaining data integrity during CDC.
  • Incremental Processing: Hudi allows for efficient incremental data ingestion and processing, making it suitable for real-time data changes.
  • Time Travel: The framework offers time-travel capabilities, enabling you to access data at different points in time, essential for historical analysis and auditing.

There Is An excellent post explaining the which table type to be used in Apache Hudi. Do refer to this AWS blog to understand the intricacies of both the table and understand the key difference

https://aws.amazon.com/blogs/big-data/part-1-build-your-apache-hudi-data-lake-on-aws-using-amazon-emr/

Specify the table type In Hudi: Copy on Write Vs. Merge on Read

HUDI Copy on Write (COW)

This is table type where commit table are fully merged into main table during write operation.

  • For updates, the latest version of the that file id, is rewritten once, with new values for all records that have changed.
  • For inserts, the records are first packed onto the smallest file in each partition path, until it reaches the configured maximum size.
Hudi COW Table type

2 Types of Queries Possible:

  1. Real Time/Snapshot queries — As soon as merge happens
  2. Incremental Queries — Based on Commit_time

When to Use?

1. COW is the default storage type and is preferred for simple read-heavy use cases

2. Tables with a lower ingestion rate and use cases without real-time ingestion

3. Use cases requiring the freshest data with minimal read latency because merging cost is taken care of at the write phase

4. Append-only workloads where existing data is immutable

Process FLOW: HUDI DATA LAKE — COPY on Write Approach

HUDI MERGE ON READ(MOR)

· In this table type, records written to the source file are quickly first written to log-files which are later merged with the source file using compaction action on the timeline.

· DELTA_COMMIT: Each new write creates incremental log files for updates, which are associated with the base Parquet files. For inserts, it creates a new version of the base file like CoW. Each write adds a delta commit action to the timeline.

· At a high level, MOR writer goes through same stages as COW writer in ingesting data. The updates are appended to latest log (delta) file belonging to the latest file slice without merging.

HUDI MERGE ON READ(MOR)

3 types of queries are possible:

  1. Read Optimized — Read-optimized queries show the latest compacted data, which doesn’t include the freshest updates in the not yet compacted log files.
  2. Real Time/Snapshot queries — As soon as merge happens
  3. Incremental Queries — Based on Commit_time

When to Use?

  1. MoR is recommended for tables with write-heavy and update-heavy use cases.
  2. Faster ingestion requirements and real-time ingestion use cases.
  3. Varying or bursty write patterns (for example, ingesting bulk random deletes in an upstream database) due to the zero-merge cost for updates during write time.
  4. Streaming use cases.
  5. Mix of downstream consumers, where some are looking for fresher data by paying some additional read cost, and others need faster reads with some trade-off in data freshness.
Process FLOW: HUDI DATA LAKE — MERGE on READ Approach

Implementing CDC using AWS EMR and Apache HUDI

Prerequisites

Configure AWS EMR for Apache Hudi

  1. So when you install spark on EMR cluster it comes with hudi already installed but it is local filesystem we need to copy it hdfs file system using the following step after that only we can run it in notebooks while you configure EMR :

In EC2 master instances connect it using SSH and implement the following commands to copy Hudi Jar file from its local filesystem to hdfs file system.

## Make a lib folder inside the app folder on HDFS
hdfs dfs -mkdir -p /apps/hudi/lib

## Copy the Hudi Jar from local to HDFS
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar

## Copy the spark-avro jar from local to HDFS
hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar

Configure on the EMR notebook to use HUDI

%%configure -f
{ "conf": {
"spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar",
"spark.serializer":"org.apache.spark.serializer.KryoSerializer",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
}}

Lets get started with the implementation

1. Setting up the HUDI parameters for the use case.

Configuring the parameters can be tricky and need to have the understanding so it can be changed accordingly . Please refer to below documentation once to have a quick overview of parameter's and to take maximum advantage HUDI

Link: https://hudi.apache.org/docs/configurations

# General Constants - Basic Configuration
HUDI_FORMAT = "org.apache.hudi"
TABLE_NAME = "hoodie.table.name"
RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
UPSERT_OPERATION_OPT_VAL = "upsert"
BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
S3_CONSISTENCY_CHECK = "hoodie.consistency.check.enabled"
HUDI_CLEANER_POLICY = "hoodie.cleaner.policy"
KEEP_LATEST_COMMITS = "KEEP_LATEST_COMMITS"
HUDI_COMMITS_RETAINED = "hoodie.cleaner.commits.retained"
PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class"
EMPTY_PAYLOAD_CLASS_OPT_VAL = "org.apache.hudi.common.model.EmptyHoodieRecordPayload"

# Hive Constants - Needed
HIVE_SYNC_ENABLED_OPT_KEY="hoodie.datasource.hive_sync.enable"
HIVE_PARTITION_FIELDS_OPT_KEY="hoodie.datasource.hive_sync.partition_fields"
HIVE_ASSUME_DATE_PARTITION_OPT_KEY="hoodie.datasource.hive_sync.assume_date_partitioning"
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY="hoodie.datasource.hive_sync.partition_extractor_class"
HIVE_TABLE_OPT_KEY="hoodie.datasource.hive_sync.table"

# Partition Constants - For big dataset
NONPARTITION_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.NonPartitionedExtractor"
MULTIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.MultiPartKeysValueExtractor"
KEYGENERATOR_CLASS_OPT_KEY="hoodie.datasource.write.keygenerator.class"
NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.keygen.NonpartitionedKeyGenerator"
COMPLEX_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.ComplexKeyGenerator"
PARTITIONPATH_FIELD_OPT_KEY="hoodie.datasource.write.partitionpath.field"

#Incremental Constants
VIEW_TYPE_OPT_KEY="hoodie.datasource.view.type"
BEGIN_INSTANTTIME_OPT_KEY="hoodie.datasource.read.begin.instanttime"
VIEW_TYPE_INCREMENTAL_OPT_VAL="incremental"
END_INSTANTTIME_OPT_KEY="hoodie.datasource.read.end.instanttime"

2. Generating a dummy dataset

Timestamp will be required as HUDI uses it for its internal computation based on Commit time.

from datetime import datetime
dest = ["Seattle", "New York", "New Jersey", "Los Angeles", "Las Vegas", "Tucson","Washington DC","Philadelphia","Miami","San Francisco"]
def get_json_data(start, count, dest):
time_stamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
data = [{"trip_id": i, "ts": time_stamp, "route_id": chr(65 + (i % 10)), "destination": dest[i%10]} for i in range(start, start + count)]
return data

#Create a spark dataframe
def create_json_df(spark, data):
sc = spark.sparkContext
return spark.read.json(sc.parallelize(data))

#Inserting dataset of large size
df = create_json_df(spark, get_json_data(0, 20000000, dest))

+-------------+--------+-------+-----------------------+
|destination |route_id|trip_id|ts |
+-------------+--------+-------+-----------------------+
|Seattle |A |0 |2023-05-16 04:58:18.557|
|New York |B |1 |2023-05-16 04:58:18.557|
|New Jersey |C |2 |2023-05-16 04:58:18.557|
|Los Angeles |D |3 |2023-05-16 04:58:18.557|
|Las Vegas |E |4 |2023-05-16 04:58:18.557|
|Tucson |F |5 |2023-05-16 04:58:18.557|
|Washington DC|G |6 |2023-05-16 04:58:18.557|
|Philadelphia |H |7 |2023-05-16 04:58:18.557|
|Miami |I |8 |2023-05-16 04:58:18.557|
|San Francisco|J |9 |2023-05-16 04:58:18.557|
|Seattle |A |10 |2023-05-16 04:58:18.557|
|New York |B |11 |2023-05-16 04:58:18.557|
|New Jersey |C |12 |2023-05-16 04:58:18.557|
|Los Angeles |D |13 |2023-05-16 04:58:18.557|
|Las Vegas |E |14 |2023-05-16 04:58:18.557|
|Tucson |F |15 |2023-05-16 04:58:18.557|
|Washington DC|G |16 |2023-05-16 04:58:18.557|
|Philadelphia |H |17 |2023-05-16 04:58:18.557|
|Miami |I |18 |2023-05-16 04:58:18.557|
|San Francisco|J |19 |2023-05-16 04:58:18.557|
+-------------+--------+-------+-----------------------+

3. Copy on Read (COW) implementation

## Configuring the table and other parameter 
config = {
"table_name": "hudi_trips_table",
"target": "s3://********************/***************/*****",
"primary_key": "trip_id",
"sort_key": "ts",
"commits_to_retain": "2"
}

### Writing the data to the S3 bucket in the target

(df.write.format(HUDI_FORMAT)
.option(PRECOMBINE_FIELD_OPT_KEY,config["sort_key"])
.option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
.option(TABLE_NAME, config['table_name'])
.option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL)
.option(BULK_INSERT_PARALLELISM, 3)
.option(S3_CONSISTENCY_CHECK, "true")
.option(HIVE_TABLE_OPT_KEY,config['table_name'])
.option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
.option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
.option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)
.mode("Overwrite")
.save(config['target']))

4. Use Spark SQL to read the COW table

## Check the table created 
spark.sql("show tables").show()

+---------+----------------+-----------+
|namespace| tableName|isTemporary|
+---------+----------------+-----------+
| default|hudi_trips_table| false|
+---------+----------------+-----------+

## Reading the Metadata
spark.sql("show create table "+config['table_name']).show(10,truncate=False)
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|createtab_stmt |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|CREATE TABLE default.hudi_trips_table (\n _hoodie_commit_time STRING,\n _hoodie_commit_seqno STRING,\n _hoodie_record_key STRING,\n _hoodie_partition_path STRING,\n _hoodie_file_name STRING,\n destination STRING,\n route_id STRING,\n trip_id BIGINT,\n ts STRING)\nUSING hudi\nOPTIONS (\n 'hoodie.query.as.ro.table' = 'false')\nLOCATION 's3://emrpocgigaforce/Output/hudi_data/hudi'\nTBLPROPERTIES (\n 'bucketing_version' = '2',\n 'last_commit_time_sync' = '20230516073344246',\n 'last_modified_by' = 'hive',\n 'last_modified_time' = '1684213183')\n|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

4. Hudi Creates extra features for its internal consumption and make process streamline . Do refer the above documentation for it.

_hoodie_commit_time, _hoodie_commit_seqno, _hoodie_record_key,_hoodie_partition_path,_hoodie_file_name

## Reading the data from S3 in dataframe
# Read the table from s3
df2=spark.read.format(HUDI_FORMAT).load(config["target"]+"/*")
df2.show()
+-------------------+--------------------+------------------+----------------------+--------------------+-------------+--------+-------+--------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| destination|route_id|trip_id| ts|
+-------------------+--------------------+------------------+----------------------+--------------------+-------------+--------+-------+--------------------+
| 20230516073344246|20230516073344246...| 0| |b43666a0-e433-42d...| Seattle| A| 0|2023-05-16 04:58:...|
| 20230516073344246|20230516073344246...| 1| |b43666a0-e433-42d...| New York| B| 1|2023-05-16 04:58:...|
| 20230516073344246|20230516073344246...| 2| |b43666a0-e433-42d...| New Jersey| C| 2|2023-05-16 04:58:...|
| 20230516073344246|20230516073344246...| 3| |b43666a0-e433-42d...| Los Angeles| D| 3|2023-05-16 04:58:...|
| 20230516073344246|20230516073344246...| 4| |b43666a0-e433-42d...| Las Vegas| E| 4|2023-05-16 04:58:...|
| 20230516073344246|20230516073344246...| 5| |b43666a0-e433-42d...| Tucson| F| 5|2023-05-16 04:58:...|
| 20230516073344246|20230516073344246...| 6| |b43666a0-e433-42d...|Washington DC| G| 6|2023-05-16 04:58:...|
| 20230516073344246|20230516073344246...| 7| |b43666a0-e433-42d...| Philadelphia| H| 7|2023-05-16 04:58:...|
| 20230516073344246|20230516073344246...| 8| |b43666a0-e433-42d...| Miami| I| 8|2023-05-16 04:58:...|
| 20230516073344246|20230516073344246...| 9| |b43666a0-e433-42d...|San Francisco| J| 9|2023-05-16 04:58:...|
| 20230516073344246|20230516073344246...| 10| |b43666a0-e433-42d...| Seattle| A| 10|2023-05-16 04:58:...|
| 20230516073344246|20230516073344246...| 11| |b43666a0-e433-42d...| New York| B| 11|2023-05-16 04:58:...|
| 20230516073344246|20230516073344246...| 12| |b43666a0-e433-42d...| New Jersey| C| 12|2023-05-16 04:58:...|
| 20230516073344246|20230516073344246...| 13| |b43666a0-e433-42d...| Los Angeles| D| 13|2023-05-16 04:58:...|
| 20230516073344246|20230516073344246...| 14| |b43666a0-e433-42d...| Las Vegas| E| 14|2023-05-16 04:58:...|
| 20230516073344246|20230516073344246...| 15| |b43666a0-e433-42d...| Tucson| F| 15|2023-05-16 04:58:...|
| 20230516073344246|20230516073344246...| 16| |b43666a0-e433-42d...|Washington DC| G| 16|2023-05-16 04:58:...|
| 20230516073344246|20230516073344246...| 17| |b43666a0-e433-42d...| Philadelphia| H| 17|2023-05-16 04:58:...|
| 20230516073344246|20230516073344246...| 18| |b43666a0-e433-42d...| Miami| I| 18|2023-05-16 04:58:...|

df2.printSchema()

root
|-- _hoodie_commit_time: string (nullable = true)
|-- _hoodie_commit_seqno: string (nullable = true)
|-- _hoodie_record_key: string (nullable = true)
|-- _hoodie_partition_path: string (nullable = true)
|-- _hoodie_file_name: string (nullable = true)
|-- destination: string (nullable = true)
|-- route_id: string (nullable = true)
|-- trip_id: long (nullable = true)
|-- ts: string (nullable = true)

5. Lets perform batch update operation

upsert_dest= ["boston" for i in range (0,100)]
df3=create_json_df(spark, get_json_data(1000000,100, upsert_dest))

# Update the changes in s3 now

(df3.write.format(HUDI_FORMAT)
.option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
.option(TABLE_NAME, config['table_name'])
.option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
.option(UPSERT_PARALLELISM, 4)
.option(S3_CONSISTENCY_CHECK, "true")
.option(HUDI_CLEANER_POLICY, KEEP_LATEST_COMMITS)
.option(HUDI_COMMITS_RETAINED,config["commits_to_retain"])
.option(HIVE_TABLE_OPT_KEY,config['table_name'])
.option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
.option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
.option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)
.mode("Append") ## We will use append here to append it
.save(config['target']))

# Checking the unique commit time - This shows the data has been changed
spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_table order by commitTime").show(20, False)
+-----------------+
|commitTime |
+-----------------+
|20230516073344246|
|20230516073449459|
+-----------------+

6. Lets perform batch insert operation

# Inserting new data

insert_dest=["chennai" for i in range(0,100)]
df4=create_json_df(spark,get_json_data(2000000,100,insert_dest))
df4.show()

+-----------+--------+-------+--------------------+
|destination|route_id|trip_id| ts|
+-----------+--------+-------+--------------------+
| chennai| A|2000000|2023-05-16 07:36:...|
| chennai| B|2000001|2023-05-16 07:36:...|
| chennai| C|2000002|2023-05-16 07:36:...|
| chennai| D|2000003|2023-05-16 07:36:...|
| chennai| E|2000004|2023-05-16 07:36:...|
| chennai| F|2000005|2023-05-16 07:36:...|
| chennai| G|2000006|2023-05-16 07:36:...|
| chennai| H|2000007|2023-05-16 07:36:...|
| chennai| I|2000008|2023-05-16 07:36:...|
| chennai| J|2000009|2023-05-16 07:36:...|
| chennai| A|2000010|2023-05-16 07:36:...|
| chennai| B|2000011|2023-05-16 07:36:...|
| chennai| C|2000012|2023-05-16 07:36:...|
| chennai| D|2000013|2023-05-16 07:36:...|
| chennai| E|2000014|2023-05-16 07:36:...|
| chennai| F|2000015|2023-05-16 07:36:...|
| chennai| G|2000016|2023-05-16 07:36:...|


# Insert the changes in s3 now - same config of hudi options as upsert. Since it is new data it will be inserted directly
(df4.write.format(HUDI_FORMAT)
.option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
.option(TABLE_NAME, config['table_name'])
.option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
.option(UPSERT_PARALLELISM, 4)
.option(S3_CONSISTENCY_CHECK, "true")
.option(HUDI_CLEANER_POLICY, KEEP_LATEST_COMMITS)
.option(HUDI_COMMITS_RETAINED,config["commits_to_retain"])
.option(HIVE_TABLE_OPT_KEY,config['table_name'])
.option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
.option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
.option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)
.mode("Append")
.save(config['target']))

# Check the data
spark.sql("select * from " + config['table_name'] +" where trip_id between 2000000 and 2000100").show(50,False)

7. Lets perform delete operation

Hudi supports implementing two types of deletes on data stored in Hudi tables, by enabling the user to specify a different record payload implementation.

For delete neither extra file not commit is added

It can’t be rollback since data has been deleted from hudi table

Soft Deletes : Retain the record key and just null out the values for all the other fields. This can be achieved by ensuring the appropriate fields are nullable in the table schema and simply upserting the table after setting these fields to null.

Hard Deletes : A stronger form of deletion is to physically remove any trace of the record from the table. This can be achieved in 3 different ways.

Using DataSource, set OPERATION_OPT_KEY to DELETE_OPERATION_OPT_VAL. This will remove all the records in the DataSet being submitted.

Using DataSource, set PAYLOAD_CLASS_OPT_KEY to “org.apache.hudi.EmptyHoodieRecordPayload”. This will remove all the records in the DataSet being submitted.

Using DataSource or DeltaStreamer, add a column named _hoodie_is_deleted to DataSet. The value of this column must be set to true for all the records to be deleted and either false or left null for any records which are to be upserted.

# Soft Delete
df6=df5.filter(col("trip_id").between(2000050,2000100))

(df6.write.format(HUDI_FORMAT)
.option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
.option(TABLE_NAME, config['table_name'])
.option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
.option(UPSERT_PARALLELISM, 4)
.option(S3_CONSISTENCY_CHECK, "true")
.option(HUDI_CLEANER_POLICY, KEEP_LATEST_COMMITS)
.option(HUDI_COMMITS_RETAINED,config["commits_to_retain"])
.option(HIVE_TABLE_OPT_KEY,config['table_name'])
.option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
.option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
.option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)
.option(PAYLOAD_CLASS_OPT_KEY,EMPTY_PAYLOAD_CLASS_OPT_VAL) # Deleting the record so this option needs to be added
.mode("Append")
.save(config['target']))

## Check the hooide commit time to see the operations performed
spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_table order by commitTime").show(20, False)

Code: You can find the full code on my GitHub repository

Conclusion

Building a Data Lakehouse with AWS EMR, Apache Hudi, and S3 empowers you to harness the advantages of a modern data architecture while efficiently managing CDC use cases. You can capture and process real-time data changes with ease, ensuring data quality, consistency, and accessibility.

In this post we have implemented the HUDI- COW implementation .

Next post will the implementaion of HUDI- MOR.

--

--