Pyspark Window Functions -part2

Vivek Chaudhary
Plumbers Of Data Science
4 min readJul 10, 2023

In previous edition of windows function article we had covered rank(), dense_rank() and row_number(). Article link is below.

https://medium.com/plumbersofdatascience/pyspark-window-functions-1ef8f7cc1016

The objective of this article is to cover other more window functions.

Import Libraries and Create Dataset

from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, rank,
dense_rank, lead,lag, percent_rank, ntile
from pyspark.sql import functions as f
from pyspark.sql.functions import mean, col

df1=spark.createDataFrame([(‘ABC’,82371179,5059697396328,12047.30),
(‘ABC’,90361,4268839976,88604.31),
(‘ABC’,11858,40450708107,7895.11),
(‘ABC’,54653,1898557782,75923.40),
(‘ABC’,38094,1021673268,49407.87 ),
(‘ABC’,55020,40085011012,9618.90 ),
(‘ABC’,36740,9697265785,75923.40),
(‘ABC’,200003,1166060253,161637.52),
(‘ABC’,93561,7753211486,49407.87 ),
(‘XYZ’,87762,40254018689,5759.49 ),
(‘XYZ’,55020,40085011012,9618.90 ),
(‘XYZ’,36740,9697265785,75923.40),
(‘XYZ’,200003,1166060253,161637.52),
(‘XYZ’,93561,7753211486,49407.87 ),
(‘XYZ’,87762,50254018689,5759.49 )],
schema=’channel_code string, prod_id bigint,
prod_code string, sum_spent float’)

df1.show()

lead()

Calculates the next value in a column.

win=Window.partitionBy(df1[‘channel_code’]).\
orderBy(df1[‘sum_spent’].desc())

revenue_difference=(df1[‘sum_spent’]-lead(df1[‘sum_spent’]).over(win))

df1.select(df1[‘channel_code’],df1[‘prod_code’],
df1[‘sum_spent’],revenue_difference.\
alias(‘spent_diff_lead’)).show(truncate=False)

lag()

Calculates the previous value in a column.

win=Window.partitionBy(df1[‘channel_code’]). \
orderBy(df1[‘sum_spent’].desc())

revenue_difference=(df1[‘sum_spent’]-lag(df1[‘sum_spent’]).over(win))

df1.select(df1[‘channel_code’],df1[‘prod_code’],
df1[‘sum_spent’],revenue_difference. \
alias(‘spent_diff’)).show(truncate=False)

precent_rank()

returns the rank of rows in the percentage format.

win=Window.partitionBy(df1[‘channel_code’]). \
orderBy(df1[‘sum_spent’].desc())
df_round=df1.withColumn(‘percent_rank’,percent_rank().over(win))

df_round.select(df_round[‘channel_code’],df_round[‘prod_id’],
df_round[‘prod_code’],df_round[‘sum_spent’],
f.round(df_round[‘percent_rank’],2). \
alias(‘percent_rank’)).show(20,truncate=False)

ntile():

breaks the result set into specified number of equal groups.

ntile(2) represents that dataset will be divided into two groups.

win=Window.partitionBy(df1[‘channel_code’]). \
orderBy(df1[‘sum_spent’].desc())

df1.withColumn(“ntile”,ntile(2).over(win)).show(truncate=False)

rowsBetween()

It defines dynamic windows that depend on the row on which a calculation is being performed.

currentRow- indicates the current row
unboundedPreceding- indicates the first row of the group.
unboundedFollowing- indicates the last row of the group.

  1. unboundedPreceding and currentRow
winrow=Window.partitionBy(df1[‘channel_code’]) \
.orderBy(df1[‘sum_spent’].desc()) \
.rowsBetween(Window.unboundedPreceding,
Window.currentRow)

revenue_difference =(f.max(df1[‘sum_spent’]).over(winrow)-df1[‘sum_spent’])

df1.select(df1[‘channel_code’],df1[‘prod_code’],
df1[‘sum_spent’],revenue_difference \
.alias(“spend_difference”)).show(truncate=False)

For Ist iteration both preceding and currentRow value is 161637.52, hence the difference is 0.

For 2nd iteration, preceding value is 161637.52 and currentRow value is 88604.31, hence the difference is 73033.2.

For 3rd iteration, preceding value is 161637.52 and currentRow value is 75932.4, hence the difference is 85714.12.

2. unboundedPreceding and unboundedFollowing

winrow1=Window.partitionBy(df1[‘channel_code’]) \
.orderBy(df1[‘sum_spent’].desc()) \
.rowsBetween(Window.unboundedPreceding,
Window.unboundedFollowing)

spent_sum =(f.sum(df1[‘sum_spent’]).over(winrow1))

df1.select(df1[‘channel_code’],df1[‘prod_code’],
df1[‘sum_spent’],spent_sum \
.alias(“cumulative_spent”)).show(truncate=False)

3. currentRow and unboundedFollowing

winrow1=Window.partitionBy(df1[‘channel_code’]). \
orderBy(df1[‘sum_spent’].desc()). \
rowsBetween(Window.currentRow,
Window.unboundedFollowing)

spent_sum =(f.sum(df1[‘sum_spent’]).over(winrow1))

df1.select(df1[‘channel_code’],df1[‘prod_code’],df1[‘sum_spent’],
spent_sum.alias(“cumulative_spent”)).show(truncate=False)

rangeBetween()

Using the rangeBetween function, we can define the boundaries explicitly, it gives more control over the boundaries in comparison to rowsBetween. windows created from rangeBetween are more dynamic in nature.

winra=Window.partitionBy(df1[‘channel_code’]) \
.orderBy(df1[‘sum_spent’].desc()) \
.rangeBetween(Window.unboundedPreceding,
Window.currentRow)

revenue_difference =(f.max(df1[‘sum_spent’]).over(winra)-df1[‘sum_spent’])

df1.select(df1[‘channel_code’],df1[‘prod_code’],df1[‘sum_spent’],
revenue_difference.alias(“spend_difference”)) \
.show(truncate=False)

Output of rangeBetween(Window.unboundedPreceding, Window.currentRow) and rowsBetween(Window.unboundedPreceding,
Window.currentRow)
similar, so I have covered another scenario where we will have more control over the boundaries.

salary_df = spark.createDataFrame([
(“Admin”, 1, 50000),(“IT”, 2, 39000),
(“Admin”, 3, 48000),(“Admin”, 4, 48000),
(“IT”, 5, 35000),(“IT”, 12, 36000),
(“IT”, 22, 40000),(“Technology”, 7, 42000),
(“Technology”, 8, 60000),(“Technology”, 9, 45000),
(“Technology”, 10, 52000),(“Technology”, 11, 55000),
(“Prod_Engineering”, 13, 42000),(“Prod_Engineering”, 15, 60000),
(“Prod_Engineering”, 21, 45000),(“Prod_Engineering”, 23, 40000)],
schema=’dept string, empID int,salary int’)

salary_df.show()
win_funcrb1=Window.partitionBy(salary_df['dept']) \
.orderBy(salary_df['salary']) \
.rangeBetween(Window.unboundedPreceding,15000)

salary_df.withColumn('RangeBetween_max',
f.max(salary_df['salary']).over(win_funcrb1)).show()

Here we have created a boundary from unboundedPreceding to 15000 from currentRow and finding out the max value between range of values.

For group Technology, 42000 is the min and 60000 is the max.

In 1st iteration we have a range of values from 42000 + 15000 which is (42000,57000) the max value in this range is 55000.

In 2nd iteration we have a range of 45000+15000 which is (45000,60000) the max value in this range is 60000.

Summary:

· Pyspark window functions: lead(), lag(), percent_rank() and ntile().

· Boundary control clause such as rowsBetween() and rangeBetween() with arguments: unboundedPreceding, currentRow, unboundedFollowing.

--

--

Vivek Chaudhary
Plumbers Of Data Science

Aspiring Full Stack Web Developer, Full-time Data Engineer, Blogger by choice.