Are You Ready To Learn The Most Expensive Operation In Pyspark With Me ?

R RAMYA
7 min readMay 8, 2022

--

What’s that operation ? Let’s get into it.

UDF

You might have heard what’s UFO is !!!

But What UDF means ???

User Defined Functions β€” UDF’s

πŸ”Ž A User Defined Function is a custom function defined to perform data transformation operations on Pyspark dataframes.

πŸ”Ž UDF’s are used to extend the functions of the framework and re-use these functions on multiple DataFrame’s

It can also be used as an alternative of for-loops for faster performance.

Why do we need a UDF ?

UDF’s are used to extend the functions of the framework and re-use these functions on multiple DataFrame’s and SQL expressions.

For Example:

β†’ If we wanna convert every first letter in a name string to a capital case PySpark build-in features don’t have this function.

β†’ Hence we can create a UDF.

β†’ We can reuse this as needed on many Data Frames

β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€”

Remember πŸ“Œ

When you’re creating UDF’s you need to design them very carefully otherwise you will come across optimization & performance issues.

β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€”

β€œ Now Let’s learn how to Create PySpark UDF… ”

Create PySpark UDF

  1. Create PySpark Dataframe

Output:

2. Create a Python Function

β†’ Let’s creates a function convertCase() .

β†’ It takes a string parameter and converts the first letter of every word to capital letter.

3. Convert a Python function to PySpark UDF

To do this, first importorg.apache.spark.sql.functions.udf package

Function : PySpark SQL udf()

#default type of the udf() is StringType

convertUDF = udf(lambda z: convertCase(z))

β€œ Moving on to One Step Forward β€œ

Using UDF with DataFrame

1. Using UDF with PySpark DataFrame select()

We can use convertUDF() on a DataFrame column as a regular build-in function

df.select(col("Seqno"), \
convertUDF(col("Name")).alias("Name") ) \
.show(truncate=False)

Output:

2. Using UDF with PySpark DataFrame withColumn()

β†’ Using udf on DataFrame withColumn() function

β†’ Let’s create another upperCase() function which converts the input string to upper case.

β†’ Now convert upperCase() python function to UDF and then use it with DataFrame withColumn()

Output:

3. Registering PySpark UDF & use it on SQL

In order to use convertCase() function on PySpark SQL, you need to register the function with PySpark by using spark.udf.register().

Output:

Creating UDF using annotation

Previously we’ve learned to create a UDF is a 2 step process.

β†’ creating a Python function

β†’ second convert function to UDF using SQL udf() function

Now we’re gonna learn to create UDF in single line of code using annotations.

Output:

Complete PySpark UDF Example

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

columns = ["Seqno","Name"]
data = [("1", "john jones"),
("2", "tracey smith"),
("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)
df.show(truncate=False)

def convertCase(str):
resStr=""
arr = str.split(" ")
for x in arr:
resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
return resStr

""" Converting function to UDF """
convertUDF = udf(lambda z: convertCase(z))

df.select(col("Seqno"), \
convertUDF(col("Name")).alias("Name") ) \
.show(truncate=False)

def upperCase(str):
return str.upper()

upperCaseUDF = udf(lambda z:upperCase(z),StringType())

df.withColumn("Cureated Name", upperCaseUDF(col("Name"))) \
.show(truncate=False)

""" Using UDF on SQL """
spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
.show(truncate=False)

spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE " + \
"where Name is not null and convertUDF(Name) like '%John%'") \
.show(truncate=False)

""" null check """

columns = ["Seqno","Name"]
data = [("1", "john jones"),
("2", "tracey smith"),
("3", "amy sanders"),
('4',None)]

df2 = spark.createDataFrame(data=data,schema=columns)
df2.show(truncate=False)
df2.createOrReplaceTempView("NAME_TABLE2")

spark.udf.register("_nullsafeUDF", lambda str: convertCase(str) if not str is None else "" , StringType())

spark.sql("select _nullsafeUDF(Name) from NAME_TABLE2") \
.show(truncate=False)

spark.sql("select Seqno, _nullsafeUDF(Name) as Name from NAME_TABLE2 " + \
" where Name is not null and _nullsafeUDF(Name) like '%John%'") \
.show(truncate=False)

Evaluation order and null checking

β†’ Spark SQL does not guarantee the order of evaluation of subexpressions.

β†’ The inputs of an operator or function are not necessarily evaluated left-to-right or in any other fixed order

β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€”

For example:

logical AND and OR expressions do not have left-to-right β€œshort-circuiting” semantics.

β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€”

β†’ So it is dangerous to rely on the side effects or order of evaluation of Boolean expressions, and the order of WHERE and HAVING clauses

β†’ Such expressions and clauses can be reordered during query optimization and planning

β†’ Specifically, if a UDF relies on short-circuiting semantics in SQL for null checking, there’s no guarantee that the null check will happen before invoking the UDF.

β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€”

Example:

β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€”

This WHERE clause does not guarantee the strlen UDF to be invoked after filtering out nulls.

β†’ To perform proper null checking, we recommend that you do either of the following:

  • Make the UDF itself null-aware and do null checking inside the UDF itself
  • Use IF or CASE WHEN expressions to do the null check and invoke the UDF in a conditional branch

β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€”

Example:

β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€” β€”

Hope you got an idea on UDF πŸ’»β€¦

See you in next blog…

CheersπŸŽ—β€¦

Ramya R β™₯

RESOURCES:

https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/

IMAGES:

--

--