Cost of Data Warehouse vs Data Lakehouse

Tanakrit Kongneing
CJ Express Tech (TILDI)
11 min readJul 4, 2024

disclaimer : บทความนี้จะใช้ผลิตภัณฑ์ของ google เป็นหลัก ไม่ว่าจะเป็น bigquery, cloud storage, dataproc เป็นต้น เพราะฉะนั้นขออภัยในความไม่สะดวกมา ณ ที่นี้ครับ

Table of contents

  • Data warehouse คืออะไร
  • Data lake คืออะไร
  • Data lakehouse คืออะไร
  • Demo แบบจับมือพาทำ
  • Conclusion สรุปสาระสำคัญ

Data warehouse คืออะไร

Full Definition : https://cloud.google.com/learn/what-is-a-data-warehouse

Data warehouse หรือ คลังข้อมูล เป็นระบบที่เก็บข้อมูลจากแหล่งต่าง ๆ ขององค์กร ไม่ว่าจะเป็น ข้อมูลยอดขายจากเครื่อง POS (Point of sales), ข้อมูลสมาชิกจากระบบ CRM (Customer relationship management), ข้อมูลสินค้าจาก ERP (Enterprise resource planning) หรือจากแหล่งอื่นๆ ไว้ในที่เดียวกัน โดยจะเก็บข้อมูลมีโครงสร้าง​ (Structered data ) อาทิ เช่นข้อมูลที่อยู่ในรูปแบบของตารางหรือมาจากdatabase ที่เป็น relational database หรือเก็บข้อมูลกึ่งมีโครงสร้าง (Semi structured data) อาทิ เช่น ข้อมูลที่เป็น key-values เก็บเป็น json หรือข้อมูลจาก apiโดยจะมีการ transform แปลงข้อมูลเพื่อให้อยู่ในรูปแบบที่พร้อมใช้งานสำหรับการทำวิเคราะห์ข้อมูล (Data analytics) โดยที่สามารถใช้ BI Tools ต่าง ๆ เชื่อมต่อกับข้อมูลบน Data warehouse ได้โดยตรง เพื่อออกแบบพัฒนา/ปรับปรุง Report/Dashboard ได้อย่างมีประสิทธิภาพ ตอบโจทย์ตามที่องค์กรต้องการ

ข้อดี

  • Query ข้อมูลไว: สำหรับข้อมูลที่มีจำนวนข้อมูลเยอะๆ (columns เยอะและ rows เยอะ) หากข้อมูลอยู่ใน data warehouse การคำนวณและการประมวลผลของ query 1 ครั้งอาจใช้เวลาเพียงไม่กี่วินาที (ขึ้นอยู่กับ spec ถ้าอยู่บน cloud ก็ไวมาก)
  • เหมาะสำหรับ Analytics: data warehouse ถูกออกแบบมาเพื่อตอบโจทย์การทำ การวิเคราะห์ข้อมูล ไม่ว่าจะเป็น การสร้าง datamodel ต่างๆ จนนำไปแสดงผล ออกมาเป็น report/dashboard ต่างๆ

ข้อเสีย

  • ค่าใช้จ่ายสูง: ทั้งในแง่การจัดเก็บข้อมูลและการนำไปใช้ ถ้าใช้งานไม่ระมัดระวัง ค่าใช้จ่ายอาจพุ่งสูงปรี๊ดได้เลยทีเดียว (เช่น การ query ข้อมูลใน table แบบไม่ใส่ where condition)
Simple solution for report/dashboard with bigquery

Data lake คืออะไร

Full Definition : https://cloud.google.com/learn/what-is-a-data-lake

Data lake หรือ ทะเลสาบข้อมูล เป็นระบบที่ใช้เก็บข้อมูลจากแหล่งต่าง ๆ ภายในองค์กรไว้ในที่เดียว โดยมีความสามารถในการเก็บรวบรวมข้อมูลทุกประเภท ไม่ว่าจะเป็นข้อมูลตั้งต้น, รูปภาพ, วิดีโอ, เสียง และอื่น ๆ ที่มีคุณลักษณะแตกต่างกันไป เป็นตัวเลือกที่ดีสำหรับถังพักข้อมูล เพื่อใช้ในการประมวลผลข้อมูลในขั้นตอนต่อไป

ข้อดี

  • รองรับข้อมูลหลายประเภท: Data lake สามารถเก็บข้อมูลที่มีโครงสร้าง, ข้อมูลที่กึ่งมีโครงสร้าง และข้อมูลที่ไม่มีโครงสร้าง ตั้งแต่ไฟล์ excel, csv , pdf , รูปภาพ, วิดิโอ, เสียง เก็บได้ทั้งหมดเลย โดยที่ไม่ต้องมีการตั้งค่า schema ใดๆ
  • สามารถจัดการกับข้อมูลที่มีปริมาณมาก: Data lake สามารถเก็บข้อมูลได้เท่าไหร่ก็ได้ ถ้าคุณมีข้อมูลอยู่ 10PB (Petabyte) Data lake ก็สามารถจัดเก็บให้ได้ โดยที่ไม่ต้องตั้งค่าอะไรเพิ่มเติมเลย
  • ค่าใช้จ่ายถูกมาก: ถ้าใช้ Data lake ที่อยู่บน cloud แล้วใช้เก็บข้อมูลอย่างเดียว ไม่ได้เข้าถึงไฟล์บ่อยมากนัก จะมีราคาถูกมากๆ ตัวอย่างเช่น cloud storage ของ google จะมี 4 tier (Standard/Nearline/Coldline/Archive อันนี้ต้องตั้งค่าตอนสร้าง bucket) แบ่งตามความถี่ในการเข้าถึงไฟล์ (อ่าน/เขียน/แก้ไขไฟล์) ถ้าเข้าถึงไฟล์บ่อยใช้ Standard storage จะอยู่ที่ 0.02$/GB/Month หรือเพียงเดือนละ 0.74 บาท ต่อGBต่อเดือน

ข้อเสีย

  • Query ข้อมูลช้า: การ query ข้อมูลใน Data lake จะช้ากว่าการ query ข้อมูลจาก database แบบปกติ เพราะ data lake ถูกออกมาแบบเพื่อจัดเก็บไฟล์มากกว่าการ query ดึงข้อมูลออกไปใช้ โดยเฉพาะอย่างยิ่ง ข้อมูลที่จัดเก็บแบบไม่ได้ออกแบบตอนที่จะนำไปใช้งานต่อ
  • ต้องมีการดูแลและการจัดการที่ดี: ถ้าขาดการกำกับดูแลที่เหมาะสม data lake จะกลายเป็นแหล่งเก็บข้อมูลที่เละเทะ ไม่เป็นระเบียบ และยากที่จะนำไปใช้งานต่อได้
ราคา Storage บน Google Cloud Storage

Data lakehouse คืออะไร

Full Definition : https://cloud.google.com/discover/what-is-a-data-lakehouse

Data lakehouse หรือ บ้านทะเลสาบข้อมูล เป็นระบบระบบที่เก็บข้อมูลจากแหล่งต่าง ๆ ขององค์กรไว้ในที่เดียว โดยเกิดมาเพื่อแก้ปัญหาของ Data lake และ Data warehouse โดยเกิดผสมข้อดีของทั้งสองอย่าง เข้าไว้ในสิ่งเดียวกัน

ผู้อ่านสามารถอ่านเพิ่มเติมเกี่ยวกับ Data warehouse, Data lake, Data lakehouse ได้ที่บทความด้านล่าง พี่มิว Burasakorn Sabyeying สรุปไว้ให้แล้ว

Demo แบบจับมือพาทำ

วันนี้ผู้เขียนจะพาเปรียบเทียบค่าใช้จ่ายของการใช้ data warehouse และ data lakehouse บน google cloud platform โดย google product ที่เราจะใช้กันในวันนี้ก็จะมีทั้งหมด 3 ตัว ได้แก่ cloud storage, bigquery, dataproc และจะใช้ apache iceberg สำหรับ data lakehouse ในวันน้ี โดยขั้นตอนจะมีประมาณนี้

Focus point in this demo

เราจะโฟกัส 3 หัวข้อหลักดังนี้

  1. Init table การสร้าง table เริ่มต้น
  2. Delete & Insert การลบข้อมูลเก่าและเพิ่มข้อมูลใหม่
  3. Query data การดึงข้อมูล

โดยขั้นตอนจะมีประมาณนี้

1. Init table การสร้าง table เริ่มต้น

1.1) สร้าง Mockup data จำนวน 10 สำหรับใช้ในการสร้าง table เริ่มต้น โดยข้อมูลเริ่มต้นจะเป็นไฟล์ parquet

เราจะ gen ข้อมูลที่ประกอบไปด้วย id, email, name, point_balance, version

Example mock-up data
  • id คือ primary key ของ record นั้นๆ
  • email คือ email ของลูกค้า
  • name คือ ชื่อ-นามสกุล ของลูกค้า
  • point_balance คือ จำนวนคะแนนคงเหลือของลูกค้า
  • version คือ document version ของข้อมูลที่เรารับได้ข้อมูล

โดย code ที่เราใช้สร้างข้อมูล mock up จะมีหน้าตาประมาณนี้

import pandas as pd
from faker import Faker
import hashlib
import random
from tqdm import tqdm

fake = Faker()

num_records = 10000000

def generate_consistent_data(num_records):
data = {
'id': [hashlib.md5(fake.unique.uuid4().encode()).hexdigest() for _ in tqdm(range(num_records), desc="Generating IDs")],
'email': [fake.user_name() + "@" + fake.domain_name() for _ in tqdm(range(num_records), desc="Generating Emails")],
'name': [fake.name() for _ in tqdm(range(num_records), desc="Generating Names")],
'point_balance': [fake.random_int(min=0, max=10000) for _ in tqdm(range(num_records), desc="Generating Point Balances")]
}
return pd.DataFrame(data)

def generate_old_data(data, num_records):
version_list = []
point_balance_list = []

with tqdm(total=num_records, desc="Generating old data") as pbar:
for _ in range(num_records):
old_version = random.choice([1, 2])

if old_version == 1:
version = 1
point_balance = data['point_balance'].iloc[_]
elif old_version == 2:
version = 2
point_balance = data['point_balance'].iloc[_]
else:
raise ValueError("Invalid old_version provided.")

version_list.append(version)
point_balance_list.append(point_balance)
pbar.update(1)

data['version'] = version_list
data['point_balance'] = point_balance_list

return data

def generate_new_data(data, num_records):
version_list = []
point_balance_list = []

with tqdm(total=num_records, desc="Generating new data") as pbar:
for _ in range(num_records):
old_version = data['version'].iloc[_]

if old_version == 1:
new_version = random.choices([1, 2], weights=[1, 9])[0]
if new_version == 1:
version = 1
point_balance = data['point_balance'].iloc[_]
else:
version = 2
point_balance = fake.random_int(min=0, max=10000)
elif old_version == 2:
new_version = random.choices([2, 3], weights=[1, 9])[0]
if new_version == 2:
version = 2
point_balance = data['point_balance'].iloc[_]
else:
version = 3
point_balance = fake.random_int(min=0, max=10000)
else:
raise ValueError("Invalid old_version provided.")

version_list.append(version)
point_balance_list.append(point_balance)
pbar.update(1)

data['version'] = version_list
data['point_balance'] = point_balance_list

return data

df_consistent = generate_consistent_data(num_records)

df_old = generate_old_data(df_consistent.copy(), num_records)

print("Old Data:")
print(df_old.head())

df_new = generate_new_data(df_old.copy(), num_records)

print("\nNew Data:")
print(df_new.head())

old_data_file = 'old_data.parquet'
new_data_file = 'new_data.parquet'

df_old.to_parquet(old_data_file, index=False, engine='pyarrow')
df_new.to_parquet(new_data_file, index=False, engine='pyarrow')

print(f"\nOld data saved to {old_data_file}")
print(f"New data saved to {new_data_file}")

1.2) นำไฟล์ parquet ที่สร้างขึ้นมาจาก ขั้นตอนข้อที่ 1 ไปวางไว้ใน cloud storage

init data
up-coming data

1.3) สร้าง dataset ใน bigquery โดยไปที่ Bigquery กดที่ จุดสามจุด หลัง project id จะมีหน้าต่างตามรูปด้านล่าง ให้เลือก Create dataset จากนั้นตั้งค่า Dataset ID และเลือก Region ตามรูปด้านล่าง

create bigquery dataset

1.4) สร้าง bigquery native table จาก ไฟล์ parquet โดยไปที่ dataset ที่สร้างจากข้อก่อนหน้า แล้วกดที่ จุดสามจุด จากนั้น

  • กด Create table
  • เลือก Create table from Google cloud storage
  • เลือก bucket และ folder ที่ไฟล์ parquet นี้อยู่ (ขั้นตอนข้อที่ 2)
  • เลือก dataset ที่จะให้ table นี้อยู่ (ขั้นตอนข้อที่ 3)
  • ตั้งชื่อ table ในทีนี้ผู้เขียนตั้งเป็น customer_point_balance
  • Table type เลือกเป็น Native table
  • กด Create table
create bigquery native table from parquet file
customer_point_balance table detail + example data

ซึ่งค่าใช้จ่ายในการสร้าง bigquery native table ตรงส่วนนี้จะฟรี ไม่มีค่าใช้จ่าย เนื่องจาก operation ที่เกิดขึ้นจะเป็น Type LOAD ซึ่ง google ไม่ได้คิดเงินใน operation นี้ครับ

job type and job detail of creating bigquery native table from parquet
bigquery free operation

source : https://cloud.google.com/bigquery/pricing#free

1.5) สร้าง iceberg table โดย pyspark batch jobs ไปยัง dataproc

ต่อไปเราจะใช้วิธีการ submit batch pyspark jobs ไปยัง dataproc โดยวิธีการนี้เราไม่ต้องเตรียม resource เอง จะเป็นแบบ serverless เราแค่เตรียม code และนำไปรันก็พอ

โดย code ที่เราใช้สำหรับการสร้าง iceberg table จะมีหน้าตาประมาณนี้

import argparse
from pyspark.sql import SparkSession


def get_argument():
parser = argparse.ArgumentParser()
parser.add_argument("--project_id", help="gcp project id.")
parser.add_argument("--source_path", help="source gcs path.")
parser.add_argument("--warehouse_path", help="iceberg warehouse path")
parser.add_argument("--table_name", help="iceberg table name")
args = parser.parse_args()
if args.project_id:
project_id = args.project_id
if args.source_path:
source_path = args.source_path
if args.warehouse_path:
warehouse_path = args.warehouse_path
if args.table_name:
table_name = args.table_name

return project_id, source_path, warehouse_path, table_name


def get_spark_session(app_name, project_id, warehouse_path):
spark = SparkSession.builder
spark_config = {
"spark.ui.enabled": "false",
"spark.sql.caseSensitive": "true",
"spark.sql.autoBroadcastJoinThreshold": "-1",
"spark.sql.session.timeZone": "Asia/Bangkok",
"fs.gs.project.id": project_id,
"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.3_2.13:1.4.2,org.apache.iceberg:iceberg-spark-extensions-3.3_2.12:1.4.2",
"spark.jars.excludes": "javax.jms:jms,com.sun.jdmk:jmxtools,com.sun.jmx:jmxri",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.catalog.spark_catalog": "org.apache.iceberg.spark.SparkSessionCatalog",
"spark.sql.catalog.spark_catalog.type": "hive",
"spark.sql.catalog.local": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.local.type": "hadoop",
"spark.sql.defaultCatalog": "local",
"spark.sql.catalog.local.warehouse": warehouse_path,
}
for config_key in spark_config:
spark = spark.config(config_key, spark_config.get(config_key))
spark = spark.appName(app_name).getOrCreate()
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set(
"fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
)
# spark
print(f"Spark version = {spark.version}")

# hadoop
print(
f"Hadoop version = {spark.sparkContext._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}"
)

return spark


def create_iceberg_table(spark: SparkSession, source_path, table_name):
data_df = spark.read.parquet(source_path)
data_df.printSchema()

data_df.writeTo(f"local.{table_name}").create()

iceberg_df = spark.read.format("iceberg").load(f"local.{table_name}")
iceberg_df.show()



def main():
project_id, source_path, warehouse_path, table_name = get_argument()
print(f">>> now transform {source_path} to {warehouse_path}.{table_name}")
spark = get_spark_session("customer_point_balance", project_id, warehouse_path)

create_iceberg_table(
spark=spark, source_path=source_path, table_name=table_name
)
spark.stop()


if __name__ == "__main__":
main()

โดยนำ code ด้านบน save เป็นไฟล์ .py แล้วนำไปวางไว้ใน cloud storage โดยตั้งชื่อไฟล์ create_iceberg.py

เตรียมไฟล์ jar ไปวางไว้ใน cloud storage โดยในครั้งนี้เราจะใช้

  • iceberg-spark-runtime-3.3_2.13–1.4.2.jar
  • iceberg-spark-extensions-3.3_2.13–1.4.2.jar
jar files

จากนั้นไปที่ dataproc แบบ serverless

create batch jobs

จากนั้นกด Create

Config batch jobs parameter ตามนี้

  • Batch ID: ชื่อ job ครั้งนี้
  • Region: asia-southeast1
  • Batch Type: Pyspark
  • Runtime version: 2.0 (Spark 3.3, Java 17, Scala 2.13)
  • Main python file: gcs path ไปยัง create_iceberg.py
  • Jar files: gcs path ไปยัง jar files ทั้งสองอัน
  • Arguments: table_name=customer_point_balance, warehouse_path= gcs path ของ warehouse, source_path = gcs path ของข้อมูลเริ่มต้น (old_data), project_id = gcp project id
  • Service account: เลือก service account ที่จะใช้
config 1
config 2
  • จากนั้นกด Submit
job succeeded
log
example data

โดยในการรันสร้าง table นี้ใช้ resource ไปทั้งหมด

  • 1.545 DCU-hours
  • 0.204 GB-month

คิดเป็นค่าใช้จ่ายประมาณ

cost

การคำนวณค่าใช้จ่ายของ dataproc แบบ serverless (ในที่นี่เราใช้แบบ tier standard, singapore) ซึ่งจะคิด 2 ส่วน คือ ฝั่งประมวลผลและฝั่งเก็บข้อมูล

source : https://cloud.google.com/dataproc-serverless/pricing

1. DCU Usage Cost:

  • Approximate DCU usage: 1.545 DCU-hours
  • DCU Cost per hour: $0.074015
  • Total DCU Cost: 1.545 DCU-hours * $0.074015/DCU-hour ≈ $0.1143

DCU = Data Compute Units จำนวนหน่วยประมวลผลข้อมูลที่ใช้

2. Shuffle Storage Cost:

  • Approximate shuffle storage usage: 0.204 GB-months
  • Shuffle Storage Cost per GB-month: $0.044
  • Total Shuffle Storage Cost: 0.204 GB-months *$0.044/GB-month≈ $0.008976

ค่าใช้จ่ายทั้งหมด = DCU Usage Cost + Shuffle Storage Cost

= $0.1143 +$0.008976 ≈ $0.123276 หรือประมาณ​ 4.54 บาท

2. Delete & Insert การลบข้อมูลเก่าและเพิ่มข้อมูลใหม่

2.1) นำข้อมูลใหม่เข้า bigquery native table โดยใช้วิธี delete + insert ข้อมูล

โดยข้อมูลใหม่จะถูกสร้างเป็น bigquery native table ไว้แล้ว

โดยพฤติกรรมของข้อมูลชุดนี้จะเป็นการเก็บจำนวนแต้มของลูกค้า โดยจะดูที่ version ของ record ล่าสุด หากจำนวนแต้มมีการเปลี่ยนแปลง ตัวเลขของ version ก็จะเพิ่มขึ้น

โดยเราจะใช้ DML ( Data Manipulation Language) ในการ delete ข้อมูลเก่า และ insert ข้อมูลใหม่เข้าไปยัง target table

รัน query ด้านล่างเพื่อ delete ข้อมูลเก่าออกจาก target table

DELETE
FROM
{gcp_project_id}.medium_demo.customer_point_balance AS t1
WHERE
EXISTS (
SELECT
1
FROM
{gcp_project_id}.medium_demo.customer_point_balance_new_data AS t2
WHERE
t1.id = t2.id
AND t1.version < t2.version );

หลังจากรัน query ด้านบนจะได้ผลลัพธ์ดังนี้

bigquery delete job detail

รัน query ด้านล่างเพื่อ insert ข้อมูลใหม่ไปยัง target table

INSERT INTO {gcp_project_id}.medium_demo.customer_point_balance
SELECT
t2.*
FROM
{gcp_project_id}.medium_demo.customer_point_balance_new_data AS t2
WHERE
NOT EXISTS (
SELECT
1
FROM
{gcp_project_id}.medium_demo.customer_point_balance AS t1
WHERE
t1.id = t2.id );
bigquery insert job detail

โดยทั้ง 2 operation delete และ insert ใช้ bytebilled ไปทั้งหมด 1.24GB + 897MB ~ 2.14GB

source : https://cloud.google.com/bigquery/pricing#on_demand_pricing

คิดจากราคา bigquery on demand (singapore) จะอยู่ที่ $8.44/TB ดังนั้น cost ของการทำ delete + insert จะอยู่ที่ (2.14/1024)*8.44 =$0.0176 ~ 0.65 บาท

แต่ bigquery ก็มีค่า storage ด้วย แต่ในที่นี้ข้อมูลเข้าเงื่อนไขฟรี ไม่มีค่าใช้จ่าย

2.2 ) นำข้อมูลใหม่เข้า iceberg table โดยใช้วิธี delete + insert ข้อมูล

โดย code ที่เราใช้สำหรับการสร้าง iceberg table จะมีหน้าตาประมาณนี้

import argparse
from pyspark.sql import SparkSession


def get_argument():
parser = argparse.ArgumentParser()
parser.add_argument("--project_id", help="gcp project id.")
parser.add_argument("--source_path", help="source gcs path.")
parser.add_argument("--warehouse_path", help="iceberg warehouse path")
parser.add_argument("--table_name", help="iceberg table name")
args = parser.parse_args()
if args.project_id:
project_id = args.project_id
if args.source_path:
source_path = args.source_path
if args.warehouse_path:
warehouse_path = args.warehouse_path
if args.table_name:
table_name = args.table_name

return project_id, source_path, warehouse_path, table_name


def get_spark_session(app_name, project_id, warehouse_path):
spark = SparkSession.builder
spark_config = {
"spark.ui.enabled": "false",
"spark.sql.caseSensitive": "true",
"spark.sql.autoBroadcastJoinThreshold": "-1",
"spark.sql.session.timeZone": "Asia/Bangkok",
"fs.gs.project.id": project_id,
"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.3_2.13:1.4.2,org.apache.iceberg:iceberg-spark-extensions-3.3_2.12:1.4.2",
"spark.jars.excludes": "javax.jms:jms,com.sun.jdmk:jmxtools,com.sun.jmx:jmxri",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.catalog.spark_catalog": "org.apache.iceberg.spark.SparkSessionCatalog",
"spark.sql.catalog.spark_catalog.type": "hive",
"spark.sql.catalog.local": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.local.type": "hadoop",
"spark.sql.defaultCatalog": "local",
"spark.sql.catalog.local.warehouse": warehouse_path,
}
for config_key in spark_config:
spark = spark.config(config_key, spark_config.get(config_key))
spark = spark.appName(app_name).getOrCreate()
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set(
"fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
)
# spark
print(f"Spark version = {spark.version}")

# hadoop
print(
f"Hadoop version = {spark.sparkContext._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}"
)

return spark


def create_iceberg_table(spark: SparkSession, source_path, table_name):
new_df = spark.read.parquet(source_path)
new_df.printSchema()
new_df.createOrReplaceTempView("new_df")

sql = f"""
DELETE FROM {table_name} AS t1
WHERE EXISTS (
SELECT 1
FROM new_df AS t2
WHERE t1.id = t2.id AND t1.version < t2.version
);
"""

spark.sql(sql).show(1)

sql = f"""INSERT INTO {table_name}
SELECT t2.*
FROM new_df AS t2
WHERE NOT EXISTS ( SELECT 1
FROM {table_name} AS t1
WHERE t1.id = t2.id );
"""
spark.sql(sql).show(1)

def main():
project_id, source_path, warehouse_path, table_name = get_argument()
print(f">>> now transform {source_path} to {warehouse_path}.{table_name}")
spark = get_spark_session("customer_point_balance", project_id, warehouse_path)

create_iceberg_table(
spark=spark, source_path=source_path, table_name=table_name
)
spark.stop()


if __name__ == "__main__":
main()

โดย config ของ dataproc จะมีหน้าตาเหมือนกันกับตอนที่เราสร้าง table เริ่มต้นเลย จะเปลี่ยนแค่ path ของ python ไฟล์ใน gcs และ path ของ data ใหม่ ที่เป็น parquet

หลัง submit เสร็จจะได้ผลประมาณนี้

โดย metadata ของ iceberg จะมีการเปลี่ยนแปลง 2 ครั้งคือ ตอนที่ delete และตอนที่ insert ซึ่งมีการเก็บข้อมูล หลังจบ operation นั้นๆไว้เสมอ

ค่าใช้จ่ายสำหรับ delete + insert

DCU Usage Cost:

  • Approximate DCU usage: 2.298 DCU-hours
  • DCU Cost per hour: $0.074015
  • Total DCU Cost: 2.298 DCU-hours * $0.074015/DCU-hour ≈ $0.1700

Shuffle Storage Cost:

  • Approximate shuffle storage usage: 0.303 GB-months
  • Shuffle Storage Cost per GB-month: $0.044
  • Total Shuffle Storage Cost: 0.303 GB-months *$0.044/GB-month≈ $0.013332

ค่าใช้จ่ายทั้งหมด = DCU Usage Cost + Shuffle Storage Cost

= $0.1700 +$0.013332 ≈ $0.183332 หรือประมาณ​ 6.76 บาท

3. Query data การดึงข้อมูล

3.1 ) สร้าง biglake table สำหรับ iceberg table

เราจะลอง query data จาก Data lakehouse ที่อยู่ใน cloud storage กัน โดยจะใช้ bigquery query ผ่าน biglake connection โดยจะสร้าง biglake connection ตามนี้

  • ไปที่ Bigquery แล้วกด ADD
  • กด Connections to external data sources
  • Config external data source
  • Connection type = Vertex AI remote models, remote functions and Biglake (Cloud Resource)
  • Connection ID = medium_demo (กำหนดได้ตามใจชอบ)
  • Region = Singapore
  • กด Create

จะได้ connection ของ biglake ตามนี้

อย่าลืมให้ permission storage object viewer แก่ service account ของ connection biglake นี้ใน bucket ที่มี iceberg table เก็บอยู่ไม่งั้น อาจ query ข้อมูลไม่ได้

อ่านเพิ่มเติมเกี่ยวกับ Biglake ได้ที่นี่ เขียนโดย น้องนอร์ท Nawaphon Thiandusit

จากนั้นสร้าง external table โดยใช้คำสั่งด้านล่างนี้

CREATE OR REPLACE EXTERNAL TABLE {gcp_project_id}.medium_demo.customer_point_balance_iceberg
WITH CONNECTION `projects/{gcp_project_id}/locations/asia-southeast1/connections/medium_demo`
OPTIONS (
format = 'ICEBERG',
uris = ["gs://{bucket_id}/transforming/customer_point_balance/metadata/v3.metadata.json"]
)

โดย path uris ต้องชี้ไปยัง metadata version ล่าสุดของ iceberg table

3.2 ) query ข้อมูลด้วยเงื่อนไขเดียวกัน เพื่อดู cost ของ query ของ Data warehouse และ Data lakehouse

SELECT * 
FROM `{gcp_project_id}.medium_demo.customer_point_balance`
warehouse query cost

ค่าใช้จ่ายการ query ของ Data warehouse อยู่ที่ 0.26 บาทต่อครั้ง

SELECT * 
FROM `{gcp_project_id}.medium_demo.customer_point_balance_iceberg`
lakehouse query cost

ค่าใช้จ่ายการ query ของ Data lakehouse อยู่ที่ 0.26 บาทต่อครั้ง

ค่าใช้จ่ายในการ query เท่ากัน แต่ Data lakehouse ใช้เวลาในการ query มากกว่านิดหน่อย

3.3 ) สรุปผลค่าใช้จ่ายที่เกิดขึ้นทั้งหมดของทั้ง Data warehouse และ Data lakehouse

ค่าใช้จ่ายที่เกิดทั้งหมดฝั่ง Data warehouse หากใช้ bigquery ในขั้นตอนการประมวลผล จะอยู่ที่ 0.65 บาทต่อการอัพเดตข้อมูล (Load data เข้าเพื่อสร้าง table ฟรี) ค่า query ต่อครั้งจะอยู่ที่ 0.26 บาทต่อครั้ง (หาก full scan) และอาจจะมีค่า bigquery storage หาก table มีขนาดใหญ่มากๆ แต่ในที่นี้ขอไม่พูดถึง เพราะขนาดของข้อมูลไม่ถึงขั้น tier ที่ต้องเสียเงิน

ค่าใช้จ่ายที่เกิดทั้งหมดฝั่ง Data lakehouse หากใช้ dataproc serverless ในขั้นตอนการประมวลผล จะอยู่ที่ 6.64 บาทต่อการอัพเดตข้อมูล (ค่าสร้าง table ตอนเริ่มต้น 4.42 บาท) ค่า query ต่อครั้งจะอยู่ที่ 0.26 บาทต่อครั้ง (หาก full scan )และอาจมีค่า cloud storage หากมีการแก้ไขไฟล์/เขียนไฟล์ เพิ่มเติม หาก table มีขนาดใหญ่มากๆ ต้องมีการ maintain iceberg table ต่อ แต่ในที่นี้ขอไม่พูดถึง เพราะขนาดของข้อมูลไม่ถึงขั้น tier ที่ต้องเสียเงิน

cost summary

Conclusion สรุปสาระสำคัญ

บทความนี้เขียนขึ้นมาเพื่อเปรียบเทียบค่าใช้จ่ายของการ update ข้อมูลแบบ delete + insert โดยเปรียบเทียบระหว่าง Data warehouse (bigquery) และ Data lakehouse (iceberg + dataproc + biglake) เพื่อดูว่าแท้จริงการทำงานแบบเดียวกัน วิธีไหนยากหรือง่ายกว่ากัน และมี trade-off ระหว่างทางอะไรบ้าง

Data lakehouse นั้นเกิดมาเพื่อแก้ปัญหาบางอย่างที่ Data warehouse ไม่สามารถทำได้ โดยรวมข้อดีของ Data lake เข้ามาด้วย เช่นเรื่องการ no-fixed schema , การจัดการ metadata เป็นต้น ซึ่งผู้อ่านอาจจะต้องชั่งน้ำหนักดูว่า งานที่กำลังทำอยู่นั้น Solution ไหนตอบโจทย์มากกว่ากัน ในระยะสั้น Data warehouse อาจจะใช้งานได้ง่ายกว่า ในระยะยาวช่วงที่กำลังเติบโต Data lakehouse อาจจะตอบโจทย์องค์กร ถ้ามีการวางแผนที่เตรียมพร้อมมาเป็นอย่างดี

ถ้าคุณชอบบทความสามารถกดแชร์ ปรบมือ เพื่อเป็นกำลังใจให้ผู้เขียนด้วยนะครับ

หากสนใจอ่านบทความอื่นๆ เพิ่มเติม คลิกที่นี่

ติดตาม Facebook พวกเรา คลิกที่นี่

สนใจร่วมงานกับ CJ Express คลิกที่นี่

Reference

[1] https://cloud.google.com/learn/what-is-a-data-warehouse

[2] https://cloud.google.com/learn/what-is-a-data-lake

[3] https://cloud.google.com/discover/what-is-a-data-lakehouse

[4] https://mesodiar.com/2022/04/01/data-warehouse-data-lake-data-lakehouse/

[5] https://cloud.google.com/bigquery/pricing#free

[6] https://cloud.google.com/storage/pricing#asia

[7] https://cloud.google.com/bigquery/pricing#on_demand_pricing

[8] https://iceberg.apache.org/docs/1.5.0/

[9] https://medium.com/cj-express-tech-tildi/biglake-%E0%B9%81%E0%B8%9A%E0%B8%9A%E0%B8%88%E0%B8%B1%E0%B8%9A%E0%B8%A1%E0%B8%B7%E0%B8%AD%E0%B8%97%E0%B8%B3-e0f062f6ff2e

--

--

Analytics Engineer At CJ Express Group, Google Cloud Professional Data Engineer