Building a Data Lakehouse with AWS EMR and Apache Hudi: A CDC Use Case — part2/2

Amar_Kumar
8 min readSep 14, 2023

--

In the ever-evolving landscape of data storage and processing, three distinct solutions have emerged as game-changers: Data Lakes, Data Lakehouses, and Delta Lakes. While each of these technologies holds its own promise, the choice between them can significantly impact how you handle your data. To shed light on the matter, we will explore these data storage paradigms and dive into a specific use case related to Change Data Capture (CDC)

Table of contents :

Overview:

· Today’s Focus: Apache Hudi — MOR Table Type
· Merge On Read(MOR) Table type In Hudi:
· Implementation of MOR table
1. Generating a dummy dataset
2. Use Spark SQL to read the MOR table
3. Lets perform batch update operation
4. Lets perform batch insert operation
5. Read different read-optimized and real time data in dataframe
6. Time Travel
Code
· Conclusion

…Contiuation to the last part 2a . Do revert back to the previous post for building the Apache Hudi foundation for Change Data Capture (CDC)

In our previous post, we laid the groundwork for implementing Change Data Capture (CDC) using Apache Hudi. Today, we’re diving deeper into the world of Apache Hudi with a specific focus on the MOR (Merge on Read) table type, which is particularly well-suited for CDC use cases.

The Journey So Far

Before we proceed, let’s recap our journey. In our last post, we set the stage for harnessing Apache Hudi’s power for CDC operations. We introduced the concept of Data Lakehouses, explored the role of Apache Hudi as a data management framework, and discussed its key features like ACID compliance, incremental processing, and time travel.

Today’s Focus: Apache Hudi — MOR Table Type

Today, our focus shifts to Apache Hudi’s MOR (Merge on Read) table type. The MOR approach is pivotal when dealing with real-time data changes and CDC. It ensures that data is efficiently merged and maintained as new changes arrive.

What to Expect

In this post, we’ll walk you through the practical implementation of an Apache Hudi MOR table. We’ll guide you step by step on how to set up and use this table type for CDC operations. By the end of this journey, you’ll have a clear understanding of how MOR tables play a vital role in capturing real-time changes.

So, buckle up and get ready to delve into the world of MOR tables with Apache Hudi!

Stay with us for the continuation of this exciting data-driven journey. In our next post, we’ll take you through the practical steps of setting up an Apache Hudi MOR table for your CDC needs.

Merge On Read(MOR) Table type In Hudi:

The default table type is Copy-On-Write which is best suited for read-heavy workloads with modest writes. Copy-On-Write creates commit files with original data + the new changes during writing itself. While this increases latency on writes, this set up makes it more manageable for faster read.

For near real-time applications that mandate quick upserts, MERGE_ON_READ table type would be better suited. MOR table stores incoming upserts for each file group, onto a row based delta log (In Avro file format). This log is then merged with the existing Parquet file using a a compactor during reads.

Implementation of MOR table

In the previous post we have configured all how to configure HUDI on EMR notebook and set the HUDI parameter. So lets directly jump towards the implementation part

config = {
"table_name": "hudi_mor_trips_table",
"table_name_rt": "hudi_mor_trips_table_rt",
"target": "s3:******************/hudi_mor/",
"primary_key": "trip_id",
"sort_key": "ts",
"commits_to_retain": "2",
}

### Other than this we also need to add extra parameter for using HUDI MOR
STORAGE_TYPE_OPT_KEY="hoodie.datasource.write.storage.type"
COMPACTION_INLINE_OPT_KEY="hoodie.compact.inline"
COMPACTION_MAX_DELTA_COMMITS_OPT_KEY="hoodie.compact.inline.max.delta.commits"

1. Generating a dummy dataset

Timestamp will be required as HUDI uses it for its internal computation based on Commit time.

from datetime import datetime
dest = ["Seattle", "New York", "New Jersey", "Los Angeles", "Las Vegas", "Tucson","Washington DC","Philadelphia","Miami","San Francisco"]
def get_json_data(start, count, dest):
time_stamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
data = [{"trip_id": i, "ts": time_stamp, "route_id": chr(65 + (i % 10)), "destination": dest[i%10]} for i in range(start, start + count)]
return data

#Create a spark dataframe
def create_json_df(spark, data):
sc = spark.sparkContext
return spark.read.json(sc.parallelize(data))

#Inserting dataset of large size
df = create_json_df(spark, get_json_data(0, 20000000, dest))

+-------------+--------+-------+-----------------------+
|destination |route_id|trip_id|ts |
+-------------+--------+-------+-----------------------+
|Seattle |A |0 |2023-05-16 04:58:18.557|
|New York |B |1 |2023-05-16 04:58:18.557|
|New Jersey |C |2 |2023-05-16 04:58:18.557|
|Los Angeles |D |3 |2023-05-16 04:58:18.557|
|Las Vegas |E |4 |2023-05-16 04:58:18.557|
|Tucson |F |5 |2023-05-16 04:58:18.557|
|Washington DC|G |6 |2023-05-16 04:58:18.557|
|Philadelphia |H |7 |2023-05-16 04:58:18.557|
|Miami |I |8 |2023-05-16 04:58:18.557|
|San Francisco|J |9 |2023-05-16 04:58:18.557|
|Seattle |A |10 |2023-05-16 04:58:18.557|
|New York |B |11 |2023-05-16 04:58:18.557|
|New Jersey |C |12 |2023-05-16 04:58:18.557|
|Los Angeles |D |13 |2023-05-16 04:58:18.557|
|Las Vegas |E |14 |2023-05-16 04:58:18.557|
|Tucson |F |15 |2023-05-16 04:58:18.557|
|Washington DC|G |16 |2023-05-16 04:58:18.557|
|Philadelphia |H |17 |2023-05-16 04:58:18.557|
|Miami |I |18 |2023-05-16 04:58:18.557|
|San Francisco|J |19 |2023-05-16 04:58:18.557|

### Saving the data in target location
(df_mor.write.format(HUDI_FORMAT)
.option(PRECOMBINE_FIELD_OPT_KEY,config["sort_key"])
.option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
.option(TABLE_NAME, config['table_name'])
.option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL)
.option(BULK_INSERT_PARALLELISM, 3)
.option(S3_CONSISTENCY_CHECK, "true")
.option(HIVE_TABLE_OPT_KEY,config['table_name'])
.option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
.option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
.option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)
.option(STORAGE_TYPE_OPT_KEY,"MERGE_ON_READ")
.option(COMPACTION_INLINE_OPT_KEY,"false")
.option(COMPACTION_MAX_DELTA_COMMITS_OPT_KEY,"20")
.mode("Overwrite")
.save(config['target'])
)

2. Use Spark SQL to read the MOR table

Here 2 tables will be created for MOR that is read- optimized(_ro) and real-time(_rt)
RO tables are optimized for read-heavy workloads where you need fast query performance.
RT tables are designed for real-time data ingestion and querying, ideal for scenarios requiring low-latency access to data.

spark.sql("show tables").show(10,False)

+---------+-----------------------+-----------+
|namespace|tableName |isTemporary|
+---------+-----------------------+-----------+
|default |hudi_mor_trips_table_ro|false |
|default |hudi_mor_trips_table_rt|false |
|default |hudi_trips_table |false |
+---------+-----------------------+-----------+

Hudi Creates extra features for its internal consumption and make process streamline . Do refer the above documentation for it.

_hoodie_commit_time, _hoodie_commit_seqno, _hoodie_record_key,_hoodie_partition_path,_hoodie_file_name

df2_mor=spark.read.format(HUDI_FORMAT).load(config['target'] + "/*")
df2_mor.show(20,False)

+-------------------+----------------------+------------------+----------------------+-------------------------------------------------------------------------+-------------+--------+-------+-----------------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name |destination |route_id|trip_id|ts |
+-------------------+----------------------+------------------+----------------------+-------------------------------------------------------------------------+-------------+--------+-------+-----------------------+
|20230516095126885 |20230516095126885_0_2 |0 | |997f83b5-e22b-4bfb-9df5-11227aa3b185-0_0-2126-0_20230516095126885.parquet|Seattle |A |0 |2023-05-16 09:51:14.378|
|20230516095126885 |20230516095126885_0_5 |1 | |997f83b5-e22b-4bfb-9df5-11227aa3b185-0_0-2126-0_20230516095126885.parquet|New York |B |1 |2023-05-16 09:51:14.378|
|20230516095126885 |20230516095126885_0_6 |2 | |997f83b5-e22b-4bfb-9df5-11227aa3b185-0_0-2126-0_20230516095126885.parquet|New Jersey |C |2 |2023-05-16 09:51:14.378|
|20230516095126885 |20230516095126885_0_7 |3 | |997f83b5-e22b-4bfb-9df5-11227aa3b185-0_0-2126-0_20230516095126885.parquet|Los Angeles |D |3 |2023-05-16 09:51:14.378|

spark.sql("select * from " + config['table_name'] + "_ro").show(20,False)

3. Lets perform batch update operation

# Changing data for upsert
upsert_dest_mor= ["noida" for i in range (0,100)]
df3_mor=create_json_df(spark, get_json_data(1000000,100, upsert_dest_mor))
df3_mor.show(20,False)

(df3_mor.write.format(HUDI_FORMAT)
.option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
.option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
.option(TABLE_NAME, config['table_name'])
.option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
.option(UPSERT_PARALLELISM, 3)
.option(S3_CONSISTENCY_CHECK, "true")
.option(HUDI_CLEANER_POLICY, KEEP_LATEST_COMMITS)
.option(HUDI_COMMITS_RETAINED,config["commits_to_retain"])
.option(HIVE_TABLE_OPT_KEY,config['table_name'])
.option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
.option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
.option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)
.option(STORAGE_TYPE_OPT_KEY, "MERGE_ON_READ")
.option(COMPACTION_INLINE_OPT_KEY, "false")
.option(COMPACTION_MAX_DELTA_COMMITS_OPT_KEY, "20")
.mode("Append")
.save(config['target']))

df4_mor=spark.read.format(HUDI_FORMAT).load(config['target'] + "/*")

4. Lets perform batch insert operation

insert_dest=["ghaziabad" for i in range(0,100)]
df5_mor=create_json_df(spark,get_json_data(2000000,100,insert_dest))
df5_mor.show()

(df5_mor.write.format(HUDI_FORMAT)
.option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
.option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
.option(TABLE_NAME, config['table_name'])
.option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
.option(UPSERT_PARALLELISM, 3)
.option(S3_CONSISTENCY_CHECK, "true")
.option(HUDI_CLEANER_POLICY, KEEP_LATEST_COMMITS)
.option(HUDI_COMMITS_RETAINED,config["commits_to_retain"])
.option(HIVE_TABLE_OPT_KEY,config['table_name'])
.option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
.option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
.option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)
.option(STORAGE_TYPE_OPT_KEY, "MERGE_ON_READ")
.option(COMPACTION_INLINE_OPT_KEY, "false")
.option(COMPACTION_MAX_DELTA_COMMITS_OPT_KEY, "20")
.mode("Append")
.save(config['target']))

df6_mor=spark.read.format(HUDI_FORMAT).load(config['target'] + "/*")

5. Read different read-optimized and real time data in dataframe

# read RO file as dataframe
# Read Optimized (RO) files: These files contain immutable data that is optimized for read-heavy workloads.
# RO files are created as a result of compaction and contain a full snapshot of the data at the time of compaction.
# RO files can be queried using standard SQL engines like Presto, Athena, and Spark SQL
# When a read query is executed against a Hudi MOR table, both RO and RT files are scanned to retrieve the latest data.
# RO files are used for historical data, while RT files are used for the latest changes

ro_df = spark.read.format("org.apache.hudi") \
.option("hoodie.table.name", "hudi_mor_trips_table") \
.option("hoodie.datasource.read.storage.type", "RO") \
.load(config['target'] + "/*")

ro_df.show(5,False)

+-------------------+----------------------+------------------+----------------------+-------------------------------------------------------------------------+-------------+--------+-------+-----------------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name |destination |route_id|trip_id|ts |
+-------------------+----------------------+------------------+----------------------+-------------------------------------------------------------------------+-------------+--------+-------+-----------------------+
|20230516095126885 |20230516095126885_0_2 |0 | |997f83b5-e22b-4bfb-9df5-11227aa3b185-0_0-2126-0_20230516095126885.parquet|Seattle |A |0 |2023-05-16 09:51:14.378|
|20230516095126885 |20230516095126885_0_5 |1 | |997f83b5-e22b-4bfb-9df5-11227aa3b185-0_0-2126-0_20230516095126885.parquet|New York |B |1 |2023-05-16 09:51:14.378|
|20230516095126885 |20230516095126885_0_6 |2 | |997f83b5-e22b-4bfb-9df5-11227aa3b185-0_0-2126-0_20230516095126885.parquet|New Jersey |C |2 |2023-05-16 09:51:14.378|
|20230516095126885 |20230516095126885_0_7 |3 | |997f83b5-e22b-4bfb-9df5-11227aa3b185-0_0-2126-0_20230516095126885.parquet|Los Angeles |D |3 |2023-05-16 09:51:14.378|
# Realtime (RT) files: These files contain mutable data that is optimized for write-heavy workloads.
# RT files are written as new data is ingested into the Hudi dataset.
# RT files are small and efficient, allowing for fast writes.
rt_df = spark.read.format("org.apache.hudi") \
.option("hoodie.table.name", "hudi_mor_trips_table") \
.option("hoodie.datasource.read.storage.type", "RT") \
.load(config['target'] + "/*")
rt_df.show(5,False)

+-------------------+----------------------+------------------+----------------------+-------------------------------------------------------------------------+-------------+--------+-------+-----------------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name |destination |route_id|trip_id|ts |
+-------------------+----------------------+------------------+----------------------+-------------------------------------------------------------------------+-------------+--------+-------+-----------------------+
|20230516095126885 |20230516095126885_0_2 |0 | |997f83b5-e22b-4bfb-9df5-11227aa3b185-0_0-2126-0_20230516095126885.parquet|Seattle |A |0 |2023-05-16 09:51:14.378|
|20230516095126885 |20230516095126885_0_5 |1 | |997f83b5-e22b-4bfb-9df5-11227aa3b185-0_0-2126-0_20230516095126885.parquet|New York |B |1 |2023-05-16 09:51:14.378|
|20230516095126885 |20230516095126885_0_6 |2 | |997f83b5-e22b-4bfb-9df5-11227aa3b185-0_0-2126-0_20230516095126885.parquet|New Jersey |C |2 |2023-05-16 09:51:14.378|
|20230516095126885 |20230516095126885_0_7 |3 | |997f83b5-e22b-4bfb-9df5-11227aa3b185-0_0-2126-0_20230516095126885.parquet|Los Angeles |D |3 |2023-05-16 09:51:14.378|

6. Time Travel

Used to query previous commit with incremental and point in time queries. This previous commit can be used for incremental queries

Incrementally query data — We can perform the incremental query which can speed up the retreival process based on the commit time.

commits=spark.sql("select distinct(_hoodie_commit_time)  from hudi_trips_table order by _hoodie_commit_time").collect()
print("commits: ")
for i in commits:
print(i)

commits:
Row(_hoodie_commit_time='20230516073344246')
Row(_hoodie_commit_time='20230516073449459')
Row(_hoodie_commit_time='20230516073620516')



# incrementally query data
beginTime=commits[-3]
incViewDF = spark.read.format(HUDI_FORMAT) \
.option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL) \
.option(BEGIN_INSTANTTIME_OPT_KEY, beginTime) \
.load(config["target"]+)

Code: You can find the full code on my GitHub repository

Conclusion

Building a Data Lakehouse with AWS EMR, Apache Hudi, and S3 empowers you to harness the advantages of a modern data architecture while efficiently managing CDC use cases. You can capture and process real-time data changes with ease, ensuring data quality, consistency, and accessibility.

In this post we have implemented the HUDI- MOR implementation . This post completes the HUDI implementation using AWS EMR

In our upcoming blog post, we’ll redirect our attention to Delta Lake, delving into the intricacies of this powerful data storage solution. You’ll explore how Delta Lake bridges the gap between Data Lakes and data warehouses, offering a comprehensive platform for organizing, managing, and analyzing your data with unmatched reliability and security. Stay tuned for an enlightening dive into the world of Delta Lake — it’s an opportunity you won’t want to miss!

--

--