Spark - Higher Order Functions

Unleash their power!

Rohit Tayde
Globant
7 min readAug 1, 2023

--

Photo by Karsten Würth on Unsplash

It is common to have complex data types such as struct, map, and array when working with semi-structured formats, in big data processing, especially in Apache Spark. As the demand for scalable and efficient data processing continues to rise, understanding and leveraging higher-order functions in Spark becomes essential for developers and data engineers. Higher-order functions (HOFs) are great for processing nested data like array within Spark. In this article, we will look at what higher-order functions are and how they can be used in Spark SQL. We will demonstrate their usage through practice examples and explore the underline mechanism.

What are HOFs?

A Higher Order Function (HOF) is a concept in functional programming where a function can take one or more functions as arguments and return a function as its result. In other words, HOFs treat functions as first-class citizens, just like any other data type.

HOFs provide several advantages:

  • Abstraction: By passing functions as arguments or returning them as results, HOFs allow developers to abstract away specific implementations and focus on the overall behavior or transformation. This promotes code reusability and modularity.
  • Flexibility: HOFs provide a way to parameterize behavior, allowing functions to be customized or extended with different behaviors. This enables greater flexibility in handling complex or varying requirements.
  • Composition: HOFs can be combined and composed together to create more complex functionality. By chaining multiple functions, developers can build pipelines or sequences of operations, enhancing code readability and maintainability.
  • Encapsulation: HOFs encapsulate behavior, making code more concise and self-contained. This reduces code duplication and promotes cleaner, more readable code.

The support for processing complex data types(struct, map, and array) has increased since Spark 2.4 by releasing HOFs. In this article, we will take a look at how the below HOFs can be efficiently used:

To demonstrate these functions, let's create a sample dataset that holds a column name as StringType and a column numbers as ArrayType:

from pyspark.sql import SparkSession
from pyspark.sql.types import *

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
schema = StructType([StructField('name',StringType(), True)
,StructField('numbers',ArrayType(LongType()), True
)])

# Sample DataFrame with an array column
data = [("Alice", [1, 2, 3]), ("Bob", [4, 5, 6]), ("Charlie", [7, 8, 9])]
df = spark.createDataFrame(data, schema=schema)

# create view
df.createOrReplaceTempView("view_v")

# show sample data
spark.sql("""select * from view_v""").show()

# Output:
+-------+---------+
| name| numbers|
+-------+---------+
| Alice|[1, 2, 3]|
| Bob|[4, 5, 6]|
|Charlie|[7, 8, 9]|
+-------+---------+

Let's say we want to achieve the below independent tasks :

  1. Find the square of each element in numbers and store it in the new array column squared_number.
  2. Find an even number from the numbers column and store it in the new array column even_number.
  3. Flag records to true if numbers contains an even prime number and store it in the new array column prime_even_number(Hint: 2 is the only even prime number).
  4. Sum each element in numbers, if the sum is an even number, mark it as true else false and store it in the new column even_sum.

Transform()

Its signature is transform(array<T>, function<T, U>): array<U>.

  • array<T>: This parameter represents the input array on which the transformation will be applied. The elements in the array are of type T. It could be an array of integers, strings, or custom objects.
  • function<T, U>: This parameter is a unary function that takes an element from the input array (T) and performs a transformation on it. The function returns an element of a different type U. It defines the logic for how each array element should be transformed.
  • array<U>: The return type of the transform function is a new array containing the transformed elements of type U. The transformed array will have the same length as the input array, with each element transformed based on the provided function.

To achieve task #1, we can use transform to iterate over each element in numbers and perform transformation using lambda expression x*x as below:

# Task-1
# Find the square of each element in numbers and store it in the new array
# column squared_number

spark.sql("""
select
name,
numbers,
transform(numbers, x -> x * x) as squared_number
from
view_v
""").show()

# Output:
+-------+---------+--------------+
| name| numbers|squared_number|
+-------+---------+--------------+
| Alice|[1, 2, 3]| [1, 4, 9]|
| Bob|[4, 5, 6]| [16, 25, 36]|
|Charlie|[7, 8, 9]| [49, 64, 81]|
+-------+---------+--------------+

Filter()

Its signature is filter(array<T>, function<T, Boolean>): array<T>.

  • array<T>: This parameter represents the input array from which elements will be filtered. The elements in the array are of type T. It could be an array of integers, strings, or custom objects.
  • function<T, Boolean>: This parameter is a unary function that takes an element from the input array (T) and evaluates a condition on it. The function returns a Boolean value (true or false) based on whether the element satisfies the condition. Elements that return true are included in the resulting filtered array.
  • array<T>: The return type of the filter function is a new array that contains the elements from the input array that satisfy the condition defined by the function. The order of the elements in the resulting array will match their order in the input array.

So here, to achieve task #2, we can use filter with a lambda expression x%2=0, elements matching this lambda expression will be returned in an array as below:

# Task-2
# Find an even number from the numbers column and store it in the
# new array column even_number

spark.sql("""
select
name,
numbers,
filter(numbers, x -> (x % 2 = 0) ) as even_number
from view_v""").show()

# Output:
+-------+---------+-----------+
| name| numbers|even_number|
+-------+---------+-----------+
| Alice|[1, 2, 3]| [2]|
| Bob|[4, 5, 6]| [4, 6]|
|Charlie|[7, 8, 9]| [8]|
+-------+---------+-----------+

Exists()

Its signature is exists(array<T>, function<T, V, Boolean>): Boolean.

  • array<T>: This parameter represents the input array on which the existence check will be performed. The elements in the array are of type T. It could be an array of integers, strings, or custom objects.
  • function<T, V, Boolean>: This parameter is a unary function that takes an element from the array (T) and an additional value (V) as input. It evaluates a condition based on these arguments and returns a Boolean value. The function defines the condition that needs to be satisfied by any element in the array.
  • Boolean: The return type of the exists function is a Boolean value. It indicates whether there exists at least one element in the array that satisfies the specified condition.

To achieve task #3, we can use exists with lambda expression x=2 , elements matching this lambda expression are flagged as true or otherwise false. Please note that as 2 is only even prime number, for simplicity we are comparing each element in an array with 2 using lambda expression as below:

# Task-3
# Flag records to true if numbers column contains an even prime number
# and store it in the new array column prime_even_number

spark.sql("""
select
name,
numbers,
exists(numbers, x -> x = 2 ) as prime_even_number
from view_v""").show()

# Output:
+-------+---------+-----------------+
| name| numbers|prime_even_number|
+-------+---------+-----------------+
| Alice|[1, 2, 3]| true|
| Bob|[4, 5, 6]| false|
|Charlie|[7, 8, 9]| false|
+-------+---------+-----------------+

Reduce()

Its signature is slightly different than other HOFs. It requires the accumulator(buffer) to hold aggregated value, reduce(array<T>, B, function<B, T, B>, function<B, C>): C.

  • array<T>: This parameter represents the input array on which the reduction operation will be performed. The elements in the array are of type T. For example, it could be an array of integers, strings, or custom objects.
  • B: This parameter represents the initial value, also known as the accumulator, of type B. The accumulator is an intermediate value that gets updated iteratively during the reduction process. Its type can be different from the input array elements (T).
  • function<B, T, B>: This parameter is a binary function that takes two arguments, the accumulator (B) and an element from the input array (T). It performs a computation using these two arguments and returns an updated value for the accumulator (B). The function defines the logic for how the reduction operation should be performed.
  • function<B, R>: This parameter is a function that takes the final value of the accumulator (B) and transforms it into a result of the type C. This function provides the flexibility to convert the final accumulator value into a different type or perform additional computations on it.
  • C: This represents the return type of the HOF, which is the final result of the reduction operation. It can be any desired type (C) based on the requirement.

To accomplish the last task #4, we can use reduce function, which will iterate over each element of an array and then add that to the accumulator(acc) which is holding 0L initially. Based on the final value of the accumulator, it returns true/false using lamda expression acc -> if(acc%2=0, ‘true’, ‘false’) as below:

# Task-4
# Sum each element in numbers, if the sum is an even number,
# mark it as true else false and store it in the new column even_sum.

spark.sql("""
select
name,
numbers,
reduce(numbers, 0L, (number, acc) -> number + acc,
acc -> if(acc%2=0, 'true', 'false') ) as even_sum
from view_v""").show()

# Output:
+-------+---------+--------+
| name| numbers|even_sum|
+-------+---------+--------+
| Alice|[1, 2, 3]| true|
| Bob|[4, 5, 6]| false|
|Charlie|[7, 8, 9]| true|
+-------+---------+--------+

Conclusion

In this article, we have seen examples of HOFs that allow us to transform, filter, check for existence, and aggregate elements in an array. Before HOFs were released, most of these problems had to be solved using User-Defined functions (UDFs). It is easy to employ HOFs than to write UDFs. Read the Databricks blog on SQL higher-order functions for more information on the whys. Also, the HOF approach is however more efficient in terms of performance.

Reference

--

--