Using PySpark to read data from BigQuery

Nitit Taepant
CJ Express Tech (TILDI)
4 min readMay 25, 2023

การทำ Data Pipeline เราต้องยุ่งเกี่ยวกับข้อมูลหลายรูปแบบ และพยายามปรับเปลี่ยนข้อมูลให้อยู่ในรูปแบบที่เหมาะสมกับการใช้งาน บางครั้งเราต้องทำการสร้าง data model จากข้อมูลที่อยู่ในรูปแบบต่างกัน และคนละที่ BigQuery ก็อาจจะเป็นหนึ่งในนั้น

ในทีม Data Engineer TILDI เราใช้ Airflow ในการทำ Data Pipeline และใช้ PySpark เป็นตัวหลักในการ Processing ต่างๆ ทั้งการ transforming และ serving data รวมถึงงานอื่นๆเช่น Data Validation และ Data Reconciliation ถ้าอยากรู้ว่าเราทำอะไรบ้างใน Data Platform ดูที่นี่

เราพยายามจะหาวิธีที่ดีที่สุดในการเลือกใช้งาน Tools ให้เหมาะกับงานและ Data โดยคำนึงถึงทั้ง Costs และ Performances เราจึงจะมาลองใช้วิธีการอ่านข้อมูลใน BigQuery ด้วย PySpark ดูว่าจะช่วยอะไรใน Platform ของเราได้บ้าง

วิธีอ่านข้อมูลจาก BigQuery มี 3 วิธี

  1. BigQuery REST API
  2. BigQuery Extract หรือ Export วิธีนี้จะอ่านข้อมูลไปเก็บลง Cloud Storage ใน format ต่างๆ เช่น json, csv และ avro
  3. BigQuery Storage Read API

เราอาจจะพอเห็นวิธีที่ 1 กับ 2 บ่อยๆในการใช้อ่านข้อมูลจาก BigQuery แต่เราจะไม่พูดถึงในบทความนี้

BigQuery Storage Read API

ในการอ่าน BigQuery ด้วย PySpark เราจะใช้ spark-bigquery-connector ที่จะทำให้เราสามารถอ่านข้อมูลบน BigQuery ได้ และการอ่านของมันจะเรียกใช้งาน BigQuery Storage Read API ในการอ่าน

BigQuery Storage Read API คืออะไร?

เป็น API ที่ทำให้เราใช้งาน BigQuery-managed storage ได้อย่างรวดเร็วด้วย rpc-based protocol (gRPC).

จุดเด่นคือ

  1. สามารถอ่านข้อมูลหลายเซ็ตของแถวข้อมูลได้ในหนึ่ง session. ซึ่งจะช่วยในส่วนของการทำงานแบบ distributed processing ได้ดี.
  2. เนื่องจาก BigQuery เก็บข้อมูลในรูปแบบ Columnar Datastore และ API นี้สามารถทำการ filter columns ได้จึงสามารถลดขนาดของข้อมูลที่จะอ่านมาได้ ถ้า Table มีหลาย columns มากๆ.
  3. สามารถใส่ Simple Filter Predicates ได้เพื่อทำการ filter ข้อมูลบน server ก่อนจะส่งมาทาง client.
  4. ใช้ snapshot isolation model consumers ทุกตัวจะอ่านข้อมูล ณ จุดๆหนึ่งของเวลา ทำให้ข้อมูลที่ streaming มามีความ consistency

มาลองใช้ spark-bigquery-connector กัน

Setup

เราต้องทำการเปิดใช้งาน API ตามนี้ Enabling BigQuery Storage API

Download .jar file ไปเก็บที $SPARK_HOME/jars โดยเราต้องเลือก version ให้ตรงกับของ Spark ซึ่งโหลดจาก GCS หรือ Maven ก็ได้

spark = SparkSession.builder \
.master("local[*]") \
.appName('spark-bigquery-demo').getOrCreate()

หรือเราจะโหลดตอนที่สร้าง session ก็ได้

from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[*]") \
.appName('spark-bigquery-demo') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-3.1-bigquery:0.30.0') \\
.getOrCreate()

Reading data from table

Read directly from BigQuery storage

อ่านข้อมูลผ่าน BigQuery storage table ผ่าน API ซึ่งจะอ่านแบบ Streaming ใน format Arrow or Avro มาเป็น Dataframe โดยตรง

# Setting table location
project_id = ""
dataset_id = ""
table_id = ""
# Load data from BigQuery.
df = spark.read.format("bigquery") \
.option('project', project_id) \
.option('dataset', dataset_id) \
.option('table', table_id) \
.load()
# Or this way
table = f"{project_id}.{dataset_id}.{table_id}"
df = spark.read.format("bigquery").option("table",table).load()
# Or
df = spark.read.format("bigquery").load(table)

Read from query

เราสามารถอ่านข้อมูลผ่านการ query SQL บน BigQuery ได้ด้วย

การที่เราจะใช้การ Query SQL ต้องเซ็ต properties ดังนี้

  • viewsEnabled เป็น true
  • materializationDataset ต้องเซ็ตเป็น dataset ที่ GCP user หรือ credential มี permission การสร้าง table.
  • materializationProject (optional) เซ็ตเป็น project ที่จะสร้าง table.

Tables ใน query จะต้องอยู่ที่เดียวกับmaterializationDataset แต่ถ้าอยู่คนละที่ และไม่ได้อยู่ใน project เดียวกันให้ทำการเขียนที่อยู่แบบเต็ม [project].[dataset].[table] แทน

การอ่านข้อมูลด้วยวิธีนี้จะทำการ query บน BigQuery แล้วสร้าง temporary table บน materializationDataset (expiration time default=24hrs) หลังจากนั้น Spark ก็จะอ่านข้อมูลจาก table นี้ ซึ่งการอ่านด้วยวิธีนี้อาจจะทำให้มีการคิด Cost เพิ่มในการ Query และสร้าง temporary table

query = f"""
SELECT
*
from
`{project_id}.{dataset_id}.{table_id}`
limit
1000
"""

df = spark.read.format('bigquery') \
.option("viewsEnabled","true") \
.option("materializationProject", project_id) \
.option("materializationDataset", dataset_id) \
.option("query", query) \
.load()

# or
# set globally spark config
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationProject", project_id)
spark.conf.set("materializationDataset", dataset_id)

# then read in this way
df = spark.read.format("bigquery").load(query)
# Another way is
df = spark.read.format("bigquery").option("query", sql).load()

ข้อดีของการใช้การ Query ก็คือในกรณีที่ join หลายๆ tables ก่อนที่จะอ่านข้อมูลมาใส่ Dataframe เพราะ BigQuery สามารถทำการ join tables ได้ดีกว่า การ join ใน Spark รวมถึงทำให้ข้อมูลที่จะต้องผ่าน Network น้อยลงได้ด้วย

Filtering

Projection Pushdown

เนื่องจาก BigQuery เป็นแบบ columnar storage จึงมีประสิทธิภาพมากที่สุดถ้าเลือก columns เฉพาะที่จะใช้งานและ spark-bigquery-connector ก็รองรับการ filter columns แบบนี้ด้วยการ select columns

df.select("column1", "column2", "column3")

Predicate Pushdown

Partitions Pruning

BigQuery table นั้นสามารถทำ partitions ได้ ทำให้เราสามารถอ่านข้อมูลเท่าที่จำเป็นโดยการอ่านข้อมูลแค่บาง partitions จากการ filter เช่นเรามี partition เป็น date column จะใช้วิธีดังนี้

df = spark.read.format('bigquery') \
.option('project', project_id) \
.option('dataset', dataset_id) \
.option('table', table_id) \\
.option('filter', "<date column name> >= 'yyyy-mm-dd'") \
.load()

Predicate filter

เรายังสามารถ filter row ของข้อมูลเพิ่มได้อีกโดยส่วนนี้จะถูกไป filter บน BigQuery เราสามารถใช้ได้ทั้ง option filter หรือ where ก็ได้

df = spark.read.format('bigquery') \
.option('project', project_id) \
.option('dataset', dataset_id) \
.option('table', table_id) \
.option('filter', "<date column name> >= 'yyyy-mm-dd'") \
.option('filter', "<column> = <value>") \
.load()
# or
df.select("*").where("<column> = <value>")

Note:

ใน Spark ยังมีปัญหาในการ pushdown filter ของ nested fields เช่น address.city = "Sunnyvale" อย่างนี้จะไม่ถูก pushdown ไปที่ BigQuery

Cost

ในส่วนของ cost Streaming reads (BigQuery Storage Read API) จะคิดเป็น $1.1 per TB read ซึ่งจะมี free tier อยู่ที่ 300 TB/month ซึ่งถูกมาก แต่ยังมีในส่วน Network Egress สามารถดูเพิ่มเติมได้ ที่นี่

และถ้าหากเราใช้การอ่านแบบ Query ก็อาจจะโดนคิดในส่วน Query เพิ่มด้วย

ในกรณีปกติการคิด cost ของ API โดย default แล้วจะถูกคิดบน project ที่ credential หรือ service account ใช้งานอยู่ ถ้าอยากให้ cost ส่วนนี้เกิดขึ้นที่ project อื่น ต้องเซ็ต

spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

หรือเซ็ตที่ read operation โดยเพิ่ม

.option("parentProject", "<BILLED-GCP-PROJECT>").

df = spark.read.format('bigquery') \
.option('parentProject', "<BILLED-GCP-PROJECT>") \
.load(table)

cost อีกหนึ่งส่วนที่อาจต้องคำนึงถึง เนื่องจากเราใช้ Spark ในการอ่านและทำงานกับข้อมูลส่วนนั้นต่อ เราจึงต้องคำนวน cost ในส่วนของ processing resources ของ Spark ด้วย

Useful Properties

  • materializationExpirationTimeInMinutes ไว้สำหรับกำหนดเวลา temporary table ที่สร้างจาก query จะหมดอายุในหน่วยนาที (Defaults to 1440, or 24 hours)
  • maxParallelism จำนวนสูงสุดในการแบ่ง partition data ในการอ่าน
  • preferredMinParallelism จำนวน partition data ต่ำสุดที่อยากให้ทำการ partition ซึ่งอาจจะน้อยกว่าที่เซ็ตไว้ถ้าข้อมูลน้อย
  • และยังมี Properties อื่นๆ

Conclusion

การอ่านข้อมูลบน BigQuery มาด้วย PySpark ผ่าน spark-bigquery-connector นั้นสามารถทำได้ง่ายๆรวมถึงยังสามารถใช้จุดเด่นของ BigQuery Storage API ได้ค่อนข้างครบ และเหมาะกับใช้งานร่วมกับ Spark ด้วยการอ่านข้อมูลแบบ Streaming ทำให้ใช้งานแบบ Parallel ได้ดีทำให้เราสามารถ optimize การ processing ได้อย่างมีประสิทธิภาพ แต่เราอาจจะต้องคำนึงเรื่อง cost มากขึ้นเพราะอาจจะ track cost ส่วนต่างๆค่อนข้างยาก

References

--

--