Photo by Shahadat Rahman on Unsplash

PySpark (Pandas) UDF?

Watcharee Skr
CJ Express Tech (TILDI)
3 min readNov 29, 2023

--

บางครั้งเราก็อยาก process อะไรบางอย่างบน PySpark เช่นการ encrypt ข้อมูล หรือแปลงข้อมูลแบบแปลก ๆ ด้วย PySpark เราก็เลยไปลองค้นใน document ดูด้วยความคาดหวังว่าน่าจะมีคนทำให้เราแล้วแหละ แล้วก็ต้องพบความจริงว่า ไม่มี ดังนั้น เพื่อทำงานต่อได้เลยต้องสร้าง function ขึ้นมาเอง คำถามที่ตามมาคือยังไงล่ะ คำตอบก็คือ UDF

Table of Contents
- What is pyspark UDF
- Types of UDf
- Why we used pyspark UDF
- Performance comparing
- Conclusion
- References

What is PySpark UDF

UDF หรือ User Defined Function คือ การที่เราเขียน custom function ขึ้นมาเพื่อใช้ในการจัดการกับข้อมูลของเราในกรณีที่ไม่สามารถใช้ built-in function ที่มีอยู่แล้วมาจัดการได้ และเราสามารถนำ udf ใช้งานได้กับหลาย ๆ dataframe โดยสามารถใช้ได้กับเป็น spark function หรือใช้ใน SQL expression ได้เช่นกัน

หลักการทำงานของ UDF นี้คือจะทำงานที่ละ row โดยการ ส่งข้อมูลจาก JVM ไปประมวลผลในส่วนของ python runtime แล้วจึงส่งข้อมูลกลับมาให้ JVM ต่อทีละแถว ๆ แน่นอนว่าการต้องส่งข้อมูลกลับไปกลับมารวมไปถึงการค่อย ๆ ทำงานทีละแถว ทุกคนน่าจะพอนึกออกว่าจะเกิดอะไรตามมาบ้าง นั่นก็คือ performace ของ UDF นั้นไม่ดีเอาซะเลย

ดังนั้น ขอให้ทุกคนพึงระลึกไว้ก่อนเลยว่า ให้ใช้ UDF เป็นทางเลือกสุดท้ายในการ process บน Spark

Types of UDF

ตัว UDF มีด้วยกันหลายประเภทใน PySpark ขอยกมาเปรียบเทียบ 3 ประเภท

  1. Spark Scala UDF: ตรงตามชื่อ คือ UDF ที่เขียนด้วย Scala ซึ่งสามารถทำงานกับ JVM ได้เลยไม่ต้องส่งข้อมูลกันไปมา ทำให้ทำงานได้เร็วที่สุด แต่ว่าจำเป็นต้องเขียนด้วย Scala
  2. PySpark UDF: เป็น UDF ที่เขียนด้วย python ดังนั้นในการทำงานจำเป็นต้องแปลงจาก Java/Scala → Python → Java/Scala ทำให้ทำงานช้า
  3. PySpark Pandas UDF: เป็น UDF ที่เขียนด้วย Python เช่นกันแต่เบื้องหลังมีการใช้ Apache Arrow มาช่วยในการส่งผ่านข้อมูลระหว่าง JVM กับ Python runtime ทำให้สามารถทำงานได้เร็วขึ้น โดยแทนที่จะทำทีละแถว ๆ ก็จะแบ่งไปทำทีละ chunk แทน โดยจำนวนแถวที่จะแบ่งไปทำงานในแต่ละครั้งสามารถกำหนดได้ผ่าน parameter ที่ชื่อว่า spark.sql.execution.arrow.maxRecordsPerBatch มี default อยู่ที่ 10,000 rows

Why we used PySpark UDF

คำตอบง่าย ๆ ก็คือเราไม่มี built-in function ที่สามารถใช้งานได้ สิ่งที่เราต้องการ function ในการ encrypt และ decrypt ข้อมูลของเรา

จากหลังจากข้อมูลถูก encrypt ใน landing โดยใช้ tink ของ google ซึ่งสิ่งที่เรา encrypt ไม่ได้มีแค่ column ที่เป็น sensitive เช่น ชื่อ นามสกุล แต่รวมไปถึง Foreign Key ที่ข้อมูลอื่นสามารถมา Join ได้ ดังนั้นเมื่อมาถึงส่วนของ serving หรือก็คือส่วนที่ทำ data model จึงจำเป็นต้องมีการ decrypt column ที่เป็น FK และ encrypt กลับบหลังจากทำ data model ที่ process ด้วย spark แล้ว

ซึ่งจริง ๆ แล้วตัว spark ที่ เวอร์ชัน ≥ 3.5.0 ก็มี built-in function มานั่นก็คือ aes_encrypt และ aes_decrypt เราก็เลยไปลองใช้ดูบ้าง แต่พบว่าตัว function นี้ใช้ไม่ได้เพราะ

ตอนที่ลองพบว่า Key ที่ได้มาจาก tink มีความยาวไม่ตรงกับที่ built-in function รองรับ ดังนั้นเลยต้องใช้ UDF นั่นเอง

org.apache.spark.SparkRuntimeException: [INVALID_PARAMETER_VALUE.AES_KEY_LENGTH] The value of parameter(s) `key` in `aes_encrypt`/`aes_decrypt` is invalid: expects a binary value with 16, 24 or 32 bytes, but got 108 bytes.
at org.apache.spark.sql.errors.QueryExecutionErrors$.invalidAesKeyLengthError(QueryExecutionErrors.scala:2449)
at org.apache.spark.sql.errors.QueryExecutionErrors.invalidAesKeyLengthError(QueryExecutionErrors.scala)

เนื่องจากหาข้อมูลแล้วยังไม่พบว่าทำไมความยาวของ key ที่สร้างขึ้นด้วย tink จึงเป็น 108 bytes ถ้าใครมีคำแนะนำสามารถบอกได้เลยนะคะ

UDF Performance

ในเมื่อเราจำเป็นต้องใช้ UDF อย่างหลีกเลี่ยงไม่ได้ แล้วเราจะเลือกใช้ UDF ตัวไหนดี คำตอบคือถ้าสามารถเขียนด้วย Scala ได้ ให้ใช้ Scala UDF เลยเพราะสามารถใช้ใน JVM ได้โดยตรง จะเร็วและมีประสิทธิภาพที่สุด แต่ถ้าเราต้องใช้ python ล่ะ จะเลือกอันไหนดี เราก็เลยลองเปรียบเทียบระหว่าง Pyspark UDF และ Pyspark Pandas UDF ให้ดู

จำนวนเวลาที่เรารับได้คือ 2 นาทีในการ transform ข้อมูล

โดยสิ่งที่เราใช้เปรียบเทียบกันคือ function ที่ใช้ในการ encrypt ข้อมูล โดยมีต้นแบบมาจาก medium นี้

ข้อมูลที่ใช้คือข้อมูลเดียวกัน แต่มีขนาดไม่เท่ากัน

result of PySpark UDF

จากผลด้านบน เริ่มการเปรียบเทียบด้วยจำนวน row น้อยๆ ก่อนเพราะรู้ว่า UDF จะทำงานช้าอยู่แล้ว แต่เราต้องการ optimize ส่วนของ code ที่เราเขียนด้วยว่าสามารถทำให้เร็วขึ้นได้เท่าไหน ดังนั้นเริ่มจาก encrypt 1 column และข้อมูลขนาด 29 row เราใช้เวลาไปแค่ 6 วินาทีเท่านั้น พอจะใจชื้นขึ้นมาบ้างว่า code เราน่าจะพอได้อยู่แหละ

ดังนั้นเราเลยลองเพิ่มขนาดของข้อมูลเป็น 4000 row ดูบ้าง

p.s. ที่เลือกใช้ขนาด 4000 เพราะข้อมูลจริงที่จะผ่าน UDF มีขนาดประมาณ 4000

พบว่าใช้เวลาเพิ่มขึ้นมาเป็น 2 นาทีนิด ๆ ซึ่งยังอยู่ในเกณฑ์ที่เรารับได้

เราเลยลองเพิ่มจำนวน column ที่จะต้อง encrypt ดู เพราะในแต่ละชุดข้อมูลอาจจะมีหลาย column ที่เป็น sensitive เช่น ข้อมูลลูกค้าไม่ได้มีแค่ ชื่อลูกค้า แต่ยังมีเบอร์โทรศัพท์ หรือที่อยู่ด้วย เราลองเพิ่มจำนวน column ในการ encrypt ดู เจอว่าใช้เวลาเพิ่มขึ้นจาก 2 นาทีเป็น 12 นาที!!

เราเลยต้องหาทางปรับ จนไปเจอว่าตอนที่เรียกใช้ UDF เพื่อ encrypt เราเรียกใช้ for loop วนทีละ column ซึ่งเป็นสิ่งที่ไม่ควรใช้ใน spark เพราะ for loop ไม่ทำให้ distribute งาน ดังนั้นเราเลยเปลี่ยนมาใช้ functools เช่น map reduce แทน for loop ปรากฎว่าลดลงเหลือ 4 เกือบ 5 นาที ซึ่งเร็วขึ้นแต่ยังช้ากว่าที่เราคาดหวังไว้

ดังนั้นเรามาลองเทียบเร็วที่สุดของ PySpark UDF แต่เปลี่ยนมาใช้ PySpark Pandas UDF เพื่อเทียบกันดู

result of PySpark Pandas UDF

จะเห็นว่าพอเปลี่ยนจาก PySpark UDF เป็น PySpark Pandas UDF ใช้เวลาเหลือแค่ 3.1 วินาที ไม่ถึง 1 นาทีด้วยซ้ำ เราจึงเลือกใช้ PySpark Pandas UDF ตัวนี้

ข้อควรระวัง การใช้ PySpark Pandas UDF จะเกิดการแปลงข้อมูลเป็นรูปแบบ arrow ก่อน execute function อาจเกิดเหตุการณ์ OOM ได้ ดังนั้นต้องระวังเรื่อง resource และการ config maxRecordsPerBatch ให้ดี

Conclusion

จะเห็นว่าเราสามารถทำอะไรก็ได้เพื่อ process ข้อมูลใน PySpark ด้วย UDF แต่พึงระลึกไว้เสมอว่าการใช้ UDF นั้นการทำงานจะดีหรือไม่ดีขึ้นอยู่กับว่าเราเขียน code มาได้ดีขนาดไหน และต่อให้เขียนมาได้ดีมากขนาดไหนก็อย่าคาดหวังว่าจะดีได้เท่ากับ built-in function ของ Spark เอง ดังนั้นถ้าสุดท้ายไม่มีทางอื่นแล้วค่อยมาทาง UDF และเลือกใช้ UDF ตามที่เห็นสมควรได้เลยค่ะ

References

  • https://medium.com/analytics-vidhya/user-defined-functions-udf-in-pyspark-928ab1202d1c
  • https://www.youtube.com/watch?app=desktop&v=cvdHw4s4fa8
  • https://blog.knoldus.com/concept-of-udf-in-spark-user-defined-function/
  • https://spark.apache.org/docs/3.0.0/sql-pyspark-pandas-with-arrow.html#setting-arrow-batch-size
  • https://medium.com/@suffyan.asad1/an-introduction-to-pandas-udfs-in-pyspark-a0a512bd00e2
  • https://blog.devgenius.io/pyspark-udf-1b52b9311641
  • https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.aes_encrypt.html#:~:text=Returns%20an%20encrypted%20value%20of,'%2C%20'PKCS').

--

--