การเก็บข้อมูลของ Apache Iceberg

Nitit Taepant
CJ Express Tech (TILDI)
11 min readJul 12, 2024

ในบทความนี้เราจะมาอธิบายโดยละเอียดของการเก็บข้อมูลโดยการใช้ Apache Iceberg ซึ่งเราก็จะครอบคลุมเนื้อหาดังนี้

  • Apache Iceberg คืออะไร
  • Apache Iceberg Architecture
  • การทำงานและการจัดเก็บข้อมูลโดย Apache Iceberg

มาเริ่มกันเลย

Apache Iceberg คืออะไร

Apache Iceberg คือ Open Table Format, OFT โดยเป็น Open Source ที่สร้างโดย Netflix และนำเข้า Apache Software Foundation ในปี 2018

OFT เป็น DataLake table format โดยประกอบขึ้นจาก Data file และ metadata ด้วยกันเพื่อบอกว่า table มีลักษณะและประกอบด้วยอะไรบ้าง ทำให้สามารถที่จะเพิ่มความสามารถต่างๆให้การเก็บไฟล์ เช่น ACID, Schema Evolution, Time Tarvel, Multiple writer/reader และอื่นๆ ซึ่งช่วยให้เราจัดการและเพิ่มประสิทธิภาพการใช้งานข้อมูลได้ดีขึ้น

Apache Iceberg Architecture

https://iceberg.apache.org/spec/#overview

การเก็บข้อมูลของ Iceberg มี 3 layers นั่นคือ Data Layer, Metadata Layer และ Catalog Layer

Data Layer

เป็น Layer ที่ใช้เก็บข้อมูลจริงๆซึ่งเราสามารถกำหนด file type ที่จะใช้ได้เพื่อเก็บข้อมูลได้เหมาะสมกับการใช้งาน ไม่ว่าจะเป็น Parquet, ORC หรือ Avro และยังสามารถใช้เก็บไว้ Distribute file system ได้เช่น HDFS หรือ Cloud object storage.

Metadata Layer

Layer นี้จะเก็บชุดของข้อมูลเกี่ยวกับสิ่งที่เกิดขึ้นกับ Table ทั้งการสร้างการเปลี่ยนของข้อมูลรวมถึงการเปลี่ยนของ metadata เอง

โดยประกอบด้วย

  1. Manifest file: จะเก็บชุดของ data file รวมถึงสถิติต่างของ file นั้นๆ
  2. Manifest list: จะเก็บข้อมูลของ snapshot id หนึ่งๆ โดยจะเก็บว่า snapshot นั้นประกอบไปด้วย Manifest file อะไรบ้างและไฟล์อยู่ที่ไหน partition คืออะไร รวมถึงข้อมูลสถิติอื่นๆเช่น ระยะของ partition ที่แต่ละไฟล์ชี้ข้อมูลไปหา
  3. Metadata file: จะเก็บข้อมูลของ Manifest list ที่ใช้งานได้ของไว้ รวมถึงข้อมูลการเปลี่ยนแปลงต่างๆของ snapshot นั้นๆ และเก็บข้อมูลของ metadata สำหรับ table นั้นๆเช่น schema, partition, snapshot id ปัจจุบันสำหรับ version ของไฟล์นี้

Catalog Layer

โดยจะเป็นตัว pointer สำหรับการบอกที่อยู่ของ metadata file ของ table ต่างๆ ซึ่งในแต่ backend ที่ใช้สำหรับการชี้ metadata ก็จะมี interface ต่างกันตัวอย่างของ backend เช่น Hadoop, AWS S3, Hive เป็นต้น

การทำงานและการจัดเก็บข้อมูลโดย Apache Iceberg

เราลองมาเล่น Apache Iceberg เพื่อดูสิ่งที่เกิดขึ้นในแต่ละขั้นตอนแบบละเอียดกัน

เริ่มจาก Catalog Layer

สิ่งที่เราต้องกำหนดสำหรับ Catalog Layer มีดังนี้

กำหนดให้ Catalog ใช้งาน Apache Iceberg โดยเราสามารถใช้ config อย่างใดอย่างหนึ่งข้างล่างนี้

  • spark.sql.catalog.<catalog-name> เป็นค่า org.apache.iceberg.spark.SparkCatalog
  • spark.sql.catalog.spark_catalog เป็นค่า org.apache.iceberg.spark.SparkSessionCatalog

ต่อไปจะเป็น type ของการ implementation เราสามารถกำหนดได้ทั้ง catalog catalog-impl และ I/O io-impl ซึ่งเราจะใช้แบบที่ iceberg มีให้ใช้อยู่แล้วก็ได้โดย config spark.sql.catalog.<catalog-name>.type เป็น hive, hadoop เป็นต้น หรือถ้าจะใช้เป็นแบบ custom อื่นๆเช่น AWS Glue Catalog ก็กำหนดผ่าน spark.sql.catalog.<catalog-name>.catalog-impl ได้

และสิ่งที่จำเป็นอย่างสุดท้ายก็คือ warehouse เพื่อที่เราจะกำหนด root ของ catalog storage ซึ่งทุก table path ที่สร้างและใช้งานจะอยู่ภายใต้ path ของ root นี้ สามารถกำหนดโดย config spark.sql.catalog.<catalog-name>.warehouse เป็น path ที่เราจะใช้งาน

และยังมี config อื่นๆอีกมากมาย ดูได้ที่ Catalog configuration

โดยตัวอย่างที่เราใช้จะทำตาม code ข้างล่างนี้เพื่อสร้าง Catalog Layer บนเครื่องเรา

from pyspark.sql import SparkSession

spark = SparkSession.builder
spark_config = {
"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.2,org.apache.iceberg:iceberg-spark-extensions-3.3_2.12:1.5.2",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.catalog.spark_catalog.type": "hadoop",
"spark.sql.defaultCatalog": "local",
"spark.sql.catalog.local": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.local.type": "hadoop",
"spark.sql.catalog.local.warehouse": "./warehouse"
}
for config_key in spark_config:
spark = spark.config(config_key, spark_config.get(config_key))
spark = spark.appName("iceberg-example").master("local[2]").getOrCreate()

ต่อมาเราจะลองสร้าง table

โดยเราจะสร้าง table ที่ชื่อ iceberg.example จาก code ด้านล่างนี้

ddl = """CREATE OR REPLACE table iceberg.example (customer_id string, group string, values int, created_at timestamp) 
USING iceberg
PARTITIONED BY (days(created_at));"""

spark.sql(ddl)

spark.sql("select * from iceberg.example").show()

จะได้ table หน้าตาอย่างนี้

จะมี 4 columns โดยที่ customer_id จะเป็น unique key และกำหนดให้ทำ partition data ด้วย column group

เมื่อเราดู directory tree จะเห็น warehouse path ที่เรากำหนดและมี folder ชื่อ table ของเรา iceberg/example และข้างใต้จะมี folder metadata โดยจะมี 2 files

  1. version-hint (text) ข้างในจะเป็นเลข version ที่บอกว่า version ของ table นี้คือเท่าไหร่
  2. metadata file (json)

เราลองมาดูกันว่าภายใต้ v1.metadata.json มีอะไรบ้าง

{
"format-version" : 1,
"table-uuid" : "88a3f003-1e78-4b23-8c95-842e0b73d977",
"location" : "./warehouse/iceberg/example",
"last-updated-ms" : 1720685066572,
...
}

ส่วนแรกนี้จะบอกลักษณะของ table โดยบอกว่า ใช้ iceberg format version อะไร table id และที่อยู่ รวมถึงเวลาที่อัพเดท table คือเมื่อไหร่

{
...
"last-column-id" : 4,
"schema" : {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "customer_id",
"required" : false,
"type" : "string"
}, {
"id" : 2,
"name" : "group",
"required" : false,
"type" : "string"
}, {
"id" : 3,
"name" : "values",
"required" : false,
"type" : "int"
}, {
"id" : 4,
"name" : "created_at",
"required" : false,
"type" : "timestamptz"
} ]
},
"current-schema-id" : 0,
"schemas" : [ {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "customer_id",
"required" : false,
"type" : "string"
}, {
"id" : 2,
"name" : "group",
"required" : false,
"type" : "string"
}, {
"id" : 3,
"name" : "values",
"required" : false,
"type" : "int"
}, {
"id" : 4,
"name" : "created_at",
"required" : false,
"type" : "timestamptz"
} ]
} ],
...
}

ส่วนต่อมาจะบอกว่า schema ของ table ปัจจุบันเป็นอย่างไร และที่ผ่านมามี schema ยังไงมาบ้าง id อะไรบ้าง

{
...
"partition-spec" : [ {
"name" : "group",
"transform" : "identity",
"source-id" : 2,
"field-id" : 1000
} ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ {
"name" : "group",
"transform" : "identity",
"source-id" : 2,
"field-id" : 1000
} ]
} ],
"last-partition-id" : 1000,
...
}

ส่วนนี้จะบอกถึง partition ของ table ในลักษณะเดียวกับ schema ข้างบน

{
...
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
"properties" : {
"owner" : "Name",
},
...
}

ในส่วนนี้จะเป็นการบอก default column ที่ใช้ในการ sort table นี้ และอีกส่วนคือ properties ของ table ซึ่งเรากำหนดได้หลายอย่างมาก ดูได้ที่ Table Property

{
...
"current-snapshot-id" : -1,
"refs" : { },
"snapshots" : [ ],
"statistics" : [ ],
"snapshot-log" : [ ],
"metadata-log" : [ ]
}

ส่วนสุดท้าย จะเป็นการบอก snapshot id ปัจจุบันที่ table นี้ใช้ รวมถึงการเปลี่ยนแปลง snapshot และ metadata files ต่างๆ และ Statistics ที่ใช้งานร่วมกับ Puffin files

จะเห็นว่าในไฟล์ metadata นี้เก็บข้อมูลเกี่ยวกับการอ้างอิงข้อมูลสำหรับ metadata version นี้ เพื่อบอกลักษณะและการใช้งาน table ไว้

จะมีอะไรเกิดขึ้นถ้าเราใส่ข้อมูลเข้าไป

โดยเราจะลอง mock data ขึ้นมา และทำการใส่ข้อมูล 2 operations ดังนี้

  1. insert data ซึ่งมี unique row 100 rows โดยมี id ตั้งแต่ 1–99 และมีข้อมูล group A,B และ C
  2. upsert data ซึ่งมี unique row 100 rowsโดยมี id ตั้งแต่ 50–149 และมีข้อมูล group B,C และ D

Insert data ซึ่งมี unique row 100 rows โดยมี id ตั้งแต่ 1–99 และมีข้อมูล group A,B และ C
ข้อมูลที่จะ Insert หน้าตาแบบนี้

และเรานำข้อมูลเข้า table ตาม code ข้างล่างนี้

df.coalesce(1).sortWithinPartitions(['group']).createOrReplaceTempView("data")
spark.sql("""INSERT INTO iceberg.example SELECT * FROM data""")

เพื่อความง่ายในการสังเกตุไฟล์ที่เกิดขึ้นเราจะกำหนดให้ข้อมูลใน Dataframe เหลือแค่ 1 partition สำหรับการเขียนไฟล์ ซึ่งจะเห็นการเปลี่ยนแปลงใน directory ดังนี้

เมื่อเราสังเกตุสิ่งที่เปลี่ยนจะเห็นว่า

  • data folder เพิ่มขึ้นมา โดยข้อมูลจะถูกแบ่งไปตาม partition ที่กำหนดไว้ จะเห็นว่า iceberg เขียนข้อมูลลง partition ในแต่ละ group A,B และ C และ file type default จะเป็น parquet
  • file metadata version 2 ถูกเพิ่มเข้ามา (v2.metadata.json)
  • snapshot file หรือ manifest list ถูกสร้างขึ้นมา 1 file (snap-1820674555431279225–1-c4a62a24–63c1–4ceb-bea7-ae9e0c557daf.avro)
  • manifest file ถูกสร้างขึ้นมา 1 file ชื่อว่า c4a62a24–63c1–4ceb-bea7-ae9e0c557daf-m0.avro
  • และข้อมูลข้างใน version-hint.text ถูกเปลี่ยนเป็น ‘2’

และใน v2.metadata.json จะมี records ของ snapshot ใหม่เพิ่มเข้ามาใน fields ที่ว่างๆก่อนหน้า

{
...
"current-snapshot-id" : 1820674555431279225,
"refs" : {
"main" : {
"snapshot-id" : 1820674555431279225,
"type" : "branch"
}
},
"snapshots" : [ {
"snapshot-id" : 1820674555431279225,
"timestamp-ms" : 1720685076972,
"summary" : {
"operation" : "append",
"spark.app.id" : "local-1720685062883",
"added-data-files" : "3",
"added-records" : "99",
"added-files-size" : "4821",
"changed-partition-count" : "3",
"total-records" : "99",
"total-files-size" : "4821",
"total-data-files" : "3",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "warehouse/iceberg/example/metadata/snap-1820674555431279225-1-c4a62a24-63c1-4ceb-bea7-ae9e0c557daf.avro",
"schema-id" : 0
} ],
"statistics" : [ ],
"snapshot-log" : [ {
"timestamp-ms" : 1720685076972,
"snapshot-id" : 1820674555431279225
} ],
"metadata-log" : [ {
"timestamp-ms" : 1720685066572,
"metadata-file" : "warehouse/iceberg/example/metadata/v1.metadata.json"
} ]
...
}

สิ่งที่ควรสังเกตุดูคือ

  • current-snapshot-id เปลี่ยนเป็น snapshot id (1820674555431279225) ที่ใช้สำหรับ table ปัจจุบัน
  • snapshots มีข้อมูลของ snapshot-id ที่พึ่งเพิ่มเข้ามา และที่น่าสนใจคือมี summary การเปลี่ยนแปลงของ snapshot นั้น เช่น operation ที่ใช้เปลี่ยนแปลง table รวมถึง จำนวนไฟล์, แถว และอื่นๆที่เป็น overviews stats ของการเปลี่ยนแปลง
  • manifest list location บอกถึงที่อยู่ของ manifest list ของ snapshot นั้น

คราวนี้เราจะลองมาสำรวจดูว่า manifest list file snap-1820674555431279225–1-c4a62a24–63c1–4ceb-bea7-ae9e0c557daf.avro เก็บอะไรไว้บ้าง

+-------------------------------------------------------------------------------+---------------+-----------------+-------------------+----------------------+-------------------------+------------------------+----------------------+----------------+-------------------+------------------+
|manifest_path |manifest_length|partition_spec_id|added_snapshot_id |added_data_files_count|existing_data_files_count|deleted_data_files_count|partitions |added_rows_count|existing_rows_count|deleted_rows_count|
+-------------------------------------------------------------------------------+---------------+-----------------+-------------------+----------------------+-------------------------+------------------------+----------------------+----------------+-------------------+------------------+
|warehouse/iceberg/example/metadata/c4a62a24-63c1-4ceb-bea7-ae9e0c557daf-m0.avro|6282 |0 |1820674555431279225|3 |0 |0 |[{false, false, A, C}]|99 |0 |0 |
+-------------------------------------------------------------------------------+---------------+-----------------+-------------------+----------------------+-------------------------+------------------------+----------------------+----------------+-------------------+------------------+

สิ่งที่ manifest list เก็บไว้ก็คือ list ของ manifest file สำหรับ snapshot นี้ โดยแต่ละ file ก็จะมีข้อมูลเพิ่มเติมคือ

  • partition spec ที่ใช้ ของ manifest file
  • lower/upper bound ของ partitions ที่ manifest file ชี้ไปหา data files
  • และอื่นๆ

แล้วใน manifest file c4a62a24–63c1–4ceb-bea7-ae9e0c557daf-m0.avro เก็บอะไรไว้บ้างล่ะ

+------+-------------------+---------------------------------------------------------------------------------------------------------------------------+
|status| snapshot_id| data_file|
+------+-------------------+---------------------------------------------------------------------------------------------------------------------------+
| 1|1820674555431279225|{warehouse/iceberg/example/data/group=A/00000-204-74c03ce3-4a86-44c9-9b86-0e522cb2e5d9-00001.parquet, PARQUET, {A}, 34, ...|
| 1|1820674555431279225|{warehouse/iceberg/example/data/group=B/00000-204-74c03ce3-4a86-44c9-9b86-0e522cb2e5d9-00002.parquet, PARQUET, {B}, 33, ...|
| 1|1820674555431279225|{warehouse/iceberg/example/data/group=C/00000-204-74c03ce3-4a86-44c9-9b86-0e522cb2e5d9-00003.parquet, PARQUET, {C}, 32, ...|
+------+-------------------+---------------------------------------------------------------------------------------------------------------------------+

จะเห็นว่าสิ่งที่ manifest file นี้เก็บไว้คือ snapshot ไหนใช้ data file ไหนบ้าง และยังมีข้อมูลประกอบต่างๆของแต่ละ data file โดยยกตัวอย่างของ row แรกดังนี้

{
"data_file": {
"file_path": "warehouse/iceberg/example/data/group=A/00000-204-74c03ce3-4a86-44c9-9b86-0e522cb2e5d9-00001.parquet",
"file_format": "PARQUET",
"partition": {
"group": "A"
},
"record_count": 34,
"file_size_in_bytes": 1659,
"block_size_in_bytes": 67108864,
"column_sizes": [
{
"key": 1,
"value": 120
},
{
"key": 2,
"value": 95
},
{
"key": 3,
"value": 174
},
{
"key": 4,
"value": 262
}
],
"value_counts": [
{
"key": 1,
"value": 34
},
{
"key": 2,
"value": 34
},
{
"key": 3,
"value": 34
},
{
"key": 4,
"value": 34
}
],
"null_value_counts": [
{
"key": 1,
"value": 0
},
{
"key": 2,
"value": 0
},
{
"key": 3,
"value": 0
},
{
"key": 4,
"value": 0
}
],
"nan_value_counts": [],
"lower_bounds": [
{
"key": 1,
"value": "MTA="
},
{
"key": 2,
"value": "QQ=="
},
{
"key": 3,
"value": "AQAAAA=="
},
{
"key": 4,
"value": "ADb7tR4cBgA="
}
],
"upper_bounds": [
{
"key": 1,
"value": "OTk="
},
{
"key": 2,
"value": "QQ=="
},
{
"key": 3,
"value": "YQAAAA=="
},
{
"key": 4,
"value": "wCQNGlkcBgA="
}
],
"split_offsets": [
4
],
"sort_order_id": 0
}
}

โดยสิ่งที่ข้อมูลนี้บอกก็คือ

  • ที่อยู่และชนิดของ file
  • partition ที่ไฟล์นี้อยู่
  • stat ต่างๆของ file เช่น ขนาด, จำนวนแถว, จำนวน null ของ column ต่างๆ, lower/upper bound ของแต่ละ column และอื่นๆ

ต่อไปเราลองเพิ่มอัพเดทข้อมูลเพิ่มโดยใช้การ Upsert

Upsert data ซึ่งมี unique row 100 rows โดยมี id ตั้งแต่ 50–149 และมีข้อมูล group B,C และ D ดังนี้

เราจะใช้ code ข้างล่างนี้ upsert data เข้าไป

df.coalesce(1).sortWithinPartitions(['group']).createOrReplaceTempView("data")

spark.sql("""
MERGE INTO iceberg.example t
USING data s
ON t.customer_id = s.customer_id
WHEN MATCHED THEN UPDATE SET t.group = s.group, t.values = s.values
WHEN NOT MATCHED THEN INSERT *
""")

โดย directory ก็จะมีหน้าตาเปลี่ยนไปเป็นข้างล่างนี้

สิ่งที่เปลี่ยนแปลงไปที่น่าสังเกตุคือ

  • ในแต่ละ partition มี file เพิ่มขึ้นมา และมี group D เพิ่มขึ้นมาด้วยจากข้อมูลใหม่
  • data file group A มี data file เพิ่มขึ้นมา น่าสงสัยไหมว่าทำไมเราไม่ได้เพิ่มข้อมูล group A แต่มีข้อมูลเพิ่มขึ้นมา เดี๋ยวเราจะไปดูกัน
  • มี manifest list file ของ snapshot อันใหม่เพิ่มขึ้นมา (snap-1102376363750356649–1-fccd777b-a610–4778-ad81-b68b45ad4821.avro)
  • มี manifest file เพิ่มขึ้นมา 2 file (fccd777b-a610–4778-ad81-b68b45ad4821-m0.avro และ fccd777b-a610–4778-ad81-b68b45ad4821-m1.avro)
  • และสุดท้ายมี v3.metadata.json เพิ่มขึ้นมา

เราลองมาดูที่ v3.metadata.json ก่อนว่ามีอะไรเพิ่มขึ้นมาบ้าง

{
...
"current-snapshot-id" : 1102376363750356649,
"snapshots" : [
...,
{
"snapshot-id" : 1102376363750356649,
"parent-snapshot-id" : 1820674555431279225,
"timestamp-ms" : 1720688608968,
"summary" : {
"operation" : "overwrite",
"spark.app.id" : "local-1720685062883",
"added-data-files" : "4",
"deleted-data-files" : "3",
"added-records" : "149",
"deleted-records" : "99",
"added-files-size" : "6590",
"removed-files-size" : "4821",
"changed-partition-count" : "4",
"total-records" : "149",
"total-files-size" : "6590",
"total-data-files" : "4",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "warehouse/iceberg/example/metadata/snap-1102376363750356649-1-fccd777b-a610-4778-ad81-b68b45ad4821.avro",
"schema-id" : 0
}
} ],
...
}

ข้างบนเป็นส่วนหนึ่งของไฟล์ที่ต่างจาก v2.metadata.json จะเห็นว่ามีการเปลี่ยน current-snapshot-id และเพิ่ม record ของ snapshot-id ใหม่เข้ามาใน list ของ field snapshots เพื่อบอกว่า table v3 มีการ track snapshot อะไรบ้าง รวมถึงบอก summary การเปลี่ยนแปลงของ snapshot นี้ด้วย

ต่อมาเรามาดูที่ manifest list กัน ไฟล์ชื่อ snap-1102376363750356649–1-fccd777b-a610–4778-ad81-b68b45ad4821.avro

+-------------------------------------------------------------------------------+---------------+-----------------+-------------------+----------------------+-------------------------+------------------------+----------------------+----------------+-------------------+------------------+
|manifest_path |manifest_length|partition_spec_id|added_snapshot_id |added_data_files_count|existing_data_files_count|deleted_data_files_count|partitions |added_rows_count|existing_rows_count|deleted_rows_count|
+-------------------------------------------------------------------------------+---------------+-----------------+-------------------+----------------------+-------------------------+------------------------+----------------------+----------------+-------------------+------------------+
|warehouse/iceberg/example/metadata/fccd777b-a610-4778-ad81-b68b45ad4821-m1.avro|6341 |0 |1102376363750356649|4 |0 |0 |[{false, false, A, D}]|149 |0 |0 |
|warehouse/iceberg/example/metadata/fccd777b-a610-4778-ad81-b68b45ad4821-m0.avro|6283 |0 |1102376363750356649|0 |0 |3 |[{false, false, A, C}]|0 |0 |99 |
+-------------------------------------------------------------------------------+---------------+-----------------+-------------------+----------------------+-------------------------+------------------------+----------------------+----------------+-------------------+------------------+

จะเห็นว่ามี Manifest file 2 อันในการประกอบข้อมูลของ table snapshot นี้

ไฟล์ fccd777b-a610-4778-ad81-b68b45ad4821-m1.avro จะเป็นข้อมูลที่มีการเพิ่ม row มา โดยจะมี 149 rows

และไฟล์ fccd777b-a610-4778-ad81-b68b45ad4821-m0.avro จะเป็นข้อมูลที่ถูกลบออกไป โดยลบไป 99 rows

จะสังเกตุเห็นว่า เรา upsert ข้อมูลไป 100 row id 50–149 แต่มีไฟล์เพิ่มขึ้นมา 149 rows และลบไป 99 rows มันเกิดอะไรขึ้น โดยปลายทางของ table ควรจะมีข้อมูล id 1–149 และไม่ซ้ำกันจากการ upsert โดยคำตอบ เราจะสังเกตุเห็นจาก Manifest file

เรามาดูไฟล์ Manifest fccd777b-a610-4778-ad81-b68b45ad4821-m1.avro กันว่าเก็บอะไร

+------+-------------------+---------------------------------------------------------------------------------------------------------------------------+
|status| snapshot_id| data_file|
+------+-------------------+---------------------------------------------------------------------------------------------------------------------------+
| 1|1102376363750356649|{warehouse/iceberg/example/data/group=A/00000-228-ecb2d2c5-c8e1-42ab-8707-50f7959c0ef5-00001.parquet, PARQUET, {A}, 18, ...|
| 1|1102376363750356649|{warehouse/iceberg/example/data/group=B/00000-228-ecb2d2c5-c8e1-42ab-8707-50f7959c0ef5-00002.parquet, PARQUET, {B}, 51, ...|
| 1|1102376363750356649|{warehouse/iceberg/example/data/group=C/00000-228-ecb2d2c5-c8e1-42ab-8707-50f7959c0ef5-00003.parquet, PARQUET, {C}, 40, ...|
| 1|1102376363750356649|{warehouse/iceberg/example/data/group=D/00000-228-ecb2d2c5-c8e1-42ab-8707-50f7959c0ef5-00004.parquet, PARQUET, {D}, 40, ...|
+------+-------------------+---------------------------------------------------------------------------------------------------------------------------+

จะเห็นว่า Manifest ไฟล์นี้ชี้ไปที่ไฟล์ data ที่เกิดขึ้นมาใหม่ในแต่ละ group path

ต่อไปเรามาดูที่ไฟล์ Manifest fccd777b-a610-4778-ad81-b68b45ad4821-m0.avro กัน

+------+-------------------+---------------------------------------------------------------------------------------------------------------------------+
|status| snapshot_id| data_file|
+------+-------------------+---------------------------------------------------------------------------------------------------------------------------+
| 2|1102376363750356649|{warehouse/iceberg/example/data/group=A/00000-204-74c03ce3-4a86-44c9-9b86-0e522cb2e5d9-00001.parquet, PARQUET, {A}, 34, ...|
| 2|1102376363750356649|{warehouse/iceberg/example/data/group=B/00000-204-74c03ce3-4a86-44c9-9b86-0e522cb2e5d9-00002.parquet, PARQUET, {B}, 33, ...|
| 2|1102376363750356649|{warehouse/iceberg/example/data/group=C/00000-204-74c03ce3-4a86-44c9-9b86-0e522cb2e5d9-00003.parquet, PARQUET, {C}, 32, ...|
+------+-------------------+---------------------------------------------------------------------------------------------------------------------------+

คล้ายๆกัน แต่ Manifest จะไฟล์นี้จะชี้ไปที่ไฟล์ data ที่มีมาก่อนหน้านั้นในแต่ละ group path

จากการสังเกตุดูก็พอจะเห็นว่า

ในไฟล์แรก fccd777b-a610-4778-ad81-b68b45ad4821-m1.avro จะเป็นข้อมูลที่ถูกรวมข้อมูลที่เพิ่มและอัพเดทของ id ที่เรา upsert เข้าไป เขียนเป็นไฟล์ใหม่ทั้งหมด (เนื่องจาก defualt เป็นการเขียนแบบ COW) เช่นข้อมูลของ group A ที่ถูก update ไปที่ group อื่น 16 ids และ 18 ids ที่เหลืออยู่ iceberg จะเขียนเป็นไฟล์ใหม่ลง group A

ในไฟล์ที่สอง fccd777b-a610-4778-ad81-b68b45ad4821-m0.avro ไฟล์นี้จะบอกว่าข้อมูลไฟล์ไหนถูกลบไป หรือก็คือถ้า snapshot อ้างอิง manifest file นี้หมายความว่าข้อมูล files ในนี้ไม่เกี่ยวข้องกับ snapshot นั่นเอง

เพิ่มเติม เรายังสังเกตุเห็นว่าใน Manifest file นี้มี status ซึ่ง status 1=การเพิ่ม, 2=การลบ, 0=มีข้อมูลนี้อยู่ในไฟล์นี้อยู่แล้ว การเก็บข้อมูลอย่างนี้ทำให้ลดขนาดข้อมูลที่ควรจะเก็บเพิ่มเท่านั้น ไม่ได้ทำการ duplicate data โดยไม่จำเป็น

สิ่งที่เราทำและเกิดขึ้นในบทความนี้

สรุปแล้ว Apache Iceberg เก็บข้อมูลยังไง

Catalog Layer

กำหนดวิธีการเข้าถึงข้อมูล โดยเราต้องชี้หาที่อยู่ของข้อมูลใน Catalog Layer และกำหนดวิธีที่จะใช้งาน Catalog นั้นซึ่งในที่นี้เราจะใช้ Apache Iceberg

Metadata Layer

ใน Metadata Layer จะถูกใช้งานหรือสร้างขึ้นมา สำหรับ อ่าน เขียน หรือลบ ประสานไปกับ Data Layer โดยประกอบด้วยข้อมูลต่างๆดังนี้

  • Version hint file เพื่อบอก version ของข้อมูลปัจจุบันของ table
  • Metadata json file ของแต่ละ version เพื่อบอก snapshot-id ที่มีอยู่ของ version นั้นๆ และที่อยู่ในการไปหา Manifest List file ของ snapshot ต่างๆ
  • Manifest List ที่ถูกชี้มาก็จะบอกถึงที่อยู่ของ Manifest file ต่างๆที่จะประกอบขึ้นมาเป็น snapshot นั้นๆ
  • Manifest file ก็จะบอกว่า data file ภายใต้ Manifest นี้ประกอบด้วยอะไรบ้างและ ถูกเพิ่ม,ลบ,หรือคงเดิมที่มีอยู่แล้ว

Data Layer

Layer นี้จะเก็บข้อมูลตัวจริง และถูกอ้างอิงโดย Metadata Layer อีกที ซึ่งวิธีการเก็บข้อมูลอาจจะแตกต่างออกไปในแต่ละ version ที่ Table มีการเปลี่ยนแปลงลักษณะการเก็บ

ประโยชน์ของการเก็บข้อมูลลักษณะนี้

การเก็บข้อมูลแบบนี้ทำให้ Iceberg สามารถเพิ่ม Features ในการทำงานกับข้อมูลอีกมากมายเช่น

  • Optimize Query Planning จากการใช้ข้อมูล metadata
  • ACID transaction
  • DML, DDL operation
  • Schema Evolution
  • Partition Evolution
  • Transform Partition
  • Time Travel and Rollback
  • etc.

แล้วมีข้อเสียบ้างไหม?

จากการลองเล่นของเราจะเห็นว่าในแต่ละครั้งของการอัพเดทข้อมูลจะมีทั้ง metadata และ data เพิ่มขึ้นมาเต็มไปหมด ทำให้มีความซับซ้อนมากขึ้นในการเก็บข้อมูล เราจึงต้องใช้เวลาในการเข้าใจ ดูแล และ optimize การเก็บ data ของ Apache Iceberg รวมถึงเพิ่ม cost ในการจัดการไฟล์พวกนี้ด้วย แต่ Apache Iceberg ก็มีเครื่องมือในการ Optimize ข้อมูลให้เรามากมายซึ่งเราจะมีการเขียนในบทความต่อไป

หวังว่าบทความนี้จะมีประโยชน์การทำความเข้าใจ การเก็บข้อมูลใน Apache Iceberg นะครับ

หากอยากติดตามบทความอื่นๆที่ทาง CJ Express (Tildi) ได้เขียนไว้ติดตามได้ที่

และมีบทความอื่นๆที่เกี่ยวข้องกับ Apache Iceberg จากทีมงานเราอีกดังนี้

References:

--

--