## EXPEDIA GROUP TECHNOLOGY — SOFTWARE

# Deep dive into Apache Spark Window Functions

## Window functions operate on groups of data and return values for each record or group

In this blog post, we’ll do a Deep Dive into Apache Spark Window Functions. You may also be interested in my earlier posts on Apache Spark.

- Start Your Journey with Apache Spark — Part 1
- Start Your Journey with Apache Spark — Part 2
- Start Your Journey with Apache Spark — Part 3
- Deep Dive into Apache Spark DateTime Functions
- Working with JSON in Apache Spark
- Deep Dive into Apache Spark Array Functions
- Apache Spark Structured Streaming

First, let’s look at what window functions are and when we should use them. We use various functions in Apache Spark like `month`

(return month from the date), `round`

(round off the value), and`floor`

(gives floor value for a given input), etc. which will be performed on each record and will return a value for each record. Then we have various aggregated functions that will be performed on a group of data and return a single value for each group like `sum`

, `avg`

, `min`

, `max`

, and `count`

. But what if we would like to perform the operation on a group of data and would like to have a single value/result for each record? We can use window functions in such cases. They can define the ranking for records, cumulative distribution, moving average, or identify the records prior to or after the current record.

Let’s use some Scala API examples to learn about the following window functions:

**Aggregate**:`min`

,`max`

,`avg`

,`count`

, and`sum`

.**Ranking**:`rank`

,`dense_rank`

,`percent_rank`

,`row_num`

, and`ntile`

**Analytical**:`cume_dist`

,`lag`

, and`lead`

**Custom boundary**:`rangeBetween`

and`rowsBetween`

For your easy reference, a Zeppelin notebook exported as a JSON file and also a Scala file are available on GitHub.

# Create Spark DataFrame

Now let’s create a sample Spark DataFrame which we will use throughout this blog. First, let’s load the required libraries.

Now we will create the `DataFrame`

with some dummy data which we will use to discuss various window functions.

This is what our `DataFrame`

looks like:

# Window aggregate functions

Let’s look at some aggregated window functions to see how they work.

First, we need to define the specification of the window. Let’s say we would like to get the aggregated data based on the department. So we will define our window based on the department name (column: `depname`

) in this example.

## Create a window specification for aggregate function

## Apply aggregate function on window

Now within the department (column: `depname`

) we can apply various aggregated functions. So let's try to find the max and min salary in each department. Here we have selected only desired columns (`depName`

, `max_salary`

, and `min_salary`

) and removed the duplicate records.

Output:

`+---------+----------+----------+`

| depname|max_salary|min_salary|

+---------+----------+----------+

| develop| 6000| 4200|

| sales| 5000| 4800|

|personnel| 3900| 3500|

+---------+----------+----------+

Now let’s see how it works. We have partitioned the data on department name:

Now when we perform the aggregate function, it will be applied to each partition and return the aggregated value (`min`

and `max`

in our case.).

*Note: Available aggregate functions are max, min, sum, avg and count.*

# Window ranking functions

In this section, we will discuss several types of ranking functions.

## Create window specification for ranking function

Now let's say we would like to rank the employees based on their salary within the department. The employee with the highest salary will have rank 1, and the one with the least salary will rank last. Here we will partition the data based on department (column: `depname`

) and within the department, we will sort the data based on salary in descending order.

For each department, records are sorted based on salary in descending order.

## 1. Rank function: `rank`

This function will return the rank of each record within a partition and skip the subsequent rank following any duplicate rank:

Output:

`+---------+-----+------+----+`

| depName|empNo|salary|rank|

+---------+-----+------+----+

| develop| 8| 6000| 1|

| develop| 11| 5200| 2|

| develop| 10| 5200| 2|

| develop| 9| 4500| 4|

| develop| 7| 4200| 5|

| sales| 1| 5000| 1|

| sales| 4| 4800| 2|

| sales| 3| 4800| 2|

|personnel| 2| 3900| 1|

|personnel| 5| 3500| 2|

+---------+-----+------+----+

Here we can see some of the ranks are duplicated and some ranks are missing. For example, in `develop`

department we have 2 employees with `rank = 2`

and no employee with `rank = 3 `

because the rank function will keep the same rank for the same value and skip the next ranks accordingly.

## 2. Dense rank: dense_rank

This function will return the rank of each record within a partition but will not skip any rank.

Output:

`+---------+-----+------+-----------+`

| depName|empNo|salary|desnse_rank|

+---------+-----+------+-----------+

| develop| 8| 6000| 1|

| develop| 10| 5200| 2|

| develop| 11| 5200| 2|

| develop| 9| 4500| 3|

| develop| 7| 4200| 4|

| sales| 1| 5000| 1|

| sales| 3| 4800| 2|

| sales| 4| 4800| 2|

|personnel| 2| 3900| 1|

|personnel| 5| 3500| 2|

+---------+-----+------+-----------+

Here we can see some of the ranks are duplicated, but ranks are not missing like when we used the `rank`

function. For example, in `develop`

department, we have 2 employees with rank = 2. `dense_rank`

function will keep the same rank for same value but will not skip the next ranks.

## 3. Row num function: row_number

This function will assign the row number within the window. If 2 rows will have the same value for ordering column, it is non-deterministic which row number will be assigned to each row with same value.

Output:

`+---------+-----+------+----------+`

| depName|empNo|salary|row_number|

+---------+-----+------+----------+

| develop| 8| 6000| 1|

| develop| 10| 5200| 2|

| develop| 11| 5200| 3|

| develop| 9| 4500| 4|

| develop| 7| 4200| 5|

| sales| 1| 5000| 1|

| sales| 3| 4800| 2|

| sales| 4| 4800| 3|

|personnel| 2| 3900| 1|

|personnel| 5| 3500| 2|

+---------+-----+------+----------+

## 4. Percent rank function: percent_rank

This function will return the relative (percentile) rank within the partition.

Output:

`+---------+-----+------+------------+`

| depName|empNo|salary|percent_rank|

+---------+-----+------+------------+

| develop| 8| 6000| 0.0|

| develop| 10| 5200| 0.25|

| develop| 11| 5200| 0.25|

| develop| 9| 4500| 0.75|

| develop| 7| 4200| 1.0|

| sales| 1| 5000| 0.0|

| sales| 3| 4800| 0.5|

| sales| 4| 4800| 0.5|

|personnel| 2| 3900| 0.0|

|personnel| 5| 3500| 1.0|

+---------+-----+------+------------+

## 5. N-tile function: ntile

This function can further sub-divide the window into *n* groups based on a window specification or partition. For example, if we need to divide the departments further into say three groups we can specify `ntile`

as 3.

Output:

`+---------+-----+------+-----+`

| depName|empNo|salary|ntile|

+---------+-----+------+-----+

| develop| 8| 6000| 1|

| develop| 10| 5200| 1|

| develop| 11| 5200| 2|

| develop| 9| 4500| 2|

| develop| 7| 4200| 3|

| sales| 1| 5000| 1|

| sales| 3| 4800| 2|

| sales| 4| 4800| 3|

|personnel| 2| 3900| 1|

|personnel| 5| 3500| 2|

+---------+-----+------+-----+

# Window analytical functions

Next, we will discuss the analytical functions like cumulative distribution, lag, and lead.

## 1. Cumulative distribution function: cume_dist

This function gives the cumulative distribution of values for the window/partition.

Define the window specification and apply `cume_dist`

function to get the cumulative distribution.

Output:

`+---------+-----+------+------------------+`

| depName|empNo|salary| cume_dist|

+---------+-----+------+------------------+

| develop| 7| 4200| 0.2|

| develop| 9| 4500| 0.4|

| develop| 10| 5200| 0.8|

| develop| 11| 5200| 0.8|

| develop| 8| 6000| 1.0|

| sales| 4| 4800|0.6666666666666666|

| sales| 3| 4800|0.6666666666666666|

| sales| 1| 5000| 1.0|

|personnel| 5| 3500| 0.5

|personnel| 2| 3900| 1.0|

+---------+-----+------+------------------+

## 2. Lag function: lag

This function will return the value prior to offset rows from DataFrame.

The `lag`

function takes 3 arguments (`lag(col, count = 1, default = None)`

),

`col`

: defines the columns on which function needs to be applied.

`count`

: for how many rows we need to look back.

`default`

: defines the default value.

Output:

`+---------+-----+------+----+`

| depName|empNo|salary| lag|

+---------+-----+------+----+

| develop| 7| 4200|null|

| develop| 9| 4500|null|

| develop| 10| 5200|4200|

| develop| 11| 5200|4500|

| develop| 8| 6000|5200|

| sales| 4| 4800|null|

| sales| 3| 4800|null|

| sales| 1| 5000|4800|

|personnel| 5| 3500|null|

|personnel| 2| 3900|null|

+---------+-----+------+----+

For example, let’s look for salary 2-rows prior to the current row.

- For
**depname**=`develop`

,**salary**=`4500`

. There is no such row which is 2 rows prior to this row. So it will get null.

- For
**deptname**=`develop`

,**salary**=`6000`

(highlighted in blue). If we go 2 rows prior, we will get`5200`

as salary (highlighted in green).

## 3. Lead function: lead

This function will return the value after the offset rows from DataFrame.

lead function takes 3 arguments (`lead(col, count = 1, default = None)`

)

`col`

: defines the columns on which the function needs to be applied.

`count`

: for how many rows we need to look forward/after the current row.

`default`

: defines the default value.

Output:

`+---------+-----+------+----+`

| depName|empNo|salary| lag|

+---------+-----+------+----+

| develop| 7| 4200|5200|

| develop| 9| 4500|5200|

| develop| 10| 5200|6000|

| develop| 11| 5200|null|

| develop| 8| 6000|null|

| sales| 3| 4800|5000|

| sales| 4| 4800|null|

| sales| 1| 5000|null|

|personnel| 5| 3500|null|

|personnel| 2| 3900|null|

+---------+-----+------+----+

Let’s try to look for salary 2-rows forward/after from the current row.

- for
**depname**=`develop`

,**salary**=`4500`

(highlighted in blue). If we go 2 rows forward/after, we will get`5200`

as salary (highlighted in green).

- for
**depname**=`personnel`

,**salary**=`3500`

. There is no such row which is 2-row forward/after this row in this partition. so we will get null.

# Custom window definition

By default, the boundaries of the window are defined by partition column and we can specify the ordering via window specification. As an example, for `develop`

department, start of the window is min value of salary, and end of the window is max value of salary.

But what if we would like to change the boundaries of the window? The following functions can be used to define the window within each partition.

## 1. rangeBetween

Using the`rangeBetween`

function, we can define the boundaries explicitly.

For example, let’s define the start as 100 and end as 300 units from current salary and see what it means. Start as 100 means the window will start from 100 units and end at 300 value from current value (both start and end values are inclusive).

## Define window specification

`L`

after start and end values denotes the value is a Scala `Long`

type.

## Apply custom window specification

Output:

`+---------+-----+------+----------+`

| depName|empNo|salary|max_salary|

+---------+-----+------+----------+

| develop| 7| 4200| 4500|

| develop| 9| 4500| null|

| develop| 10| 5200| null|

| develop| 11| 5200| null|

| develop| 8| 6000| null|

| sales| 3| 4800| 5000|

| sales| 4| 4800| 5000|

| sales| 1| 5000| null|

|personnel| 5| 3500| null|

|personnel| 2| 3900| null|

+---------+-----+------+----------+

Let’s try to understand the output now.

- For
**depname**=`develop`

,**salary**=`4200`

, start of the window will be (current value + start) which is 4200 + 100 = 4300. End of the window will be (current value + end) which is 4200 + 300 = 4500.

Since there is only one salary value in the range 4300 to 4500 inclusive, which is `4500`

for `develop`

department, we got `4500`

as max_salary for `4200`

(check output above).

- Similarly for
**depname**=`develop`

,**salary**=`4500`

, the window will be (`start : 4500 + 100 = 4600, end : 4500 + 300 = 4800`

). But there are no salary values in the range 4600 to 4800 inclusive for`develop`

department so max value will be null (check output above).

There are some special boundary values which can be used here.

`Window.currentRow`

: to specify a current value in a row.`Window.unboundedPreceding`

: This can be used to have an unbounded start for the window.`Window.unboundedFollowing`

: This can be used to have an unbounded end for the window.

For example, we need to find the max salary which is greater than 300 from employee salary. So we’ll define the start value as 300L and define the end value as `Window.unboundedFollowing:`

Output:

`+---------+-----+------+----------+`

| depName|empNo|salary|max_salary|

+---------+-----+------+----------+

| develop| 7| 4200| 6000|

| develop| 9| 4500| 6000|

| develop| 10| 5200| 6000|

| develop| 11| 5200| 6000|

| develop| 8| 6000| null|

| sales| 3| 4800| null|

| sales| 4| 4800| null|

| sales| 1| 5000| null|

|personnel| 5| 3500| 3900|

|personnel| 2| 3900| null|

+---------+-----+------+----------+

So, for **depname** = `personnel`

, **salary** = `3500.`

the window will be (`start : 3500 + 300 = 3800, end : unbounded`

). So the maximum value in this range is 3900 (check output above).

Similarly, for **depname** = `sales`

, **salary** = `4800`

, the window will be (`start : 4800 + 300, 5100, end : unbounded`

). Since there are no values greater than 5100 for `sales department`

, `null`

results.

## 2. rowsBetween

With rangeBetween, we defined the start and end of the window using the value of the ordering column. However, we can also define the start and end of the window with the relative row position.

For example, we would like to create a window where start of the window is one row prior to current and end is one row after current row.

## Define custom window specification

## Apply custom window specification

Output:

`+---------+-----+------+----------+`

| depName|empNo|salary|max_salary|

+---------+-----+------+----------+

| develop| 7| 4200| 4500|

| develop| 9| 4500| 5200|

| develop| 10| 5200| 5200|

| develop| 11| 5200| 6000|

| develop| 8| 6000| 6000|

| sales| 3| 4800| 4800|

| sales| 4| 4800| 5000|

| sales| 1| 5000| 5000|

|personnel| 5| 3500| 3900|

|personnel| 2| 3900| 3900|

+---------+-----+------+----------+

Let’s try to understand the output now.

- For
**depname**=`develop`

,**salary**=`4500`

, a window will be defined with one row prior and after the current row (highlighted in green). So salaries within the window are (`4200, 4500, 5200`

) and max is`5200`

(check output above).

- Similarly, for
**depname**=`sales`

,**salary**=`5000`

, a window will be defined with one prior and after the current row. Since there are no rows after this row, the window will only have 2 rows (highlighted in green) which have salaries as (`4800, 5000`

) and max is`5000`

(check output above).

We can also use the special boundaries `Window.unboundedPreceding`

, `Window.unboundedFollowing`

, and `Window.currentRow`

as we did previously with rangeBetween.

*Note: Ordering is not necessary with rowsBetween, but I have used it to keep the results consistent on each run.*

# Summary

I hope you have enjoyed learning about window functions in Apache Spark. In this blog, we discussed using window functions to perform operations on a group of data and have a single value/result for each record. We also discussed various types of window functions like aggregate, ranking and analytical functions including how to define custom window boundaries. You can find a Zeppelin notebook exported as a JSON file and also a Scala file on GitHub. In my next blog, I will cover various Array Functions available in Apache Spark.