A deeper look into Spark User Defined Functions

Suffyan Asad
10 min readDec 4, 2022

--

This article provides a basic introduction to UDFs, and using them to manipulate complex, and nested array, map and struct data, with code examples in PySpark.

Photo by Joshua Sortino on Unsplash

Introduction to Spark User Defined Functions (UDFs)

User Defined Functions in Apache Spark allow extending the functionality of Spark and Spark SQL by adding custom logic.

The official Spark documentation describes User Defined Function as:

User-Defined Functions (UDFs) are user-programmable routines that act on one row. This documentation lists the classes that are required for creating and registering UDFs. It also contains examples that demonstrate how to define and register UDFs and invoke them in Spark SQL.

Following is the link to the Spark documentation about UDFs:

Another property of the UDFs is that they act on one row, and therefore, are invoked for every row in the data set. This should be kept in mind when writing UDF logic in a way that takes long time for each execution.

The custom logic inside the UDFs can be used to implement functionality that is not present in the selected version of Apache Spark. Although the list of generic functionality not available in Spark is shrinking with every release, still, there can be many cases when a User Defined function is helpful, specially when writing jobs for Spark 2.x which is still quite popular.

Basic User Defined Function

The simple user-defined function is the one that takes no input, and returns a random number. This basic UDF can be defined as a Python function with the udf decorator.

Simple User Defined Function (UDF)

This User Defined Function has no inputs, and output type is int. It returns a random number between 1 and 10.

The @udf decorator converts it to a User Defined Function, and therefore, it can be used as a column definition in a Spark DataFrame as:

Code repository and boilerplate code

Before going forward, here is a slight detour. Link to the repository containing example code is:

Following is some boilerplate code to create the SparkSession that will be used in all examples:

pyspark.sql.functions.udf decorator

The @udfdecorator used is pyspark.sql.functions.udf, and following is the link to its documentation:

It takes the function itself (or a lambda), and a return type as parameters. Return type parameter is optional, and it can either be a string or a DataType. This can be used to define the return schema, and will be used in later examples.

The aforementioned random number UDF can also be defined, and used as:

Notes on non-deterministic result from the UDF

This user defined function is a non-deterministic UDF because each execution of the function returns a different number because of the way random.Random works. This can be an issue if randomness is needed between rows, but once evaluated, the result for each row needs to be preserved. Or the requirement is to ensure that re-evaluations of the UDF return the same result.

This has been demonstrated in the following example — the UDF is used to defined a column called random once, but the data frame is being printed twice using .show() function. Each call to the .show() function gives different values for same rows:

Following is the result of the code above:

It might be tempting to use the .cache() function to cache the DataFrame after calling a non-deterministic function, and it might work as desired during testing. But if the function has to be executed more than once, either due to recreation of lost data, or as a result of an optimization decision by Spark, the same rows will get different values, despite caching in memory. If such scenarios arise, the best solution is to checkpoint, or to write data immediately to persistent storage as intermediate data, and begin a new step, after calling the non-deterministic function.

Handling nested data columns in User Defined Functions (UDFs)

The user defined functions (UDFs) can handle complex data columns containing nested array, map or struct (StructType) data. Most of the time I have to write UDFs, it is to either create such data to meet expectations of downstream systems, to filter, or to filter, or calculate some results using some values present within such data. Here I will try to present some ways to handle such data in UDFs, with the intention to make this article serve as a reference when you encounter similar requirements.

Processing Array data — Parse a column containing array of date string to get minimum date

If the data is an array, it can be processed as a list within the UDF, and all the operations that can be performed on the list can be performed. The following example will demonstrate this.

Consider a scenario where source data has an array of dates as string that are not in order, and the objective is to determine the minimum date. The UDF will parse each date string to datetime.date, and determine the minimum by calling the min() function. The return type of the function is datetime.date, and is interpreted by Spark as DateType.

Here, we are using the return type argument of the udf decorator function to specify that the return type is a date.

The following code generates the test data, and runs the UDF on it to populate the minimum_date column:

The code produces the following result:

The minimum_date has the data type of date, and it can be seen in the printed schema.

When data passed to UDF doesn’t match the data type expected by the function

When there is a mismatch between the data type of the column passed to the UDF, and the data expected by the UDF, or when Spark cannot cast the Spark data types to python data types, the UDF returns null values. For example, passing the above dates column to the following UDF which expects a list of integers, puts null in the minimum_date column for all rows:

Using two columns in a UDF — Parsing a column containing array of date strings to get minimum date after a date in another column

This is a slightly more complex example. Consider a case where, as previous example, the task is find the minimum date from an array of date strings, but with an additional requirement: find the minimum date that is after another date mentioned in another column. Given the following example data should produce the result in the minimum_date column:

The UDF will take two columns as input, the dates column and the minimum_after column:

The UDF takes two inputs: the first is the a of strings, which contains the list of dates to determine minimum from. And the second input is a parameter of type datetime.date, which is the date after which the minimum date needs to be found (or return None otherwise). The code inside the UDF first filters the dates that are greater than the minimum_after param. And then, if there is any date present in the array, return the minimum, or return None otherwise. The following code creates the example data, and then runs the UDF on it to get the result:

The following result is obtained by running the code:

Handling Map Data — Aggregating list of maps to a single map:

In PySpark, map type (pyspark.sql.types.MapType) data can be manipulated as dictionaries in User Defined Functions.

The example to demonstrate working with map data will take a list of maps, each representing sales of a car in different cities on a day in the date column. Our objective is to group the dataset by car make and model, and calculate total sales in each city. The following UDF and code implements it:

The UDF aggregate_maps takes a list of dictionaries, and uses two for-loops to aggregate the dictionaries to a single dictionary, adding the values if key exists, and adding the key with its value if it doesn’t already exist in the result dictionary.

The complete flow first groups the dataset by make and model columns, and then aggregates the sales_by_city map column using collect_list function, which converts all the values of the group into an array. The UDF is then called on the array to produce the final result.

The function to create test data is:

And when executed on the test data, the following result is returned:

Manipulating Structs — Aggregating list of Map to result in Struct column

Struct (StructType) data can be created in a UDF by returning result of each execution as a pyspark.sql.types.Row, and by defining the schema of StructType in the return type of the UDF. Struct can have fields that contain multiple types of data, which can be an advantage over maps that can have only one data type keys and for values. But the fields of struct need to be predetermined, while there is no such restriction in maps.

Consider a modified version of the car sales data used in the previous example. We now have, for each car and date, a map mentioning the number of online sales, and number of sales made at the dealerships. The requirement is to first create a Struct column that represents sales at dealerships, and online sales. In addition to this, there is another column to be added to the struct, called did_sell_online, which is true is online sales is > 0.

The UDF to convert the map column sales to StructType is:

In this case, correctly specifying the return type schema to the udf decorator is very important. If it is not provided, or is incorrect, Spark throws an exception: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict.

Next, the UDF that manipulates the list of sales to produce the final result is:

StructType field is passed to the UDF as pyspark.sql.types.Row. Fields in the struct can be accessed by using the square bracket notation. The UDF above accesses the two types of sales, calculates the did_sell_online, and returns the result as a new instance of pyspark.sql.types.Row. The following code creates test data and executes the two UDFs to get the final result:

Executing the code gives the following result:

Drawbacks of using User Defined Functions

Although this point is mentioned repeatedly, I want to restate that implementing User Defined Functions should not be the first option. This is mainly due to the performance implications of UDFs. Jacek Laskowski mentions the following about User Defined Functions in his online documentation on Spark:

Use the higher-level standard Column-based functions (with Dataset operators) whenever possible before reverting to developing user-defined functions since UDFs are a blackbox for Spark SQL and it cannot (and does not even try to) optimize them.

Following is the link to the reference:

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-udfs.html

This point is further explained with examples in the article:

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-udfs-blackbox.html

To summarize, User Defined Functions (UDFs) should be the last choice, and as much as possible, built-in functions in Spark should be used. UDFs suffer from many problems related to performance and optimization, such as:

  • UDFs are black-box for optimization, and therefore, cannot be properly be optimized by Spark. For example, optimizations such as filter pushdown to source data are not applied if those filters are defined in UDFs.
  • There is additional overhead in using UDFs in PySpark, because the structures native to the JVM environment that Spark runs in, have to be converted to Python data structures to be passed to UDFs, and then the results of UDFs have to be converted back.

Implementing Parse a column containing array of date string to get minimum date Example without UDFs

The previously presented example uses UDF to parse the date strings and to find the minimum. The transformation to date can be done using the pyspark.sql.functions.transform function, and the minimum can then be found using the pyspark.sql.functions.array_min function. The following code implements this:

The transform function provides custom transformations on data using PySpark SQL functions on each element of an array. In this case, pyspark.sql.to_date function is being called on each date string to convert it to date. This code will produce the same result as the UDF min_number_udf.

My previous article on Spark SQL Aggregate and Transform functions provides some examples of how these functions can be used to manipulate complex nested data instead of using UDFs:

Performance test — Compare execution time of processing 500,000 rows of date string array example

To compare the performance of UDF with Spark SQL functions, data containing 500,000 rows of list of date string will be used. The data will be processed to calculate the minimum date for each row, and then will be grouped by the minimum date to produce the final result. The calculation of minimum date for each row will be done first by using the UDF, and then by using Spark SQL functions. The output looks like the following:

The execution times of data containing 500,000 rows are:

This small-scale test demonstrates that, even for a small dataset containing 500,000 narrow rows, and only one operation before grouping, using the built-in Spark SQL functions is 3 to 4 seconds faster than using the UDF. This difference will increase in real-world scenarios on larger data and more manipulations. Therefore, when considering to use User Defined Functions, their performance implications should be considered and tested.

--

--

Suffyan Asad

Data Engineer | Passionate about data processing at scale | Fulbright and George Washington University alum | https://pk.linkedin.com/in/suffyan-asad-421711126