How to Unittest PySpark UDFs

Albert Franzi
Albert Franzi
Published in
3 min readFeb 1, 2019

Test the smallest things to guarantee the right functionality of the big distributed ones.

This short post aims to cover how to test PySpark UDFs without requiring to launch a Spark session.

Defining a PySpark UDF

Since Spark 1.3, we have the udf() function, which allows us to extend the native Spark SQL vocabulary for transforming DataFrames with python code.

In PySpark UDFs can be defined in one of two ways: calling the udf method or -thanks to pythons syntactic sugar- as function annotations

Unittest the PySpark UDFs

Once we have defined our UDFs, we proceed to unit test them and ensure that the associated Spark transformers behave as expected.

In our first example, where we have created the UDF with a lambda function, one would expect the UDF to be callable as a method, and hence be able to test its behavior:

In the second case, we are creating the UDF from an existing method. This at least allows us to test the method without the UDF syntax.

That’s great! However, let’s see what happens when we resort in the udf annotation, which leads to shorter and better readable code:

In this case, the udf() annotation has the same behavior as the first case. It’s creating a partial function that evaluates our function ‘to_upper_list’ and expecting the input parameters as Columns.

So, if we want to test our UDFs functions that are created using the 1st approach or the 3rd one, we have to mock the udf annotation.

Mocking the UDF annotation in PySpark

I know, I know…, why PySpark doesn’t provide a good way to test the UDFs without mocking its own behavior?

Well, I know that the next solution is far from ideal, but well, if we want to use the udf() decorator and test it without launching a Spark Session, we have to mock it.

The first approach, it’s using the monkeypatch from PyTest, however, it forces us to import our UDF package inside each test method. Since the decorator is applied when the class definition is loaded. So, as soon as we import the our_package.spark.udfs module, it loads the decorator.

A neater approach is using the mock.patch method before importing our UDFs module. This second approach doesn’t require to import our module in each test case. However, this approach relies on us setting the imports in the proper order.

That’s nice and cleaner right? Not so fast my friend. If you have other tests which are testing a real UDF or some DataFrame transforms logics that are using a Spark Session, you will end up having the same issue, since the udf module could be loaded already.

If that’s the case, we need to proceed with the following solution, where we start the patch and reload the UDF module inside the setUpClass method. This reload will allow us to apply the new patched udf annotation to our UDFs, so we will be ready to test them properly.

After that, remember to stop the patch and reload the UDFs module with the original udf annotation to avoid impacting other tests.

Summary

To wrap up, I’m not proud of patching Spark code to test custom UDFs. Even, if it’s possible to define just the function and then wrapping it inside the udf.

Having said this, I found really useful the udf annotation, since it provides good readability to the code and brings visibility to the methods that are being used as UDF.

I’m more used coding with Scala Spark, but sometimes for projects and teams requirements I also code with the PySpark API. That makes me wonder about how other people are testing their PySpark UDFs.

Links Of Interest

--

--