Spark User Defined Functions (UDFs)

Matthew Powers
Dec 27, 2017 · 3 min read

Simple UDF example

def lowerRemoveAllWhitespace(s: String): String = {
s.toLowerCase().replaceAll("\\s", "")
}

val lowerRemoveAllWhitespaceUDF = udf[String, String](lowerRemoveAllWhitespace)
val sourceDF = spark.createDF(
List(
(" HI THERE "),
(" GivE mE PresenTS ")
), List(
("aaa", StringType, true)
)
)

sourceDF.select(
lowerRemoveAllWhitespaceUDF(col("aaa")).as("clean_aaa")
).show()
+--------------+
| clean_aaa|
+--------------+
| hithere|
|givemepresents|
+--------------+
val anotherDF = spark.createDF(
List(
(" BOO "),
(" HOO "),
(null)
), List(
("cry", StringType, true)
)
)

anotherDF.select(
lowerRemoveAllWhitespaceUDF(col("cry")).as("clean_cry")
).show()
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 3.0 failed 1 times, most recent failure: Lost task 2.0 in stage 3.0 (TID 7, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function(anonfun$2: (string) => string)Caused by: java.lang.NullPointerExceptionCause: org.apache.spark.SparkException: Failed to execute user defined function(anonfun$2: (string) => string)Cause: java.lang.NullPointerException
def betterLowerRemoveAllWhitespace(s: String): Option[String] = {
val str = Option(s).getOrElse(return None)
Some(str.toLowerCase().replaceAll("\\s", ""))
}

val betterLowerRemoveAllWhitespaceUDF = udf[Option[String], String](betterLowerRemoveAllWhitespace)
val anotherDF = spark.createDF(
List(
(" BOO "),
(" HOO "),
(null)
), List(
("cry", StringType, true)
)
)

anotherDF.select(
betterLowerRemoveAllWhitespaceUDF(col("cry")).as("clean_cry")
).show()
+---------+
|clean_cry|
+---------+
| boo|
| hoo|
| null|
+---------+
== Physical Plan ==
*Project [UDF(cry#15) AS clean_cry#24]
+- Scan ExistingRDD[cry#15]

Using Column Functions

def bestLowerRemoveAllWhitespace()(col: Column): Column = {
lower(regexp_replace(col, "\\s+", ""))
}
val anotherDF = spark.createDF(
List(
(" BOO "),
(" HOO "),
(null)
), List(
("cry", StringType, true)
)
)

anotherDF.select(
bestLowerRemoveAllWhitespace()(col("cry")).as("clean_cry")
).show()
+---------+
|clean_cry|
+---------+
| boo|
| hoo|
| null|
+---------+
anotherDF.select(
bestLowerRemoveAllWhitespace()(col("cry")).as("clean_cry")
).explain()
== Physical Plan ==
*Project [lower(regexp_replace(cry#29, \s+, )) AS clean_cry#38]
+- Scan ExistingRDD[cry#29]

Conclusion

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade