Spark - Higher Order Functions
Unleash their power!
It is common to have complex data types such as struct
, map
, and array
when working with semi-structured formats, in big data processing, especially in Apache Spark. As the demand for scalable and efficient data processing continues to rise, understanding and leveraging higher-order functions in Spark becomes essential for developers and data engineers. Higher-order functions (HOFs) are great for processing nested data like array
within Spark. In this article, we will look at what higher-order functions are and how they can be used in Spark SQL. We will demonstrate their usage through practice examples and explore the underline mechanism.
What are HOFs?
A Higher Order Function (HOF) is a concept in functional programming where a function can take one or more functions as arguments and return a function as its result. In other words, HOFs treat functions as first-class citizens, just like any other data type.
HOFs provide several advantages:
- Abstraction: By passing functions as arguments or returning them as results, HOFs allow developers to abstract away specific implementations and focus on the overall behavior or transformation. This promotes code reusability and modularity.
- Flexibility: HOFs provide a way to parameterize behavior, allowing functions to be customized or extended with different behaviors. This enables greater flexibility in handling complex or varying requirements.
- Composition: HOFs can be combined and composed together to create more complex functionality. By chaining multiple functions, developers can build pipelines or sequences of operations, enhancing code readability and maintainability.
- Encapsulation: HOFs encapsulate behavior, making code more concise and self-contained. This reduces code duplication and promotes cleaner, more readable code.
The support for processing complex data types(struct
, map
, and array
) has increased since Spark 2.4 by releasing HOFs. In this article, we will take a look at how the below HOFs can be efficiently used:
To demonstrate these functions, let's create a sample dataset that holds a column name
as StringType
and a column numbers
as ArrayType
:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
schema = StructType([StructField('name',StringType(), True)
,StructField('numbers',ArrayType(LongType()), True
)])
# Sample DataFrame with an array column
data = [("Alice", [1, 2, 3]), ("Bob", [4, 5, 6]), ("Charlie", [7, 8, 9])]
df = spark.createDataFrame(data, schema=schema)
# create view
df.createOrReplaceTempView("view_v")
# show sample data
spark.sql("""select * from view_v""").show()
# Output:
+-------+---------+
| name| numbers|
+-------+---------+
| Alice|[1, 2, 3]|
| Bob|[4, 5, 6]|
|Charlie|[7, 8, 9]|
+-------+---------+
Let's say we want to achieve the below independent tasks :
- Find the square of each element in
numbers
and store it in the newarray
columnsquared_number
. - Find an even number from the
numbers
column and store it in the new array columneven_number
. - Flag records to
true
ifnumbers
contains an even prime number and store it in the newarray
columnprime_even_number
(Hint:2
is the only even prime number). - Sum each element in
numbers
, if the sum is an even number, mark it astrue
elsefalse
and store it in the new columneven_sum
.
Transform()
Its signature is transform(array<T>, function<T, U>): array<U>
.
array<T>
: This parameter represents the inputarray
on which the transformation will be applied. The elements in thearray
are of typeT
. It could be anarray
of integers, strings, or custom objects.function<T, U>
: This parameter is a unary function that takes an element from the inputarray
(T)
and performs a transformation on it. The function returns an element of a different typeU
. It defines the logic for how each array element should be transformed.array<U>
: The return type of thetransform
function is a newarray
containing the transformed elements of typeU
. The transformedarray
will have the same length as the inputarray
, with each element transformed based on the provided function.
To achieve task #1, we can use transform
to iterate over each element in numbers
and perform transformation using lambda expression x*x
as below:
# Task-1
# Find the square of each element in numbers and store it in the new array
# column squared_number
spark.sql("""
select
name,
numbers,
transform(numbers, x -> x * x) as squared_number
from
view_v
""").show()
# Output:
+-------+---------+--------------+
| name| numbers|squared_number|
+-------+---------+--------------+
| Alice|[1, 2, 3]| [1, 4, 9]|
| Bob|[4, 5, 6]| [16, 25, 36]|
|Charlie|[7, 8, 9]| [49, 64, 81]|
+-------+---------+--------------+
Filter()
Its signature is filter(array<T>, function<T, Boolean>): array<T>
.
array<T>
: This parameter represents the inputarray
from which elements will be filtered. The elements in thearray
are of typeT
. It could be anarray
of integers, strings, or custom objects.function<T, Boolean>
: This parameter is a unary function that takes an element from the inputarray
(T
) and evaluates a condition on it. The function returns a Boolean value (true
orfalse
) based on whether the element satisfies the condition. Elements that returntrue
are included in the resulting filteredarray
.array<T>
: The return type of thefilter
function is a newarray
that contains the elements from the inputarray
that satisfy the condition defined by the function. The order of the elements in the resultingarray
will match their order in the inputarray
.
So here, to achieve task #2, we can use filter
with a lambda expression x%2=0
, elements matching this lambda expression will be returned in an array
as below:
# Task-2
# Find an even number from the numbers column and store it in the
# new array column even_number
spark.sql("""
select
name,
numbers,
filter(numbers, x -> (x % 2 = 0) ) as even_number
from view_v""").show()
# Output:
+-------+---------+-----------+
| name| numbers|even_number|
+-------+---------+-----------+
| Alice|[1, 2, 3]| [2]|
| Bob|[4, 5, 6]| [4, 6]|
|Charlie|[7, 8, 9]| [8]|
+-------+---------+-----------+
Exists()
Its signature is exists(array<T>, function<T, V, Boolean>): Boolean
.
array<T>
: This parameter represents the inputarray
on which the existence check will be performed. The elements in thearray
are of typeT
. It could be anarray
of integers, strings, or custom objects.function<T, V, Boolean>
: This parameter is a unary function that takes an element from thearray
(T
) and an additional value (V
) as input. It evaluates a condition based on these arguments and returns a Boolean value. The function defines the condition that needs to be satisfied by any element in thearray
.Boolean
: The return type of the exists function is a Boolean value. It indicates whether there exists at least one element in thearray
that satisfies the specified condition.
To achieve task #3, we can use exists
with lambda expression x=2
, elements matching this lambda expression are flagged as true
or otherwise false
. Please note that as 2
is only even prime number, for simplicity we are comparing each element in an array
with 2
using lambda expression as below:
# Task-3
# Flag records to true if numbers column contains an even prime number
# and store it in the new array column prime_even_number
spark.sql("""
select
name,
numbers,
exists(numbers, x -> x = 2 ) as prime_even_number
from view_v""").show()
# Output:
+-------+---------+-----------------+
| name| numbers|prime_even_number|
+-------+---------+-----------------+
| Alice|[1, 2, 3]| true|
| Bob|[4, 5, 6]| false|
|Charlie|[7, 8, 9]| false|
+-------+---------+-----------------+
Reduce()
Its signature is slightly different than other HOFs. It requires the accumulator(buffer) to hold aggregated value, reduce(array<T>, B, function<B, T, B>, function<B, C>): C
.
array<T>
: This parameter represents the inputarray
on which the reduction operation will be performed. The elements in thearray
are of typeT
. For example, it could be anarray
of integers, strings, or custom objects.B
: This parameter represents the initial value, also known as the accumulator, of typeB
. The accumulator is an intermediate value that gets updated iteratively during the reduction process. Its type can be different from the inputarray
elements (T
).function<B, T, B>
: This parameter is a binary function that takes two arguments, the accumulator (B
) and an element from the inputarray
(T
). It performs a computation using these two arguments and returns an updated value for the accumulator (B
). The function defines the logic for how the reduction operation should be performed.function<B, R>
: This parameter is a function that takes the final value of the accumulator (B
) and transforms it into a result of the typeC
. This function provides the flexibility to convert the final accumulator value into a different type or perform additional computations on it.C
: This represents the return type of the HOF, which is the final result of the reduction operation. It can be any desired type (C
) based on the requirement.
To accomplish the last task #4, we can use reduce
function, which will iterate over each element of an array
and then add that to the accumulator(acc
) which is holding 0L
initially. Based on the final value of the accumulator, it returns true/false
using lamda expression acc -> if(acc%2=0, ‘true’, ‘false’)
as below:
# Task-4
# Sum each element in numbers, if the sum is an even number,
# mark it as true else false and store it in the new column even_sum.
spark.sql("""
select
name,
numbers,
reduce(numbers, 0L, (number, acc) -> number + acc,
acc -> if(acc%2=0, 'true', 'false') ) as even_sum
from view_v""").show()
# Output:
+-------+---------+--------+
| name| numbers|even_sum|
+-------+---------+--------+
| Alice|[1, 2, 3]| true|
| Bob|[4, 5, 6]| false|
|Charlie|[7, 8, 9]| true|
+-------+---------+--------+
Conclusion
In this article, we have seen examples of HOFs that allow us to transform, filter, check for existence, and aggregate elements in an array
. Before HOFs were released, most of these problems had to be solved using User-Defined functions (UDFs). It is easy to employ HOFs than to write UDFs. Read the Databricks blog on SQL higher-order functions for more information on the whys. Also, the HOF approach is however more efficient in terms of performance.