An introduction to Window Functions in Apache Spark with Examples
An introduction to Window functions in Apache Spark. Creating windows on data in Spark using partitioning and ordering clauses, and performing aggregations and ranking functions on them.
Introduction to Window Functions
Window functions in Apache Spark SQL are conceptually similar to the window functions in SQL — they divide the data into “windows” or bounds, and allow performing operations within those windows.
Background — Window Functions in SQL
As mentioned, window functions divide the data into windows, and allow performing operations within these windows.
Windows can be defined to be as large as the entire table, or smaller. Operations that can be performed can be aggregations, such as sum, maximum, average, or order based, such as rank, dense rank, running totals etc.
Defining Window bounds — Partition clause
Window bounds can be specified by using a partition clause, which is similar to a group by
clause in SQL. If a partition clause is not specified, the entire data set becomes one partition.
Within each window, the data can be ordered using the order by
function. Ordering of data is compulsory to run functions such as rank, dense rank. In other functions such as sum, average, order by
is not required.
Alternatively, for running aggregates, such as running sum, sliding windows can be defined using a window definition relative to each row. The following diagram represents a window of 3 rows — current row and preceding 2 rows.
Running aggregates including sum, maximum, minimum etc. can be performed over these windows. Running windows can also be specified without bounds, but ordering clause is necessary to determine the rows relative to each row.
A good reference article I have found about Window functions in SQL is:
Window Function in Apache Spark
Window functions in Apache Spark are similar to the window functions in SQL. They create bounds on the data, and perform operations within the scope of those bounds.
Defining the Window
In Spark, a Window can be defined by using the pyspark.sql.Window
class in PySpark, or using the org.apache.spark.sql.expressions.Window
in the Spark API in Scala/Java.
Because the code examples in this article are in Python, going forward, we’ll refer to the PySpark API only. Reference Spark documentation on pyspark.sql.Window
class:
Defining the window involves specifying the following:
- partition clause
- order clause
- bounds clause
All three are optional, and which one is necessary for a particular situation depends on the operations that are performed on the window. For example, the rank
function needs an order, therefore, it requires an order clause to be present in the window definition.
Caution: Not specifying a partition clause would result in all data to be packed in a single partition in Spark. This results in all data going to a single worker, and this can result in out-of-memory exceptions.
pyspark.sql.Window
has following functions to define the window:
partitionBy(*cols)
- to specify the partition columnsorderBy(*cols)
- to specify the order columns. Descending order can be specified by using thedesc()
function of thepyspark.sql.column
class.rowsBetween(start, end)
- to specify the sliding window bounds in terms of preceding rows (start) and following rows (end). 0 refers to the current row, negative number refers to a preceding row, and positive number refers to a following row. The order that is considered to determine the preceding and following rows can be defined using theorderBy
function.rangeBetween(start, end)
- to specify the sliding window bounds in terms of values of the ordering column (instead of number of rows preceding or following each row).
After defining the window as a variable, it can then be used in functions: aggregate functions, or sliding window functions as needed. For example:
Difference between rowsBetween
and rangeBetween
- The
rowsBetween
function creates the running window based on number of rows, ordered in the specified order, following and preceding each row. - The
rangeBetween
function created the running window based on the value of ordering column.
For example, if the value of ordering column is 9, rowsBetween(-3, -0)
will simply include the preceding 3 rows relative to the current row, and the current row itself in the window. But, the rangeBetween(-3, 0)
will apply the -3 and 0 parameters on the value of the ordering column i.e. 9, and will include rows that have the values between 6 and 9 in the window. This concept has been illustrated in the following diagram:
Because the parameters provided to the rangeBetween
function are applied to the values of the ordering column, the following restrictions apply on the use of rangeBetween
function:
- The ordering column must be numeric.
- There must be only one ordering column.
Please refer to the following documentation on the rangeBetween
function for more details:
Examples
Next, lets see the window function in action using some examples. But first, a look into the example data and setting up the example project.
Example Database — Adventure Works
The database used in this article for example queries is Adventure Works 2014 example database by Microsoft.
Link to Adventure Works:
A docker image running PostgreSQL database with Adventure Works 2014 loaded in it can be obtained from:
This docker image provides a simple way to have the example database running.
Additionally, the purpose of using a database, Adventure Works in this case, is to first write the examples queries in SQL, and then write them in Spark.
Code repository
Complete project containing code examples is present on github:
Setting up code and loading sales.salesorderheader
table into Spark
The data can be loaded into Spark using the jdbc()
function on DataFrameReader
object. Additionally, a JDBC driver for PostgreSQL is required, and it has to be added in the project. The driver can be downloaded from:
Please refer to my article on parallelizing loading large amount of data into Spark from JDBC sources:
The following code creates the spark session and loads data into spark from the sales.salesorderheader
table in Adventure Works database:
Example Query 1 — Total sales of the day and rank of sale by amount due
The first query is to create the following columns over the sales.salesorderheader
table:
- Sum of all sales of the day
- Rank of each sale within the day based on descending order of the
totaldue
column
The required result can be created using the following SQL query:
And it produces the following result:
In Spark SQL, the same operation can be written as:
The query first defines two windows, using the pyspark.sql.Window
. First window is for the sum of day calculation, and it is partitioned by orderdate
column. The window to determine rank of each sale is partitioned by orderdate
and ordered by totaldue
in descending order.
The sum_of_day
column is created by calling Spark SQL sum function over sum_window
. Similarly, the sale_rank
column is created by calling the rank function over the rank_window
.
The code, when executed, gives the following result:
Points to note
- Note that the rank function relies on the order of rows in the window, and if window is not ordered, the code throws
pyspark.sql.utils.AnalysisException
exception. - If partition clause is omitted, the entire data set is considered one partition. In this case, Spark gives the following warning:
WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation
- Also, if order is specified in the window used to calculate sum, it sums the values up like a running total, i.e. sums all preceding values including current value for each row.
- Finally, window functions introduce shuffles due to partitioning and ordering of data.
Example 2
The second example query does the following:
- Sum sales by date and territory.
- Within each territory, create a running average of total sales per day of previous 3 days.
This can be done using the following SQL:
And it produces the following result:
This running total can be implemented in Spark SQL as:
Rows between range can be specified using the rowsBetween function. The code produces the following result:
Conclusion — Key takeaways
- Apache Spark provides
pyspark.sql.Window
class inpyspark
API to create windows over data frames. - Partitioned, ordered, and running windows can be created over data frames, and they are similar to Window functions in SQL.
- Several functions such as aggregations, rank, etc. can be performed over windows.
Please share your feedback in comments.