ความเหมือนที่แตกต่างของจิ๊กซอว์ใน lakehouse อย่าง apache hudi และ apache iceberg

Pongthep Vijite
DAMAGeek
Published in
5 min readMar 12, 2022

หมายเหตุ ผู้อ่านสามารถดู table of contents ของ Data Engineering from Noob to Newbie ได้ที่ http://bit.ly/2P7isEw

เมื่อกล่าวถึงการทำ lakehouse หลายๆคนมักจะนึกถึงการใช้งาน delta lake ของ databricks ซึ่งในความเป็นจริงแล้วในกลุ่มของ framework สำหรับ lakehouse ยังมีทางเลือกที่ fully open source มากกว่า delta lake อีก 2 ตัวอย่าง apache hudi และ apache iceberg และแม้ทั้ง 2 frameworks นี้จะสามารถใช้ทำ lakehouse ได้เหมือนแต่ก็มีความแตกต่างในตัวเองอยู่ ซึ่งการที่เราสามารถทำความเข้าใจถึงความต่างดังกล่าวได้นั้นจะช่วยทำให้เราเลือก framework ได้เหมาะกับงานที่เรากำลังทำส่งผลทำให้การจัดการระบบดังกล่าวง่ายขึ้นด้วย โดยในบทความนี้เราจะมาพูดถึงจุดเด่นของทั้ง hudi และ iceberg กันครับ

Apache Hudi

hudi เป็น project ที่ตั้งต้นพัฒนาโดย Uber และเป็น component หลักของ data platform (Generation 3) ที่ Uber ตั้งแต่ปี 2017 จากนั้นถูกปล่อยออกมาเป็น open source project ภายใต้โครงการ apache license ในเวลาต่อมา โดยที่จุดประสงค์หลักของ hudi นั้นพุ่งเป้าไปที่การจัดการ data streaming หรือ realtime ingestion ใน data lake โดยมี feature เกี่ยวกับ ACID transaction ซึ่งเป็น feature หลักอันนึงของการ implement lakehouse

การสร้าง table ของ hudi นั้นมีให้เลือกด้วยกัน 2 แบบเพื่อรองรับการใช้งานที่แตกต่างกัน โดยประกอบไปด้วย table แบบ Copy On Write (COW) ซึ่งเหมาะสำหรับการใช้งานที่เน้นงานเขียน และ Merge On Read (MOR) ที่เหมาะกับงานเน้นการอ่าน

ด้วยจุดเด่นของ hudi ด้าน data streaming ทำให้การจัดการข้อมูลแบบ change data capture (CDC) ใน data lake นั้นทำได้ง่ายมากเมื่อเทียบกับการจัดการแบบดั้งเดิมอย่างการใช้ spark sql เพื่อ join DataFrame ในการ maintain ให้ table ใน data lake นั้นเปลี่ยนแปลงไปตามข้อมูลที่มีการเปลี่ยนแปลงที่ data source หรือแม้แต่การใช้งาน merge table ของ delta lake (https://docs.databricks.com/_static/notebooks/merge-in-cdc.html) แต่สำหรับ hudi นั้นเราสามารถจัดการได้เพียงแค่ config ไม่กี่บรรทัดเท่านั้น

pyspark เพื่อทำการ upsert ข้อมูลจาก CDC

โดย config หลักๆนั้นมีดังนี้

  • recordkey.field: ทำหน้าที่เสมือนเป็น pk ของ table เพื่อช่วยให้เวลาเราทำการ upsert
  • partitionpath.field: ระบุ column ที่เราจะใช้เพื่อทำ partition
  • precombine.field: ระบุ column ที่เป็น timestamp ใช้เพื่อดึง record ที่เป็น update มาใช้ก่อนจะ upsert ลง table ภายใน data lake

เพื่อให้เห็นภาพมากขึ้นเรามาลองดูตัวอย่างการ update ของ master table ตามการเปลี่ยนแปลงของข้อมูล CDC กันครับ โดยเราจะเริ่มจาก master table ใน data lake ซึ่งมีข้อมูลดังนี้

master table ใน data lake

และหากเราทำ realtime ingestion ลง lake และ partition ใน landing แบบ daily ตัว path ของ lake จะออกมาประมาณนี้ /datalake/master_table/date=02–01–2022/*.json จากนั้นเราใช้ pyspark ดึงข้อมูลและ prepared ให้อยู่ในรูปของ dataframe

new cdc ใน landing

จากรูปข้างต้นจะเห็นว่า user_id หมายเลข 2 มีการ update เข้ามา 2 ครั้งใน 1 วันซึ่งสิ่งที่เราต้องการคือข้อมูลล่าสุดเท่านั้น ในส่วนนี้เมื่อเรากำหนก precombine.field ให้เป็น column ชื่อ last_updated ตัว hudi จะทำการดึงเฉพาะข้อมูลที่มีการเปลี่ยนแปลงล่าสุดอ้างอิงกับ column ที่ชื่อ last_updated ก็จะได้ dataframe ใหม่ดังนี้

เมื่อใช้งาน precombine.field เป็น last_updated

ถึงตรงนี้ dataframe ของข้อมูล cdc พร้อมที่จะทำการ upsert เข้ากับ master table แล้ว โดย hudi จะใช้ recordkey.field เป็น key ในการทำงาน ซึ่งก็จะได้ final ของ master table หลัง upsert แล้วตามรูปด้านล่างนี้

master table หลัง upsert

เนื่องจาก hudi เน้นงาน data streaming จึงได้ออกตัวช่วยในการ stream data ที่ชื่อว่า DeltaStreamer ซึ่งเป็นการต่อยอด spark streaming เพื่อใช้ในการ sync data จาก kafka แบบไม่ต้องเขียนเองขึ้นมาใหม่ ซึ่งนอกจาก DeltaStreamer ที่อิงอยู่กับ spark ตัว hudi ยังมี Kafka Connect Sink ให้ hudi sink connector

feature สุดท้ายใน hudi ที่เราจะทำการพูดถึงกันคือ schema evolution ซึ่งจากที่เทียบกันระหว่าง 3 frameworks พบว่า hudi รองรับ case การเปลี่ยนแปลง schema ได้น้อยสุดซึ่งรองรับแค่การเพิ่ม column ต่อท้ายและการเปลี่ยน datatype เท่านั้นทั้งๆที่ทั้ง delta lake และ iceberg ทำได้ถึงการ rename column ของ table ใน data lake

Apache Iceberg

iceberg เป็น project ที่ตั้งต้นพัฒนาโดย Netflix ประมาณปี 2018 ซึ่งบ้างแหล่งข้อมูลอ้างอิงว่าในช่วงต้นนั้น Apple ก็มีส่วนร่วมในการพัฒนาเช่นกัน ก่อนจะถูกปล่อยเป็น open source project ภายใต้โครงการ apache license เช่นเดียวกับ hudi แต่จุดประสงค์หลักของ iceberg นั้น พุ่งไปที่ performance การของการทำ analytic กับข้อมูลขนาดใหญ่(จำนวน partition ที่เยอะ) ซึ่งถ้าจะอ้างอิงแบบชี้ชัดไปกว่านั้นคือแก้ไขปัญหาการ query ข้อมูลขนาดใหญ่ของ hive metastore ซึ่งในด้านการใช้งานเพื่อทำ lakehouse ตัว iceberg ก็มี feature รองรับงาน ACID transaction เช่นกัน

หากจะพูดถึงการทำ upsert ที่เรามักใช้เพื่อ sync dataใน lakehouse นั้นตัว iceberg จะใช้วิธี merge into แบบเดียวกับ delta lake รวมถึงการพูดถึงการงานด้าน data streaming ก็มี feature รองรับแต่อาจจะไม่ครบเครื่องเมื่อเทียบกับ hudi

มาถึงตรงนี้หลายท่านอาจจะกำลังคิดว่างั้น hudi ก็ดูดีกว่า iceberg ละสิ ซึ่งผมอยากพาย้อนกลับไปในส่วนของจุดประสงค์ตั้งต้นของ iceberg อีกครั้งว่าจุดเด่นของการพัฒนาแต่เดิมเค้าไม่ได้เน้นเรื่องการทำ upsert หรือ data streaming แต่จะเน้นไปที่การทำ analytic เป็นหลัก ทีนี้เราลองมาดูกันว่า feature เด่นๆในด้านนี้ของ iceberg มีอะไรบ้าง

เริ่มต้นที่เรื่องของ schema evolution นอกจาก iceberg จะรองรับการ add/rename/delete ของ column แล้วยังสามารถทำ sort order ของ table รวมถึง reorder ตัว table ได้โดยไม่มีผลกระทบใดๆ ตัวอย่างเช่นหากเรามี table ชื่อ product และภายใน table ดังกล่าวมี column ชื่อ price และ category และด้วยเหตุผลในด้าน query performance เราอยาก sort order ของ column price และ category ให้เรียงจากมากไปหาน้อย ซึ่งหากภายหลังเราพบว่าควรปรับการเรียงของ column category จากน้อยไปมากแทน เราก็สามารถปรับเปลี่ยนในจุดนี้ได้อิสระ

feature ต่อมาของ iceberg ที่ผมขอยกให้เป็น killer feature เลยนั้นก็คือ partitioning ซึ่งโดยทั่วไปหากเราจะทำ partitioning ของ data lake นั้นเราต้องทำ column ขึ้นมารองรับ ตัวอย่างเช่นถ้าเราอยาก partition ด้วย date เพื่อให้ path ของ data lake มีรูปร่างแบบนี้ /datalake/master_table/date=02–01–2022/*.json ก็ต้องมี column นึงที่ชื่อว่า date และ value เป็น format dd-MM-yyyy ซึ่งหากใน source ของ dataframe เรามี column ที่เก็บข้อมูล datetime เป็น format MM/dd/yyyy HH:mm:ss ในกรณีนี้เราก็จำเป็นต้อง transform column ดังกล่าวให้อยู่ในรูปแบบ final ที่เราต้องการก่อนที่จะมีการ persist ลง data lake เพื่อให้ได้ partition แบบที่เราอยากให้เป็น ซึ่งใน point นี้มีในส่วนของ partition transforms ตัวอย่างเช่นเราสร้อง table ชื่อ sample และมี ts เป็น column timestamp เราสามารถระบุตอนสร้าง table ได้เลยว่าอยาก partition เป็น year,month และ days โดยที่ไม่ต้อง prepared column ในส่วนนี้แต่อย่างใด

CREATE TABLE prod.db.sample (
id bigint,
data string,
category string,
ts timestamp)
USING iceberg
PARTITIONED BY (years(ts),months(ts),days(ts))

ซึ่งในอนาคตหากเราต้องการทำ partition เพิ่มในระดับชั่วโมงก็สามารถแก้ไข table ได้โดยไม่มีผลกระทบ

ALTER TABLE prod.db.sample ADD PARTITION FIELD hour(ts)

นอกจากนี้การที่ iceberg มี hidden-partitioning ยังทำให้การ query ข้อมูลโดยใช้ partition เป็นเรื่องง่ายขึ้น ตัวอย่างเช่นหากเราไม่ใช้ iceberg แล้ว partition เรามีรูปแบบนี้ /datalake/master_table/date=02–01–2022/*.json เพื่อให้การ query มีประสิทธิภาพเราต้องระบุ partition ใน where clause ของเราให้ตรง format ว่า where date=02–01–2022 ซึ่งในจุดนี้หากเราใช้ iceberg ตามรูปด้านล่างไม่ว่าเราจะ partition ข้อมูลเราเป็นแบบไหนหรือเคยเปลี่ยน partition จาก month เป็น day ตอนเรา query เราก็ยังระบุ date ตามที่ต้องการแบบที่ไม่ต้องเจาะจง partition

อ้างอิง https://iceberg.apache.org/docs/latest/evolution/#partition-evolution

เปรียบเทียบระหว่าง hudi และ iceberg ในด้านอื่นๆ

  • การทำงานคู่กับ processing framework

ทั้ง hudi และ iceberg สามารถใช้งานร่วมกับ spark และ flink ได้ทั้งคู่

  • การใช้กับ lake แบบ git style

ในด้านของ iceberg นั้นสามารถใช้งานรวมกับ nessie project ได้แบบ native ซึ่งหากเราเลือกใช้ hudi ตัวเลือกให้การทำ lake ในแบบ git-like ก็จะเหลือแค่ lakeFS เท่านั้น

  • การใช้งานร่วมกับ query engine อย่าง Trino

ในด้านของ iceberg ชัดเจนมาก ว่าสามารถใช้งานร่วมกับ Trino ได้ ดูได้จากหน้าเว็บที่เป็น official ของ Trino ว่าเป็น 1 ใน connector ที่ Trino ซึ่งในจุดนี้แม้ว่า hudi จะยังไม่มีอยู่ใน list ของ connector แต่หากดูจาก web ของ hudi ก็ได้จัดเตรียม connector ไว้ใช้กับ Trino เช่นกัน https://hudi.apache.org/docs/query_engine_setup#trino

  • query performance

อาจจะเพราะ iceberg นั้นมีจุดเด่นที่การทำ analytic กับข้อมูลที่มี partioin จำนวนมากๆ อยู่แล้วทำให้เอกสารและข้อมูลที่เกี่ยวกับ performance การ query ออกมาเยอะกว่า ตัวอย่างเช่นใน video นี้ (https://www.youtube.com/watch?v=nWwQMlrjhy0) แต่ในด้านของ hudi หากดูจากเอกสารของระบบก็น่าจะไม่ด้อยไปกว่ากันเพราะมี metadata table ซึ่งเทียบได้กับ manifest files ของ iceberg ที่ช่วยให้การ query ข้อมูลสามารถ jump ไปยัง file system ที่มีข้อมูลที่เราต้องการอยู่ได้เลย อีกทั้ง hudi ยังมี mode การใช้งาน table ที่เน้นไปที่งาน query อย่าง Merge On Read ซึ่งมีการใช้ file ข้อมูล parquet และ avro คู่กันเพื่อทำให้เราสามารถ query ข้อมูลได้แบบ near real-time ซึ่งในด้านนี้อาจจะต้องรอไปอีกสะระยะเราถึงจะได้เห็นเอกสารฝั่ง hudi เรื่อง query peromance กัน

  • multiple concurrent writes

แม้ทั้งคู่จะรองรับ multiple concurrent writes แต่ด้าน iceberg นั้นสามารถใช้งาน feature นี้ได้แบบ native แบบไม่ต้องพึ่งพา tool อื่นแต่อย่างใด (https://iceberg.apache.org/docs/latest/reliability/#concurrent-write-operations) แต่ในด้านของ hudi นั้นเราจำเป็นต้องพึ่ง tool อื่นเข้ามาช่วยในการจัดการซึ่งเราสามารถเลือกได้ว่าจะใช้ Zookeeper, HiveMetastore หรือ Amazon DynamoDB https://hudi.apache.org/docs/concurrency_control#enabling-multi-writing

  • snapshot info

การ query เพื่อดู detail ของ snapshot ที่เกิดขึ้นกับ table นั้นในฝั่งของ iceberg ทำได้ดีกว่าซึ่งหากดูจาก https://iceberg.apache.org/docs/latest/spark-queries/#snapshots จะเห็นว่า iceberg ได้บันทึก operator ที่กระทำกับ table ด้วยว่าเป็นการ append, delete หรือ overwrite โดยในส่วนนี้หากนำ dataframe ที่ได้จาก query ดังกล่าวมาส่งออกไปทาง spark streaming ก็อาจจะสามารถทำสิ่งที่คลายกับ CDC บน lakehouse สำหรับด้านของ hudi นั้นก็มี feature การ query แบบ snapshot หรือ time travel แต่ info ที่ได้กลับไม่มีในส่วนของ operator มาให้ แต่ถึงอย่างนั้น hudi ก็มี feature บ้างอย่างที่น่าสนใจซึ่งอาจจะมาตอบโจทย์ตรงนี้ได้อย่าง Kafka callback

สรุป

จากที่เราได้กล่าวมาทั้งหมดนี้แม้ว่าทั้ง hudi และ iceberg จะมี features ในการทำ lakehouse เหมือนๆกัน แต่ด้วยจุดประสงค์ของการพัฒนาที่แตกต่างกัน การเลือก tools ให้เหมาะกับงานจะช่วยทำให้การทำงานของ data engineer ง่ายขึ้นได้อย่างแน่นอน ซึ่งในการใช้งานเชิงลึกนั้น ผมเชื่อว่ายังมีรายละเอียดในด้านของความต่างของทั้งคู่อีกมากและหวังว่าเราคงได้นำมาพูดคุยในโอกาสต่อๆไป

สุดท้ายนี้ถ้าท่านใดมีข้อสงสัยหรือคำชี้แนะใดๆสามารถฝากข้อความได้ที่ https://www.facebook.com/coeffest/ นะครับ ขอบคุณมากครับที่ติดตาม

สำหรับคนที่สนใจ Data Engineer สามารถเข้ามาแชร์ข้อมูลกันที่ได้ที่ https://www.facebook.com/groups/dataengineerth

Refer:

--

--