Benchmarking PySpark Pandas, Pandas UDFs, and Fugue Polars

Kevin Kho
fugue-project
Published in
7 min readApr 10, 2023

A case study on the performance of group-map operations on different backends.

Polar bear supercharged. Image by author.

Using the term PySpark Pandas alongside PySpark and Pandas repeatedly was very confusing. Because of this, I used the old name Koalas sometimes to make it easier to read. Koalas and PySpark Pandas are used equivalently in this article.

Introduction

There has been a lot of attention on the limitations of Pandas, especially concerning performance. As such, data practitioners are seeking other solutions to utilize resources more efficiently or scale beyond a single machine. This article will take a closer look at two such solutions in Polars and PySpark Pandas. For anyone not familiar with these tools, here is a quick introduction.

Polars is a Rust-based DataFrame library that is multithreaded by default. It can also handle out-of-core streaming operations. For a comparison with Pandas, this is a good resource.

PySpark Pandas (formerly known as Koalas) is a Pandas-like library allowing users to bring existing Pandas code to PySpark. The Spark engine can be leveraged with a familiar Pandas interface for people less familiar with Spark syntax.

So Polars handles multiprocessing on a single machine, and Koalas allows users to scale out over a cluster. It didn’t really make sense to compare these two frameworks because Polars was limited to a single machine, until now. Fugue is a project that ports Python and Pandas code to Spark, Dask, and Ray with minimal lines of code (as we’ll see below). Recently, the Fugue project added a Polars backend, which allows Polars users to take already existing Polars code, and apply it on Spark DataFrame over a cluster. Spark already has support for Pandas UDFs, and now we can use similar Polars UDFs on top of Spark, Dask, and Ray through Fugue.

With this integration, we set out to compare the execution time of Fugue + Polars, Pandas UDFs, and PySpark Pandas (Koalas). The benchmarks showed us some other exciting things worth further discussion.

Simulated Data and Setup

For our experiment, we used a DataFrame that looked like the following.

We generated ten float columns, and a timestamp for each record. The uid is a unique id for each group of data. We had 672 data points for each group. From here, we generated three datasets at 10,000 groups, 100,000 groups, and 1,000,000 groups to test how the solutions scaled. The biggest dataset has 672 million rows.

Some of the execution will depend on cluster configuration, so we’ll attach all of our execution details along with a link to a notebook in the appendix section. This was run on Databricks (at our own expense costing around $100). We also made sure to clear the cache before each code execution.

PySpark Pandas versus Pandas UDF

Forgetting Fugue and Polars for a second, we wanted to look at the performance of Koalas versus support for Pandas UDFs in PySpark. When Koalas was added to the main PySpark package (and renamed to PySpark Pandas), a blog was released that compared the performance of Koalas to Pandas.

While there is a significant performance improvement over single-core Pandas (in some cases), we wanted to compare it against bringing Pandas code to Spark in Pandas UDFs. We also compared it to native Spark, but that’s a topic for another article. How does Koalas groupby-apply compare to using the Pandas UDF of Spark?

The code below compares the overhead of Koalas and Pandas UDF. We get the first row of each partition and sum the first column. This is just the pure overhead from doing a dummy operation.

import pyspark pandas as pp
from pyspark.sql.functions import sum

def koalas_overhead(path):
print(pp.read_parquet(path).groupby("uid").apply(lambda x:x.head(1))["_0"].sum())

def pandas_udf_overhead(path):
df = spark.read.parquet(path)
df = df.groupby("uid").applyInPandas(lambda x:x.head(1), schema=df.schema)
print(df.select(sum(df["_0"])).toPandas())

This gives us the following results. At the three data sizes, we found that Pandas-UDF had significantly less overhead (between 75% to 80%). Both already have the ability to leave the business logic written in Pandas, applyInPandas just requires some additional syntax to learn. For execution with large data, though, the additional syntax can be well worth it.

PySpark Pandas versus Pandas UDF overhead

Benchmark Experiment

Moving on to a real use case, we calculated the z-score of the differences for each column of data. This is a common operation to find outlier events in each group and will take a bit more compute than basic operations. To benchmark performance, we need to define this function in both Pandas and Polars. Note that the logic will be for one group, and then we can use groupby-apply type semantics to bring it to Spark. More specifically, this operation is a grouped-map, which is a common use case. In the code snippet below, zscore_pd is the function for Pandas and zscore_pl is for Polars.

import polars as pl
import pandas as pd

# schema:*
def zscore_pd(df:pd.DataFrame, n) -> pd.DataFrame:
subdf = df[COLS]
df = df.sort_values("ts")
x = subdf.shift(1).rolling(n)
z=(subdf-x.mean()).abs()/x.std()
return df.assign(**{k:z[k] for k in COLS}).dropna()

# schema:*
def zscore_pl(df:pl.DataFrame, n:int) -> pl.DataFrame:
params = {}
for col in COLS:
mean = pl.col(col).shift().rolling_mean(n, min_periods=n)
std = pl.col(col).shift().rolling_std(n, min_periods=n)
params[col]=(pl.col(col) - mean).abs()/std
return df.sort("ts").with_columns(**params).drop_nulls()

Fugue Polars versus Koalas

We know that Polars is faster than Pandas on a local machine. Part of that is due to the parallelized nature, but it’s also faster because of the Rust engine. We were curious to see if using Spark to run Polars jobs would be faster than applyInPandas. Note that Polars is multicore by default, so we need to ensure that Polars jobs are limited to one thread. This can be done by setting POLARS_MAX_THREAD to 1.

In the code below, Fugue was used to bring the Pandas z-score to Spark instead of applyInPandas. We previously benchmarked Fugue’s overhead to be negligible, so this shouldn’t affect the results. Fugue’s transform()will use the appropriate Spark method under the hood based on the function (mapInPandas, applyInPandas, rdd.mapPartitions, mapInArrow, etc.) based on the user’s logic. To execute Polars on Spark, we simply replace the Pandas function with the Polars function, and Fugue will make the adjustments. The Python functions were left untouched and decoupled with any Spark dependency, leaving them highly testable.


import fugue.api as fa

# Fugue + Pandas
fa.transform(
path_to_file,
zscore_pd, # Pandas function
partition="uid",
params=dict(n=7*24),
engine=spark
)

# Fugue + Polars
fa.transform(
path_to_file,
zscore_pl, # Polars function
partition="uid",
params=dict(n=7*24),
engine=spark
)

The results below showed the Fugue + Pandas or Fugue + Polars can already be much faster than using PySpark Pandas for this type of operation (more than 50% faster for larger datasets). The results below also showed an improvement from using Polars over Pandas, but we wanted to see if we could improve further.

ZScore performance on different Spark interfaces

Polars with Coarse Partitioning

For Fugue, it inspired us to come up with a new partitioning option called coarse. Coarse allows each partition to have multiple groups of data, and then we can let Polars handle the inner groupby. We only had to modify our code a bit for this to include an over clause in the Polars function.

# schema:*
def zscore_pl_gp(df:pl.DataFrame, n:int) -> pl.DataFrame:
params = {}
for col in COLS:
mean = pl.col(col).shift().rolling_mean(n, min_periods=n).over("uid")
std = pl.col(col).shift().rolling_std(n, min_periods=n).over("uid")
params[col]=(pl.col(col) - mean).abs()/std
return df.sort("ts").with_columns(**params).drop_nulls()

fa.transform(
path,
zscore_pl_gp,
partition=dict(by="uid", algo="coarse"),
params=dict(n=7*24),
engine=spark
)

In the results below, we found significant improvement with the coarse partitioning strategy. In fact, it was even faster than the Pandas UDF and Koalas overhead we benchmarked earlier. This is likely because we have much less overhead partitioning and just rely on Polars to perform the groupby on a single machine.

Using Spark and Polars together through Fugue can be a very fast solution for this group-map use case. It can also be done with minimal lines of code, and incrementally adopted. The transform()function above can take in a Spark DataFrame and return a Spark DataFrame after the Polars code is executed (and will work similarly for Dask and Ray). Fugue is meant to be minimally invasive, so it can also be used to scale the execution of a single step.

ZScore performance on different Spark interfaces

Conclusion

Through our benchmarking, we found:

  1. In some cases, Spark operations like mapInPandas or applyInPandas can be significantly faster than PySpark Pandas (Koalas), and be an alternative way to port Python or Pandas code.
  2. Using Fugue and Polars together can provide significant speed-up, even faster than Spark’s applyInPandas in some cases.

Using Fugue allowed us to use Spark and Polars synergistically (and the same setup will work on Dask and Ray). We were able to use Spark to partition the data and Polars to apply the compute logic with very minimal lines of code. Fugue lets users combine the best features of multiple tools to improve the experience of working on big data.

In a following article, we will show the results of this z-score benchmark against native Spark.

Resources

  1. Fugue Repo
  2. Polars Repo
  3. Fugue Polars

Appendix

These experiments were run on Databricks Runtime 12.2 LTS with a cluster of 8 m5d.4xlarge machines with 16 cpus each. We also used:

  • Fugue 0.8.3
  • Polars 0.16.14
  • Spark 3.3

The full notebook used to execute these can be found here. Code was simplified for the article.

--

--