EXPEDIA GROUP TECHNOLOGY — SOFTWARE
Deep dive into Apache Spark Window Functions
Window functions operate on groups of data and return values for each record or group
In this blog post, we’ll do a Deep Dive into Apache Spark Window Functions. You may also be interested in my earlier posts on Apache Spark.
- Start Your Journey with Apache Spark — Part 1
- Start Your Journey with Apache Spark — Part 2
- Start Your Journey with Apache Spark — Part 3
- Deep Dive into Apache Spark DateTime Functions
- Working with JSON in Apache Spark
- Deep Dive into Apache Spark Array Functions
- Apache Spark Structured Streaming
First, let’s look at what window functions are and when we should use them. We use various functions in Apache Spark like month
(return month from the date), round
(round off the value), andfloor
(gives floor value for a given input), etc. which will be performed on each record and will return a value for each record. Then we have various aggregated functions that will be performed on a group of data and return a single value for each group like sum
, avg
, min
, max
, and count
. But what if we would like to perform the operation on a group of data and would like to have a single value/result for each record? We can use window functions in such cases. They can define the ranking for records, cumulative distribution, moving average, or identify the records prior to or after the current record.
Let’s use some Scala API examples to learn about the following window functions:
- Aggregate:
min
,max
,avg
,count
, andsum
. - Ranking:
rank
,dense_rank
,percent_rank
,row_num
, andntile
- Analytical:
cume_dist
,lag
, andlead
- Custom boundary:
rangeBetween
androwsBetween
For your easy reference, a Zeppelin notebook exported as a JSON file and also a Scala file are available on GitHub.
Create Spark DataFrame
Now let’s create a sample Spark DataFrame which we will use throughout this blog. First, let’s load the required libraries.
Now we will create the DataFrame
with some dummy data which we will use to discuss various window functions.
This is what our DataFrame
looks like:
Window aggregate functions
Let’s look at some aggregated window functions to see how they work.
First, we need to define the specification of the window. Let’s say we would like to get the aggregated data based on the department. So we will define our window based on the department name (column: depname
) in this example.
Create a window specification for aggregate function
Apply aggregate function on window
Now within the department (column: depname
) we can apply various aggregated functions. So let's try to find the max and min salary in each department. Here we have selected only desired columns (depName
, max_salary
, and min_salary
) and removed the duplicate records.
Output:
+---------+----------+----------+
| depname|max_salary|min_salary|
+---------+----------+----------+
| develop| 6000| 4200|
| sales| 5000| 4800|
|personnel| 3900| 3500|
+---------+----------+----------+
Now let’s see how it works. We have partitioned the data on department name:
Now when we perform the aggregate function, it will be applied to each partition and return the aggregated value (min
and max
in our case.).
Note: Available aggregate functions are max, min, sum, avg and count.
Window ranking functions
In this section, we will discuss several types of ranking functions.
Create window specification for ranking function
Now let's say we would like to rank the employees based on their salary within the department. The employee with the highest salary will have rank 1, and the one with the least salary will rank last. Here we will partition the data based on department (column: depname
) and within the department, we will sort the data based on salary in descending order.
For each department, records are sorted based on salary in descending order.
1. Rank function: rank
This function will return the rank of each record within a partition and skip the subsequent rank following any duplicate rank:
Output:
+---------+-----+------+----+
| depName|empNo|salary|rank|
+---------+-----+------+----+
| develop| 8| 6000| 1|
| develop| 11| 5200| 2|
| develop| 10| 5200| 2|
| develop| 9| 4500| 4|
| develop| 7| 4200| 5|
| sales| 1| 5000| 1|
| sales| 4| 4800| 2|
| sales| 3| 4800| 2|
|personnel| 2| 3900| 1|
|personnel| 5| 3500| 2|
+---------+-----+------+----+
Here we can see some of the ranks are duplicated and some ranks are missing. For example, in develop
department we have 2 employees with rank = 2
and no employee with rank = 3
because the rank function will keep the same rank for the same value and skip the next ranks accordingly.
2. Dense rank: dense_rank
This function will return the rank of each record within a partition but will not skip any rank.
Output:
+---------+-----+------+-----------+
| depName|empNo|salary|desnse_rank|
+---------+-----+------+-----------+
| develop| 8| 6000| 1|
| develop| 10| 5200| 2|
| develop| 11| 5200| 2|
| develop| 9| 4500| 3|
| develop| 7| 4200| 4|
| sales| 1| 5000| 1|
| sales| 3| 4800| 2|
| sales| 4| 4800| 2|
|personnel| 2| 3900| 1|
|personnel| 5| 3500| 2|
+---------+-----+------+-----------+
Here we can see some of the ranks are duplicated, but ranks are not missing like when we used the rank
function. For example, in develop
department, we have 2 employees with rank = 2. dense_rank
function will keep the same rank for same value but will not skip the next ranks.
3. Row num function: row_number
This function will assign the row number within the window. If 2 rows will have the same value for ordering column, it is non-deterministic which row number will be assigned to each row with same value.
Output:
+---------+-----+------+----------+
| depName|empNo|salary|row_number|
+---------+-----+------+----------+
| develop| 8| 6000| 1|
| develop| 10| 5200| 2|
| develop| 11| 5200| 3|
| develop| 9| 4500| 4|
| develop| 7| 4200| 5|
| sales| 1| 5000| 1|
| sales| 3| 4800| 2|
| sales| 4| 4800| 3|
|personnel| 2| 3900| 1|
|personnel| 5| 3500| 2|
+---------+-----+------+----------+
4. Percent rank function: percent_rank
This function will return the relative (percentile) rank within the partition.
Output:
+---------+-----+------+------------+
| depName|empNo|salary|percent_rank|
+---------+-----+------+------------+
| develop| 8| 6000| 0.0|
| develop| 10| 5200| 0.25|
| develop| 11| 5200| 0.25|
| develop| 9| 4500| 0.75|
| develop| 7| 4200| 1.0|
| sales| 1| 5000| 0.0|
| sales| 3| 4800| 0.5|
| sales| 4| 4800| 0.5|
|personnel| 2| 3900| 0.0|
|personnel| 5| 3500| 1.0|
+---------+-----+------+------------+
5. N-tile function: ntile
This function can further sub-divide the window into n groups based on a window specification or partition. For example, if we need to divide the departments further into say three groups we can specify ntile
as 3.
Output:
+---------+-----+------+-----+
| depName|empNo|salary|ntile|
+---------+-----+------+-----+
| develop| 8| 6000| 1|
| develop| 10| 5200| 1|
| develop| 11| 5200| 2|
| develop| 9| 4500| 2|
| develop| 7| 4200| 3|
| sales| 1| 5000| 1|
| sales| 3| 4800| 2|
| sales| 4| 4800| 3|
|personnel| 2| 3900| 1|
|personnel| 5| 3500| 2|
+---------+-----+------+-----+
Window analytical functions
Next, we will discuss the analytical functions like cumulative distribution, lag, and lead.
1. Cumulative distribution function: cume_dist
This function gives the cumulative distribution of values for the window/partition.
Define the window specification and apply cume_dist
function to get the cumulative distribution.
Output:
+---------+-----+------+------------------+
| depName|empNo|salary| cume_dist|
+---------+-----+------+------------------+
| develop| 7| 4200| 0.2|
| develop| 9| 4500| 0.4|
| develop| 10| 5200| 0.8|
| develop| 11| 5200| 0.8|
| develop| 8| 6000| 1.0|
| sales| 4| 4800|0.6666666666666666|
| sales| 3| 4800|0.6666666666666666|
| sales| 1| 5000| 1.0|
|personnel| 5| 3500| 0.5
|personnel| 2| 3900| 1.0|
+---------+-----+------+------------------+
2. Lag function: lag
This function will return the value prior to offset rows from DataFrame.
The lag
function takes 3 arguments (lag(col, count = 1, default = None)
),
col
: defines the columns on which function needs to be applied.
count
: for how many rows we need to look back.
default
: defines the default value.
Output:
+---------+-----+------+----+
| depName|empNo|salary| lag|
+---------+-----+------+----+
| develop| 7| 4200|null|
| develop| 9| 4500|null|
| develop| 10| 5200|4200|
| develop| 11| 5200|4500|
| develop| 8| 6000|5200|
| sales| 4| 4800|null|
| sales| 3| 4800|null|
| sales| 1| 5000|4800|
|personnel| 5| 3500|null|
|personnel| 2| 3900|null|
+---------+-----+------+----+
For example, let’s look for salary 2-rows prior to the current row.
- For depname =
develop
, salary =4500
. There is no such row which is 2 rows prior to this row. So it will get null.
- For deptname =
develop
, salary =6000
(highlighted in blue). If we go 2 rows prior, we will get5200
as salary (highlighted in green).
3. Lead function: lead
This function will return the value after the offset rows from DataFrame.
lead function takes 3 arguments (lead(col, count = 1, default = None)
)
col
: defines the columns on which the function needs to be applied.
count
: for how many rows we need to look forward/after the current row.
default
: defines the default value.
Output:
+---------+-----+------+----+
| depName|empNo|salary| lag|
+---------+-----+------+----+
| develop| 7| 4200|5200|
| develop| 9| 4500|5200|
| develop| 10| 5200|6000|
| develop| 11| 5200|null|
| develop| 8| 6000|null|
| sales| 3| 4800|5000|
| sales| 4| 4800|null|
| sales| 1| 5000|null|
|personnel| 5| 3500|null|
|personnel| 2| 3900|null|
+---------+-----+------+----+
Let’s try to look for salary 2-rows forward/after from the current row.
- for depname =
develop
, salary =4500
(highlighted in blue). If we go 2 rows forward/after, we will get5200
as salary (highlighted in green).
- for depname =
personnel
, salary =3500
. There is no such row which is 2-row forward/after this row in this partition. so we will get null.
Custom window definition
By default, the boundaries of the window are defined by partition column and we can specify the ordering via window specification. As an example, for develop
department, start of the window is min value of salary, and end of the window is max value of salary.
But what if we would like to change the boundaries of the window? The following functions can be used to define the window within each partition.
1. rangeBetween
Using therangeBetween
function, we can define the boundaries explicitly.
For example, let’s define the start as 100 and end as 300 units from current salary and see what it means. Start as 100 means the window will start from 100 units and end at 300 value from current value (both start and end values are inclusive).
Define window specification
L
after start and end values denotes the value is a Scala Long
type.
Apply custom window specification
Output:
+---------+-----+------+----------+
| depName|empNo|salary|max_salary|
+---------+-----+------+----------+
| develop| 7| 4200| 4500|
| develop| 9| 4500| null|
| develop| 10| 5200| null|
| develop| 11| 5200| null|
| develop| 8| 6000| null|
| sales| 3| 4800| 5000|
| sales| 4| 4800| 5000|
| sales| 1| 5000| null|
|personnel| 5| 3500| null|
|personnel| 2| 3900| null|
+---------+-----+------+----------+
Let’s try to understand the output now.
- For depname=
develop
, salary =4200
, start of the window will be (current value + start) which is 4200 + 100 = 4300. End of the window will be (current value + end) which is 4200 + 300 = 4500.
Since there is only one salary value in the range 4300 to 4500 inclusive, which is 4500
for develop
department, we got 4500
as max_salary for 4200
(check output above).
- Similarly for depname=
develop
, salary =4500
, the window will be (start : 4500 + 100 = 4600, end : 4500 + 300 = 4800
). But there are no salary values in the range 4600 to 4800 inclusive fordevelop
department so max value will be null (check output above).
There are some special boundary values which can be used here.
Window.currentRow
: to specify a current value in a row.Window.unboundedPreceding
: This can be used to have an unbounded start for the window.Window.unboundedFollowing
: This can be used to have an unbounded end for the window.
For example, we need to find the max salary which is greater than 300 from employee salary. So we’ll define the start value as 300L and define the end value as Window.unboundedFollowing:
Output:
+---------+-----+------+----------+
| depName|empNo|salary|max_salary|
+---------+-----+------+----------+
| develop| 7| 4200| 6000|
| develop| 9| 4500| 6000|
| develop| 10| 5200| 6000|
| develop| 11| 5200| 6000|
| develop| 8| 6000| null|
| sales| 3| 4800| null|
| sales| 4| 4800| null|
| sales| 1| 5000| null|
|personnel| 5| 3500| 3900|
|personnel| 2| 3900| null|
+---------+-----+------+----------+
So, for depname = personnel
, salary = 3500.
the window will be (start : 3500 + 300 = 3800, end : unbounded
). So the maximum value in this range is 3900 (check output above).
Similarly, for depname = sales
, salary = 4800
, the window will be (start : 4800 + 300, 5100, end : unbounded
). Since there are no values greater than 5100 for sales department
, null
results.
2. rowsBetween
With rangeBetween, we defined the start and end of the window using the value of the ordering column. However, we can also define the start and end of the window with the relative row position.
For example, we would like to create a window where start of the window is one row prior to current and end is one row after current row.
Define custom window specification
Apply custom window specification
Output:
+---------+-----+------+----------+
| depName|empNo|salary|max_salary|
+---------+-----+------+----------+
| develop| 7| 4200| 4500|
| develop| 9| 4500| 5200|
| develop| 10| 5200| 5200|
| develop| 11| 5200| 6000|
| develop| 8| 6000| 6000|
| sales| 3| 4800| 4800|
| sales| 4| 4800| 5000|
| sales| 1| 5000| 5000|
|personnel| 5| 3500| 3900|
|personnel| 2| 3900| 3900|
+---------+-----+------+----------+
Let’s try to understand the output now.
- For depname =
develop
, salary =4500
, a window will be defined with one row prior and after the current row (highlighted in green). So salaries within the window are (4200, 4500, 5200
) and max is5200
(check output above).
- Similarly, for depname =
sales
, salary =5000
, a window will be defined with one prior and after the current row. Since there are no rows after this row, the window will only have 2 rows (highlighted in green) which have salaries as (4800, 5000
) and max is5000
(check output above).
We can also use the special boundaries Window.unboundedPreceding
, Window.unboundedFollowing
, and Window.currentRow
as we did previously with rangeBetween.
Note: Ordering is not necessary with rowsBetween, but I have used it to keep the results consistent on each run.
Summary
I hope you have enjoyed learning about window functions in Apache Spark. In this blog, we discussed using window functions to perform operations on a group of data and have a single value/result for each record. We also discussed various types of window functions like aggregate, ranking and analytical functions including how to define custom window boundaries. You can find a Zeppelin notebook exported as a JSON file and also a Scala file on GitHub. In my next blog, I will cover various Array Functions available in Apache Spark.