Spark UDFs, how to write them and some gotchas?

UDFs are one exciting aspect of spark which has evolved tremendously over the spark releases. We will try to cover different aspects of it.

Spark UDFs are awesome!!

It is pretty straight forward and easy to create it in spark. Let's say we have this customer data from Central Perk. If you look at the country data, it has a lot of discrepancies but we kinda know its the right country, it's just that the way it is entered is not typical. Let’s say we need to normalize it to the USA that is similar with the help of a known dictionary.

val userData = spark.createDataFrame(Seq(
(1, "Chandler", "Pasadena", "US"),
(2, "Monica", "New york", "USa"),
(3, "Phoebe", "Suny", "USA"),
(4, "Rachael", "St louis", "United states of America"),
(5, "Joey", "LA", "Ussaa"),
(6, "Ross", "Detroit", "United states")
)).toDF("id", "name", "city", "country")
val allUSA = Seq("US", "USa", "USA", "United states", "United states of America")
//All possible combinations of US that could be misspelled

It is easy to clean this data if userData were a Scala collection and we had a List of all possible combinations of US that could be misspelled data.

def cleanCountry = (country: String) => {
val allUSA = Seq("US", "USa", "USA", "United states", "United states of America")
if (allUSA.contains(country)) {
"USA"
}
else {
"unknown"
}
}

It would be really nice and easy if we were to use this cleanCountry function inside spark, isn’t it? That’s where UDF’s come into the picture.

Now that we have the cleanCountry function. We can go ahead and register it with our spark session.

val normaliseCountry = spark.udf.register("normalisedCountry",cleanCountry)
//spark is the SparkSession here.Available by default in spark-shell

Now we can use normaliseCountry scala function as if it’s spark SQL functions.

userData.withColumn("normalisedCountry",normaliseCountry(col("country"))).show
+---+--------+--------+------------------------+----------------+
|id |name |city |country |normalisedCountry|
+---+--------+--------+------------------------+----------------+
|1 |Chandler|Pasadena|US |USA |
|2 |Monica |New york|USa |USA |
|3 |Phoebe |Suny |USA |USA |
|4 |Rachael |St louis|United states of America|USA |
|5 |Joey |LA |Ussaa |unknown |
|6 |Ross |Detroit |United states |USA |
+---+--------+--------+------------------------+----------------+

There are multiple ways to register UDFs actually.

When using in Spark SQL, the only difference is that we don’t need the UserDefinedFunction val normaliseCountry as we did in the above example.

userData.createOrReplaceTempView("user_data")
spark.udf.register("cleanCountry", cleanCountry)
spark.sql("select * ,cleanCountry(country) as normalisedCountry from user_data")
scala> spark.sql("select * ,cleanCountry(country) as normalisedCountry from user_data").show(false)
+---+--------+--------+------------------------+-----------------+
|id |name |city |country |normalisedCountry|
+---+--------+--------+------------------------+-----------------+
|1 |Chandler|Pasadena|US |USA |
|2 |Monica |New york|USa |USA |
|3 |Phoebe |Suny |USA |USA |
|4 |Rachael |St louis|United states of America|USA |
|5 |Joey |LA |Ussaa |unknown |
|6 |Ross |Detroit |United states |USA |
+---+--------+--------+------------------------+-----------------+

We can also use the udf method from sql.functions

scala> import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.functions.{col, udf}
scala> val cleanCountryUdf = udf(cleanCountry)
cleanCountryUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
scala> userData.withColumn("normalisedCountry",cleanCountryUdf(col("country"))).show(false)
+---+--------+--------+------------------------+-----------------+
|id |name |city |country |normalisedCountry|
+---+--------+--------+------------------------+-----------------+
|1 |Chandler|Pasadena|US |USA |
|2 |Monica |New york|USa |USA |
|3 |Phoebe |Suny |USA |USA |
|4 |Rachael |St louis|United states of America|USA |
|5 |Joey |LA |Ussaa |unknown |
|6 |Ross |Detroit |United states |USA |
+---+--------+--------+------------------------+-----------------+

To list all the UDFs:

scala> spark.sql("SHOW USER FUNCTIONS").collect
res79: Array[org.apache.spark.sql.Row] = Array([cleancountry], [normalisecountry])

Caveats of Using Spark UDFs:

Spark UDFs are not good but why??

1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark’s optimizer. Let’s consider an example of a general optimization when reading data from Database or columnar format files such as Parquet is PredicatePushdown. What it does is minimize the amount of data that is read from the source itself.

Let's persist the above userData df as parquet file and read the dataframe back as df1 to simulate data that is read from the parquet file.

userData.write.mode("overwrite").parquet("/data/parquetdata/userData.parquet")
val df1: DataFrame = spark.read.parquet("/data/parquetdata/userData.parquet")

Let’s say we want to filter just the data with name Joey as we all love him! If we check the query that is executed for that.

df1.where('name === "Joey").queryExecution.executedPlanres5: org.apache.spark.sql.execution.SparkPlan =
*(1) Project [id#20, name#21, city#22, country#23]
+- *(1) Filter (isnotnull(name#21) && (name#21 = Joey))
+- *(1) FileScan parquet [id#20,name#21,city#22,country#23] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/data/parquetdata/userData.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Joey)], ReadSchema: struct<id:int,name:string,city:string,country:string>

Don’t worry about the clutter around the execution plan but this is what is executed when we try to filter the data. This is equivalent to having select * from table where name = ‘Joey’ instead of doing select * from table and then filtering out inside the spark cluster.

But say we do the same operation using a UDF instead.

val  isJoey = udf((name:String) => name == "Joey")df1.where(isJoey('name)).queryExecution.executedPlan
res6: org.apache.spark.sql.execution.SparkPlan =
*(1) Filter UDF(name#21)
+- *(1) FileScan parquet [id#20,name#21,city#22,country#23] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/data/parquetdata/userData.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,name:string,city:string,country:string>

So the moral of the story is that there are many cool optimizations that spark does it for us and we will be shying away from that if we end up using UDFs. So it is always suggested to avoid UDFs as long as it is inevitable.

2) One more caveat is the way null values are handled. It is the responsibility of the programmer to make sure that the UDFs are handled gracefully. Let’s say we have a UDF that converts strings to camelCase string.

val stringsData: DataFrame = Seq("hello world", "welcome to spark",null).toDF("strings")val camelCase = spark.udf.register("convertToCamelCase", (s: String) =>  s.split(" ").map(x => x.capitalize).mkString(" "))

This UDF will error out :

org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (string) => string)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1066)
Caused by: java.lang.NullPointerException
at $anonfun$1.apply(<console>:25)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:108)

Hence we should make sure that null values are handled gracefully.

val nullSafeCamelCase = spark.udf.register("nullSafeconvertToCamelCase", (s:String) => if (s!= null)s.split(" ").map(x => x.capitalize).mkString(" ") else "null found")

I will be discussing how UDFs play with non-JVM languages especially pySpark, tradeoffs of using UDFs in Spark in Scala vs Python and about some new features such as vectorized UDFs in the next blog. Happy learning! As always, Thanks for reading! Please do share the article, if you liked it. Any comments or suggestions are welcome! Check out my other articles here.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store