An introduction to Window Functions in Apache Spark with Examples

Suffyan Asad
8 min readNov 20, 2022

--

An introduction to Window functions in Apache Spark. Creating windows on data in Spark using partitioning and ordering clauses, and performing aggregations and ranking functions on them.

Running windows on Spark DataFrame

Introduction to Window Functions

Window functions in Apache Spark SQL are conceptually similar to the window functions in SQL — they divide the data into “windows” or bounds, and allow performing operations within those windows.

Background — Window Functions in SQL

As mentioned, window functions divide the data into windows, and allow performing operations within these windows.

Windows can be defined to be as large as the entire table, or smaller. Operations that can be performed can be aggregations, such as sum, maximum, average, or order based, such as rank, dense rank, running totals etc.

Defining Window bounds — Partition clause

Window bounds can be specified by using a partition clause, which is similar to a group by clause in SQL. If a partition clause is not specified, the entire data set becomes one partition.

Within each window, the data can be ordered using the order by function. Ordering of data is compulsory to run functions such as rank, dense rank. In other functions such as sum, average, order by is not required.

Two windows — partitioned by Groping Column 1

Alternatively, for running aggregates, such as running sum, sliding windows can be defined using a window definition relative to each row. The following diagram represents a window of 3 rows — current row and preceding 2 rows.

Example of a running window

Running aggregates including sum, maximum, minimum etc. can be performed over these windows. Running windows can also be specified without bounds, but ordering clause is necessary to determine the rows relative to each row.

A good reference article I have found about Window functions in SQL is:

Window Function in Apache Spark

Window functions in Apache Spark are similar to the window functions in SQL. They create bounds on the data, and perform operations within the scope of those bounds.

Defining the Window

In Spark, a Window can be defined by using the pyspark.sql.Window class in PySpark, or using the org.apache.spark.sql.expressions.Window in the Spark API in Scala/Java.

Because the code examples in this article are in Python, going forward, we’ll refer to the PySpark API only. Reference Spark documentation on pyspark.sql.Window class:

Defining the window involves specifying the following:

  • partition clause
  • order clause
  • bounds clause

All three are optional, and which one is necessary for a particular situation depends on the operations that are performed on the window. For example, the rank function needs an order, therefore, it requires an order clause to be present in the window definition.

Caution: Not specifying a partition clause would result in all data to be packed in a single partition in Spark. This results in all data going to a single worker, and this can result in out-of-memory exceptions.

pyspark.sql.Window has following functions to define the window:

  • partitionBy(*cols) - to specify the partition columns
  • orderBy(*cols) - to specify the order columns. Descending order can be specified by using the desc() function of the pyspark.sql.column class.
  • rowsBetween(start, end) - to specify the sliding window bounds in terms of preceding rows (start) and following rows (end). 0 refers to the current row, negative number refers to a preceding row, and positive number refers to a following row. The order that is considered to determine the preceding and following rows can be defined using the orderBy function.
  • rangeBetween(start, end) - to specify the sliding window bounds in terms of values of the ordering column (instead of number of rows preceding or following each row).

After defining the window as a variable, it can then be used in functions: aggregate functions, or sliding window functions as needed. For example:

Example: A window on the `partition_col` column.

Difference between rowsBetween and rangeBetween

  • The rowsBetween function creates the running window based on number of rows, ordered in the specified order, following and preceding each row.
  • The rangeBetween function created the running window based on the value of ordering column.

For example, if the value of ordering column is 9, rowsBetween(-3, -0) will simply include the preceding 3 rows relative to the current row, and the current row itself in the window. But, the rangeBetween(-3, 0) will apply the -3 and 0 parameters on the value of the ordering column i.e. 9, and will include rows that have the values between 6 and 9 in the window. This concept has been illustrated in the following diagram:

Difference between windows defined using rowsBetween and rangeBetween functions

Because the parameters provided to the rangeBetween function are applied to the values of the ordering column, the following restrictions apply on the use of rangeBetween function:

  • The ordering column must be numeric.
  • There must be only one ordering column.

Please refer to the following documentation on the rangeBetween function for more details:

Examples

Next, lets see the window function in action using some examples. But first, a look into the example data and setting up the example project.

Example Database — Adventure Works

The database used in this article for example queries is Adventure Works 2014 example database by Microsoft.

Link to Adventure Works:

A docker image running PostgreSQL database with Adventure Works 2014 loaded in it can be obtained from:

This docker image provides a simple way to have the example database running.

Additionally, the purpose of using a database, Adventure Works in this case, is to first write the examples queries in SQL, and then write them in Spark.

Code repository

Complete project containing code examples is present on github:

Setting up code and loading sales.salesorderheader table into Spark

The data can be loaded into Spark using the jdbc() function on DataFrameReader object. Additionally, a JDBC driver for PostgreSQL is required, and it has to be added in the project. The driver can be downloaded from:

Please refer to my article on parallelizing loading large amount of data into Spark from JDBC sources:

The following code creates the spark session and loads data into spark from the sales.salesorderheader table in Adventure Works database:

Setting up code and reading from the database over JDBC

Example Query 1 — Total sales of the day and rank of sale by amount due

The first query is to create the following columns over the sales.salesorderheader table:

  • Sum of all sales of the day
  • Rank of each sale within the day based on descending order of the totaldue column

The required result can be created using the following SQL query:

Example Query 1 implementation in SQL

And it produces the following result:

Result of Example Query 1 in SQL

In Spark SQL, the same operation can be written as:

Example Query 1 implementation in PySpark

The query first defines two windows, using the pyspark.sql.Window. First window is for the sum of day calculation, and it is partitioned by orderdate column. The window to determine rank of each sale is partitioned by orderdate and ordered by totaldue in descending order.

The sum_of_day column is created by calling Spark SQL sum function over sum_window. Similarly, the sale_rank column is created by calling the rank function over the rank_window.

The code, when executed, gives the following result:

Result of Example Query 1 in Spark

Points to note

  • Note that the rank function relies on the order of rows in the window, and if window is not ordered, the code throws pyspark.sql.utils.AnalysisException exception.
  • If partition clause is omitted, the entire data set is considered one partition. In this case, Spark gives the following warning:
WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation
  • Also, if order is specified in the window used to calculate sum, it sums the values up like a running total, i.e. sums all preceding values including current value for each row.
  • Finally, window functions introduce shuffles due to partitioning and ordering of data.

Example 2

The second example query does the following:

  1. Sum sales by date and territory.
  2. Within each territory, create a running average of total sales per day of previous 3 days.

This can be done using the following SQL:

Example Query 2 implementation in SQL

And it produces the following result:

Result of Example Query 2 in SQL

This running total can be implemented in Spark SQL as:

Implementation of Example Query 2 in PySpark

Rows between range can be specified using the rowsBetween function. The code produces the following result:

Result of Example Query 2 in Spark

Conclusion — Key takeaways

  • Apache Spark provides pyspark.sql.Window class in pyspark API to create windows over data frames.
  • Partitioned, ordered, and running windows can be created over data frames, and they are similar to Window functions in SQL.
  • Several functions such as aggregations, rank, etc. can be performed over windows.

Please share your feedback in comments.

--

--

Suffyan Asad

Data Engineer | Passionate about data processing at scale | Fulbright and George Washington University alum | https://pk.linkedin.com/in/suffyan-asad-421711126