User Defined function in PySpark

Vaishali S
6 min readMay 8, 2022

--

Hi Everyone!!! In this blog we will learn about User defined functions in PySpark.

The Spark SQL provides the PySpark UDF (User Define Function) that is used to define a new Column-based function.

Let’s look detailed about it…

What is UDF?

  • PySpark UDF is a User Defined Function that is used to create a reusable function in Spark.
  • Once UDF created, that can be re-used on multiple DataFrames and SQL (after registering).
  • The default type of the udf() is StringType.
  • You need to handle nulls explicitly otherwise you will see side-effects.
  • The PySpark UDF (User Define Function) that is used to define a new Column-based function.

Why do we need a Spark UDF?

⯮ UDF’s are used to extend the functions of the framework and re-use this function on several DataFrame.

⯮ For example if you wanted to convert the every first letter of a word in a sentence to capital case, spark build-in features does’t have this function hence you can create it as UDF and reuse this as needed on many Data Frames.

Create PySpark UDF

⋙ Before we jump in creating a UDF, first let’s create a PySpark Dataframe

Create a DataFrame:

Example:

columns = ["Seqno","Name"]
data = [("1", "john jones"),
("2", "tracey smith"),
("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

Output:

+-----+------------+
|Seqno|Names |
+-----+------------+
|1 |john jones |
|2 |tracey smith|
|3 |amy sanders |
+-----+------------+

Create a Python Function

⋙ Below code creates a function convertCase() which takes a string parameter and converts the first letter of every word to capital letter. UDF’s take parameters of your choice and returns a value.

Example:

def convertCase(str):
resStr=""
arr = str.split(" ")
for x in arr:
resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
return resStr

Convert a Python function to PySpark UDF:

➥ Here we will convert this function convertCase() to UDF by passing the function to PySpark SQL udf()

➥ This function is available at org.apache.spark.sql.functions.udf package. Make sure you import this package before using it.

➥ PySpark SQL udf() function returns org.apache.spark.sql.expressions.UserDefinedFunction class object.

#converting function to UDF
convertUDF = udf(lambda z: convertCase(z),StringType())

◙ The default type of the udf() is StringType hence, you can also write the above statement without return type.

#Converting function to UDF StringType() is by default hence not required
convertUDF = udf(lambda z: convertCase(z))

Using UDF with DataFrame

1. withColumn():

⯮ Here, will create another upperCase() function which converts the input string to upper case.

def upperCase(str):
return str.upper()

⯮ Let’s convert upperCase() python function to UDF and then use it with DataFrame withColumn(). Below example converts the values of “Name” column to upper case and creates a new column “Curated Name”

upperCaseUDF = udf(lambda z:upperCase(z),StringType())   

df.withColumn("Cureated Name", upperCaseUDF(col("Name"))) \
.show(truncate=False)

Output:

+-----+------------+-------------+
|Seqno|Name |Cureated Name|
+-----+------------+-------------+
|1 |john jones |JOHN JONES |
|2 |tracey smith|TRACEY SMITH |
|3 |amy sanders |AMY SANDERS |
+-----+------------+-------------+

2. select():

⯮ Now convertUDF() on a DataFrame column as a regular build-in function.

df.select(col("Seqno"), \
convertUDF(col("Name")).alias("Name") ) \
.show(truncate=False)

Output:

+-----+-------------+
|Seqno|Name |
+-----+-------------+
|1 |John Jones |
|2 |Tracey Smith |
|3 |Amy Sanders |
+-----+-------------+

3. Registering PySpark UDF & use it on SQL:

⯮ In order to use convertCase() function on PySpark SQL, you need to register the function with PySpark by using spark.udf.register().

spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
.show(truncate=False)

Output:

+-----+-------------+
|Seqno|Name |
+-----+-------------+
|1 |John Jones |
|2 |Tracey Smith |
|3 |Amy Sanders |
+-----+-------------+

4. Creating UDF using annotation

◙ In the previous sections, learned about creating a UDF is a 2 step process,

First, you need to create a Python function

Second convert function to UDF using SQL udf() function

◙ You can avoid these two steps and create it with just a single step by using annotations.

@udf(returnType=StringType()) 
def upperCase(str):
return str.upper()

df.withColumn("Cureated Name", upperCase(col("Name"))) \
.show(truncate=False)

Output:

+-----+------------+-------------+
|Seqno|Name |Cureated Name|
+-----+------------+-------------+
|1 |john jones |JOHN JONES |
|2 |tracey smith|TRACEY SMITH |
|3 |amy sanders |AMY SANDERS |
+-----+------------+-------------+

Evaluation Order and null checking

➨ PySpark SQL doesn’t give the assurance that the order of evaluation of subexpressions remains the same. It is not necessary to evaluate Python input of an operator or function left-to-right or in any other fixed order. For example, logical AND and OR expressions do not have left-to-right “short-circuiting” semantics.

➨ Therefore, it is quite unsafe to depend on the order of evaluation of a Boolean expression. For example, the order of WHERE and HAVING clauses, since such expressions and clauses can be reordered during query optimization and planning. If a UDF depends on short-circuiting semantics (order of evaluation) in SQL for null checking, there’s no surety that the null check will happen before invoking the UDF.

Primitive type Outputs

➥ Let’s consider a function square() that squares a number, and register this function as Spark UDF.

def square(x):

return x**2

➥Now we convert it into the UDF. While registering, we have to specify the data type using the pyspark.sql.types. The problem with the spark UDF is that it doesn’t convert an integer to float, whereas, Python function works for both integer and float values. A PySpark UDF will return a column of NULLs if the input data type doesn’t match the output data type.

◘ Let’s consider the following program:

Output:

+--------+------+-----------+-------------+
|integers|floats|int_squared|float_squared|
+--------+------+-----------+-------------+
| 1| -1.0| 1| null|
| 2| 0.5| 4| null|
| 3| 2.7| 9| null|
+--------+------+-----------+-------------+

Register with UDF with float type output

Example:

Output:

+--------+------+-----------+-------------+
|integers|floats|int_squared|float_squared|
+--------+------+-----------+-------------+
| 1| -1.0| null| 1.0|
| 2| 0.5| null| 0.25|
| 3| 2.7| null| 7.29|
+--------+------+-----------+-------------+

Specifying float type output using the Python function

Output:

+--------+------+-----------+-------------+
|integers|floats|int_squared|float_squared|
+--------+------+-----------+-------------+
| 1| -1.0| 1.0| 1.0|
| 2| 0.5| 4.0| 0.25|
| 3| 2.7| 9.0| 7.29|
+--------+------+-----------+-------------+

Composite Type Output

⯮ If the output of Python functions is in the form of list, then the input value must be a list, which is specified with ArrayType() when registering the UDF.

Output:

+--------------+------------------------+
|integer_arrays|(integer_arrays)|
+--------------+------------------------+
| [1, 2]| [1.0, 4.0]|
| [3, 4, 5]| [9.0, 16.0, 25.0]|
| [6, 7, 8, 9]| [36.0, 49.0, 64.0...|
+--------------+------------------------+

Let’s Catch you all in next blog… Any Questions? Please pin me in the comment section, will get back to you :)

Image Source:

Resouces:

https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/

--

--