How to get rid of loops and use window functions, in Pandas or Spark SQL

source: https://pixabay.com/en/stained-glass-spiral-circle-pattern-1181864/

Every software developer knows that iterating through rows of a dataset is one sure killer of performance. Loops are bad. Vectorized operations (operations that work on entire arrays) are good. Pandas, the Python library for data analysis, (https://pandas.pydata.org/), has vectorized operations for everything, allowing for great performance. For more on this topic, see an excellent article by Sofia Heisler here: https://engineering.upside.com/a-beginners-guide-to-optimizing-pandas-code-for-speed-c09ef2c6a4d6.

So, we can add a new calculated column to a Pandas dataframe, in one quick operation:

Same in Spark:

But what if we need to calculate the row’s value using values from adjacent rows instead? How do we calculate cumulative totals? Are we stuck looping through rows?

Enter window functions

Window functions can do exactly what we need: look at surrounding rows to calculate the value for the current row. They are especially useful together with partitioning (in Spark) or grouping (in Pandas), to limit which records are included in the current window. Let’s look closer at some use cases for window functions.

Calculate totals from deltas, or cumulative sum

Here is an example dataset that contains a set of online article URLs, for which we track whenever they get new social network shares (Twitter, Facebook, etc.):

Now we want to calculate a total value at ts for each row, summing up all values from previous rows for the same service and url. Here is how to do it with Pandas:

With pyspark:

PARTITION BY url, service clause makes sure the values are only added up for the same url and service. The same is ensured in Pandas with .groupby. We order records within each partition by ts, with .sort_values in Pandas and ORDER BY in Spark SQL.

Calculate deltas from totals

Sometimes, we need to do the reverse: calculate deltas in each row from total values. With Pandas, use .diff:

When the URL is shared for the first time, there’s no previous records to diff with, so we get a null delta. In [9], we set the value of delta to be equal to total for those rows.

With pyspark, use the LAG function:

Pandas lets us subtract row values from each other using a single .diff call. In pyspark, there’s no equivalent, but there is a LAG function that can be used to look up a previous row value, and then use that to calculate the delta. In Pandas, an equivalent to LAG is .shift. Both LAG and .shift take an offset parameter to tell them how many rows to look back (or forward). In pyspark, LAG looks back, and LEAD looks forward. In Pandas, .shift replaces both, as it can accept a positive or negative offset.

Rank things

It is often useful to show things like “Top N products in each category”. In this dataset, we have aggregated share counts per service and url, for a certain period. We want to see the 3 most shared URLs for each service. Pandas code uses .rank:

With pyspark, using a SQL RANK function:

In Spark, there’s quite a few ranking functions:

  • RANK
  • DENSE_RANK
  • ROW_NUMBER
  • PERCENT_RANK

The last one (PERCENT_RANK) calculates percentile of records that fall within the current window. It is equal to:

(rank — 1) / (total_num_of_rows — 1)

The first three mainly differ in how they break ties. Here is an example to illustrate the differences:

Aggregate without losing information

All the examples so far followed the same pattern: look one row back or one row forward to calculate the value in the current row. But what if we needed to expand our window to include, say, two rows back? Or, what if we want to use a time interval to size our window, for example, to calculate a 5-minute average?

In case of a 5-minute average, we could round the ts field, group on service, url and the rounded values of ts, and calculate averages per group. But in this case, we do not see the original records that contributed to the 5-minute average. Trying to write a GROUP BY query for a 3-row-window would be a SQL developer nightmare.

With pyspark, ROWS BETWEEN clause is used to size the window relative to the current row:

With Pandas, things get a little more complicated:

What is going on here?

In [16], we create a new dataframe by grouping the original df on url, service and ts and applying a .rolling window followed by a .mean. The rolling window of size 3 means “current row plus 2 preceding”. Unfortunately, the new ro dataframe now has a different index from the original df, so we can‘t simply take a column from one and assign it to another. Instead, in [17], we .merge the two dataframes on their key columns (an equivalent of SQL JOIN).

Note that in Pandas, we use min_periods=1 to say “If we don’t have 3 records in a window, use however many we have to calculate the mean, even if it’s just one current row.”. Spark has this behavior by default.

What about the 5-minute average?

In pyspark, use RANGE BETWEEN INTERVAL. This tells the window to include records within 5 minutes back in time from current row, for which the ts field from ORDER BY clause is used:

In Pandas, specify an offset (window=”5min”), instead of an integer:

What’s next?

Here are some excellent articles on window functions in pyspark, SQL and Pandas:

For more information on function parameters and usage, refer to the documentation:

The csv files and an ipython notebook with code snippets used in this article are shared on github:

I concentrated on Spark SQL and Pandas here, but the same queries can work in many relational databases, such as MS SQL Server, Oracle, PostgreSQL. PostgreSQL documentation is a great resource on window functions:

Window functions can seem like pure wizardry at first, but there is no need to be intimidated by them. Now you can impress others with your SQL or Pandas wizardry!