Apache HUDI vs Delta Lake
The tale of the two ACID platforms for Data Lakes
Author: Vibhor Goyal
Off late ACID compliance on Hadoop like system-based Data Lake has gained a lot of traction and Databricks Delta Lake and Uber’s Hudi have been the major contributors and competitors. As both solve a major problem by providing the different flavors of abstraction on “parquet” file format; it’s very hard to pick one as a better choice over the other. In this blog, we are going to understand using a very basic example of how these tools work under the hood. We will leave for the readers to take the functionalities as pros/cons.
We would follow a reverse approach as in the next article in this series, we will discuss the importance of a Hadoop like Data Lake and why the need for systems like Delta/Hudi arose in the first place and how Data Engineers used to do build siloed and error-prone ACID systems for Lakes.
Environment Setup
Source Database : AWS RDS MySQL
CDC Tool : AWS DMS
Hudi Setup : AWS EMR 5.29.0
Delta Setup : Databricks Runtime 6.1
Object/File Store : AWS S3By choice and as per infrastructure availability; above toolset is considered for Demo; the following alternatives can also be possibly used
Source Database : Any traditional/cloud-based RDBMS
CDC Tool : Attunity, Oracle Golden Gate, Debezium, Fivetran, Custom Binlog Parser
Hudi Setup : Apache Hudi on Open Source/Enterprise Hadoop
Delta Setup : Delta Lake on Open Source/Enterprise Hadoop
Object/File Store : ADLS/HDFS
Data Preparation Steps:
create database demo;
use demo;create table hudi_delta_test
(
pk_id integer,
name varchar(255),
value integer,
updated_at timestamp default now() on update now(),
created_at timestamp default now(),
constraint pk primary key(pk_id)
);insert into hudi_delta_test(pk_id,name,value) values(1,’apple’,10);
insert into hudi_delta_test(pk_id,name,value) values(2,’samsung’,20);
insert into hudi_delta_test(pk_id,name,value) values(3,’dell’,30);
insert into hudi_delta_test(pk_id,name,value) values(4,’motorola’,40);
Now let’s load this data to a location in S3 using DMS and let’s identify the location with a folder name full_load. For the sake of adhering to the title; we are going to skip the DMS setup and configuration. Here’s the screenshot from S3 after full load.
Now let’s perform some Insert/Update/Delete operations in the MySQL table.
insert into hudi_delta_test(pk_id,name,value) values(4,’motorola’,40);
update hudi_delta_test set value = 201 where pk_id=2;
delete from hudi_delta_test where pk_id=3;
Let’s again skip the DMS magic and have the CDC data loaded as below to S3.
NOTE: DMS populates an extra field named “Op” standing for Operation and has values I/U/D respectively for inserted, updated and deleted records. The below screenshot shows the content of the CDC Data only. The screenshot is from a Databricks notebook just for convenience and not a mandate.
df=spark.read.parquet('s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test')
df.show()
Now let’s begin with the real game; while DMS is continuously doing its job in shipping the CDC events to S3, for both Hudi and Delta Lake, this S3 becomes the data source instead of MySQL. As an end state of both the tools, we aim to get a consistent consolidated view like [1] above in MySQL.
Using Apache HUDI
Hudi handles UPSERTS in 2 ways [1]:
- Copy on Write (CoW): Data is stored in columnar format (Parquet) and updates create a new version of the files during writes. This storage type is best used for read-heavy workloads because the latest version of the dataset is always available in efficient columnar files.
- Merge on Read (MoR): Data is stored with a combination of columnar (Parquet) and row-based (Avro) formats; updates are logged to row-based “delta files” and compacted later creating a new version of the columnar files. This storage type is best used for write-heavy workloads because new commits are written quickly as delta files, but reading the data set requires merging the compacted columnar files with the delta files.
Open Up a Spark Shell with Following Configuration and import the relevant libraries
spark-shell — conf “spark.serializer=org.apache.spark.serializer.KryoSerializer” — conf “spark.sql.hive.convertMetastoreParquet=false” — jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jarimport org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
Using CoW:
val inputDataPath = “s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test”
val hudiTableName = “hudi_cow”
val hudiTablePath = “s3://development-dl/demo/hudi-delta-demo/hudi_cow”val hudiOptions = Map[String,String]
(
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> “pk_id”,
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> “created_at”,
HoodieWriteConfig.TABLE_NAME -> hudiTableName,
DataSourceWriteOptions.OPERATION_OPT_KEY ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> “COPY_ON_WRITE”,
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> “updated_at”,
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> “true”,
DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName,
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> “created_at”,
DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> “false”,
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName
)val temp = spark.read.format(“parquet”).load(inputDataPath)
val fullDF = temp.withColumn(“Op”,lit(‘I’))
fullDF.write.format(“org.apache.hudi”).options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)
A table named “hudi_cow” will be created in Hive as we have used Hive Auto Sync configurations in the Hudi Options. The Table is created with Parquet SerDe with Hoodie Format.
Here is the content of the table :
val updateDF = spark.read.parquet(“s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test”)
updateDF.write.format(“org.apache.hudi”).options(hudiOptions).option(DataSourceWriteOptions.OPERATION_OPT_KEY,DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).mode(SaveMode.Append).save(hudiTablePath)
The same hive table “hudi_cow” will be populated with the latest UPSERTED data as in the below screenshot
As stated in the CoW definition, when we write the updateDF in hudi format to the same S3 location, the Upserted data is copied on write and only one table is used for both Snapshot and Incremental Data.
Using MoR:
val inputDataPath = “s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test”
val hudiTableName = “hudi_mor”
val hudiTablePath = “s3://development-dl/demo/hudi-delta-demo/hudi_mor”val hudiOptions = Map[String,String]
(
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> “pk_id”,
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> “created_at”,
HoodieWriteConfig.TABLE_NAME -> hudiTableName,
DataSourceWriteOptions.OPERATION_OPT_KEY ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> “MERGE_ON_READ”,
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> “updated_at”,
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> “true”,
DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName,
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> “created_at”,
DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> “false”,
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName
)
val temp = spark.read.format(“parquet”).load(inputDataPath)
val fullDF = temp.withColumn(“Op”,lit(‘I’))
fullDF.write.format(“org.apache.hudi”).options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)
Two tables named “hudi_mor” and “hudi_mor_rt” will be created in Hive. hudi_mor is a read optimized table and will have snapshot data while hudi_mor_rt will have incrimental and real-time merged data. The data is compacted and made available to hudi_mor at frequent compact intervals. hudi_mor_rt leverages Avro format to store incrimental data. As the Definition says MoR, the data when read via hudi_mor_rt would be merged on the fly. This is good for high updatable source table, while providing a consistent and not very latest read optimized table.
NOTE: Both “hudi_mor” and “hudi_mor_rt” point to the same S3 bucket but are defined with different Storage Formats.
The content of both tables is the same after full load and is shown below:
val updateDF = spark.read.parquet(“s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test”)
updateDF.write.format(“org.apache.hudi”).options(hudiOptions).option(DataSourceWriteOptions.OPERATION_OPT_KEY,DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).mode(SaveMode.Append).save(hudiTablePath)
The table hudi_mor has the same old content for a very small time (as the data is small for the demo and it gets compacted soon), but the table hudi_mor_rt gets populated with the latest data as soon as the merge command exists successfully.
Now Let’s take a look at what’s happening in the S3 Logs for these Hudi formatted tables. While the underlying storage format remains parquet, ACID is managed via the means of logs. Typically following types of files are produced:
hoodie_partition_metadata:This is a small file containing information about partitionDepth and last commitTime in the given partition
hoodie.properties:Table Name, Type are stored here.
commit and clean:File Stats and information about the new file(s) being written, along with information like numWrites, numDeletes, numUpdateWrites, numInserts, and some other related audit fields are stored in these files. These files are generated for every commit.
The above 3 files are common for both CoW and MoR type of tables. For MoR tables, however, there are avro formatted log files that are created for the partitions that are UPSERTED. The first file in the below screenshot is the log file that is not present in the CoW table.
Using Delta Lake:
Using the below code snippet, we read the full load Data in parquet format and write the same in delta format to a different location
from pyspark.sql.functions import *
inputDataPath = "s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test"
deltaTablePath = "s3://development-dl/demo/hudi-delta-demo/delta_table"
fullDF = spark.read.format("parquet").load(inputDataPath)
fullDF = fullDF.withColumn("Op",lit('I'))
fullDF.write.format("delta").mode("overwrite").save(deltaTablePath)
Using the below command in the SQL interface in the Databricks notebook, we can create a Hive External Table, the “using delta” keyword contains the definition of the underlying SERDE and FILE format and needs not to be mentioned specifically.
%sql
create table delta_table
using delta
location 's3://development-dl/demo/hudi-delta-demo/delta_table'
The DDL of the table is shown below.
%sql
show create table delta_table
The table as expected contains all the records as in the full load file.
%sql
select * from delta_table
Use below command to read the CDC data and register as a temp view in Hive
updateDF = spark.read.parquet("s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test")
updateDF.createOrReplaceTempView("temp")
The MERGE COMMAND: Below is the MERGE SQL that does the UPSERT MAGIC, for convenience it has been executed as a SQL cell, can be very well executed in spark.sql() method call as well
%sql
MERGE INTO delta_table target
USING
(SELECT Op,latest_changes.pk_id,name,value,updated_at,created_at
FROM temp latest_changes
INNER JOIN (
SELECT pk_id, max(updated_at) AS MaxDate
FROM temp
GROUP BY pk_id
) cm ON latest_changes.pk_id = cm.pk_id AND latest_changes.updated_at = cm.MaxDate) as source
ON source.pk_id == target.pk_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
The content of the delta_table in Hive after MERGE. It is updated…!!!!
%sql
select * from delta_table
Like Hudi, the underlying file storage format is “parquet” in case of Delta Lake as well. The Delta provides ACID capability with logs and versioning. Let’s see what’s happening in S3 after full load and CDC merge.
Delta Log contains JSON formatted log that has information regarding the schema and the latest files after each commit.
In the case of CDC Merge, since multiple records can be inserted/updated or deleted. The content of the initial parquet file is split into multiple smaller parquet files and those smaller files are rewritten. If the table were partitioned, the CDC data corresponding to the updated partition only would be affected. The initial parquet file still exists in the folder but is removed from the new log file. The file can be physically removed if we run VACUUM on this table. These smaller files can also be concatenated with the use of OPTIMIZE command [6].
Delta Log appended with another JSON formatted log file that stores the schema and file pointers to the latest files.
In Both the examples, I have kept the deleted record as is and can be identified by Op=’D’, this has been done intentionally to show the capability of DMS, however, the references below show how to convert this soft delete into a hard delete with minimal effort.
Hope this is a useful comparison and would help make an informed decision to pick either of the available toolsets in our data lakes. I am more biased towards Delta because Hudi doesn’t support PySpark as of now.
About the author:
Vibhor Goyal is a Data Engineer at Punchh where he is working on building a Data Lake and its applications to cater multiple Product and Analytics requirements.
References:
- https://aws.amazon.com/blogs/aws/new-insert-update-delete-data-on-s3-with-amazon-emr-and-apache-hudi/
- https://databricks.com/blog/2019/07/15/migrating-transactional-data-to-a-delta-lake-using-aws-dms.html
- https://hudi.apache.org/
- https://docs.delta.io/
- https://databricks.com/blog/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log.html
- https://docs.databricks.com/delta/optimizations/index.html