Optimize data usage with Apache Iceberg

Nitit Taepant
CJ Express Tech (TILDI)
7 min readJul 17, 2024

จากบทความก่อนหน้า การเก็บข้อมูลของ Apache Iceberg เราก็พอจะเข้าใจวิธีการเก็บข้อมูลและ metadata ของ Apache Iceberg มาบ้างแล้ว การเก็บข้อมูลแบบนี้ทำให้เกิด Features ขึ้นมามากมาย แต่ก็นำมาซึ่งการเก็บข้อมูลเพิ่มเติม และต้องจัดการไฟล์เพิ่มขึ้นมาอีกระดับนึงด้วย ดังนั้นเราจึงจะมาทำความเข้าใจให้มากขึ้นกันว่าเราจะใช้ประโยชน์ และจัดการดูแลมันยังไง

โดยเราจะแบ่งหัวข้อดังนี้

  • ปัญหาที่มาพร้อมกับการเก็บข้อมูลบน Apache Iceberg
  • โจทย์ที่ใช้ Apache Iceberg ช่วยได้
  • เครื่องมือที่ Iceberg มีให้ใช้
  • เราจะใช้เครื่องมือพวกนี้แก้ปัญหาที่เจอยังไงได้บ้าง

ปัญหาที่เกิดจากการเก็บข้อมูลบน Apache Iceberg

Too many snapshots, metadata and versioning

ในการเปลี่ยนแปลงข้อมูลใน Apache Iceberg จะทำให้เกิด snapshot และ metadata version ใหม่ขึ้นมา และเมื่อ snapshot มากเกินไปทำให้ metadata.json จะใหญ่ขึ้นเรื่อยๆ และ storage ของเราจะมีข้อมูลมากเกินไปเพราะต้องเก็บข้อมูลที่อยู่ภายใต้ snapshot ต่างๆแม้ว่าข้อมูล snapshot นั้นจะผ่านมานานมากๆและเราอาจจะไม่ได้ใช้แล้วก็ตาม ทำให้เราต้องเสียค่าใช้จ่ายในการเก็บไฟล์เหล่านั้นไว้โดยไม่จำเป็น รวมถึงเวลาที่เราจะหา snapshot ที่ถูกต้องสำหรับการใช้งานจะเสียเวลาในการ scan เพื่อหาไฟล์ที่จำเป็น

Too many small files

ในการจะอ่านข้อมูลขึ้นมานั้นเราต้องการที่จะอ่านข้อมูลที่เล็กและจำเป็นขึ้นมาเท่านั้น เพื่อที่จะลดขนาดข้อมูลในการ process ต่างๆให้มีประสิทธิภาพและใช้ทรัพยากรณ์เท่าที่จำเป็น แต่การเปิดไฟล์เพื่ออ่านข้อมูลหลายๆไฟล์นั้นค่อนข้างที่จะเป็นคอขวด ยิ่งถ้ามีข้อมูลหลายๆไฟล์เล็กๆเพื่อที่จะรวมเป็นข้อมูลชุดเดียวเพื่อ process แล้วยิ่งทำให้การทำงานประสิทธิภาพแย่ลง

สิ่งนี้เกิดขึ้นเมื่อการทำงานของ Spark อาจจะมีการแบ่ง Partition ข้อมูลเพื่อทำงานขนานกันไปในหลายๆ Tasks และเขียนข้อมูลลง Disk ไปโดยไม่ได้ทำการ Repartition ให้ข้อมูลมีขนาดที่เหมาะสมก่อนเขียนไฟล์ และยิ่งจะถูกแบ่งไฟล์ไปอีกเมื่อมีการทำ Partition บนข้อมูล ยกตัวอย่างเช่น บน Spark Partition มี 100 Partitions และต้องเขียนข้อมูลลง Partition ของข้อมูลอีก 10 Partitions โดยรวมแล้วจะต้องมีไฟล์เกิดขึ้น 1000 Files เลยยิ่งทำให้ไฟล์เล็กลงและเยอะขึ้นไปอีก

โจทย์ที่ใช้ Apache Iceberg แก้ปัญหาได้

Big table, small size query

ข้อมูลมีขนาดใหญ่แต่เวลา Query ใช้งานอาจจะต้องการข้อมูลที่เฉพาะเจาะจงบางส่วนเล็กๆที่ปรากฏในบางไฟล์ แต่การ query อาจจะต้อง query มาทุกไฟล์แล้วมา filter ออกทีหลังจนเหลือข้อมูลที่จำเป็น ทำให้การ query ค่อนข้างไม่มีประสิทธิภาพเพราะอ่านข้อมูลเกินจำเป็น

Timestamp partition evolving over time

เราต้องการใช้ Timestamp เป็น Partition ด้วยวันที่ แต่เมื่อนานๆไปข้อมูลใหญ่ขึ้นในแต่ละชั่วโมงจึงต้องการเปลี่ยนแปลง Partition ให้ละเอียดขึ้นเป็นรายชั่วโมง เพื่อลดขนาดข้อมูลในแต่ละ partition การทำสิ่งนี้จะเกิดปัญหาสองส่วนเมื่อเราใช้การเก็บข้อมูลแบบ Hive คือ ทำให้เกิด column ใหม่ทั้ง column timestamp ต้นทาง และ partition column ที่แปลงมาเกิดขึ้นบน table อาจทำให้ผู้ใช้งานสับสนว่าต้องใช้ column ไหนในการ filter ข้อมูล ปัญหาส่วนที่สองคือการเปลี่ยน partition จากวันที่เป็นชั่วโมง จะต้อง migrate และเปลี่ยนที่อยู่ของข้อมูล

Big table, high-cardinality partition column, medium records update, and many partitions need to update

ในข้อมูลที่มีขนาดใหญ่และต้องใช้ High-cardinality partition column ในการแบ่งข้อมูลเพื่อ optimize การ query จากผู้ใช้งาน แต่ข้อมูลมีการอัพเดทไม่เยอะมากในแต่ละรอบอัพเดท แต่การอัพเดทต้องไปแตะข้อมูลหลายๆ Partition ทำให้เวลาจะอัพเดทข้อมูลต้องกวาดข้อมูลเกือบทุก partition มาเทียบและอัพเดทข้อมูล

ก่อนอื่นเรามาดูเครื่องมือที่ Apache Iceberg มีให้ใช้สำหรับแก้ปัญหากัน

เครื่องมือที่ Apache Iceberg มีให้เลือกใช้นั้นมีเต็มไปหมด แต่เราจะยกตัวอย่างเฉพาะเครื่องมือที่เห็นว่าสำคัญๆสำหรับการใช้แก้ปัญหาที่ยกตัวอย่างมาให้

Data Definition Language (DDL)

เราสามารถที่จะสร้างและเปลี่ยนแปลง Table ได้โดยใช้ Spark SQL ง่ายๆผ่านคำสั่ง DDL ยกตัวอย่างเช่น

Create a Table with Partitions

CREATE TABLE <TABLE_NAME> (<[COLUMNS,]>)
USING iceberg
PARTITIONED BY (<[COLUMNS,]>);

Add หรือ Drop partition

ALTER TABLE <TABLE_NAME> [DROP or ADD] PARTITION FIELD <[COLUMNS,]>;

และยังสามารถ replace partition ได้ด้วย

ALTER TABLE <TABLE_NAME> REPLACE PARTITION FIELD <COLUMN> WITH <COLUMN>;

Alter a Table writing default order

-- use optional ASC/DEC keyword to specify sort order of each field (default ASC)
-- use optional NULLS FIRST/NULLS LAST keyword to specify null order of each field (default FIRST)
ALTER TABLE <TABLE_NAME> WRITE ORDERED BY <COLUMN> ASC NULLS LAST, <COLUMN> DESC NULLS FIRST

Create a Table with table properties configuration

CREATE OR REPLACE TABLE <TABLE_NAME> (<[COLUMN,]>) 
USING iceberg
TBLPROPERTIES ('write.distribution.mode'='none',
'write.merge.distribution-mode'='hash',
'write.target-file-size-bytes'='536870912');

Alter a Table property

-- 536870912 bytes = 512MB
ALTER TABLE <TABLE_NAME>
SET TBLPROPERTIES ('write.target-file-size-bytes'='536870912')

Table properties

เราสามารถใช้งาน Table properties สำหรับการกำหนดรูปแบบการเก็บข้อมูลของ Table ได้โดยมีดังนี้

  • format-version กำหนด Version ของ Apache Iceberg Format
  • write.format.default กำหนด type ของ data ว่าจะเก็บเป็น format อะไรให้เหมาะกับงาน เช่น Parquet ก็จะเหมาะกับ Analytic Table หรือ Avro ก็จะเหมาะกับการเก็บข้อมูล Streaming
  • write.metadata.previous-versions-max และ write.metadata.delete-after-commit.enabled จะกำหนดว่าเราจะเก็บ metadata file ไว้กี่ version และจะลบ metadata file ที่เก่าที่สุดหลังจาก commit การเปลี่ยนแปลงหรือไม่
  • write.target-file-size-bytes และ write.parquet.row-group-size-bytes กำหนดขนาดของไฟล์ในการเขียนต่อ 1 ไฟล์ของข้อมูล โดย Apache Iceberg จะพยายามเขียนให้ถึงขนาดที่กำหนด ยกตัวอย่างในการเก็บไฟล์แบบ Parquet นั้น ขนาดไฟล์ควรจะอยู่ที่ประมาณ 512 MB และ row-group ควรจะอยู่ที่ 128 MB แต่ก็จะมีเงื่อนไขอื่นๆที่ทำให้ข้อมูลที่เขียนมีขนาดเล็กกว่าที่กำหนดไว้เช่นกัน เช่น Spark shuffle partitions, ขนาดของ data ที่เขียน และ ขนาดของ memory ที่รับไหวในการ process ข้อมูลบน Spark เพราะการที่จะได้ข้อมูลที่ถูกบีบอัดแล้วเขียนลง disk ในขนาดที่กำหนดอาจจะต้องใช้ memory ขนาดใหญ่กว่ามาก
  • write.distribution-mode, write.delete.distribution-mode, write.update.distribution-mode, และ write.merge.distribution-mode โดยในแต่ละโหมดก็จะมีลักษณะการใช้ที่ต่างกัน ดูเพิ่มเติมได้ ที่นี่
    โดยจะมีรูปแบบดังนี้
    - none จะไม่ทำอะไรเพิ่มเติมก่อนเขียนข้อมูล ทำให้เราต้อง sort ข้อมูลบน partition เองก่อนที่จะเขียนข้อมูล
    - hash จะทำการ shuffle data โดยใช้ hash-based exchange และกระจายไปตาม Partition ก่อนที่เขียนข้อมูล สามารถทำความเข้าใจเพิ่มเติมได้ ที่นี่
    - range จะทำการ shuffle data โดยใช้ range based exchange ก่อนที่จะเขียนข้อมูล ซึ่งวิธีนี้จะใช้การคำนวนที่มากกว่าแบบ hash โดยที่แบ่งเป็น 2 stages ซึ่ง stage แรกจะสุ่มข้อมูลเพื่อทำการแบ่งช่วงข้อมูลโดยใช้ partition และ sort columns จะได้ range ออกมา และ stage ต่อมาจะ shuffle ข้อมูลตาม range นั้นแบ่งไปในแต่ละ task เพื่อเขียนข้อมูลอีกที
  • write.delete.mode, write.update.mode, และ write.merge.mode เพื่อกำหนด mode ในการเขียนข้อมูลใหม่ซึ่งจะมี copy-on-write หรือ merge-on-read ซึ่งจะใช้ได้เฉพาะ Iceberg format v2 การเลือกใช้จะขึ้นอยู่ use case ของลักษณะข้อมูล ทำความเข้าใจเพิ่มเติมได้ ที่นี่
  • commit.manifest.min-count-to-merge และ commit.manifest-merge.enabled สอง properties นี้คือการเซตว่าจะ merge manifest file อัตโนมัติตอนที่เขียนข้อมูลหรือไม่และจะ merge เมื่อมีกี่ไฟล์
  • history.expire.max-snapshot-age-ms และ history.expire.min-snapshots-to-keep เป็นการ set default ของเวลาและจำนวน snapshot ในขณะที่จะทำการ expire ผ่าน Call Procedure โดยที่จำนวนขั้นต่ำที่จะเก็บจะถูกจัดลำดับความสำคัญไว้ก่อนอายุของ snapshot ดูเพิ่มเติมได้ ที่นี่

Call Procedure

ต่อมามาดูที่ Call Procedure ในส่วนของ Metadata Management ซึ่งจะเกี่ยวข้องกับการ Maintenance Apache Iceberg สามารถดูบทความที่ทาง Tildi เขียนไว้ได้

  • expire_snapshots ในการเขียนข้อมูลไม่ว่า write, update, delete, upsert, หรือการ compact files จะทำให้เกิด snapshot และข้อมูลใหม่ขึ้นมา โดยที่ expire_snapshots จะทำการลบ snapshot เก่าๆและข้อมูลที่ไม่จำเป็นแล้วออกไป เพื่อทำให้ขนาดของ metadata และ data ที่จำเป็นต้องเก็บน้อยลง โดยที่เราสามารถกำหนด parameters ต่างๆได้ ยกตัวอย่างเช่น older_than และ retain_last เพื่อกำหนดเวลาที่จะทำการ expire shanpshot ที่นับว่าได้ว่าเก่าแล้ว และจะเก็บ snapshot ไว้อย่างน้อยกี่ไฟล์ล่าสุด ซึ่งถ้าไม่กำหนดก็จะใช้ค่า default ที่กำหนดไว้ที่ Table properties
    โดยวิธีการใช้งานผ่าน Spark SQL เป็นดังนี้
CALL <catalog_name>.system.expire_snapshots(
table => '<table_name>',
older_than => TIMESTAMP '<expire_snapshot_ts>',
retain_last => 10)
  • remove_orphan_files ลบไฟล์ที่ไม่ได้ถูกอ้างอิงโดย table metadata แล้ว ซึ่งสาเหตุหนึ่งอาจเกิดจากการที่เราใช้งาน Spark หรือ distributed processing engines อื่นๆ สร้าง job หรือ task ที่ไม่สำเร็จขึ้นมา ทำให้อาจจะเหลือไฟล์บางไฟล์ไว้แล้วไม่ได้จัดการลบออกไป โดยไฟล์พวกนี้อาจจะไม่ได้ถูกอ้างอิงใน table metadata ทำให้การ expire snapshot อาจจะไม่ได้ลบไฟล์พวกนี้ไปด้วย
    โดยเราสามารถกำหนด parameters เช่น older_than กำหนดอายุของไฟล์ที่พบว่าเป็น orphan แล้วจะลบออก และ location เพื่อกำหนดที่อยู่ของไฟล์ในการหา orphan file
CALL <catalog_name>.system.remove_orphan_files(
table => '<table_name>')

ข้อควรระวังในการใช้งานคำสั่งนี้ก็คือ ไม่ควรจะตั้งเวลาการใช้งานในแต่ละรอบให้สั้นกว่าเวลาที่ข้อมูลจะอัพเดทเสร็จ เพราะในระหว่างที่อัพเดทข้อมูล อาจจะมีข้อมูลใหม่บางไฟล์ที่ถูกคิดว่าเป็น orphan fileโดยวิธีการใช้งานผ่าน Spark SQL เป็นดังนี้

  • rewrite_data_files ในการอัพเดทข้อมูลอาจจะมีการเขียนข้อมูลเล็กๆหลายๆไฟล์ประกอบเป็นข้อมูลหนึ่ง snapshot ซึ่งอาจจะรวมเป็นไฟล์ใหญ่หนึ่งไฟล์ได้เพื่อเพิ่มประสิทธิภาพในการอ่านข้อมูล การ rewrite data file เป็นเครื่องมือหนึ่งที่ใช้ช่วยตรงนี้ได้ โดยสามารถทำความเข้าใจเพิ่มเติมได้จากบทความนี้ และสามารถดู parameters ต่างๆที่ใช้งานสำหรับ procedure นี้ได้ ที่นี่ ตัวอย่างการใช้งานผ่าน Spark SQL เป็นดังนี้
CALL <catalog_name>.system.rewrite_data_files(
table => '<table_name>',
options => map('min-input-files','2'))
  • rewrite_manifests เมื่อเวลาผ่านไปนานๆข้อมูลได้มีการ update และเกิด Manifest file ขึ้นมาอ้างอิงไฟล์ข้อมูลต่างๆมากมาย รวมถึงอาจจะมีการเปลี่ยนแปลงรูปแบบการจัดเก็บข้อมูล ทำให้เวลาทำ Query Planning อาจจะได้เวลาในการ process ข้อมูลที่ช้าลง เราสามารถที่จะใช้ procedure นี้ในการ optimize manifest files ได้ โดยจะเขียน Manifest ใหม่อ้างอิงกับ Partition Spec id ปัจจุบันของ Table หรือจะกำหนดผ่าน Parameter spec_id ก็ได้
    ตัวอย่างการใช้งานผ่าน Spark SQL เป็นดังนี้
CALL <catalog_name>.system.rewrite_manifests(table => '<table_name>')

Hidden Partitioning

ในการทำ Partition บนข้อมูลจะช่วยจับกลุ่มข้อมูลตอนเขียนข้อมูล ทำให้เราสามารถที่จำกัดขนาดในการอ่านข้อมูลในแต่ละครั้งได้อย่างมีประสิทธิภาพ

โดยปกติเราอาจจะใช้ Hive Partition ในการจัดการ storage แต่การใช้งาน Hive Partition ก็ยังมีข้อจำกัดในการใช้งานอยู่เช่น

  • Partition ต้องเป็น partition ที่ตรงกับบน physical storage จริงๆ ทำให้การจะใช้ query ผ่าน partition ผู้ใช้งานจำเป็นต้องรู้ว่า partition เก็บในลักษณะอย่างไร เช่นข้อมูลถูกเก็บโดย partition เป็นวันที่รายวัน ซึ่งข้อมูลมีจะ column ‘timestamp’ และ ‘date’ แยกกันโดย ‘timestamp’ คือข้อมูลเวลาของข้อมูลจริงๆ แต่ ‘date’ เป็น partition column ที่เราสร้างขึ้นมาเพื่อทำ partition ถ้าเราจะ query ข้อมูลภายใน range ของวันที่ที่ต้องการ เราจำเป็นต้องรู้ว่าเราต้อง filter column ‘date’ เพื่อให้มีการใช้งาน partition ที่ถูกต้อง ไม่สามารถที่จะ query filter ผ่าน ‘timestamp’ ได้ รวมถึงเราต้องใช้ format ของ date ที่ถูกต้องในการ filter ด้วย เช่น YYYY-MM-DD หรือ DD-MM-YYYY รวมถึง time zone ของวันที่
  • การเปลี่ยนแปลง partition จำเป็นต้อง migrate data ไปที่ path ใหม่ที่มีการจัดการ partition ใหม่

ซึ่ง Apache Iceberg เข้ามาแก้ปัญหาตรงนี้รวมถึงเพิ่ม Feature ในการ transform ข้อมูลก่อนที่จะทำ Partition เพื่อใช้งานกับ use case ต่างๆด้วย

Apache Iceberg ใช้ Metadata ในการจัดการ partition เพื่อชี้ไปหา partition ของข้อมูลที่ถูกต้อง โดยที่ไม่ต้องให้ user มา maintain หรือรู้เกี่ยวกับ Partition จริงๆว่าเก็บอย่างไรทำให้สามารถที่จะซ่อน partition บนข้อมูลจริงๆได้จึงเรียกว่า Hiddle Partition ซึ่งการทำงานของมันจะทำงานทุกครั้งเมื่อเรา query ข้อมูลบน Apache Iceberg table โดย query จะถูกนำไปสร้าง partition ที่ถูกต้องเพื่อ scan หาข้อมูลที่เกี่ยวข้องสำหรับ query นั้นๆ เพื่อทำให้ query เร็วขึ้นและใช้ partition ที่ถูกต้อง

การที่ Apache Iceberg แยก physical และ logical ออกจากกันทำให้สามารถที่จะเปลี่ยนแปลง partition เมื่อใดก็ได้ โดย partition จะถูกจัดการที่ Metadata Layer และค่อยๆเปลี่ยน Data Layer เท่าที่จำเป็นตามที่มีข้อมูลอัพเดทเข้ามาทำให้เราลดเวลาและ cost ของการ migration ได้

ซึ่งการใช้งาน Hidden Partition ทำให้ Apache Iceberg ทำการ Transform Partition เพื่อให้ตอบโจทย์ต่างๆได้โดยจะถูกแบ่งเป็นประเภทของการ Partitioning Strategies ได้ดังนี้

Range Partitioning

โดยข้อมูลจะถูกแบ่งเป็นช่วงๆบนข้อมูลของ column ที่เป็น continuous range ซึ่งที่ Apache Iceberg มีให้ตอนที่เขียนจะเป็นเกี่ยวกับ Time-series โดยเราสามารถ transform timestamp column ให้เป็น partition รูปแบบต่างๆได้ดังนี้ year(YYYY), month(YYYY-MM-01), day or date(YYYY-MM-DD), hour or date_hour(YYYY-MM-DD HH:00:00)

ตัวอย่างการสร้าง table partition

CREATE OR REPLACE table <TABLE_NAME> (<[COLUMN,]>) 
PARTITIONED BY (day(<column_ts>));

Bucket Partitioning

Partitioning แบบนี้ข้อมูลจะถูกแบ่งเป็นถังๆ โดยจะถูกกำหนดจำนวนถังเอาไว้ ซึ่งข้อมูลจะถูกทำการ hash แล้วจับใส่แต่ละถัง ซึ่งวิธีการทำ Partition อย่างนี้จะช่วยในการกระจายข้อมูลให้แต่ละถังมีจำนวนพอๆกัน ทำให้เวลาอ่านข้อมูลขึ้นมาในแต่ละ Partition จะมีข้อมูลพอๆกัน การ process ข้อมูลก็จะมีประสิทธิ์ภาพมากขึ้น

ตัวอย่างการสร้าง table partition

CREATE OR REPLACE table <TABLE_NAME> (<[COLUMN,]>) 
PARTITIONED BY (bucket(<n_bucket>,<column>));

Truncate Partitioning

ข้อมูลจะถูกตัดเป็นช่วงของความกว้างตามจำนวนที่กำหนด โดยที่ string type จะถูกตัดให้เหลือ character ตามขนาดที่กำหนด และพวก numeric type จะถูกทำ binning เป็นช่วงของข้อมูลขนาดตามที่กำหนด เช่น

สมมติเรากำหนดให้ truncate ขนาดเท่ากับ 3 บน column ที่มีข้อมูล “abcdef” จะได้ partition เป็น “column=abc” หรือ column ที่มีข้อมูลเป็นตัวเลขจะใช้สูตร v — (v % W) v=ข้อมูล W=width=3 ถ้าข้อมูลคือ 2 ข้อมูลจะอยู่ partition เป็น “column=0” และถ้าข้อมูลคือ 3 จะได้ partition เป็น “column=3” เรียงตามลำกับ 0,3,6,9,…. ไปเรื่อยๆ

การทำ Truncate นั้นเหมาะกับข้อมูลที่มี unique value เยอะแต่มีลักษณะของข้อมูลที่เป็น pattern ที่สามารถจับกลุ่มได้เช่น URLs, SKUs หรือ code บางอย่างที่มี pattern ทำให้เราสามารถลดการใช้งาน partition ที่เล็กๆเยอะเพื่อจับกลุ่มไฟล์ให้ใหญ่ขึ้นเพื่อเพิ่มประสิทธิ์ภาพในการอ่านเขียนข้อมูลได้

ตัวอย่างการสร้าง table partition

CREATE OR REPLACE table <TABLE_NAME> (<[COLUMN,]>) 
PARTITIONED BY (truncate(<n_width>,<column>));

สามารถดูตัวอย่างการใช้งานและตัวอย่างของ hidden partition ได้ที่นี่

มาถึงตรงนี้เราก็พอจะเห็นเครื่องมือที่ Apache Iceberg ให้มาในการใช้งานและดูแลข้อมูลของเราบ้างแล้ว ซึ่งก็เป็นเครื่องมือสำคัญหลักๆที่เราจะใช้ในการตอบโจทย์สำหรับปัญหาที่เราจะยกตัวอย่างเท่านั้น ซึ่ง Apache Iceberg ยังมีเครื่องมืออื่นๆที่อาจจะช่วยเราในเคสที่เฉพาะเจาะจงอื่นๆได้อีกด้วย

แล้วเราจะใช้เครื่องมือพวกนี้แก้ปัญหาที่เราพูดถึงข้างบนๆยังไงได้บ้าง

คิดว่าหลังจากที่เราเข้าใจเครื่องมือที่ยกตัวอย่างไป ก็คงจะคิดออกบ้างแล้วว่าปัญหาที่จะเกิดขึ้นต้องแก้ปัญหายังไง แต่เดี๋ยวจะลองอธิบายแนวทางวิธีแก้ปัญหาต่างให้เห็นชัดๆกัน

ปัญหาที่เกิดจาก การเก็บข้อมูลบน Apache Iceberg

Too many snapshots, metadata and versioning

ปัญหานี้ทาง Apache Iceberg ก็ให้แนวทางและเครื่องมือมาพร้อมนั่นคือให้เรา Remove old metadata files โดยที่กำหนดค่า table properties write.metadata.previous-versions-max และ write.metadata.delete-after-commit.enabled ให้เหมาะสม และ Expire Snapshots โดยใช้ Call Procedure expire_snapshots และ remove_orphan_files เป็นระยะๆ เท่านี้เราจะจัดการทั้ง metadata และ data files ที่เก่าและไม่จำเป็นออกไปได้

และยังสามารถ optimize metadata ได้อีกโดยการกำหนด table properties commit.manifest.min-count-to-merge และ commit.manifest-merge.enabled เพื่อ merge manifest เวลาที่มีเยอะเกินที่กำหนด หรือใช้ Call procedure rewrite_manifestsเพื่อเขียน manifest ใหม่ให้ optimal ขึ้น

Too many small files

ปัญหานี้เกิดขึ้นได้จากหลายๆอย่าง ตัวอย่างเช่น

  • การอัพเดทข้อมูลเล็กๆบ่อยๆ
  • การที่ทิ้งข้อมูลไว้หลาย version แล้วไม่ได้ Maintenance
  • การ design ในการเก็บข้อมูลรวมถึง optimize การเขียนในแต่ละการอัพเดท

วิธีแก้ปัญหานี้ เราสามารถใช้เครื่องมือเหล่านี้ได้

  • Data Writing Mode สามารถใช้ copy-on-write หรือ merge-on-read โดยกำหนดผ่าน write.delete.mode, write.update.mode, และ write.merge.mode ได้
  • Distribution mode ในการเขียนไฟล์ สามารถกำหนดผ่าน write.distribution-mode, write.delete.distribution-mode, write.update.distribution-mode, และ write.merge.distribution-mode ได้
  • Design partition สำหรับข้อมูลให้เหมาะกับการเขียนและอ่าน โดยใช้ประโยชน์จาก Hiddle Partition
  • กำหนด File size ในการเขียน โดยใช้ write.target-file-size-bytes
  • การ compact files โดยใช้ call procedure rewrite_data_files เพื่อรวมไฟล์เล็กให้เป็นขนาดไฟล์ที่ต้องการ
  • กำหนด Spark Dataframe partition ก่อนการเขียนไฟล์ โดย coalesce หรือ repartition หรือปรับการใช้งาน Spark AQE

โจทย์ที่ใช้ Apache Iceberg แก้ปัญหาได้

Big table, small size query

ในการ query โดยใช้ Apache Iceberg มีการ query planning เพื่อ pruning file โดยจะพยายามหาไฟล์ที่จำเป็นในการอ่านเท่านั้น ซึ่งทำได้โดยอัตโนมัติจากการอ่านข้อมูลบน metadata

Timestamp partition evolving over time

การแก้ปัญหานี้เราสามารถใช้ column ที่เป็น timestamp ในการเป็น partition โดยผ่านการ transfrom ด้วย day function และเมื่อเราต้องเป็นเป็นในระดับชั่วโมงก็เปลี่ยนเป็น hour โดยใช้ DDL ALTER ได้เลย โดยที่ไม่ต้อง migrate table และผู้ใช้ไม่จำเป็นต้องเปลี่ยนการ query สามารถ query โดยใช้ column timestamp นั้นเหมือนเดิมได้เลย เพราะ Apache Iceberg จะจัดการ partition โดยใช้ hidden partition ให้อัตโนมัติ

Big table, high-cardinality partition column, medium records update, and many partitions need to update

ปัญหานี้แก้ได้โดยการ Design การเก็บข้อมูลและ Maintenance เป็นระยะๆ

  • Design

เนื่องจากข้อมูลจำเป็นต้องใช้ partition เป็นแบบ High-cardinality column ทำให้ถ้าเราจะใช้เป็น partition ตรงๆอาจจะเกิด small files เต็มไปหมดเพราะจะมี unique value ของ partition แยกไฟล์เต็มไปหมด เราสามารถใช้งาน partition transform ในรูปแบบ bucket ในการ grouping ข้อมูลและ distribute ข้อมูลลง bucket โดยที่ต้อง balance ขนาดไฟล์ และลักษณะการ query บน table ก็จะสามารถช่วยให้การเก็บและใช้ข้อมูลมีประสิทธิภาพมากขึ้นแล้ว

  • Maintenance

ถึงแม้การทำ partition bucket ไว้จะช่วยได้เยอะแล้วแต่เนื่องจากการอัพเดทข้อมูลในแต่ละรอบเมื่อกระจายไปแต่ละ bucket อาจจะเกิดเป็นไฟล์เล็กๆในแต่ละรอบขึ้นมาได้ ดังนั้นเราจะต้องกำหนดช่วงเวลาใน rewrite data file เป็นระยะ เพื่อ optimize ขนาด data file และรวมถึงการ Maintenance อื่นๆด้วย

Conclusion

จะเห็นว่า Apache Iceberg มีการเพิ่มข้อมูลในการจัดเก็บต่างๆมากขึ้น แต่ก็มาพร้อมกับ Features มากมายเพื่อให้เราสามารถ Optimize การจัดการข้อมูลและเพิ่มประสิทธิภาพในการอ่านเขียนข้อมูลมากขึ้น รวมถึงมีเครื่องมือในการ Maintenance ตัวเองด้วย ซึ่งการใช้ Apache Iceberg ในการเก็บข้อมูลจำเป็นต้องเข้าใจการทำงานของมัน และเข้าใจว่าข้อมูลที่เราจะเก็บมีลักษณะอย่างไรเพื่อที่จะทำให้การเก็บข้อมูลมีประสิทธิภาพสูงสุด และไม่เกิดปัญหาตามมาภายหลัง

และ Apache Iceberg ยังมีความสามารถอีกมากมาย และรายละเอียดของ concepts อื่นๆที่ไม่ได้พูดถึงในบทความนี้ แต่หวังว่าบทความนี้จะทำให้เห็นภาพของการใช้งานชัดเจนมากขึ้นเพื่อนำไปต่อยอดและศึกษาต่อไป

ถ้าเกิดอยากติดตามบทความอื่นๆที่น่าสนใจ ก็สามารถติดตามได้ทาง CJ Express (TILDI)

References:

--

--