Using Snowflake’s Snowpark Pandas to process data at scale

By
Doris Lee (Product Manager, Snowflake)
Adrian Lee (Principal Solutions Engineer, Snowflake)

At the Data Cloud Summit, Snowflake announced the public preview of the Snowpark pandas API.

So what is Snowpark pandas, and how is it different from the normal pandas?

Pandas is a widely adopted library that is flexible and convenient — especially when it comes to manipulating DataFrames. However, processing data inside a normal pandas DataFrame can be inefficient when operating on datasets beyond its main memory, so developers have to scale up their virtual machines to accommodate the pandas DataFrame.

Snowpark pandas allows you to run pandas code directly on your data in Snowflake. Users who are familiar with pandas can simply import their pandas code into Snowpark — and with minimal code changes (changing the import statement) — and start leveraging the same familiar native pandas experience.

Under the hood, Modin is a big component powering Snowpark pandas. Modin is a popular open-source library that allows data users to be able to run pandas at scale on distributed computing backends. Snowflake acquired Ponder, who created Modin back in October 2023. With the release of Snowpark pandas, users are able to run pandas natively in Snowflake without moving their data out, taking advantage of the parallelization and governance in Snowflake to do data processing but with the same convenience as using pandas.

In this blog, we will explore the performance of Snowpark pandas and how it compares to regular pandas. For the sake of this comparison, we will be doing DataFrame operations such as reading, aggregations, group by, and Pivot Table operations from the tables.

You can use Snowpandas in Snowflake notebooks. To get started do follow this guide here.

The table we are using is a demand forecasting training base table. It includes food menu items with 10 million rows and has 952.5MB of storage. There are multiple dimensions inside this table such as the average sales of menu items across a 7-day period,14-day and 28-day period. In total, there are 50 columns inside the table. The size of the Warehouse used is XS.

Reading the data into a Snowpark pandas DataFrame

Let’s import Snowpark pandas and start reading the data into the pandas DataFrame. In this example of about 10 million rows, it took 4.58 seconds to read the data and produce the dataframe.

# Read data into a Snowpark pandas df
start = perf_counter()
spd_df = spd.read_snowflake("FROSTBYTE_TASTY_BYTES_V2.ANALYTICS.DEMAND_FORECAST_TRAINING_BASE")
print(spd_df)
end = perf_counter()
data_size = len(spd_df)
print(f"Took {end - start} seconds to read a table with {data_size} rows into Snowpark pandas!")

Pretty fast. But how does it compare to running it as compared to

  1. Reading the data into Snowpark dataframe and doing a conversion to pandas with the to_pandas() method
  2. Reading the data into a sql statement and performing a fetch_pandas_all method

Let us first explore using the to_pandas() method. With the to_pandas() method, we can see it took about 65 seconds to read that same 10 million rows and perform the casting to the pandas dataframe.

from IPython import display
start = perf_counter()
df_table = session.table("FROSTBYTE_TASTY_BYTES_V2.ANALYTICS.DEMAND_FORECAST_TRAINING_BASE")
df_table_pandas = df_table.to_pandas()
print(df_table_pandas)
end = perf_counter()
data_size = len(df_table_pandas)
print(f"Took {end - start} seconds to read a table with {data_size} rows into normal pandas using to_pandas()")

With Snowpark pandas API, we see almost more than 90% improvement in terms of time when reading that data into a Snowpark pandas dataframe as compared to perform the to_pandas conversion.

Now lets us try using the fetch_pandas_all method. As evidenced, we can see it took about 63.3 seconds. Again with the Snowpark pandas API, we see almost more than 90% improvement in terms of time when using the fetch_pandas_all() method.

# Read data into a local native pandas df
from IPython import display
start = perf_counter()
# Create a cursor object.
cur = session.connection.cursor()
# Execute a statement that will generate a result set.
sql = "select * from FROSTBYTE_TASTY_BYTES_V2.ANALYTICS.DEMAND_FORECAST_TRAINING_BASE"
cur.execute(sql)
# Fetch the result set from the cursor and deliver it as the pandas DataFrame.
native_pd_df = cur.fetch_pandas_all()
print(df_table_pandas)
end = perf_counter()
print(f"Native pandas took {end - start} seconds to read the data!")

Performing aggregations on the Snowpark pandas dataframe

Let us now try to perform aggregation on our pandas dataframe. We are going to take 12 columns

  • AVG_QUANTITY_7D_CITY_MENU_ITEM
  • AVG_QUANTITY_14D_CITY_MENU_ITEM
  • AVG_QUANTITY_28D_CITY_MENU_ITEM
  • AVG_SALES_7D_CITY_MENU_TYPE
  • AVG_SALES_14D_CITY_MENU_TYPE
  • AVG_SALES_28D_CITY_MENU_TYPE
  • AVG_SALES_7D_LOCATION
  • AVG_SALES_14D_LOCATION
  • AVG_SALES_28D_LOCATION
  • AVG_QUANTITY_7D_MENU_ITEM
  • AVG_QUANTITY_14D_MENU_ITEM
  • AVG_QUANTITY_28D_MENU_ITEM”

and perform a summation of all the columns stated.

#Aggregation of value
start = perf_counter()
spd_df = spd.read_snowflake("FROSTBYTE_TASTY_BYTES_V2.ANALYTICS.DEMAND_FORECAST_TRAINING_BASE")
result = spd_df[["AVG_QUANTITY_7D_CITY_MENU_ITEM",
"AVG_QUANTITY_14D_CITY_MENU_ITEM",
"AVG_QUANTITY_28D_CITY_MENU_ITEM",
"AVG_SALES_7D_CITY_MENU_TYPE",
"AVG_SALES_14D_CITY_MENU_TYPE",
"AVG_SALES_28D_CITY_MENU_TYPE",
"AVG_SALES_7D_LOCATION",
"AVG_SALES_14D_LOCATION",
"AVG_SALES_28D_LOCATION",
"AVG_QUANTITY_7D_MENU_ITEM",
"AVG_QUANTITY_14D_MENU_ITEM",
"AVG_QUANTITY_28D_MENU_ITEM"]].aggregate('sum')
print(spd_df)
end = perf_counter()
print(f"Took {end - start} seconds to finish doing a aggregation of columns!")+-

Pretty neat that it was able to perform this summation in 4.86 seconds including reading the data.

If we use the to_pandas() method we can see that this aggregation takes 65 seconds. Again with Snowpark pandas the improvement in time processing is almost more than 90%.​​

start = perf_counter()
df_table = session.table("FROSTBYTE_TASTY_BYTES_V2.ANALYTICS.DEMAND_FORECAST_TRAINING_BASE")
df_table_pandas = df_table.to_pandas()
df_table_pandas = df_table_pandas[["AVG_QUANTITY_7D_CITY_MENU_ITEM",
"AVG_QUANTITY_14D_CITY_MENU_ITEM",
"AVG_QUANTITY_28D_CITY_MENU_ITEM",
"AVG_SALES_7D_CITY_MENU_TYPE",
"AVG_SALES_14D_CITY_MENU_TYPE",
"AVG_SALES_28D_CITY_MENU_TYPE",
"AVG_SALES_7D_LOCATION",
"AVG_SALES_14D_LOCATION",
"AVG_SALES_28D_LOCATION",
"AVG_QUANTITY_7D_MENU_ITEM",
"AVG_QUANTITY_14D_MENU_ITEM",
"AVG_QUANTITY_28D_MENU_ITEM"]].aggregate('sum')
print(df_table_pandas)
end = perf_counter()
data_size = len(df_table_pandas)
print(f"Took {end - start} seconds to read a table with {data_size} rows into normal pandas using to_pandas()")

Similarly, if we use the fetch_pandas_all() method we can see that this aggregation takes 62 seconds. Again with Snowpark pandas the improvement in time processing is almost more than 90%.

cur = session.connection.cursor()
start = perf_counter()
sql = "select * from FROSTBYTE_TASTY_BYTES_V2.ANALYTICS.DEMAND_FORECAST_TRAINING_BASE"
cur.execute(sql)
# Fetch the result set from the cursor and deliver it as the pandas DataFrame.
native_pd_df = cur.fetch_pandas_all()
native_pd_df = native_pd_df[["AVG_QUANTITY_7D_CITY_MENU_ITEM",
"AVG_QUANTITY_14D_CITY_MENU_ITEM",
"AVG_QUANTITY_28D_CITY_MENU_ITEM",
"AVG_SALES_7D_CITY_MENU_TYPE",
"AVG_SALES_14D_CITY_MENU_TYPE",
"AVG_SALES_28D_CITY_MENU_TYPE",
"AVG_SALES_7D_LOCATION",
"AVG_SALES_14D_LOCATION",
"AVG_SALES_28D_LOCATION",
"AVG_QUANTITY_7D_MENU_ITEM",
"AVG_QUANTITY_14D_MENU_ITEM",
"AVG_QUANTITY_28D_MENU_ITEM"]].aggregate('sum')
print(native_pd_df)
end = perf_counter()
data_size = len(native_pd_df)
print(f"Took {end - start} seconds to read a table with {data_size} rows into normal pandas using to_pandas()")

Performing group by on the Snowpark pandas dataframe

Our next operation includes doing a group by and performing an average of all columns. For this, we are using the below columns and going to do a group by the PRIMARY_CITY and do an average on the rest of the columns

  • “PRIMARY_CITY”
  • “AVG_QUANTITY_7D_CITY_MENU_ITEM”,
  • “AVG_QUANTITY_14D_CITY_MENU_ITEM”,
  • “AVG_QUANTITY_28D_CITY_MENU_ITEM”,
  • “AVG_SALES_7D_CITY_MENU_TYPE”,
  • “AVG_SALES_14D_CITY_MENU_TYPE”,
  • “AVG_SALES_28D_CITY_MENU_TYPE”,
  • “AVG_SALES_7D_LOCATION”,
  • “AVG_SALES_14D_LOCATION”,
  • “AVG_SALES_28D_LOCATION”,
  • “AVG_QUANTITY_7D_MENU_ITEM”,
  • AVG_QUANTITY_14D_MENU_ITEM”,
  • “AVG_QUANTITY_28D_MENU_ITEM”

Again, with Snowpark pandas pretty impressive speed for it to complete in 5 seconds including reading the data, performing the group by and doing a mean.

#Aggregation of value
start = perf_counter()
spd_df = spd.read_snowflake("FROSTBYTE_TASTY_BYTES_V2.ANALYTICS.DEMAND_FORECAST_TRAINING_BASE")
result = spd_df[["PRIMARY_CITY","AVG_QUANTITY_7D_CITY_MENU_ITEM",
"AVG_QUANTITY_14D_CITY_MENU_ITEM",
"AVG_QUANTITY_28D_CITY_MENU_ITEM",
"AVG_SALES_7D_CITY_MENU_TYPE",
"AVG_SALES_14D_CITY_MENU_TYPE",
"AVG_SALES_28D_CITY_MENU_TYPE",
"AVG_SALES_7D_LOCATION",
"AVG_SALES_14D_LOCATION",
"AVG_SALES_28D_LOCATION",
"AVG_QUANTITY_7D_MENU_ITEM",
"AVG_QUANTITY_14D_MENU_ITEM",
"AVG_QUANTITY_28D_MENU_ITEM"]].groupby(['PRIMARY_CITY']).mean()
print(spd_df)
end = perf_counter()
print(f"Took {end - start} seconds to finish doing a group by of Primary City and performing mean with Snowpark pandas")

If we use the to_pandas() method we can see that this aggregation takes 67 seconds. Again with Snowpark pandas the improvement in time processing is almost more than 90%.

start = perf_counter()
df_table = session.table("FROSTBYTE_TASTY_BYTES_V2.ANALYTICS.DEMAND_FORECAST_TRAINING_BASE")
df_table_pandas = df_table.to_pandas()
df_table_pandas = df_table_pandas[["PRIMARY_CITY","AVG_QUANTITY_7D_CITY_MENU_ITEM",
"AVG_QUANTITY_14D_CITY_MENU_ITEM",
"AVG_QUANTITY_28D_CITY_MENU_ITEM",
"AVG_SALES_7D_CITY_MENU_TYPE",
"AVG_SALES_14D_CITY_MENU_TYPE",
"AVG_SALES_28D_CITY_MENU_TYPE",
"AVG_SALES_7D_LOCATION",
"AVG_SALES_14D_LOCATION",
"AVG_SALES_28D_LOCATION",
"AVG_QUANTITY_7D_MENU_ITEM",
"AVG_QUANTITY_14D_MENU_ITEM",
"AVG_QUANTITY_28D_MENU_ITEM"]].groupby(['PRIMARY_CITY']).mean()
print(df_table_pandas)
end = perf_counter()
data_size = len(df_table_pandas)
print(f"Took {end - start} seconds to read a table with {data_size} rows into normal pandas using to_pandas()")

Similarly, if we use the fetch_pandas_all() method we can see that this aggregation takes 64 seconds. Again with Snowpark pandas the improvement in time processing is almost more than 90%.

cur = session.connection.cursor()
start = perf_counter()
sql = "select * from FROSTBYTE_TASTY_BYTES_V2.ANALYTICS.DEMAND_FORECAST_TRAINING_BASE"
cur.execute(sql)
# Fetch the result set from the cursor and deliver it as the pandas DataFrame.
native_pd_df = cur.fetch_pandas_all()
native_pd_df = native_pd_df[["PRIMARY_CITY","AVG_QUANTITY_7D_CITY_MENU_ITEM",
"AVG_QUANTITY_14D_CITY_MENU_ITEM",
"AVG_QUANTITY_28D_CITY_MENU_ITEM",
"AVG_SALES_7D_CITY_MENU_TYPE",
"AVG_SALES_14D_CITY_MENU_TYPE",
"AVG_SALES_28D_CITY_MENU_TYPE",
"AVG_SALES_7D_LOCATION",
"AVG_SALES_14D_LOCATION",
"AVG_SALES_28D_LOCATION",
"AVG_QUANTITY_7D_MENU_ITEM",
"AVG_QUANTITY_14D_MENU_ITEM",
"AVG_QUANTITY_28D_MENU_ITEM"]].groupby(['PRIMARY_CITY']).mean()
print(native_pd_df)
end = perf_counter()
data_size = len(native_pd_df)
print(f"Took {end - start} seconds to read a table with {data_size} rows into normal pandas using to_pandas()")

Performing Pivot Table on the Snowpark pandas dataframe

Lastly, let us use a pivot table function to get the average quantity for the menu item by menu type organized by the respective cities.

Again, with Snowpark pandas pretty impressive speed for it to complete in 2.97 seconds including reading the data and performing the pivot table.

#Aggregation of value
start = perf_counter()
spd_df = spd.read_snowflake("FROSTBYTE_TASTY_BYTES_V2.ANALYTICS.DEMAND_FORECAST_TRAINING_BASE")
spd_df = spd_df.pivot_table(index=["PRIMARY_CITY"], columns="MENU_TYPE", values = "AVG_QUANTITY_14D_CITY_MENU_ITEM")
print(spd_df)
end = perf_counter()
print(f"Took {end - start} seconds to finish doing a pivot table with Snowpark pandas")

If we use the to_pandas() method we can see that this aggregation takes 66 seconds. Again with Snowpark pandas the improvement in time processing is more than 90%.

start = perf_counter()
df_table = session.table("FROSTBYTE_TASTY_BYTES_V2.ANALYTICS.DEMAND_FORECAST_TRAINING_BASE")
df_table_pandas = df_table.to_pandas()
df_table_pandas = df_table_pandas.pivot_table(index=["PRIMARY_CITY"], columns="MENU_TYPE", values = "AVG_QUANTITY_14D_CITY_MENU_ITEM")
print(df_table_pandas)
end = perf_counter()
data_size = len(df_table_pandas)
print(f"Took {end - start} seconds to finish doing a pivot table with to_pandas)")

Similarly, if we use the fetch_pandas_all() method we can see that this aggregation takes 65 seconds. Again with Snowpark pandas the improvement in time processing is almost more than 90%.

cur = session.connection.cursor()
start = perf_counter()
sql = "select * from FROSTBYTE_TASTY_BYTES_V2.ANALYTICS.DEMAND_FORECAST_TRAINING_BASE"
cur.execute(sql)
# Fetch the result set from the cursor and deliver it as the pandas DataFrame.
native_pd_df = cur.fetch_pandas_all()
native_pd_df = native_pd_df.pivot_table(index=["PRIMARY_CITY"], columns="MENU_TYPE", values = "AVG_QUANTITY_14D_CITY_MENU_ITEM")
print(native_pd_df)
end = perf_counter()
data_size = len(native_pd_df)
print(f"Took {end - start} seconds to read a table with {data_size} rows into normal pandas using fetch_pandas_all()")

Summary

In conclusion, we have seen how using Snowpark pandas has drastically improved the scale of processing data within a familiar framework. On average we have seen across the different operations, Snowpark pandas has had a massive performance improvement of more than 90%.

We have also linked the notebook of this blog into this github link

Have a go at Snowpark Pandas!

Resources

--

--