อ่านข้อมูลจาก databases ด้วย PySpark JDBC ทำยังไง?

Nitit Taepant
CJ Express Tech (TILDI)
4 min readJan 11, 2023
Spark JDBC

ในหลายๆโอกาสเราอยากจะใช้งานข้อมูลจาก databases ต่างๆ แต่ข้อมูลที่เราจะใช้มันเยอะมาก แล้วอาจจะต้องเอามาประมวลผลอีกหลายต่อ สิ่งที่ช่วยเราได้ก็คือ Apache Spark นั่นเอง

แล้วเราจะใช้ Spark อ่านข้อมูลจาก Databases ยังไงดีล่ะ?

ในที่นี้เราใช้ Python ในการทำงานเป็นหลักก็เลยจะใช้ PySpark ในการใช้งานฟังก์ชันต่างๆของ Apache Spark การที่จะเชื่อมต่อกับ Database ด้วย Spark เราสามารถใช้ JDBC (Java Database Connectivity) ในการเชื่อมต่อกับ Databases ชนิดต่างๆได้ ถ้าอยากจะดู official document ก็นี่เลย Pyspark JDBC

Table of contents

  • Spark session
  • Database connection
  • JDBC read options ที่น่าสนใจ
  • Optimizing performance
  • สิ่งที่ต้องคำนึงในฝั่ง Database
  • สรุป

Spark session

สร้าง spark session และโหลด connection jar packages ต่างๆ

เช่น

  • org.postgresql:postgresql:version สำหรับ connect กับ Postgresql database
  • mysql:mysql-connector-java:version สำหรับ connect กับ Mysql database
spark = SparkSession.builder.appName("My App").master("local[*]") \\
.config("spark.jars.packages", "org.postgresql:postgresql:42.1.1,mysql:mysql-connector-java:8.0.28") \\
.getOrCreate()

Database connection

ทำการกำหนด configs connection กับ MYSQL และ PostgreSQL

MYSQL

database_name = ""
table = "example_table"
driver = "com.mysql.cj.jdbc.Driver" # connection driver
host = "localhost"

df = spark.read.format("jdbc") \
.option("driver", driver) \
.option("url", f"jdbc:mysql://{host}:3306/{database_name}") \
.option("dbtable", table) \
.option("user", "user") \
.option("password", "password") \
.load()

PostgreSQL

database_name = ""
table = "example_table"
driver = "org.postgresql.Driver" # connection driver
host = "localhost"

df = spark.read.format("jdbc") \
.option("driver", driver) \
.option("url", f"jdbc:postgresql://{host}:5432/{database_name}") \
.option("dbtable", table) \
.option("user", "user") \
.option("password", "password") \
.load()

ในส่วนของ dbtable option จะถูกนำเอาไปใส่ใน query statement ดังนี้

SELECT * FROM example_table

เรายังสามารถใส่ dbtable option เป็น subquery ได้ด้วยเช่น

dbtable:”(SELECT * FROM example_table WHERE date ≥ ‘2023–01–01’) as temp” จะเป็น

SELECT * FROM (SELECT * FROM example_table WHERE date ≥ '2023-01-01') as temp # need alia

JDBC options ที่น่าสนใจ

partitionColumn, lowerBound, upperBound, numPartitions

options 4 ตัวนี้จำเป็นต้องมีทั้งหมดถ้าจะใช้ตัวใดตัวหนึ่ง

  • partitionColumn ชื่อ column ที่จะใช้สำหรับทำ partitions โดย column datatype ต้องเป็น numeric, date, หรือ timestamp
  • lowerBound, upperBound เป็นตัวคำนวณเพื่อกำหนด partition stride เราต้องรู้ value range ของ partition column เพื่อทำให้สามารถแบ่ง partitions ให้พอดี
  • numPartitions ตัวกำหนดจำนวน partitions สูงสุดที่จะใช้อ่านเขียนแบบ parrellel ซึ่งก็จะเป็นตัวกำหนด concurrent ของ connection database ด้วย แต่จะถูกจำกัดด้วยจำนวน executors อีกที
ref. https://luminousmen.com/post/spark-tips-optimizing-jdbc-data-source-reads

ตัวอย่าง code

database_name = ""
table = "example_table"
driver = "org.postgresql.Driver" # connection driver
host = "localhost"

# numeric partition
df = spark.read.format("jdbc") \
.option("driver", driver) \
.option("url", f"jdbc:postgresql://{host}:5432/{database_name}") \
.option("dbtable", table) \
.option("user", "user") \
.option("password", "password") \
.option("partitionColumn", "number") \
.option("numPartitions", 10) \
.option("lowerBound", 0) \
.option("upperBound", 10000) \
.load()

# date partition
df = spark.read.format("jdbc") \
.option("driver", driver) \
.option("url", f"jdbc:postgresql://{host}:5432/{database_name}") \
.option("dbtable", table) \
.option("user", "user") \
.option("password", "password") \
.option("partitionColumn", "date") \
.option("numPartitions", 10) \
.option("lowerBound", "2020-01-01") \
.option("upperBound", "2023-01-01") \
.load()

query ที่เกิดขึ้นของ numeric partition

SELECT * FROM example WHERE number <= 1000
SELECT * FROM example WHERE number BETWEEN 1000 and 2000
...
SELECT * FROM example WHERE number > 9000

fetchsize

จำนวนแถวข้อมูลที่จะดึงมาต่อรอบของการ fetch ข้อมูลจาก database ถ้ากำหนดค่าต่ำเกินไปจะทำให้เกิด query เข้าคิวดึงข้อมูลจาก database เต็มไปหมด และทำให้ได้ข้อมูลช้าเนื่องจากมี latency ของ internet หน่วงการทำงาน ส่วนถ้าเยอะเกินไปก็จะทำให้ GC (Garbage Collection) ทำงานหนักจนทำให้หยุดทำงาน แล้วทำให้เกิด OOM error (Out Of Memory error) ได้เพราะข้อมูลที่ได้กลับมาเยอะเกินไป ดังนั้นการจะทำให้การดึงข้อมูลมีประสิทธิภาพมากขึ้น เราต้องหาตัวเลขที่พอดีมาใช้

สิ่งที่ต้องคำนึงในการหาตัวเลขที่พอดี ก็คือ size ของข้อมูลที่ query เช่น

  • query ไปกี่ columns
  • มี data types อะไรบ้าง
  • string ใน column มีความยาวแค่ไหน

ตัวอย่าง code

database_name = ""
table = "example_table"
driver = "org.postgresql.Driver" # connection driver
host = "localhost"

df = spark.read.format("jdbc") \
.option("driver", driver) \
.option("url", f"jdbc:postgresql://{host}:5432/{database_name}") \
.option("dbtable", table) \
.option("user", "user") \
.option("password", "password") \
.option("fetchSize", 100) \
.load()

pushDownPredicate

ถ้าตั้งค่าเป็น “true” Spark จะพยายามเอา filter ไปใช้ทางฝั่ง source ให้มากที่สุด แต่ถ้าเป็น “false” filter ทั้งหมดจะถูกมาคำนวณบน Spark แทน ในส่วนนี้ต้องพิจารณาว่าฝั่งไหนจะทำงานดีกว่าในการใช้ filter

Optimizing performance

On computation

ในส่วนของการจัดการ performance ในการคำนวณ เราจะคิดถึงจำนวน Spark tasks โดยจะเท่ากับ JDBC connection partitions และถูกกำหนดด้วย numPartitions ซึ่งจำนวนที่ดีคือควรจะเท่ากับจำนวน executors บน cluster ทำให้ partitions ถูกกระจายไปทำงานทุก executors และไม่ต้อง write/read partition ใหม่ในกรณีที่ partitions มากกว่า executors

On memory

ถ้าเกิดว่ามีการเกิด OOM error (Out Of Memory error) เราสามารถแบ่ง partitions ให้มากขึ้นเพื่อหั่น data ให้เล็กลงในแต่ละ partitions ได้ หรือจะลดขนาด fetchsize เพื่อลดขนาดของการดึงข้อมูลจาก database ในแต่ละรอบ

On data

ถ้าเกิดว่า data ของเรามี distribution แบบ skewed ทำให้เวลาทำ partition บนคอลัมน์นั้นเกิด partition ที่มีบางอันที่ใหญ่กว่าอันอื่นมาก เราอาจจะต้องพิจารณาการเลือกค่า lowerBound, upperBound, partitionColumn หรือทำการแบ่งข้อมูลด้วยการทำ subquery แยกออกมาในแต่ละส่วนของข้อมูลมาใช้งานแทน

On query

ในด้านของการ query เราก็สามารถเพิ่มความเร็วได้โดยที่เราควรจะใช้ partitionColumn เป็น column ที่ทางฝั่ง database ทำเป็น partition อยู่แล้วหรือมีการทำ indexing ไว้ รวมถึงการใช้ subquery ในการดึงเฉพาะข้อมูลที่จำเป็นในการเอามาใช้งาน

สิ่งที่ต้องคำนึงในฝั่ง Database

ถ้าเรากำหนดจำนวน partitions สูงจำนวน transactions ที่เกิดขึ้นก็จะเพิ่มตาม ทำให้เราต้องคำนึงถึงจำนวน transactions ที่ database รับไหวด้วย

ในส่วนของ fetchsize ถ้าเรากำหนดต่ำมากๆก็อาจจะทำให้ transactions per second ลดลงแต่จริงๆแล้วจำนวน transactions จะเพิ่มขึ้น ซึ่งอาจจะเกิดจาก latency ของ network อีกเหตุการณ์ที่เกิดขึ้นได้คือ worker ของฝั่ง database ไม่พอทำให้ transaction ถูกเข้าคิวไว้ใน scheduler ของ database ในส่วนนี้ต้องระวังที่จะทำให้ database ไม่สามารถทำงานปกติได้ เพราะ requests อื่นต้องไปต่อคิวจนกว่า worker จะว่างทำงาน

สรุป

Spark JDBC สามารถทำให้เราสามารถดึงข้อมูลจาก database ต่างๆมาคำนวณต่อใน Spark ได้อย่างง่ายดาย รวมถึงมี options ให้ใช้สำหรับ tuning performance ในการดึงข้อมูลทั้งเพิ่มความเร็วให้ query และ รวมถึงการจัดการกับ query ให้เหมาะกับ resources ที่มีอยู่

References:

--

--