What is Apache Iceberg: Iceberg คืออะไร ทำอะไรได้บ้าง ?

Burasakorn Sabyeying
Mils’ Blog
Published in
12 min readJun 24, 2023

บทความนี้เราจะมาเล่าถึง Apache Iceberg กัน

แต่พอพูดถึง Apache Iceberg แล้ว จะไม่เกริ่นถึง concept เรื่อง Data Lakehouse เลยก็ไม่ได้

Data Lakehouse หรือ Open table format คืออะไร ?

Open table format แปลตรงตัวเลย คือการเก็บข้อมูลแบบ tabular (table) แต่เก็บในรูปแบบไฟล์ เช่น ไฟล์นั้นก็เก็บใน object/blob storage อย่าง S3 หรือ Google Cloud Storage

แล้วมันดีกว่าปกติยังไง? มันดีกว่าตรงที่สามารถทำ versioning, indexing และ ACID ได้ ทำการจัดการเรื่อง metadata ของข้อมูล table ได้ นำความสามารถของ Data Warehouse มาผสมเข้ากับ Data Lake

ถ้าใครอยากอ่านแบบละเอียด เราแนะนำอ่านบทความนี้ก่อน:

ใครทำเรื่อง Data Lakehouse ได้บ้าง ?

ในฝั่งที่เป็น opensource ตอนนี้มีอยู่ 3 เจ้าหลัก

  1. Delta Lake สร้างโดย Databricks
  2. Apache Hudi สร้างโดย Uber (อันนี้เราก็เขียนบทความไว้เหมือนกัน )
  3. Apache Iceberg สร้างโดย Netflix

บทความนี้เลยจะยกตัว Apache Iceberg ให้อ่านกัน

Apache Iceberg คืออะไร

“Iceberg is a high-performance format for huge analytic tables. Iceberg brings the reliability and simplicity of SQL tables to big data, while making it possible for engines like Spark, Trino, Flink, Presto, Hive and Impala to safely work with the same tables, at the same time.”

นี่คือนิยามที่ Iceberg เขานิยามตัวเองไว้

สิ่งที่ Iceberg ชูตัวเองคือ ไม่ว่าคุณจะใช้ engine ไหน ก็สามารถต่อกับเจ้า Iceberg table ได้พร้อมๆกันแบบไม่มีปัญหา ซึ่งเป็นข้อดีที่เราหยิบมาลองใช้ เนื่องจากมี usecase การใช้งาน concurrent write > 1

งั้นเราจะทำให้เห็นภาพยิ่งขึ้น เราจะลองสร้าง Iceberg table โดยสร้างผ่าน Spark ให้ดู

สร้าง spark session ขึ้นมา

# Spark session with Hudi package

spark = (
SparkSession.builder.appName("eieiei")
.master("local")
.config(
"spark.jars.packages",
"""org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.2.1""",
) .config(
"spark.jars.excludes", "javax.jms:jms,com.sun.jdmk:jmxtools,com.sun.jmx:jmxri"
)
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hive")
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hadoop")
.config("spark.sql.catalog.local.warehouse", "/home/iceberg/warehouse")
.config("spark.sql.defaultCatalog", "local")
.getOrCreate()
)

.config("spark.sql.catalog.local.warehouse", "/home/iceberg/warehouse") จะระบุว่า Iceberg นี้จะเก็บอยู่ที่ path ไหน สำหรับเราคือฟีลมันเหมือน path ไป database โดยที่ข้างในมีหลาย table ด้วยกัน

ทีนี้เราสร้าง Iceberg table

tableName = "trips_cow"
schema = StructType(
[
StructField("rowId", StringType(), True),
StructField("partitionId", StringType(), True),
StructField("preComb", LongType(), True),
StructField("name", StringType(), True),
StructField("versionId", StringType(), True),
StructField("intToLong", IntegerType(), True),
]
)

# Create data
data1 = [
Row("row_1", "part_0", 0, "bob", "v_0", 0),
Row("row_2", "part_0", 0, "john", "v_0", 0),
Row("row_3", "part_0", 0, "tom", "v_0", 0),
]

dfFromData1 = spark.createDataFrame(data1, schema)

dfFromData1.writeTo(tableName).create()

df = spark.table(tableName).show()

จะได้ผลลัพท์คือ

+-----+-----------+-------+----+---------+---------+
|rowId|partitionId|preComb|name|versionId|intToLong|
+-----+-----------+-------+----+---------+---------+
|row_1| part_0| 0| bob| v_0| 0|
|row_2| part_0| 0|john| v_0| 0|
|row_3| part_0| 0| tom| v_0| 0|
+-----+-----------+-------+----+---------+---------+

เราได้ผลลัพท์เป็น Iceberg table ละ

อ้าวแล้วที่บอกว่ามันเก็บเป็นรูปแบบไฟล์ล่ะ?

นี่ไงอยู่นี่เลย มันจะอยู่ใน path ที่เรากำหนดใน Spark session "spark.sql.catalog.local.warehouse" ซึ่งเราเก็บไว้ที่ "/home/iceberg/warehouse"

ซึ่งใน path นี้ จะเป็นรายชื่อ table ทั้งหมด โฟกัสไปที่ table ที่เราเพิ่งสร้างคือ “trips_cow” นั่นเอง

ข้างใน trips_cow จะมี 2 folders

  1. data — เก็บเนื้อข้อมูลของ table
  2. metadata — เก็บ metadata และ version ของ table นั้น

โครงสร้างของ Iceberg ไม่ซับซ้อนเลยเนอะ

ซึ่งในพาร์ทแรกเราจะมาโฟกัสที่พาร์ท data กันว่าสร้าง Iceberg table ยังไง หน้าตายังไงได้บ้าง

สร้าง Iceberg table ผ่าน Spark ยังไงได้บ้าง

ขอแบ่งออกเป็น 2 แบบ

  1. แบบ Dataframe
  2. แบบ Spark SQL

1) สร้างผ่าน Dataframe

เหมือนเมื่อกี้เลย เราใช้ PySpark สร้างผ่าน dataframe และใช้ writeTo(’table_name’).create() ในการสร้าง table ขึ้นมา

tableName = "trips_cow"
schema = StructType(
[
StructField("rowId", StringType(), True),
StructField("partitionId", StringType(), True),
StructField("preComb", LongType(), True),
StructField("name", StringType(), True),
StructField("versionId", StringType(), True),
StructField("intToLong", IntegerType(), True),
]
)

# Create data
data1 = [
Row("row_1", "part_0", 0, "bob", "v_0", 0),
Row("row_2", "part_0", 0, "john", "v_0", 0),
Row("row_3", "part_0", 0, "tom", "v_0", 0),
]

dfFromData1 = spark.createDataFrame(data1, schema)

dfFromData1.writeTo(tableName).create()

df = spark.table(tableName).show()

output:

+-----+-----------+-------+----+---------+---------+
|rowId|partitionId|preComb|name|versionId|intToLong|
+-----+-----------+-------+----+---------+---------+
|row_1| part_0| 0| bob| v_0| 0|
|row_2| part_0| 0|john| v_0| 0|
|row_3| part_0| 0| tom| v_0| 0|
+-----+-----------+-------+----+---------+---------+

ไหนๆ ลอง append data เพิ่มซิ

schema = spark.table(tableName).schema

data = [
Row("row_2", "part_0", 5, "john", "v_3", 3),
Row("row_5", "part_0", 5, "maroon", "v_2", 2),
Row("row_9", "part_0", 5, "michael", "v_2", 2),
]
df = spark.createDataFrame(data, schema)
df.writeTo(tableName).append()
df = spark.table(tableName).show()

output:

+-----+-----------+-------+-------+---------+---------+
|rowId|partitionId|preComb| name|versionId|intToLong|
+-----+-----------+-------+-------+---------+---------+
|row_1| part_0| 0| bob| v_0| 0|
|row_2| part_0| 0| john| v_0| 0|
|row_2| part_0| 5| john| v_3| 3|
|row_3| part_0| 0| tom| v_0| 0|
|row_5| part_0| 5| maroon| v_2| 2|
|row_9| part_0| 5|michael| v_2| 2|
+-----+-----------+-------+-------+---------+---------+

2) สร้างผ่าน Spark SQL

เราสร้างอีก table นึง ชื่อว่า taxis โดยสร้าง table เปล่าๆระบุว่า USING Iceberg และค่อย insert ข้อมูลเพิ่ม

spark.sql(
"""
CREATE OR REPLACE TABLE demo.nyc.taxis
(
vendor_id bigint,
trip_id bigint,
trip_distance float,
fare_amount double,
store_and_fwd_flag string
) USING iceberg
PARTITIONED BY (vendor_id);

"""
)

spark.sql(
"""
INSERT INTO demo.nyc.taxis
VALUES (1, 1000371, 1.8, 15.32, 'N'), (2, 1000372, 2.5, 22.15, 'N'), (2, 1000373, 0.9, 9.01, 'N'), (1, 1000374, 8.4, 42.13, 'Y');

"""
)

ไหนลองดู table ซิ

spark.sql("""
select * from demo.nyc.taxis
"""
).show()

output:

มาถึงตรงนี้เราว่าทุกคนน่าจะพอเห็นภาพแล้วล่ะว่า Iceberg เป็นยังไง

ทั้งๆที่เป็น file แต่สร้าง query ได้เหมือนเป็น database เลยเนอะ อยากดูข้อมูล table ไหนก็ SELECT * FROM table ได้เลย

ดังนั้นส่วนต่อไป เราจะขอเล่าฟีเจอร์เทพๆของ Iceberg บ้าง

Iceberg มีฟีเจอร์อะไรอีกบ้าง

เราขอแบ่งออกเป็น 4 เรื่อง

  1. Upsert data
  2. Schema Evolution
  3. Partition evolution
  4. Time Travel and Rollback

จริงๆมีเรื่อง Hidden partition และอื่นๆอีกมากมายแต่เดี๋ยวบทความจะยาวไป5555 เอาแค่นี้ก่อน

1) Upsert data

Upsert ก็คือการ “update” and “insert” ซึ่งเป็น operation หนึ่งใน database ที่จะ

  • update existing row ใน row ที่มีอยู่แล้ว
  • insert new row ถ้าไม่มีค่านั้นมาก่อน

วิธีของ Iceberg ในการ upsert data นั้น ตอนนี้ยังมีแค่ผ่าน Spark SQL อย่างเดียว

เราลองสร้างข้อมูลชุดใหม่

spark.sql(
"""
CREATE OR REPLACE TABLE demo.nyc.new_data
(
vendor_id bigint,
trip_id bigint,
trip_distance float,
fare_amount double,
store_and_fwd_flag string
) USING iceberg
PARTITIONED BY (vendor_id);

"""
)

spark.sql(
"""
INSERT INTO demo.nyc.new_data
(
VALUES (1, 1000371, 2, 20, 'N'), (2, 1000372, 4, 39, 'Y'));

"""
)
spark.sql(
"""
SELECT * FROM demo.nyc.new_data
""").show()

ข้อมูลชุดใหม่ หรือ source ตอนนี้

ข้อมูลชุดเดิมที่เราสร้างไปเมื่อตะกี้ หรือ target

spark.sql(
"""
SELECT * FROM demo.nyc.taxis

""").show()

เราลอง Upsert data 2 ชุดนี้เข้าด้วยกัน ทำได้ด้วยวิธี Merge Into

spark.sql("""
MERGE INTO demo.nyc.taxis target
USING (SELECT * FROM demo.nyc.new_data) source
ON target.trip_id = source.trip_id
WHEN MATCHED
THEN UPDATE SET target.trip_distance = source.trip_distance, target.fare_amount = source.fare_amount, target.store_and_fwd_flag = source.store_and_fwd_flag
WHEN NOT MATCHED
THEN INSERT *;
""").show()

output:

จะเห็นชัดว่า ข้อมูลชุดใหม่จะมีการ update row เดิม

ซึ่งถ้าใครเคยอ่าน Hudi มา อาจจะรู้สึกรำคาญใจนิดหน่อย เพราะ Hudi ไม่ต้องเขียน upsert เองเลย แค่กำหนด configuration เท่านั้น ในขณะที่ Iceberg ต้องมาเขียน SQL เอง

2) Schema Evolution

Schema Evolution คือการปรับเปลี่ยน schema ได้โดย apply ได้กับ table ได้เลยโดยที่ไม่ต้องลบแล้วสร้าง table ใหม่ เช่น

  • Add new column
  • Drop existing column
  • Rename existing column
  • Update type of column
  • Reorder columns

เรามาลองสร้าง table กันอีกรอบ

spark.sql(
"""
CREATE OR REPLACE TABLE demo.nyc.taxis
(
vendor_id bigint,
trip_id bigint,
trip_distance float,
fare_amount double,
store_and_fwd_flag string
) USING iceberg
PARTITIONED BY (vendor_id);

"""
)

spark.sql(
"""
INSERT INTO demo.nyc.taxis
VALUES (1, 1000371, 1.8, 15.32, 'N'), (2, 1000372, 2.5, 22.15, 'N'), (2, 1000373, 0.9, 9.01, 'N'), (1, 1000374, 8.4, 42.13, 'Y');

"""
).show()

table:

+---------+-------+-------------+-----------+------------------+
|vendor_id|trip_id|trip_distance|fare_amount|store_and_fwd_flag|
+---------+-------+-------------+-----------+------------------+
| 1|1000371| 1.8| 15.32| N|
| 1|1000374| 8.4| 42.13| Y|
| 2|1000372| 2.5| 22.15| N|
| 2|1000373| 0.9| 9.01| N|
+---------+-------+-------------+-----------+------------------+

2.1) Rename table

เราจะเปลี่ยนชื่อจาก fare_amount เป็น fare เฉยๆ

spark.sql("""
ALTER TABLE demo.nyc.taxis RENAME COLUMN fare_amount TO fare
""")
df = spark.table("demo.nyc.taxis").show()

result:

+---------+-------+-------------+-----+------------------+
|vendor_id|trip_id|trip_distance| fare|store_and_fwd_flag|
+---------+-------+-------------+-----+------------------+
| 1|1000371| 1.8|15.32| N|
| 1|1000374| 8.4|42.13| Y|
| 2|1000372| 2.5|22.15| N|
| 2|1000373| 0.9| 9.01| N|
+---------+-------+-------------+-----+------------------+

คราวนี้เปลี่ยนจาก trip_distance มาเป็น trip เฉยๆ

spark.sql(""" 
ALTER TABLE demo.nyc.taxis RENAME COLUMN trip_distance TO distance
""")

df = spark.table("demo.nyc.taxis").show()

result:

+---------+-------+--------+-----+------------------+
|vendor_id|trip_id|distance| fare|store_and_fwd_flag|
+---------+-------+--------+-----+------------------+
| 1|1000371| 1.8|15.32| N|
| 1|1000374| 8.4|42.13| Y|
| 2|1000372| 2.5|22.15| N|
| 2|1000373| 0.9| 9.01| N|
+---------+-------+--------+-----+------------------+

2.2) Add Column comment

spark.sql(""" 
ALTER TABLE demo.nyc.taxis ALTER COLUMN distance COMMENT 'The elapsed trip distance in miles reported by the taximeter.'
""")
spark.sql( """ 
DESCRIBE EXTENDED demo.nyc.taxis
""")

result:

2.3) Change Data Type

เราจะเปลี่ยนให้ distance ที่เดิมเป็น float มาเป็น double

spark.sql(
"""
ALTER TABLE demo.nyc.taxis ALTER COLUMN distance TYPE double;
""")

แล้วเราลองอ่าน table ดูแบบ describe extended

spark.sql(
"""
DESCRIBE EXTENDED demo.nyc.taxis
""")

จะเห็นได้ว่าตอนนี้ distance เปลี่ยนเป็น double แล้ว

คราวนี้ ลองทดสอบปรับ double มาเป็น float ซิ

spark.sql(
"""
ALTER TABLE demo.nyc.taxis ALTER COLUMN distance TYPE float;
""")

result:

ระเบิดจ้า เราสามารถปรับ float -> double ได้ แต่ double -> float ไม่ได้ อันนี้ขึ้นอยู่กับเงื่อนไขของ Iceberg ด้วย

AnalysisException: Cannot update local.demo.nyc.taxis field distance: double cannot be cast to float; line 2 pos 0;
AlterColumn resolvedfieldname(StructField(distance,DoubleType,true)), FloatType
+- ResolvedTable org.apache.iceberg.spark.SparkCatalog@3156de98, demo.nyc.taxis, local.demo.nyc.taxis, [vendor_id#284L, trip_id#285L, distance#286, fare#287, store_and_fwd_flag#288]

2.4) Change order

ปรับให้ distance มาอยู่หลัง fare

spark.sql(
"""
ALTER TABLE demo.nyc.taxis ALTER COLUMN distance AFTER fare;
""")

result:

+---------+-------+-----+------------------+------------------+
|vendor_id|trip_id| fare| distance|store_and_fwd_flag|
+---------+-------+-----+------------------+------------------+
| 1|1000371|15.32|1.7999999523162842| N|
| 1|1000374|42.13| 8.399999618530273| Y|
| 2|1000372|22.15| 2.5| N|
| 2|1000373| 9.01|0.8999999761581421| N|
+---------+-------+-----+------------------+------------------+

2.5) Add Column

เราจะเพิ่ม column ใหม่ทื่ชื่อ fare_per_distance_unit เข้ามา

spark.sql(
"""
ALTER TABLE demo.nyc.taxis
ADD COLUMN fare_per_distance_unit float AFTER distance
""")

เมื่อ read table ใหม่

df = spark.table("demo.nyc.taxis").show()
+---------+-------+-----+------------------+----------------------+------------------+
|vendor_id|trip_id| fare| distance|fare_per_distance_unit|store_and_fwd_flag|
+---------+-------+-----+------------------+----------------------+------------------+
| 1|1000371|15.32|1.7999999523162842| null| N|
| 1|1000374|42.13| 8.399999618530273| null| Y|
| 2|1000372|22.15| 2.5| null| N|
| 2|1000373| 9.01|0.8999999761581421| null| N|
+---------+-------+-----+------------------+----------------------+------------------+

ตอนนี้ fare_per_distance_unit จะยังไม่มีค่าอะไร ดังนั้นเราจะเติมค่าให้สักหน่อย โดยจะคำนวนจาก fare หาร distance

spark.sql(
"""
UPDATE demo.nyc.taxis
SET fare_per_distance_unit = fare/distance
""")

result:

+---------+-------+-----+------------------+----------------------+------------------+
|vendor_id|trip_id| fare| distance|fare_per_distance_unit|store_and_fwd_flag|
+---------+-------+-----+------------------+----------------------+------------------+
| 1|1000371|15.32|1.7999999523162842| 8.511111| N|
| 1|1000374|42.13| 8.399999618530273| 5.015476| Y|
| 2|1000372|22.15| 2.5| 8.86| N|
| 2|1000373| 9.01|0.8999999761581421| 10.011111| N|
+---------+-------+-----+------------------+----------------------+------------------+

2.6. Delete column

before:

spark.sql("""
select * from demo.nyc.taxis
"""
).show()
+---------+-------+-------------+-----------+------------------+
|vendor_id|trip_id|trip_distance|fare_amount|store_and_fwd_flag|
+---------+-------+-------------+-----------+------------------+
| 1|1000371| 1.8| 15.32| N|
| 1|1000374| 8.4| 42.13| Y|
| 2|1000372| 2.5| 22.15| N|
| 2|1000373| 0.9| 9.01| N|
+---------+-------+-------------+-----------+------------------+

after:

spark.sql("""
ALTER TABLE demo.nyc.taxis DROP COLUMN store_and_fwd_flag
"""
).show()

result:

+---------+-------+-------------+-----------+
|vendor_id|trip_id|trip_distance|fare_amount|
+---------+-------+-------------+-----------+
| 1|1000371| 1.8| 15.32|
| 1|1000374| 8.4| 42.13|
| 2|1000372| 2.5| 22.15|
| 2|1000373| 0.9| 9.01|
+---------+-------+-------------+-----------+

3) Partition evolution

ณ เวลาที่ผู้เขียนเขียนอยู่นี้ สิ่งที่ Iceberg ทำได้มากกว่า Hudi คือการทำ Partition Evolution

การทำ Parition evolution นั้นดียังไง คือเมื่อมีการแก้ partition ปุ๊บ เราก็ไม่ต้องเสียเวลามาสร้าง table ใหม่ แต่ทำแค่แก้แล้วใช้ table นั้นต่อได้เลย

เหตุผลที่ Iceberg เคลมว่าสามารถแก้ Partition กับดาต้าเดิมได้ เพราะเวลา query มันไม่กระทบกับ partition values โดยตรง

เราจะทดลอง Update Partitioning column ดู

spark.sql(
"""
ALTER TABLE demo.nyc.taxis
ADD PARTITION FIELD trip_id
""")
spark.sql(
"""
DESCRIBE EXTENDED demo.nyc.taxis
""")

result:

จะเห็นว่ามีตรง partitioning จะมี Part 1 เป็น trip_id จากเดิมที่เราเซ็ต partitioning column เป็น vendor_id

4) Time Travel and Rollback

ก่อนจะไปถึงการ Time travel เรามาแอบดูการเก็บ metadata ของ Iceberg กัน

ภาพนี้คือ table ที่ชื่อ demo2_ja.nyc.taxis โดยมี structure คือ data และ metadata (สีฟ้าและสีเหลือง)

Iceberg table จะมีโครงสร้างหลักๆ 2 อย่างคือ data และ metadata อย่างที่เล่าไปตอนต้นเนอะ

แต่หากเล่าให้ละเอียดขึ้นคือ

  1. ฝั่ง data จะเก็บเป็นไฟล์ parquet — ภายใน data นั้นมีการทำ partition โดย vendor_id เลยถูกแบ่ง folder ตาม vendor_id
  2. ฝั่ง metadata จะเก็บเป็นไฟล์ avro

แต่พอทุกคนมองดูแล้ว อ้าว ! ฝั่ง metadata มีทั้งไฟล์ json และ .text ด้วยล่ะ

เหตุผลเพราะว่าเวลา Iceberg อ่าน table มันจะพยายามวิ่งไปหา version-hint.txt เพื่อดูว่าตอนนี้ table อยู่ที่ version อะไร

ถ้าเราลองเปิดไฟล์นี้ ไส้ในมันเขียนว่า

4

ซึ่งหมายความว่า ตอนนี้ table นี้ใช้ version ที่ 4 อยู่ ซึ่งสังเกตว่ามีไฟล์ชื่อ v4.metadata.json โดยรายละเอียดทั้งหมดเกี่ยวกับไฟล์จะอยู่ในนี้

ตัวอย่างไฟล์ vXmetadata.json ← X เป็นตัวเลข (อันนี้เอาแบบมี 2 version ให้ดูพอ เดี๋ยวยาวไป)

{
"format-version" : 1,
"table-uuid" : "bf337a89-f818-473f-a9e2-fab0931fee06",
"location" : "/home/iceberg/warehouse/trips_cow_30",
"last-updated-ms" : 1686716015244,
"last-column-id" : 6,
"schema" : {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "rowId",
"required" : false,
"type" : "string"
}, {
"id" : 2,
"name" : "partitionId",
"required" : false,
"type" : "string"
}, {
"id" : 3,
"name" : "preComb",
"required" : false,
"type" : "long"
}, {
"id" : 4,
"name" : "name",
"required" : false,
"type" : "string"
}, {
"id" : 5,
"name" : "versionId",
"required" : false,
"type" : "string"
}, {
"id" : 6,
"name" : "intToLong",
"required" : false,
"type" : "int"
} ]
},
"current-schema-id" : 0,
"schemas" : [ {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "rowId",
"required" : false,
"type" : "string"
}, {
"id" : 2,
"name" : "partitionId",
"required" : false,
"type" : "string"
}, {
"id" : 3,
"name" : "preComb",
"required" : false,
"type" : "long"
}, {
"id" : 4,
"name" : "name",
"required" : false,
"type" : "string"
}, {
"id" : 5,
"name" : "versionId",
"required" : false,
"type" : "string"
}, {
"id" : 6,
"name" : "intToLong",
"required" : false,
"type" : "int"
} ]
} ],
"partition-spec" : [ ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ ]
} ],
"last-partition-id" : 999,
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
"properties" : {
"owner" : "root"
},
"current-snapshot-id" : 8786062751122209149, #<---- #current snapshot_id
"refs" : {
"main" : {
"snapshot-id" : 8786062751122209149,
"type" : "branch"
}
},
"snapshots" : [ {
"snapshot-id" : 564428130807534033,
"timestamp-ms" : 1686562330984,
"summary" : {
"operation" : "append",
"spark.app.id" : "local-1686554434738",
"added-data-files" : "3",
"added-records" : "3",
"added-files-size" : "5167",
"changed-partition-count" : "1",
"total-records" : "3",
"total-files-size" : "5167",
"total-data-files" : "3",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "/home/iceberg/warehouse/trips_cow_30/metadata/snap-564428130807534033-1-2b320e7c-acd7-458c-ad2f-b5c486dcce71.avro",
"schema-id" : 0
}, {
"snapshot-id" : 8786062751122209149,
"timestamp-ms" : 1686716015244,
"summary" : {
"operation" : "append",
"spark.app.id" : "local-1686554434738",
"added-data-files" : "3",
"added-records" : "3",
"added-files-size" : "5223",
"changed-partition-count" : "1",
"total-records" : "3",
"total-files-size" : "5223",
"total-data-files" : "3",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "/home/iceberg/warehouse/trips_cow_30/metadata/snap-8786062751122209149-1-c16f7b7f-e104-4986-93c4-5ffe5d6a58bf.avro",
"schema-id" : 0
} ],
"statistics" : [ ],
"snapshot-log" : [ {
"timestamp-ms" : 1686716015244,
"snapshot-id" : 8786062751122209149
} ],
"metadata-log" : [ {
"timestamp-ms" : 1686562330984,
"metadata-file" : "/home/iceberg/warehouse/trips_cow_30/metadata/v1.metadata.json"
} ]
}

เห็นว่าจะมีการเก็บ snapshots ท้ังหมดใน table เราสามารถดูได้ว่า snapshot เป็น id อะไรจาก current-snapshot-id

"current-snapshot-id" : 8786062751122209149, #<---- #current snapshot_id
{
"snapshot-id" : 8786062751122209149,
"timestamp-ms" : 1686716015244,
"summary" : {
"operation" : "append",
"spark.app.id" : "local-1686554434738",
"added-data-files" : "3",
"added-records" : "3",
"added-files-size" : "5223",
"changed-partition-count" : "1",
"total-records" : "3",
"total-files-size" : "5223",
"total-data-files" : "3",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "/home/iceberg/warehouse/trips_cow_30/metadata/snap-8786062751122209149-1-c16f7b7f-e104-4986-93c4-5ffe5d6a58bf.avro",
"schema-id" : 0
}

หากดูใน snapshot ทั้งหมด เราจะ scope เห็นว่ามีรายละเอียดของ snapshot ปัจจุบัน มีการบอกว่าตอนนี้มี total-records, file size เท่าไร operation อะไรมา

แต่ละ snapshot ก็จะชี้ไปที่ manifest file ด้วยว่าเก็บในไฟล์อะไร โดยจะเก็บเป็นไฟล์ Avro นั่นเอง

ในขณะเดียวกัน เราสามารถ query ผ่าน SQL เพื่อดูว่า table นี้มี snapshot_id และ manifest_file อะไรก็ได้นะ ด้วยการ query จาก <table_name>.snapshots

spark.sql(
"""
SELECT snapshot_id, manifest_list
FROM demo.nyc.taxis.snapshots
""")

result:

Rollback to snapshot

table ในแต่ Iceberg มีการเก็บ version ไว้ ทำให้เราสามารถย้อนเวลาได้เหมือนโดเรม่อน

table นี้มีการเล่นอะไรไปเยอะละ มีทั้งหมด 6 columns

+---------+-------+-----+------------------+----------------------+------------------+
|vendor_id|trip_id| fare| distance|fare_per_distance_unit|store_and_fwd_flag|
+---------+-------+-----+------------------+----------------------+------------------+
| 1|1000371|15.32|1.7999999523162842| 8.511111| N|
| 1|1000374|42.13| 8.399999618530273| 5.015476| Y|
| 2|1000372|22.15| 2.5| 8.86| N|
+---------+-------+-----+------------------+----------------------+------------------+

เราดู history ของ table นั้นได้เช่นกัน ด้วยการ <table>.history

spark.sql(
"""
SELECT *
FROM demo.nyc.taxis.history
""")

result:

คราวนี้ เราจะย้อนเวลา table ด้วยลิ้นชักของโนบิตะ เอ้ย! query ตามด้านล่างเลย โดยบอกว่า table ที่เราต้องการคืออะไรและ เลข snapshot_id คือที่อยากจะไป

spark.sql(f"""
CALL system.rollback_to_snapshot('demo.nyc.taxis', 8188641127357900350)
""")
spark.sql(
"""
SELECT *
FROM demo.nyc.taxis
""")

สรุป

Iceberg คือหนึ่งในเครื่องมือในการทำ Data Lakehouse หรือ Open table format ที่มีสามารถทำได้หลายอย่าง เช่น

  • ทำให้หลายๆ engine สามารถต่อ Iceberg table ได้พร้อมกันได้ และสามารถต่อผ่าน Spark ได้
  • Upsert data
  • Schema Evolution
  • Partition evolution
  • Time Travel and Rollback

หากชื่นชอบบทความ tech แบบนี้สามารถติดตามได้ในช่องทาง
facebook: https://www.facebook.com/milsblog

หรือกด follow medium กันได้นะคะ https://mesodiar.medium.com/

--

--

Burasakorn Sabyeying
Mils’ Blog

Data engineer at CJ Express. Women Techmakers Ambassador. GDG Cloud Bangkok team. Moved to Mesodiar.com