Apache Spark SQL User Defined Functions..!

Pyspark SQL & the concept of UDF

Sandhiya M
5 min readMay 9, 2022

Hey Everyone…! Today Let’s see about spark SQL and the user-defined functions. Come on let’s get into it……

⚫ Apache Spark is the most successful software of Apache Software Foundation and is designed for fast computing.

⚫ PySpark SQL is a module in Spark which integrates relational processing with Spark’s functional programming API. We can extract the data by using an SQL query language. We can use the queries same as the SQL language.

Pyspark SQL

Feature of PySpark SQL

1) Consistent Data Access

It provides consistent data access means SQL supports a shared way to access a variety of data sources like Hive, Avro, Parquet, JSON, and JDBC.

2) Incorporation with Spark

PySpark SQL queries are integrated with Spark programs. We can use the queries inside the Spark programs.

3) Standard Connectivity

Provides a connection through JDBC or ODBC, and these two are the industry standards for connectivity for business intelligence tools.

4) User-Defined Functions

PySpark SQL has a language combined User-Defined Function (UDFs). UDF is used to define a new column-based function that extends the vocabulary of Spark SQL’s DSL for transforming DataFrame.

Now let’s have a look at the last Feature (i.e.Spar SQL User-Defined Functions)

Here, I will explain what is UDF? why do we need it and how to create and use it on DataFrame with some examples.

➤ UDF….?

In Spark, you create UDF by creating a function in a language you prefer to use for Spark.

For example, if you are using Spark with scala, you create a UDF in scala language and wrap it with udf() function or register it as udf to use it on DataFrame and SQL respectively

Why do we need UDF…?

UDF

➱ we can re-use this function on several DataFrame.

➱ Used to extend the functions of the framework.

CREATING PYSPARK UDF….

First let’s create a PySpark Dataframe

Example:

columns = ["Seqno","Name"]
data = [("1", "john jones"),
("2", "tracey smith"),
("3", "amy sanders")]
df = spark.createDataFrame(data=data,schema=columns)df.show(truncate=False)

Output:

+-----+------------+
|Seqno|Names |
+-----+------------+
|1 |john jones |
|2 |tracey smith|
|3 |amy sanders |
+-----+------------+

Create a Python Function

This code creates a function convertCase() which takes a string parameter and converts the first letter of every word to a capital letter.

UDFs take parameters of your choice and return a value.

Example:

def convertCase(str):
resStr=""
arr = str.split(" ")
for x in arr:
resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
return resStr

Convert a Python function to PySpark UDF:

➥ Here we will convert this function convertCase() to UDF bypassing the function to PySpark SQL udf()

➥ PySpark SQL udf() function returns org.apache.spark.sql.expressions.UserDefinedFunction class object.

#converting function to UDF
convertUDF = udf(lambda z: convertCase(z),StringType())

The default type of the udf() is StringType hence, you can also write the above statement without a return type.

#Converting function to UDF StringType() is by default hence not required
convertUDF = udf(lambda z: convertCase(z))

Using UDF with DataFrame

1. withColumn():

⯮ Here, will create another upperCase() the function which converts the input string to the upper case.

def upperCase(str):
return str.upper()

⯮ Let’s convert upperCase() python function to UDF and then use it with DataFrame withColumn(). Below example converts the values of the “Name” column to the upper case and creates a new column “Curated Name”

upperCaseUDF = udf(lambda z:upperCase(z),StringType())   df.withColumn("Cureated Name", upperCaseUDF(col("Name"))) \
.show(truncate=False)

Output:

+-----+------------+-------------+
|Seqno|Name |Cureated Name|
+-----+------------+-------------+
|1 |john jones |JOHN JONES |
|2 |tracey smith|TRACEY SMITH |
|3 |amy sanders |AMY SANDERS |
+-----+------------+-------------+

2. select():

⯮ Now convertUDF() on a DataFrame column as a regular build-in function.

df.select(col("Seqno"), \
convertUDF(col("Name")).alias("Name") ) \
.show(truncate=False)

Output:

+-----+-------------+
|Seqno|Name |
+-----+-------------+
|1 |John Jones |
|2 |Tracey Smith |
|3 |Amy Sanders |
+-----+-------------+

3. Registering PySpark UDF & use it on SQL:

⯮ In order to use convertCase() function on PySpark SQL, you need to register the function with PySpark by using spark.udf.register().

spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
.show(truncate=False)

Output:

+-----+-------------+
|Seqno|Name |
+-----+-------------+
|1 |John Jones |
|2 |Tracey Smith |
|3 |Amy Sanders |
+-----+-------------+

4. Creating UDF using annotation

➤ In the previous sections, learned about creating a UDF is a 2 step process,

First, you need to create a Python function

Second, convert function to UDF using SQL udf() function

➤You can avoid these two steps and create them with just a single step by using annotations.

@udf(returnType=StringType()) 
def upperCase(str):
return str.upper()
df.withColumn("Cureated Name", upperCase(col("Name"))) \
.show(truncate=False)

Output:

+-----+------------+-------------+
|Seqno|Name |Cureated Name|
+-----+------------+-------------+
|1 |john jones |JOHN JONES |
|2 |tracey smith|TRACEY SMITH |
|3 |amy sanders |AMY SANDERS |
+-----+------------+-------------+

Evaluation Order and null checking

➤PySpark SQL doesn’t give the assurance that the order of evaluation of subexpressions remains the same. For example, logical AND and OR expressions do not have left-to-right “short-circuiting” semantics.

➤Therefore, it is quite unsafe to depend on the order of evaluation of a Boolean expression.If a UDF depends on short-circuiting semantics (order of evaluation) in SQL for null checking, there’s no surety that the null check will happen before invoking the UDF.

I will Catch you all in the next blog… !!!!

--

--