Are You Ready To Learn The Most Expensive Operation In Pyspark With Me ?
Whatβs that operation ? Letβs get into it.
UDF
You might have heard whatβs UFO is !!!
But What UDF means ???
User Defined Functions β UDFβs
π A User Defined Function is a custom function defined to perform data transformation operations on Pyspark dataframes.
π UDFβs are used to extend the functions of the framework and re-use these functions on multiple DataFrameβs
It can also be used as an alternative of for-loops for faster performance.
Why do we need a UDF ?
UDFβs are used to extend the functions of the framework and re-use these functions on multiple DataFrameβs and SQL expressions.
For Example:
β If we wanna convert every first letter in a name string to a capital case PySpark build-in features donβt have this function.
β Hence we can create a UDF.
β We can reuse this as needed on many Data Frames
β β β β β β β β β β β β β β β β β β β β β β β β β β β β β β β
Remember π
When youβre creating UDFβs you need to design them very carefully otherwise you will come across optimization & performance issues.
β β β β β β β β β β β β β β β β β β β β β β β β β β β β β β β
β Now Letβs learn how to Create PySpark UDFβ¦ β
Create PySpark UDF
- Create PySpark Dataframe
Output:
2. Create a Python Function
β Letβs creates a function convertCase()
.
β It takes a string parameter and converts the first letter of every word to capital letter.
3. Convert a Python function to PySpark UDF
To do this, first import
org.apache.spark.sql.functions.udf
package
Function : PySpark SQL
udf()
#default type of the udf() is StringType
convertUDF = udf(lambda z: convertCase(z))
β Moving on to One Step Forward β
Using UDF with DataFrame
1. Using UDF with PySpark DataFrame select()
We can use convertUDF()
on a DataFrame column as a regular build-in function
df.select(col("Seqno"), \
convertUDF(col("Name")).alias("Name") ) \
.show(truncate=False)
Output:
2. Using UDF with PySpark DataFrame withColumn()
β Using udf on DataFrame withColumn()
function
β Letβs create another upperCase()
function which converts the input string to upper case.
β Now convert upperCase()
python function to UDF and then use it with DataFrame withColumn()
Output:
3. Registering PySpark UDF & use it on SQL
In order to use convertCase()
function on PySpark SQL, you need to register the function with PySpark by using spark.udf.register()
.
Output:
Creating UDF using annotation
Previously weβve learned to create a UDF is a 2 step process.
β creating a Python function
β second convert function to UDF using SQL udf()
function
Now weβre gonna learn to create UDF in single line of code using annotations.
Output:
Complete PySpark UDF Example
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
columns = ["Seqno","Name"]
data = [("1", "john jones"),
("2", "tracey smith"),
("3", "amy sanders")]
df = spark.createDataFrame(data=data,schema=columns)
df.show(truncate=False)
def convertCase(str):
resStr=""
arr = str.split(" ")
for x in arr:
resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
return resStr
""" Converting function to UDF """
convertUDF = udf(lambda z: convertCase(z))
df.select(col("Seqno"), \
convertUDF(col("Name")).alias("Name") ) \
.show(truncate=False)
def upperCase(str):
return str.upper()
upperCaseUDF = udf(lambda z:upperCase(z),StringType())
df.withColumn("Cureated Name", upperCaseUDF(col("Name"))) \
.show(truncate=False)
""" Using UDF on SQL """
spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
.show(truncate=False)
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE " + \
"where Name is not null and convertUDF(Name) like '%John%'") \
.show(truncate=False)
""" null check """
columns = ["Seqno","Name"]
data = [("1", "john jones"),
("2", "tracey smith"),
("3", "amy sanders"),
('4',None)]
df2 = spark.createDataFrame(data=data,schema=columns)
df2.show(truncate=False)
df2.createOrReplaceTempView("NAME_TABLE2")
spark.udf.register("_nullsafeUDF", lambda str: convertCase(str) if not str is None else "" , StringType())
spark.sql("select _nullsafeUDF(Name) from NAME_TABLE2") \
.show(truncate=False)
spark.sql("select Seqno, _nullsafeUDF(Name) as Name from NAME_TABLE2 " + \
" where Name is not null and _nullsafeUDF(Name) like '%John%'") \
.show(truncate=False)
Evaluation order and null checking
β Spark SQL does not guarantee the order of evaluation of subexpressions.
β The inputs of an operator or function are not necessarily evaluated left-to-right or in any other fixed order
β β β β β β β β β β β β β β β β β β β β β β β β β β β β β
For example:
logical
AND
andOR
expressions do not have left-to-right βshort-circuitingβ semantics.
β β β β β β β β β β β β β β β β β β β β β β β β β β β β β
β So it is dangerous to rely on the side effects or order of evaluation of Boolean expressions, and the order of WHERE
and HAVING
clauses
β Such expressions and clauses can be reordered during query optimization and planning
β Specifically, if a UDF relies on short-circuiting semantics in SQL for null checking, thereβs no guarantee that the null check will happen before invoking the UDF.
β β β β β β β β β β β β β β β β β β β β β β β β β β β β β β
Example:
β β β β β β β β β β β β β β β β β β β β β β β β β β β β β β β
This
WHERE
clause does not guarantee thestrlen
UDF to be invoked after filtering out nulls.
β To perform proper null checking, we recommend that you do either of the following:
- Make the UDF itself null-aware and do null checking inside the UDF itself
- Use
IF
orCASE WHEN
expressions to do the null check and invoke the UDF in a conditional branch
β β β β β β β β β β β β β β β β β β β β β β β β β β β β β β β
Example:
β β β β β β β β β β β β β β β β β β β β β β β β β β β β β β β
Hope you got an idea on UDF π»β¦
See you in next blogβ¦
Cheersπβ¦
Ramya R β₯
RESOURCES:
https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/
IMAGES: