EXPEDIA GROUP TECHNOLOGY — SOFTWARE

Deep dive into Apache Spark Window Functions

Window functions operate on groups of data and return values for each record or group

Neeraj Bhadani
Expedia Group Technology

--

Woman looking through a pane glass window at a rural coastal landscape.
Photo by Tom Blackout on Unsplash

In this blog post, we’ll do a Deep Dive into Apache Spark Window Functions. You may also be interested in my earlier posts on Apache Spark.

First, let’s look at what window functions are and when we should use them. We use various functions in Apache Spark like month (return month from the date), round (round off the value), andfloor(gives floor value for a given input), etc. which will be performed on each record and will return a value for each record. Then we have various aggregated functions that will be performed on a group of data and return a single value for each group like sum, avg, min, max, and count. But what if we would like to perform the operation on a group of data and would like to have a single value/result for each record? We can use window functions in such cases. They can define the ranking for records, cumulative distribution, moving average, or identify the records prior to or after the current record.

Let’s use some Scala API examples to learn about the following window functions:

  • Aggregate: min, max, avg, count, and sum.
  • Ranking: rank, dense_rank, percent_rank, row_num, and ntile
  • Analytical: cume_dist, lag, and lead
  • Custom boundary: rangeBetween and rowsBetween

For your easy reference, a Zeppelin notebook exported as a JSON file and also a Scala file are available on GitHub.

Decorative separator

Create Spark DataFrame

Now let’s create a sample Spark DataFrame which we will use throughout this blog. First, let’s load the required libraries.

Now we will create the DataFrame with some dummy data which we will use to discuss various window functions.

This is what our DataFrame looks like:

Tabular data showing sample DataFrame including columns  department, employee number and salary.
Decorative separator

Window aggregate functions

Let’s look at some aggregated window functions to see how they work.

First, we need to define the specification of the window. Let’s say we would like to get the aggregated data based on the department. So we will define our window based on the department name (column: depname) in this example.

Create a window specification for aggregate function

Apply aggregate function on window

Now within the department (column: depname) we can apply various aggregated functions. So let's try to find the max and min salary in each department. Here we have selected only desired columns (depName, max_salary, and min_salary) and removed the duplicate records.

Output:

+---------+----------+----------+
| depname|max_salary|min_salary|
+---------+----------+----------+
| develop| 6000| 4200|
| sales| 5000| 4800|
|personnel| 3900| 3500|
+---------+----------+----------+

Now let’s see how it works. We have partitioned the data on department name:

logical representation of “develop” partition.

Now when we perform the aggregate function, it will be applied to each partition and return the aggregated value (min and max in our case.).

Shows minimum and Maximum values in each partition like Develop, Personnel and Sales

Note: Available aggregate functions are max, min, sum, avg and count.

Decorative separator

Window ranking functions

In this section, we will discuss several types of ranking functions.

Create window specification for ranking function

Now let's say we would like to rank the employees based on their salary within the department. The employee with the highest salary will have rank 1, and the one with the least salary will rank last. Here we will partition the data based on department (column: depname) and within the department, we will sort the data based on salary in descending order.

For each department, records are sorted based on salary in descending order.

For each department, records are sorted based on salary in descending order.

1. Rank function: rank

This function will return the rank of each record within a partition and skip the subsequent rank following any duplicate rank:

Output:

+---------+-----+------+----+
| depName|empNo|salary|rank|
+---------+-----+------+----+
| develop| 8| 6000| 1|
| develop| 11| 5200| 2|
| develop| 10| 5200| 2|
| develop| 9| 4500| 4|
| develop| 7| 4200| 5|
| sales| 1| 5000| 1|
| sales| 4| 4800| 2|
| sales| 3| 4800| 2|
|personnel| 2| 3900| 1|
|personnel| 5| 3500| 2|
+---------+-----+------+----+

Here we can see some of the ranks are duplicated and some ranks are missing. For example, in develop department we have 2 employees with rank = 2 and no employee with rank = 3 because the rank function will keep the same rank for the same value and skip the next ranks accordingly.

2. Dense rank: dense_rank

This function will return the rank of each record within a partition but will not skip any rank.

Output:

+---------+-----+------+-----------+
| depName|empNo|salary|desnse_rank|
+---------+-----+------+-----------+
| develop| 8| 6000| 1|
| develop| 10| 5200| 2|
| develop| 11| 5200| 2|
| develop| 9| 4500| 3|
| develop| 7| 4200| 4|
| sales| 1| 5000| 1|
| sales| 3| 4800| 2|
| sales| 4| 4800| 2|
|personnel| 2| 3900| 1|
|personnel| 5| 3500| 2|
+---------+-----+------+-----------+

Here we can see some of the ranks are duplicated, but ranks are not missing like when we used the rank function. For example, in develop department, we have 2 employees with rank = 2. dense_rank function will keep the same rank for same value but will not skip the next ranks.

3. Row num function: row_number

This function will assign the row number within the window. If 2 rows will have the same value for ordering column, it is non-deterministic which row number will be assigned to each row with same value.

Output:

+---------+-----+------+----------+
| depName|empNo|salary|row_number|
+---------+-----+------+----------+
| develop| 8| 6000| 1|
| develop| 10| 5200| 2|
| develop| 11| 5200| 3|
| develop| 9| 4500| 4|
| develop| 7| 4200| 5|
| sales| 1| 5000| 1|
| sales| 3| 4800| 2|
| sales| 4| 4800| 3|
|personnel| 2| 3900| 1|
|personnel| 5| 3500| 2|
+---------+-----+------+----------+

4. Percent rank function: percent_rank

This function will return the relative (percentile) rank within the partition.

Output:

+---------+-----+------+------------+
| depName|empNo|salary|percent_rank|
+---------+-----+------+------------+
| develop| 8| 6000| 0.0|
| develop| 10| 5200| 0.25|
| develop| 11| 5200| 0.25|
| develop| 9| 4500| 0.75|
| develop| 7| 4200| 1.0|
| sales| 1| 5000| 0.0|
| sales| 3| 4800| 0.5|
| sales| 4| 4800| 0.5|
|personnel| 2| 3900| 0.0|
|personnel| 5| 3500| 1.0|
+---------+-----+------+------------+

5. N-tile function: ntile

This function can further sub-divide the window into n groups based on a window specification or partition. For example, if we need to divide the departments further into say three groups we can specify ntile as 3.

Output:

+---------+-----+------+-----+
| depName|empNo|salary|ntile|
+---------+-----+------+-----+
| develop| 8| 6000| 1|
| develop| 10| 5200| 1|
| develop| 11| 5200| 2|
| develop| 9| 4500| 2|
| develop| 7| 4200| 3|
| sales| 1| 5000| 1|
| sales| 3| 4800| 2|
| sales| 4| 4800| 3|
|personnel| 2| 3900| 1|
|personnel| 5| 3500| 2|
+---------+-----+------+-----+
Decorative separator

Window analytical functions

Next, we will discuss the analytical functions like cumulative distribution, lag, and lead.

1. Cumulative distribution function: cume_dist

This function gives the cumulative distribution of values for the window/partition.

Define the window specification and apply cume_dist function to get the cumulative distribution.

Output:

+---------+-----+------+------------------+
| depName|empNo|salary| cume_dist|
+---------+-----+------+------------------+
| develop| 7| 4200| 0.2|
| develop| 9| 4500| 0.4|
| develop| 10| 5200| 0.8|
| develop| 11| 5200| 0.8|
| develop| 8| 6000| 1.0|
| sales| 4| 4800|0.6666666666666666|
| sales| 3| 4800|0.6666666666666666|
| sales| 1| 5000| 1.0|
|personnel| 5| 3500| 0.5
|personnel| 2| 3900| 1.0|
+---------+-----+------+------------------+

2. Lag function: lag

This function will return the value prior to offset rows from DataFrame.

The lag function takes 3 arguments (lag(col, count = 1, default = None)),
col: defines the columns on which function needs to be applied.
count: for how many rows we need to look back.
default: defines the default value.

Output:

+---------+-----+------+----+
| depName|empNo|salary| lag|
+---------+-----+------+----+
| develop| 7| 4200|null|
| develop| 9| 4500|null|
| develop| 10| 5200|4200|
| develop| 11| 5200|4500|
| develop| 8| 6000|5200|
| sales| 4| 4800|null|
| sales| 3| 4800|null|
| sales| 1| 5000|4800|
|personnel| 5| 3500|null|
|personnel| 2| 3900|null|
+---------+-----+------+----+

For example, let’s look for salary 2-rows prior to the current row.

  • For depname = develop, salary = 4500. There is no such row which is 2 rows prior to this row. So it will get null.
Shows records in “develop” partition with Salary = 4500.
  • For deptname = develop, salary = 6000 (highlighted in blue). If we go 2 rows prior, we will get 5200 as salary (highlighted in green).
With lag = 2 in “develop” partition, lag of row with salary = 6000 is row with salary = 6000 (highlighted in Green).

3. Lead function: lead

This function will return the value after the offset rows from DataFrame.

lead function takes 3 arguments (lead(col, count = 1, default = None))
col: defines the columns on which the function needs to be applied.
count: for how many rows we need to look forward/after the current row.
default: defines the default value.

Output:

+---------+-----+------+----+
| depName|empNo|salary| lag|
+---------+-----+------+----+
| develop| 7| 4200|5200|
| develop| 9| 4500|5200|
| develop| 10| 5200|6000|
| develop| 11| 5200|null|
| develop| 8| 6000|null|
| sales| 3| 4800|5000|
| sales| 4| 4800|null|
| sales| 1| 5000|null|
|personnel| 5| 3500|null|
|personnel| 2| 3900|null|
+---------+-----+------+----+

Let’s try to look for salary 2-rows forward/after from the current row.

  • for depname = develop, salary = 4500 (highlighted in blue). If we go 2 rows forward/after, we will get 5200 as salary (highlighted in green).
With lead = 2 in “develop” partition, lead of row with salary = 4500 is row with salary = 5200 (highlighted in Green).
  • for depname = personnel, salary = 3500. There is no such row which is 2-row forward/after this row in this partition. so we will get null.
With lead = 2 in “personnel” partition, there is no row for salary = 4500
Decorative separator

Custom window definition

By default, the boundaries of the window are defined by partition column and we can specify the ordering via window specification. As an example, for develop department, start of the window is min value of salary, and end of the window is max value of salary.

But what if we would like to change the boundaries of the window? The following functions can be used to define the window within each partition.

1. rangeBetween

Using therangeBetween function, we can define the boundaries explicitly.
For example, let’s define the start as 100 and end as 300 units from current salary and see what it means. Start as 100 means the window will start from 100 units and end at 300 value from current value (both start and end values are inclusive).

Define window specification

L after start and end values denotes the value is a Scala Long type.

Apply custom window specification

Output:

+---------+-----+------+----------+
| depName|empNo|salary|max_salary|
+---------+-----+------+----------+
| develop| 7| 4200| 4500|
| develop| 9| 4500| null|
| develop| 10| 5200| null|
| develop| 11| 5200| null|
| develop| 8| 6000| null|
| sales| 3| 4800| 5000|
| sales| 4| 4800| 5000|
| sales| 1| 5000| null|
|personnel| 5| 3500| null|
|personnel| 2| 3900| null|
+---------+-----+------+----------+

Let’s try to understand the output now.

  • For depname=develop, salary =4200, start of the window will be (current value + start) which is 4200 + 100 = 4300. End of the window will be (current value + end) which is 4200 + 300 = 4500.

Since there is only one salary value in the range 4300 to 4500 inclusive, which is 4500 for develop department, we got 4500 as max_salary for 4200 (check output above).

In “develop” partition, for row with salary = 4200, max salary in range (4300,4500) is 4500.
  • Similarly for depname=develop, salary = 4500, the window will be (start : 4500 + 100 = 4600, end : 4500 + 300 = 4800). But there are no salary values in the range 4600 to 4800 inclusive for develop department so max value will be null (check output above).
In “develop” partition for a row with salary=4500, the salary range is (4600, 4900) and no rows with a salary in this range.

There are some special boundary values which can be used here.

  • Window.currentRow: to specify a current value in a row.
  • Window.unboundedPreceding: This can be used to have an unbounded start for the window.
  • Window.unboundedFollowing: This can be used to have an unbounded end for the window.

For example, we need to find the max salary which is greater than 300 from employee salary. So we’ll define the start value as 300L and define the end value as Window.unboundedFollowing:

Output:

+---------+-----+------+----------+
| depName|empNo|salary|max_salary|
+---------+-----+------+----------+
| develop| 7| 4200| 6000|
| develop| 9| 4500| 6000|
| develop| 10| 5200| 6000|
| develop| 11| 5200| 6000|
| develop| 8| 6000| null|
| sales| 3| 4800| null|
| sales| 4| 4800| null|
| sales| 1| 5000| null|
|personnel| 5| 3500| 3900|
|personnel| 2| 3900| null|
+---------+-----+------+----------+

So, for depname = personnel, salary = 3500. the window will be (start : 3500 + 300 = 3800, end : unbounded). So the maximum value in this range is 3900 (check output above).

Similarly, for depname = sales, salary = 4800, the window will be (start : 4800 + 300, 5100, end : unbounded). Since there are no values greater than 5100 for sales department, null results.

2. rowsBetween

With rangeBetween, we defined the start and end of the window using the value of the ordering column. However, we can also define the start and end of the window with the relative row position.

For example, we would like to create a window where start of the window is one row prior to current and end is one row after current row.

Define custom window specification

Apply custom window specification

Output:

+---------+-----+------+----------+
| depName|empNo|salary|max_salary|
+---------+-----+------+----------+
| develop| 7| 4200| 4500|
| develop| 9| 4500| 5200|
| develop| 10| 5200| 5200|
| develop| 11| 5200| 6000|
| develop| 8| 6000| 6000|
| sales| 3| 4800| 4800|
| sales| 4| 4800| 5000|
| sales| 1| 5000| 5000|
|personnel| 5| 3500| 3900|
|personnel| 2| 3900| 3900|
+---------+-----+------+----------+

Let’s try to understand the output now.

  • For depname = develop, salary = 4500, a window will be defined with one row prior and after the current row (highlighted in green). So salaries within the window are (4200, 4500, 5200) and max is 5200 (check output above).
In “develop” partition with salary = 4500, window with one prior and one after is highlighted in green.
  • Similarly, for depname = sales, salary =5000, a window will be defined with one prior and after the current row. Since there are no rows after this row, the window will only have 2 rows (highlighted in green) which have salaries as (4800, 5000) and max is 5000 (check output above).
In “sales” partition with salary = 5000, there is no row after the current row, so window contains only 2 rows (GREEN).

We can also use the special boundaries Window.unboundedPreceding, Window.unboundedFollowing, and Window.currentRow as we did previously with rangeBetween.

Note: Ordering is not necessary with rowsBetween, but I have used it to keep the results consistent on each run.

decorative separator

Summary

I hope you have enjoyed learning about window functions in Apache Spark. In this blog, we discussed using window functions to perform operations on a group of data and have a single value/result for each record. We also discussed various types of window functions like aggregate, ranking and analytical functions including how to define custom window boundaries. You can find a Zeppelin notebook exported as a JSON file and also a Scala file on GitHub. In my next blog, I will cover various Array Functions available in Apache Spark.

--

--