เปิดยุคใหม่ของ data lake ด้วย delta lake

Pongthep Vijite
DAMAGeek
Published in
5 min readJul 27, 2021

หมายเหตุ ผู้อ่านสามารถดู table of contents ของ Data Engineering from Noob to Newbie ได้ที่ http://bit.ly/2P7isEw

อ้างอิงรูปจาก docs.delta.io

ก่อนหน้านี้หากจะพูดถึง lake ใน data lake platform หลายๆท่านอาจจะนึกถึงการจัดเก็บข้อมูลทั้งแบบ structured และ unstructured data ที่อยู่ในรูปแบบ file ตาม file format ต่างๆ และ file format ที่ data engineer มักจะต้องจัดการ ผมเชื่อว่า parquet file format ต้องเป็นตัวเลือกอันดับต้นๆแน่นอน ทั้งในแง่ของการ compression และประสิทธิภาพการอ่านในลักษณะ columnar

ถึง parquet file จะมีข้อดีหลากหลายแต่ก็มีข้อเสียใหญ่ๆอยู่ 1 อย่างคือไม่รองรับการแก้ไข file เดิม กล่าวคือเมื่อจัดเก็บข้อมูลในรูปแบบ parquet เราสามารถเลือกว่าจะเพิ่ม row ข้อมูลแบบ append หรือเขียนทับทั้ง file แบบ overwrite ได้เท่านั้น ซึ่งการเขียนทับแบบ overwrite ก็ไม่สามารถอ่านและเขียนลงใน location เดียวกันได้ (อ้างจากการใช้งานผ่านทาง apache spark) ซึ่งหลายๆบริษัทก็อาจจะมีวิธีการจัดการปัญหาตรงนี้แตกต่างกันไป ไม่ว่าจะเป็นเขียนลงใน temp location จากนั้นถึงย้ายมาทับ location หลักอีกขั้นตอน แต่ไม่ว่าทางไหนอาจจะเกิดจังหวะ blocking การใช้งานหรือการจัดการที่ซับซ้อนตามมาได้ แต่ปัญหาเหล่านี้จะหมดด้วยการมาของ framework ที่ชื่อว่า delta lake ซึ่งเราจะมาดูกันในบทความนี้ว่าตัว delta lake จะแก้ไข plain point ของ parquet file เดิมได้อย่างไร

ทำ delta lake lab ผ่านทาง jupyter notebook

บทความนี้เราจะใช้ aws emr version 6.3.0 พร้อมด้วย apache spark 3.1.1 และ jupyter hub 1.2.0

เรามาเริ่มต้นด้วยการลงมือเขียน delta lake table เพื่อให้เห็นการทำงานเบื้องต้นกันนะครับ ซึ่งหลังจากเปิด pyspark notebook ขึ้นมา เราต้องทำการ config เพื่อให้เราสามารถใช้งาน delta lake ได้ภายใน notebook (อ้างอิง https://docs.delta.io/latest/quick-start.html#pyspark-shell)

config delta lake ใน notebook

หลังจากนั้นเรามาลองสร้าง table ง่ายๆตามตัวอย่างจาก doc ของ delta lake กัน โดยใช้คำสั่ง range เพื่อสร้าง dataframe ที่มี 1 column ชื่อว่า id

สร้าง dataframe ด้วย spark.range

ขั้นตอนต่อไปเราจะทำการ save dataframe ดังกล่าวลงใน aws s3 โดยจะทำการเขียนลง 2 locations แยกตาม file format ซึ่งจะมีทั้งแบบ parquet และ delta และตั้งชื่อ table นี้ว่า lab_table

เขียน dataframe ลงใน aws s3 ทั้งแบบ parquet และ delta
parquet file ของ lab_table
delta file ของ lab_table

จากรูปข้างต้นข้อมูลใน s3 ไม่ว่าจะเขียนโดยกำหนด format เป็น parquet หรือ delta ตัว file ข้อมูลจะถูกเขียนเป็น file extension เดียวกันนั้นคือ parquet แต่ถ้าสังเกตดีๆเราจะพบว่าในการเขียน delta table จะมี directory ที่ชื่อว่า _delta_log เกิดขึ้นมาด้วย ซึ่ง directory ดังกล่าวเป็นหัวใจสำคัญของการจัดการข้อมูลด้วย delta lake

Delta transaction log

เมื่อเราเข้ามาภายใน _delta_log เราจะพบ json file พร้อมชื่อ file ที่เป็นตัวเลขทั้งหมด ซึ่ง file ดังกล่าวคือ transaction log ของ delta lake โดยชื่อ file แสดงถึง version ของ table โดยทุกๆการแปลี่ยนแปลงของ table จะทำให้เกิด json file พร้อมด้วยเลขชื่อ file ที่เพิ่มขึ้น กล่าวคือ 1 commit, 1 version

ภายใน _delta_log directory
ข้อมูลภายใน delta table version 0

เมื่อเราทำการเปิด json file ของ delta table version 0 เราจะพบข้อมูลซึ่งประกอบไปด้วย 4 กลุ่มใหญ่ๆ โดยบรรทัดแรก จะเป็น commitInfo

{
"commitInfo":{
"timestamp":1627369514021,
"operation":"WRITE",
"operationParameters":{
"mode":"ErrorIfExists",
"partitionBy":"[]"
},
"isBlindAppend":true,
"operationMetrics":{
"numFiles":"4",
"numOutputBytes":"1860",
"numOutputRows":"5"
}
}
}

ข้อมูลภายใน commitInfo จะระบุถึงรายละเอียดของการ commit ที่เกิดขึ้นประกอบไปด้วย

  • timestamp เวลาที่ commit
  • operation คือ operation ที่กระทำภายในการ commit นั้น
  • operationMetrics ที่ระบุว่า operation นี้มีการเขียน file ด้วยกัน 4 files รวมถึงขนาดข้อมูลและ row ที่เกิดขึ้น
{
"protocol":{
"minReaderVersion":1,
"minWriterVersion":2
}
}

ชุดข้อมูลที่ 2 จะเป็นข้อมูลเกี่ยวกับ protocol ซึ่งจะประกอบไปด้วยกัน 2 ค่าคือ minReaderVersion บอกถึง version ตํ่าสุดที่สามารถอ่านได้จาก table นี้ และ minWriterVersion ที่บอกถึง version ตํ่าสุดที่สามารถเขียนสำหรับ table นี้ได้ ซึ่งจะเห็นว่าตอนนี้เรามีอยู่แค่ version เดียว ถ้าหากมีใครเรียกใช้งาน table นี้เค้าก็จะสามารถเลือกอ่าน version เก่าสุดก็แค่ version นี้ แต่สำหรับกรณีหากมีคนต้องการเขียนข้อมูลใหม่ลง table นี้ version ที่เค้าจะสามารถเขียนได้ต้องไปเริ่มที่ version 2

{
"metaData":{
"id":"e83cde0a-620a-4fcb-9986-8013002df9ee",
"format":{
"provider":"parquet",
"options":{

}
},
"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}",
"partitionColumns":[

],
"configuration":{

},
"createdTime":1627369513368
}
}

ชุดข้อมูลถัดจาก protocol จะเป็นข้อมูลข้อมูล meta data ของ table ประกอบไปด้วย

  • format คือ table นี้จัดเก็บด้วย file format ประเภทใด
  • schemaString คือ schema ของ table ซึ่งใน table ตัวอย่างของเราประกอบไปด้วย 1 column ที่ชื่อว่า id มี data type เป็น long
  • partitionColumns คือ column ของ table ที่เราใช้ในการทำ partition
{"add":{"path":"part-00000-e257a25e-5a49-44fd-a72f-a3fcc7aabccc-c000.snappy.parquet","partitionValues":{},"size":463,"modificationTime":1627369514000,"dataChange":true}}
{"add":{"path":"part-00001-872cf6dd-d4f8-43ed-b197-b34b2ab832eb-c000.snappy.parquet","partitionValues":{},"size":463,"modificationTime":1627369514000,"dataChange":true}}
{"add":{"path":"part-00002-18374cf7-29be-4e40-b408-1e3c1d2051d4-c000.snappy.parquet","partitionValues":{},"size":463,"modificationTime":1627369514000,"dataChange":true}}
{"add":{"path":"part-00003-7db1dedc-4aaf-4b1d-9b83-4b36047cf77c-c000.snappy.parquet","partitionValues":{},"size":471,"modificationTime":1627369514000,"dataChange":true}}

กลุ่มข้อมูลสุดท้ายจะเป็น detail ของ file ที่เกิดจาก transaction โดยใน transactionนี้เป็นการสร้าง table ทำให้เกิด file ข้อมูลทั้งหมด 4 files ซึ่งแต่ละครั้งการเพิ่ม file ข้อมูลจะมีการระบุชื่อ file และขนาดของ file รวมถึงการระบุว่า operation นั้นทำให้เกิดการเปลี่ยนแปลงของ data หรือไม่ ผ่านทาง property dataChange

เราจะมาลองลบ row ภายใน delta table กันเพื่อดูการเปลี่ยนแปลงที่เกิดขึ้นกับ transaction log โดยเราจะใช้คำสั่ง DeltaTable.forPath เพื่อ load ข้อมูลและสร้าง delta table object จากนั้นทำการลบ row ที่มี id เท่ากับ 1

ลบ row ที่มี id เท่ากับ 1 จาก delta table

เมื่อเราลบ row เรียบร้อยแล้วเราลองไปดูความเปลี่ยนแปลงที่เกิดขึ้นใน directory ที่เก็บ delta table กันนะครับ

delta table directory หลังจากลบ row
_delta_log directory หลังจากลบ row

จะเห็นว่าเป็นเรื่องน่าแปลกมากที่เราทำการลบ row ข้อมูลออกจาก delta table แต่แทนที่ file ข้อมูลจะลด แต่กลับเกิด file ใหม่เพิ่มขึ้น 1 file พร้อมกับ version ของ delta table ที่เปลี่ยนไป ทีนี้เราลองมาดูการเปลี่ยนแปลงใน transaction log กันนะครับ

ข้อมูลภายใน delta table version 1

ภายใน transaction log ของ delta table version 1 มีข้อมูลอยู่ด้วยกัน 2 กลุ่มคือ commitInfo และ step ของเปลี่ยนแปลงใน delta table

{
"commitInfo":{
"timestamp":1627375311810,
"operation":"DELETE",
"operationParameters":{
"predicate":"[\"(`id` = 1L)\"]"
},
"readVersion":0,
"isBlindAppend":false,
"operationMetrics":{
"numRemovedFiles":"1",
"numCopiedRows":"0",
"executionTimeMs":"1796",
"numDeletedRows":"1",
"scanTimeMs":"1294",
"numAddedFiles":"1",
"rewriteTimeMs":"501"
}
}
}

ในส่วนของ commitInfo จะมีการระบุในส่วนของ operation ว่าเป็นการ delete ข้อมูลและในส่วนของ operationParameters จะบอกถึง condition การ delete ดังกล่าว ใน operationMetrics จะมีการอธิบายชัดเจนว่ามี file ข้อมูลถูกลบ 1 file (numRemovedFiles) และมีการลบข้อมูลออก 1 row (numDeletedRows) ซึ่งที่หน้าสังเกตคือมีการเพิ่ม file ขึ้นมาเพิ่มเติมอีก 1 file (numAddedFiles) ด้วยเช่นกัน ซึ่งเราสามารถตั้งข้อสังเกตได้ว่า row ที่เราจะทำการลบนั้นอยู่ใน file ที่ถูก delete พร้อมทั้งข้อมูล row อื่นที่อาจจะอยู่ใน file เดียวกัน ซึ่งเมื่อทำการลบ row แล้วข้อมูลที่อยู่ใน file เดียวกับ row ที่ถูก delete จะถูกนำมาเขียนเป็น file ใหม่ทำให้เกิดการเพิ่ม file ข้อมูลขึ้นมา

เราสามารถดู detail ของการเพิ่มลด file ข้อมูลของ delta table ได้จากส่วนที่อยู่ถัดจาก commitInfo ใน transaction log ของ delta table version 1 ซึ่งมีอยู่ด้วยกัน 2 บรรทัดคือ remove และ add

{
"remove":{
"path":"part-00001-872cf6dd-d4f8-43ed-b197-b34b2ab832eb-c000.snappy.parquet",
"deletionTimestamp":1627375311806,
"dataChange":true,
"extendedFileMetadata":true,
"partitionValues":{

},
"size":463
}
}
{
"add":{
"path":"part-00000-8c66d73b-13ed-49e5-8889-5344cd0329d9-c000.snappy.parquet",
"partitionValues":{

},
"size":296,
"modificationTime":1627375312000,
"dataChange":true
}
}

การ remove มีการระบุชื่อ file ที่ถูก mark ว่า remove เวลาและขนาดของ file ดังกล่าว ซึ่งเมื่อไปดูใน directory ข้อมูลของ delta table จะเห็นว่า แม้ file ดังกล่าวจะถูก mark ว่า remove แล้วแต่ตัว file จริงๆจะยังคงอยู่ใน directory ซึ่งเราจากเอกสารของ delta lake (https://docs.delta.io/latest/delta-utility.html#remove-files-no-longer-referenced-by-a-delta-table) ระบุว่าเราสามารถใช้คำสั่ง VACUUM เพื่อลบ file ที่ไม่ได้ใช้งานแล้วแต่ file ดังกล่าวต้องมีอายุเกิด default retention threshold ซึ่งก็คือ 7 วันด้วย ส่วนข้อมูลการ add file ใหม่นั้นเป็นการเพิ่ม file ข้อมูลใหม่ที่ไม่มี row ที่ผมทำการ delete เข้าสู่ delta table

การป้องกันการชนกันของการแก้ไข delta table

โดยปกติการจัดการข้อมูลในรูปแบบ file นั้นมีโอกาสเกิดการชนกันของการแก้ไขข้อมูล เนื่องจากการเชื่อมต่อไปยัง location ของ file นั้นไม่มีการจัดการ transaction แบบใน database software ทั่วไป แต่โดยการใช้งาน delta lake ซึ่งถูกพัฒนาโดยใช้หลักการ mutual exclusion ซึ่งการันตีว่าจะไม่เกิดการชนกันของการ update ข้อมูลแม้ว่าจะมาจากคนละ transaction โดยทั่วไปแล้วลำดับการทำงานในส่วนนี้จะมีดังนี้

อ้างอิงจาก https://databricks.com/blog/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log.html
  1. เริ่มแรกทุก transaction จะเริ่มจากการอ่าน delta table version ปัจจุบันจากรูปข้างต้น user 1 และ user 2 เริ่มต้นอ่านที่ version 0 เหมือนๆกัน
  2. จากนั้นแต่ละ transaction ก็จัดการ delta table
  3. user 1 ได้ทำการ commit การเปลี่ยนแปลง delta table และทำให้เกิด version ที่เพิ่มขึ้นเป็น version 1
  4. user 2 ซึ่งต้องการ commit การเปลี่ยนแปลง delta table เช่นกันแต่ด้วย version ปัจจุบันของ delta table ที่ user 2 รับรู้ในตอนนี้คือ version 0 ดังนั้น user 2 จึงเข้าใจการเปลี่ยนที่กำลังจะ commit นั้นจะต้องทำให้เกิดเป็น version 1 ของ delta table แต่เมื่อกำลังตรวจสอบกับ transaction log แล้วพบว่าไม่สามารถเขียน version 1 ได้เพราะ version ดังกล่าวถูกเขียนโดย user 1 แล้ว ดังนั้น user2 จึงต้องเพิ่มเขียนเป็น version ใหม่ที่ต่อจาก user 1 นั้นก็คือ version 2 แทน

ขั้นตอนดังกล่าวจะเกิดขึ้นแบบอัตโนมัติโดย delta lake ซึ่งจะหาก user2 ได้ทำการแก้ไขข้อมูลในจุดเดียวกับ user1 จนทำให้เกิดการชนกันในระดับข้อมูล ในกรณีนี้ user2 จะได้รับแจ้ง error ถึงการชนกันดังกล่าว

สุดท้ายนี้ถ้าท่านใดมีข้อสงสัยหรือคำชี้แนะใดๆสามารถฝากข้อความได้ที่ https://www.facebook.com/coeffest/ นะครับ ขอบคุณมากครับที่ติดตาม

สำหรับคนที่สนใจ Data Engineer สามารถเข้ามาแชร์ข้อมูลกันที่ได้ที่ https://www.facebook.com/groups/369157183664760/

อ้างอิง

--

--