Data Lakehouse — Apache Hudi คืออะไร ? ใช้ยังไง ?

Sojirath Thunprateep
CJ Express Tech (TILDI)
5 min readSep 15, 2022

หลายๆคนที่เคยจัดการ Data Lakes คงคุ้นเคยกับการเก็​บ Data ด้วย Parquet file และคงเจอความซับซ้อน เมื่อ Data มีการ Update เพราะการจัดเก็บ ด้วยเหตุผลนี้เราเลยหา Tools มาช่วยให้เราสามารถจัดการ Data Lakes ได้ง่ายขึ้น จริงๆในตลาดก็มี Open Source Tools อยู่หลายตัวให้เราได้เลือกใช้ เช่น Apache Iceberg และ Apache Hudi

ในบทความนี้เราขอเล่าถึง Hudi ที่เราได้ไปลองใช้มา ว่า Hudi คืออะไร ใช้งานยังไง อะไรที่ควรระวังบ้าง

Table of contents

Part 1: Hudi Basics
Part2: Hudi Configurations
Part 3: Environment Setting Up
Part 4: Hudi with Pyspark

Part 1: Hudi Basics

Hudi จริงๆแล้วเป็นคำย่อมาจาก Hadoop Updates Deletes and Inserts โดยความหมายก็ตรงตัวเลย Hudi จะช่วยให้เราสามารถทำการ Update และ Delete ได้คล้ายกับ RDBMS ทั่วไป

Hudi ใช้ Concept MVCC Design โดยการเอา Data files อย่าง Parquet file และ Log files มาประกอบเข้าด้วยกันในการจัดการดาต้า เพื่อให้เราได้ Output ที่ถูกต้องเมื่อเราอ่านจาก Hudi Table

ดังนั้นทุกครั้งที่เรามีการ Update, Insert, Delete ข้อมูลที่ Hudi Table จะนับเป็นการเขียน Commit 1 ครั้ง (คล้ายๆกับการ Commit ใน git ) ซึ่งจะมีการสร้าง File Commit ขึ้น ทำให้ Hudi มีความสามารถในการ Rollback ได้อีกด้วย

Part2: Hudi Configurations

การที่ Hudi ทำให้เราใช้งานได้ง่ายขนาดนี้ อาจจะต้องแลกมากับการ Config เยอะนิดหน่อย เราจะมาดูกันว่า Hudi Config หลักๆที่จำเป็นในการสร้าง Hudi Table มีอะไรบ้าง

  • Write operation
hoodie.datasource.write.operation

ความสามารถของ Hudi ทำให้มี Write operations ที่ทำงานได้คล้ายๆ RDBMS ดังนี้
1. Upsert: Insert และ Update data เมื่อใน Table มีข้อมูลที่มี Key นั้นอยู่แล้ว
2. Insert
3. Bulk Insert: เหมาะสำหรับดาต้าขนาดใหญ่ อย่างหลายๆ TB ขึ้นไป
4. Delete: เราสามารถเลือกให้ทำเป็น Soft หรือ Hard Deletes ได้

  • Table type
hoodie.datasource.write.table.type

Hudi มี Table Types อยู่ 2 ประเภทที่มี Mechanism ในการ Update ที่ต่างกัน ซึ่งเราสามารถเลือกใช้ได้ตามความเหมาะสมกับการใช้งาน
1. Copy on Write (COW)
Table ประเภทนี้มีกลไกการ Update เหมือนกับ RDBMS B-Tree หรือพูดง่ายๆว่าจะอัพเดท Data ในระหว่างการเขียนลง Table ไปเลย ทำให้อาจจะใช้เวลานานในการเขียนค่อนข้างมา
2. Merge on Read (MOR) การ Update Data ของ Table ประเภทนี้จะเป็นการเขียน Log ลงใน Delta files แล้วจะทำการ Compact ให้กลายเป็น Version ล่าสุดในตอนหลัง ซึ่งเราสามารถตั้งค่าได้ว่าจะให้มีการ Compact data หลังจากที่มีการ Commit หรือ Update Data ไปแล้วครั้ง ดังนั้นการเขียน Data ลง Table จะค่อนข้างเร็ว แต่ต้องแลกมากับ Cost ที่ต้องเก็บ Data ขนาดใหญ่มากขึ้น

  • Record Key
hoodie.datasource.write.recordkey.field

เปรียบได้กับ Primary key ของ Table ซึ่งเราสามารถกำหนดให้มีมากกว่า 1 คอลัมน์ เป็นแบบ Multiple keys ได้

  • Partition Field
hoodie.datasource.write.partitionpath.field
hoodie.datasource.write.hive_style_partitioning
  • การกำหนด Partition ทั่วไปของ Hudi Table
    โดยเราสามารถเลือกให้ Partition path เป็นแบบ Hive table ก็ได้ เช่น year=2022/ โดยกาารเซตค่า hive_style_partitioning ให้เป็น True
  • Precombine Field
hoodie.datasource.write.precombine.field
hoodie.combine.before.upsert

เมื่อ Data ที่เรากำลังจะอัพเดทเข้าไป เกิดมี Rows ที่มี Record key ซ้ำกัน
Hudi จะทำการ Deduplicate ให้ โดยเลือก Row ที่มีค่า Precombine field สูงที่สุดเพียง Row เดียว ของ Key นั้นเพื่ออัพเดทเข้าเทเบิล
ซึ่งหากเราไม่ต้องการ Deduplicate นี้เราสามารถปิดได้ด้วยการตั้งค่า hoodie.combine.before.[operation] เป็น False
แต่ยังไงเราก็ต้องเซตค่า precombine.field ไว้นะ ไม่งั้นจะเจอ error เพราะ Hudi จะไปตามหา ts column ซึ่งเป็น default precombine field

  • Key Generator Type
hoodie.datasource.write.keygenerator.class

โดยปกติ Hudi จะสร้าง Unique key (Hoodie Key) สำหรับ 1 Record โดยเกิดจากการของ Record key + Partition path ดังนั้นเราควรเลือก Key generator typeให้เหมาะกับลักษณะของข้อมูล

  • Index Type
hoodie.index.type

โดย Default index คือ BLOOM เราสามารถเปลี่ยนเป็น Index Type อื่น เพื่อให้เหมาะกับการใช้งานดาต้าได้ เช่น SIMPLE, BLOOM, HBASE เป็นต้น

ทั้ง SIMPLE และ BLOOM ก็มี Options แบบ GLOBAL (SIMPLE_BLOOM และ GLOBAL_BLOOM) ความต่างก็คือ Index แบบ GLOBAL จะทำให้ Index มีความ Unique ข้าม Partition ก็คือหากดาต้าของเรามีการอัพเดทเปลี่ยน Partition ดาต้าก็จะถูกลบออกจาก Partition เก่า และเพิ่มเข้าไปใน Partition ใหม่ได้อย่างถูกต้อง ไม่มีข้อมูลซ้ำกันเกิดขึ้น

Part 3: Environment Setting Up

เนื่องจากในบทความนี้เราจะยกตัวอย่างการใช้งาน Apache Hudi ผ่าน PySpark เราจะอธิบายถึงขั้นตอนแรกเตรียม Environment สำหรับการใช้งานผ่าน PySpark ก่อน

1. Doc ของ Hudi แนะนำไว้ว่าให้ใช้ Java ถ้าใครใช้ Java ver อื่นที่สูงไป จะเกิด Error ได้ตอนใช้งานได้

2. Pre-requisite Libraries
เราต้องโหลด Jar files ที่ลิสต์ไว้ข้างล่างนี้ไปวางใน Path ของ pyspark/jars
เช่น ถ้าเราลองบน Local และใช้ Python Virtual Env เราต้องไปวาง Libs เหล่านี้ใน Path ของ venv /env_hudi_pyspark/lib/python3.8/site-packages/pyspark/jars

https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.1.2-bundle_2.12/0.10.1/hudi-spark3.1.2-bundle_2.12-0.10.1.jar
https://repo1.maven.org/maven2/com/google/guava/guava/28.0-jre/guava-28.0-jre.jar
https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar https://repo1.maven.org/maven2/com/google/guava/failureaccess/1.0.1/failureaccess-1.0.1.jar
https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar
  • hudi-spark3.1.2-bundle_2.12–0.10.1 — เป็นตัวสำคัญให้เราใช้งาน Hudi Feature เขียนหรืออ่าน Hudi Table ได้
  • guava-28.0-jre — โดยปกติ PySpark อาจจะมี guava lib มาให้แล้ว แต่เป็น Version ที่เก่าเกินไป จะทำให้เจอ Error Caused by: java.lang.NoSuchMethodError: 'void com.google.common.base.Preconditions.checkArgument(boolean, java.lang.String, long, long)' เราเลยต้องลบตัวเก่า แล้วใช้ guava-28.0 ขึ้นไปแทน
  • gcs-connector-hadoop2 — เอาไว้สำหรับถ้าเราอยากเขียน Hudi Table บน GCS มันจะทำให้ Spark session รู้จัก path gs://

Part 4: Hudi with Pyspark

หลังจากที่เรารู้จัก Hudi Config ประมาณนึงแล้ว และเตรียม Env เรียบร้อย
เราจะมาลองสร้าง Hudi Table ด้วย Pyspark กัน ในที่นี้เราจะลองเขียนลง Local บนเครื่องของเราเอง

1. สร้าง Spark session

from pyspark.sql import SparkSessionspark = SparkSession.builder.appName("hudi").master("local")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()

2. สร้าง Dataframe เพื่อเขียน Hudi Table

input_df = spark.createDataFrame(
[("100", "a", "a_1", "2022-01-01T12:14:58.597216Z"),
("100", "a", "a_1", "2022-01-01T13:51:42.519832Z"),
("100", "b", "b_1", "2022-05-01T13:51:40.417052Z"),
("101", "c", "c_1", "2022-01-01T13:51:40.519832Z"),
("102", "c", "c_2", "2022-02-01T12:15:00.512679Z"),
("103", "c", "c_3", "2022-02-01T13:51:42.248818Z"),]
["id", "type", "sub_type", "update_time"]
)

3. Config Hudi Options

hudi_options = {
"hoodie.datasource.write.operation": "upsert",
"hoodie.table.name": "hudi_table_test",
"hoodie.datasource.write.table.Type": "COPY_ON_WRITE",
"hoodie.datasource.write.recordkey.field": "id",
"hoodie.datasource.write.partitionpath.field": "type,sub_type",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.write.precombine.field": "update_time",
"hoodie.combine.before.upsert": "True",
"hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.index.type":"GLOBAL_BLOOM"
}

4. สร้าง Hudi Table
ในที่นี้เราเขียนเทเบิลลง Local ดังนั้น Path file เราต้องให้ขึ้นต้นด้วย “file://”
(overwrite mode ใช้เมื่อสร้างเทเบิลครั้งแรก เปลี่ยน mode เป็น “append” เมื่อจะอัพเดทหรือเพิ่มดาต้า)

input_df.write.format(“hudi”).options(**hudi_options)
.mode(“append”).save(“file:////datalake/hudi_table_test”)

5. Hudi Table สร้าง Metadata และ Data Files
อย่างที่บอกว่า Hudi ใช้ Concept MVCC Design เราเลยมี 2 ส่วนหลักๆ คือ
- Metadata ของ Table จะถูกเก็บไว้ใน .hoodie folder ที่จะมีทั้ง Commit files, log files, properties และ ไฟล์ต่างๆที่เกี่ยวกับ Metadata
- Data files ที่จะถูกเขียนเป็น Parquet file และถูกแบ่งเก็บอยู่ภายใต้ Partition path อย่างในเคสนี้ เราได้ Config ให้ type และ subtype เป็น partition fields เราจึงได้ Data files ที่ถูกเก็บอย่างในภาพข้างล่างนี้

6. อ่าน Data from Hudi Table
เราสามารถใช้ Spark read จาก Data path ได้ หรือจะไปสร้างเป็น Temporary View เพื่อให้ Spark SQL อ่านดาต้าออกมาก็ได้

load_df = spark.read.format(“hudi”).load(“file:///datalake/hudi_table_test”)
load_df.show()
load_df.createOrReplaceTempView(“hudi_tempview”)
spark.sql(“refresh table hudi_tempview”)
data = spark.sql(“select id, type, sub_type from hudi_tempview”)
print(data)

7. Hudi Table Contents
จาก Hudi configs ที่เราตั้งค่าให้ Record Key เป็น id และ Precombine Field เป็น update_time จะเห็นได้ว่าจะมีแค่ 1 Record ต่อ 1 id เท่านั้น เพราะมีการ Deduplicate ก่อนจะสร้าง Table นี้
โดย Hudi ได้เลือก Record id = 100 ที่มี update_time ล่าสุดไว้เพียง Record เดียว

8. Update data
เราจะลอง Upsert data id = 100 โดยให้ Type เป็น “new_type”

update_df = spark.createDataFrame(
[("100", "new_type", "a_1", "2022-09-01T12:14:58.597216Z")]
["id", "type", "sub_type", "update_time"])
update_df.write.format("hudi")
.option("hoodie.datasource.write.operation", "upsert")
.options(**hudi_options)
.mode("append")
.save("file:///Users/sojirath.thu/Documents/gain/POC/hudi/datalake/hudi_table_test"))

จะเห็นได้ว่า id = 100 ยังคงมีเพียงแค่ 1 Record อยู่ โดย type และ partition path ก็ถูกอัพเดทให้เป็น “new_type” ได้อย่างถูกต้อง ไม่มีดาต้าที่ซ้ำซ้อนข้าม Partitions เนื่องจากเราใช้ GLOBAL index จึงช่วยให้ไม่เกิดปัญหาดาต้าซ้ำขึ้น

9. Hudi on GCS
Hudi Table นี้เราสามารถนำไปเขียนบน Google Storage ได้ โดยอาศัย GCS Connector (เพิ่ม Lib gcs-connector-hadoop2-latest.jar) และการ Config Authentication ด้วย Service Account ที่ Spark session
จากนั้นเราก็สามารถแทน Local file path ได้ด้วย GCS path เลย

### ใช้ Spark session ที่เราสร้างจากข้างบนมา config เพิ่มconf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("google.cloud.auth.type", "SERVICE_ACCOUNT_JSON_KEYFILE")
conf.set("google.cloud.auth.service.account.json.keyfile"
,"service_account.json") ## ใช้ service account json file
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl",
"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
### เปลี่ยน File path ให้กลายเป็น GCS path ที่ต้องการ
### ก็จะสามารถเขียนและอ่านเทเบิลได้ปกติ
input_df.write.format("hudi").options(**hudi_options).mode("overwrite").save("gs://playground/datalake/hudi_table_test")spark.read.format("hudi").load("gs://playground/datalake/hudi_table_test").show()

สรุป

Apache Hudi ทำให้เราสามารถจัดการ Data Lakes ได้ง่ายขึ้น สามารถ Update และ Delete ได้ คล้ายกับ RDBMS

เนื่องจาก Hudi ให้ Concept MVCC Design ที่รวม Data files (Parquet) และ Logs ในการงานร่วมกัน เพื่อให้แสดงผลเป็น Hudi Data Lakes ที่ใช้งานได้อย่าง Powerful เพียงแต่ต้องการ Config ที่ค่อนข้างละเอียด เช่น Table type, Record key, Precombine field, Partition path, Index และ Key generator

Apache Hudi มี Features อีกเยอะมากให้เราใช้งาน ในบทความต่อไปเราจะพูดถึงอีก Feature หนึ่งที่น่าสนใจคือ Hudi Sync ที่จะทำให้ให้ User สามารถ Query Table ผ่าน Big Query ได้

--

--