Hudi Syncing Feature - Query Hudi Data Lakes ผ่าน BigQuery

Sojirath Thunprateep
CJ Express Tech (TILDI)
5 min readSep 16, 2022

Hudi Syncing เป็นอีก Feature หนึ่งที่น่าสนใจของการสร้าง Data Lakes ด้วย Apache Hudi เพราะจะทำให้เราสามารถ Query data จาก Hudi table ผ่าน BigQuery ได้อย่างสะดวกในรูปแบบของ External table

ถ้าใครลองไปอ่าน Doc ของ Feature นี้(ver 0.12.0) อาจจะเจอแค่ตัวอย่างcommand สั้นๆ ที่อาจจะงงว่ามันใช้ยังไง เราไปลองเล่นมาแล้ว เลยสรุปมาอยู่ในบทความนี้ว่ามันคืออะไร ใช้ยังไง รวมถึงปัญหาที่เราเจอกับวิธีแก้ที่ลองมาแล้ว และการเตรียม Environment ให้พร้อมใช้งาน

Part 1: Sync Hudi table
Part 2: Set up environment
Part 3: Run Sync Hudi Table
3.1. Run Hudi Sync ผ่าน Spark-submit
3.2. Run Hudi Sync บน Kubernetes SparkOperator
Part 4: ข้อจำกัดของ Hudi Sync Version 0.12.0

Part 1: Sync Hudi table

หลักๆแล้ว hudi-utilities-bundle_2.12–0.11.1.jar จะถูกใช้เป็น Application file ควบคู่กับ Configs ต่างๆ เพื่อทำการ Sync Hudi Table ไป BigQuery
ในที่นี้เราจะยกตัวอย่างจากการ Sync Hudi Table ที่ถูกสร้างอยู่บน GCS

ซึ่งการทำงานของ Hudi Sync จะแบ่งเป็น 2 ส่วนใหญ่ๆ คือ
1. Sync Hudi Table จาก GCS path ตั้งต้นที่ Hudi Table ของเราถูกสร้างไว้อยู่ ไปที่ Synced path ตามที่ Config
2. สร้าง External Table ที่ BigQuery จาก GCS Target path
ซึ่งเราจะได้ 2 tables และ 1 view บน Dataset ของเรา

  • Versions Table: External table ของ Data ที่ถูก Sync มา ซึ่งจะเป็นเทเบิลที่ถูกสร้างบน Partition path ของตัวเนื้อไฟล์ Data
  • Manifest Table: External table ของ Hoodie files name เป็นส่วนของ Manifest ที่มาจาก Metadata เพื่อให้ View สามารถใช้ในการเลือก Data ออกมาโชว์ผ่าน BigQuery ได้
  • View: User สามารถ Query Hudi Table ที่ถูก Sync มาอยู่บน BQ ผ่าน View นี้

เบื้องหลังของ View สร้างจาก Script แบบตัวอย่างข้างล่างนี้ นั่นหมายถึง View นี้อ่านข้อมูลมาจาก Files ผ่าน External versions table ซึ่งเลือกเฉพาะไฟล์ที่ถูกกำหนดให้มาโชว์ จากการกำหนดของ External manifest table ทำให้สุดท้ายเราสามารถอ่านข้อมูลที่เกิดจากการ Update ล่าสุดผ่าน Source Hudi table ได้

SELECT * 
FROM `project_id.dataset.tablename_versions`
WHERE _hoodie_file_name IN (
SELECT filename
FROM `project_id.dataset.tablename_manifest`
)

Part 2: Set up environment

2.1. Pre-requisite Libraries

Hudi Syncing เป็นการทำงานของ HoodieDeltaStreamer กับ org.apache.hudi.gcp.bigquery.BigQuerySyncTool เพราะฉะนั้นทำให้มี Libraries ที่เราต้องเตรียมเพื่อให้สามารถ Sync Hudi Table ได้ตามที่ลิสต์อยู่ข้างล่างนี้

ในที่นี้เรารัน Hudi Sync ผ่าน Spark เราเลยเตรียม Env โดยการโหลด Libraries ทั้งหมดนี้ไปวางใน jars folder ของ Spark Home
แต่ถ้าใครจะลองรันผ่าน PySpark on Local ก็สามารถนำ jar files ที่โหลดมาไปวางใน jars folder ของ pyspark ได้เหมือนกัน

สำหรับตัว guava-28.0-jre.jar โดยปกติใน jars path อาจจะมี guava lib มาให้แล้ว แต่ถ้าเป็นเวอร์ชันเก่าอาจจะทำให้เกิด Error Caused by: java.lang.NoSuchMethodError: 'void com.google.common.base.Preconditions.checkArgument(boolean, java.lang.String, long, long)'
แนะนำให้ลบตัวนั้นออก แล้วใช้ guava-28.0 ขึ้นไปแทน

guava-28.0-jre.jar
gcs-connector-hadoop2-latest.jar
failureaccess-1.0.1.jar
hudi-spark-common_2.12-0.11.1.jar
spark-avro_2.12-3.1.1.jar
hudi-gcp-bundle-0.11.1.jar
hudi-client-common-0.11.1.jar
hudi-spark-client-0.11.1.jar
hudi-hive-sync-0.11.1.jar
hudi-spark-bundle_2.12-0.11.1.jar
hudi-utilities-bundle_2.12-0.11.1.jar
hudi-sync-common-0.11.1
gax-2.7.0.jar
gax-httpjson-0.99.0.jar
google-auth-library-credentials-1.7.0.jar
google-auth-library-oauth2-http-1.7.0.jar
google-cloud-core-2.7.0.jar
google-http-client-1.41.8.jar
api-common-2.2.1.jar
google-cloud-core-http-2.7.0.jar
grpc-context-1.27.0.jar
google-api-client-2.0.0.jar
google-api-client-gson-2.0.0.jar
google-api-services-bigquery-v2-rev20220716-2.0.0.jar
opencensus-api-0.31.0.jar
opencensus-contrib-http-util-0.31.0.jar
google-http-client-jackson2-1.41.1.jar
google-http-client-appengine-1.42.2.jar
google-http-client-gson-1.41.8.jar
threetenbp-1.6.0.jar

2.2. Java

การใช้งาน Hudi จะรองรับเฉพาะ Java 8 ถ้า Java บนเครื่องมี Java version สูงกว่านั้นจะต้อง Downgrade ลงมา
(https://hudi.apache.org/contribute/developer-setup/)

2.3. Application File

hudi-utilities-bundle_2.12-0.11.1.jar ไว้ใช้เป็น Application file

2.4.Service Account Credential File

Credential file ใช้สำหรับ Authentication เพื่อเช้าถึง GCS และ BigQuery

Part 3: Run Sync Hudi Table

หลังจากที่เราเตรียม Env แล้วเราจะต้องสร้าง Dataset บน BQ เตรียมไว้ เพื่อพร้อมสำหรับการสร้าง Table ที่ Sync ไป

3.1. Run Hudi Sync ผ่าน Spark-submit

ถ้าไปอ่าน Official Doc ของ Apache Hudi เราจะเห็นตัวอย่างการรันด้วย spark-submit เราเลยจะมาอธิบาย Config ผ่าน Spark-submit commandในแต่ละส่วนกันก่อน

ตัวอย่าง Config ต่อไปนี้ จะสมมติให้ Hudi table ตั้งต้นชื่อ “table” และเทเบิลที่จะทำการ Sync ไปชื่อ “synced_table” มี Partition เป็น “date” และทำงานอยู่บน GCS Bucket ชื่อ playground

  • Spark Submit Command
    Set master เป็น Local เนื่องจากเราจะลองรันบนเครื่องเราเอง
spark-submit --master local
  • Spark Configs (--conf)

ส่วนแรกคือการ Set Spark config เพื่อให้ Session นี้รู้จัก Path ของ GCS ที่เราสามารถใช้ Path ที่ขึ้นต้น gs:// ได้เลย

--conf spark.hadoop.fs.AbstractFileSystem.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS 
--conf
spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem

ส่วนที่สองคือ Authentication ในที่นี้เราใช้ Service account file

--conf spark.hadoop.fs.gs.project.id=<<project_id>>
--conf spark.hadoop.google.cloud.auth.type=SERVICE_ACCOUNT_JSON_KEYFILE
--conf spark.hadoop.google.cloud.auth.service.account.json.keyfile=<<service_account.json>>
  • Packages (--packages)
    Config package เสริมที่จะโหลดก่อนรัน ซึ่งเป็นเหมือนกับการ Config
--packages com.google.cloud:google-cloud-bigquery:2.10.4,com.google.cloud.bigdataoss:gcs-connector:hadoop2–2.0.1
  • Jars (--jars)
    เพื่อให้ jars file ที่ config นี้ไปอยู่ใน Driver และ Executor class path
--jars /opt/apps/hudi_sync/hudi-utilities-bundle_2.12-0.11.1.jar
  • Class (--class)
    Class ที่อยู่ใน hudi-utilities-bundle ที่ถูกรันเพื่อ Sync table
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /opt/apps/hudi_sync/hudi-utilities-bundle_2.12-0.11.1.jar \
  • Application File
    เป็นการกำหนดว่ารันคำสั่ง Hudi Sync ผ่าน Application file ไหน ซึ่งในที่นี้เราใช้
/opt/apps/hudi_sync/hudi-utilities-bundle_2.12-0.11.1.jar
  • Target Table Details
    พาร์ทนี้จะกำหนด ชื่อ, ประเภท และ Path ของเทเบิลที่เราจะไปสร้างบน BQ จากการ Sync
--target-base-path gs://playground/synced_table \
--table-type COPY_ON_WRITE \
--target-table synced_table\
  • Enable Sync Command
    เป็น Command ให้ Hudi Sync ได้ทำงาน
--enable-sync
  • Sync Tool Class

เนื่องจากเรา Sync Hudi Table ไปยัง BQ เราจึงต้องใช้ BigQuerySyncTool และกำหนดว่า Source ของเราเป็น Source class เป็น Parquet

--sync-tool-classes org.apache.hudi.gcp.bigquery.BigQuerySyncTool
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource
  • Hoodie Configs (--hoodie-conf)

- Delta Streamer Config
source.dfs.root คือ Data path ของ Synced table (เป็น Config เดียวใน Hudi Sync ที่ mention ถึง table ตั้งต้น)

--hoodie-conf hoodie.deltastreamer.source.dfs.root=gs://playground/table/

- Table Config
Part นี้เป็น Config ของ Synced table ที่เราไปสร้างบน BigQuery
ในส่วน Config hoodie.datasource.write จะต้องเหมือนกับเทเบิลตั้งต้น

--hoodie-conf hoodie.gcp.bigquery.sync.project_id=<<project_id>>
--hoodie-conf hoodie.gcp.bigquery.sync.dataset_name=<<dataset_name>>
--hoodie-conf hoodie.gcp.bigquery.sync.dataset_location<<dataset_location>>
--hoodie-conf hoodie.gcp.bigquery.sync.table_name=synced_table
--hoodie-conf hoodie.gcp.bigquery.sync.base_path=gs://playground/synced_table
--hoodie-conf hoodie.gcp.bigquery.sync.partition_fields=date
--hoodie-conf hoodie.gcp.bigquery.sync.source_uri=gs://playground/synced_table/date=*
--hoodie-conf hoodie.gcp.bigquery.sync.source_uri_prefix=gs://playground/synced_table/
--hoodie-conf hoodie.gcp.bigquery.sync.use_file_listing_from_metadata=true
--hoodie-conf hoodie.gcp.bigquery.sync.assume_date_partitioning=false
--hoodie-conf hoodie.datasource.hive_sync.database=<<dataset_name>>
--hoodie-conf hoodie.datasource.hive_sync.table=synced_table
--hoodie-conf hoodie.datasource.hive_sync.partitioned_by=date
--hoodie-conf hoodie.datasource.hive_sync.use_jdbc=false
--hoodie-conf hoodie.datasource.hive_sync.skip_ro_suffix=true
--hoodie-conf hoodie.datasource.write.recordkey.field=id
--hoodie-conf hoodie.datasource.write.drop.partition.columns=true
--hoodie-conf hoodie.payload.ordering.field=id
--hoodie-conf hoodie.datasource.write.precombine.field=ts
--hoodie-conf hoodie.datasource.write.keygenerator.type=SIMPLE
--hoodie-conf hoodie.datasource.write.partitionpath.field=date
--hoodie-conf hoodie.datasource.write.hive_style_partitioning=true
--hoodie-conf hoodie.partition.metafile.use.base.format=true
--hoodie-conf hoodie.metadata.enable=true

3.2. Run Hudi Sync บน Kubernetes SparkOperator

โดยเราต้องเตรียม Docker Image ที่มีประกอบด้วย Pre-requisites ที่ลิสต์ไว้ใน Part 1 จากนั้น Config Hudi Sync เหมือนกับ spark-submit command

apiVersion: “sparkoperator.k8s.io/v1beta2”
kind: SparkApplication
metadata:
name: application-name
namespace: spark-operator
spec:
type: Java
mode: cluster
image: gcr.io/<<project_id>>/<< image_name>>",
imagePullPolicy: IfNotPresent
mainClass:
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
mainApplicationFile: “local:///opt/apps/hudi_sync/hudi-utilities-bundle_2.12–0.11.1.jar”
sparkVersion: “3.1.1”
timeToLiveSeconds: 180
arguments:
- --target-base-path
- gs://playground/synced_table
- --table-type
- COPY_ON_WRITE
- --target-table
- synced_table
- --enable-sync
- --sync-tool-classes
- org.apache.hudi.gcp.bigquery.BigQuerySyncTool
- --source-class
- org.apache.hudi.utilities.sources.ParquetDFSSource
- --hoodie-conf
- hoodie.auto.adjust.lock.configs=true
- --hoodie-conf
- hoodie.metadata.clean.async=true
- --hoodie-conf
- hoodie.deltastreamer.source.dfs.root=gs://playground/table/
- --hoodie-conf
- hoodie.gcp.bigquery.sync.project_id=<<project_id>>
- --hoodie-conf
- hoodie.gcp.bigquery.sync.dataset_name=<<dataset_name>>
- --hoodie-conf
- hoodie.gcp.bigquery.sync.dataset_location=<<dataset_location>>
- --hoodie-conf
- hoodie.gcp.bigquery.sync.table_name=synced_table
- --hoodie-conf
- hoodie.gcp.bigquery.sync.base_path=gs://playground/synced_table
- --hoodie-conf
- hoodie.gcp.bigquery.sync.partition_fields=date
- --hoodie-conf
- hoodie.gcp.bigquery.sync.source_uri=
gs://playground/synced_table/date=*
- --hoodie-conf
- hoodie.gcp.bigquery.sync.source_uri_prefix=
gs://playground/synced_table/
- --hoodie-conf
- hoodie.gcp.bigquery.sync.use_file_listing_from_metadata=true
- --hoodie-conf
- hoodie.gcp.bigquery.sync.assume_date_partitioning=false
- --hoodie-conf
- hoodie.datasource.hive_sync.database=<<dataset_name>>
- --hoodie-conf
- hoodie.datasource.hive_sync.table=synced_table
- --hoodie-conf
- hoodie.datasource.hive_sync.partitioned_by=date
- --hoodie-conf
- hoodie.datasource.hive_sync.use_jdbc=false
- --hoodie-conf
- hoodie.datasource.hive_sync.skip_ro_suffix=true
- --hoodie-conf
- hoodie.datasource.write.recordkey.field=id
- --hoodie-conf
- hoodie.datasource.write.drop.partition.columns=true
- --hoodie-conf
- hoodie.payload.ordering.field=ts
- --hoodie-conf
- hoodie.datasource.write.precombine.field=ts
- --hoodie-conf
- hoodie.datasource.write.keygenerator.type=SIMPLE
- --hoodie-conf
- hoodie.index.type=GLOBAL_BLOOM
- --hoodie-conf
- hoodie.datasource.write.partitionpath.field=date
- --hoodie-conf
- hoodie.datasource.write.hive_style_partitioning=true
- --hoodie-conf
- hoodie.partition.metafile.use.base.format=true
- --hoodie-conf
- hoodie.metadata.enable=true
restartPolicy:
type: Never
sparkConf:
“spark.eventLog.enabled”: “true”
“spark.eventLog.dir”: “gs://<< logging_bucket >>/spark”
“spark.driver.extraJavaOptions”: “-Divy.cache.dir=/tmp -Divy.home=/tmp”
“spark.kubernetes.driver.secrets.google-sa-key”: “/cred”
“spark.kubernetes.executor.secrets.google-sa-key”: “/cred”
“spark.kubernetes.driverEnv.GOOGLE_APPLICATION_CREDENTIALS”: “service_account.json”
“spark.executorEnv.GOOGLE_APPLICATION_CREDENTIALS”: “service_account.json”
“spark.debug.maxToStringFields”: “100”
hadoopConf:
“fs.gs.project.id”: “<<project_id>>}”
“fs.AbstractFileSystem.gs.impl”: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS
“fs.gs.impl”: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
“google.cloud.auth.type”: “SERVICE_ACCOUNT_JSON_KEYFILE”
“google.cloud.auth.service.account.json.keyfile”: “service_account.json”
driver:
cores: 1
coreLimit: “1200m”
memory: “512m”
labels:
version: 3.1.1
serviceAccount: sparkoperator
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
value: “service_account.json”
executor:
cores: 1
instances: 1
memory: “512m”
labels:
version: 3.1.1
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
value: “service_account.json”

Part 4: ข้อจำกัดของ Hudi Sync Version 0.12.0

  1. Document บอกไว้อย่างชัดเจนว่า จะสามารถ Sync ได้แค่กับ Hudi Table ที่มี Hive-partitioned tables และ มี Table type เป็น Copy-On-Write
    ซึ่งถ้าเทเบิลเราไม่มี Partition จริงๆ แล้วฝืนอยากใช้ Hudi Sync อยู่เราก็ยังสามารถทำได้โดยการให้ partition.field=“” แต่ Data ของเราจะถูกเก็บไปอยู่ใน Folder path default เช่น gs://playground/synced_table/=default
  2. จากการทดลองใช้ Hudi Sync มาสักพักใน version 0.12.0 เราเจอว่ามันจะตามหา “ts” column ตลอด เพราะ Precombine field โดย Default จะเป็นคอลัมน์ชื่อ “ts”
    เราลองแก้ปัญหานี้อยู่ เช่น Set precombine field เป็นคอลัมน์อื่น และ Set combine.before.upsert = False ซึ่งเป็นวิธีที่เคยทำตอนสร้าง Table แล้วแก้ปัญหา Error ts field not found ได้ แต่กับ Hudi sync เราไม่สามารถทำแบบนั้นได้
    ตอนนี้ใน Hudi version 0.12.0 ส่วนตัวเราเลยสรุปว่าถ้าจะใช้ Hudi Sync เราต้องมี “ts” field ที่เป็น timestamp เพื่อให้ไม่เกิด Error นี้

สรุป

  • Hudi Sync BigQuery เป็น feature ที่ทำให้เราสามารถ Query Hudi table ผ่าน BQ ได้ โดยการสร้าง External table
  • Hudi Version 0.12.0 ยังมีข้อจำกัดบางอย่างในการใช้งาน Syncing feature อยู่ เช่น Hive-partitioned tables และ Copy-On-Write เราคงต้องมารอดูกันว่า Version ต่อๆไปจะมีการพัฒนาให้เราสามารถใช้ Feature นี้อย่าง Flexible มากขึ้นขนาดไหน

--

--