So You Want to…. Manipulate a Table in Snowpark Python

Photo by Mika Baumeister on Unsplash

Manipulating a table is easy in Snowflake, but getting started can sometimes be confusing. In this post I’m going to show you how to do this in a number of different ways, showing the pros and cons associated with each.

For the time pressed, here’s the summary:

  1. SQL: Best for when you know exactly how to do something in SQL and you just want to get past this bit and onto more interesting python.
  2. SPROCs: Best for when you just want to lift and drop (presumably complex) python code, and don’t have data volumes/performance demands that necessitate parallelisation.
  3. Snowpark DataFrames: Best for when you’re writing from scratch or migrating away from Spark
  4. UDFs: Best for complex manipulations that will only result in a single column, like running a machine learning prediction.
  5. UDTFs: Best for when you need a stateful interaction between rows, like a rolling average (but presumably more complex) or when you have partitions that you want to treat distinctly.

Prerequisites

  • A Snowflake Account (and a username, password etc.)
  • A Python environment (that has permissions to connect to Snowflake and Snowflake’s Python API installed on it)

0. Housekeeping: Create your session, virtual warehouse and database

0.1 First up get your session up and running:

from snowflake.snowpark.session import Session

connection_parameters = {
"account": "your accountname", # insert your account name here
"user": "your username", # insert your username here
"password": "your password", # you get it...
"role": "ACCOUNTADMIN" # (or whatever role you want)
}
session = Session.builder.configs(connection_parameters).create()

Note — better to do the above via a json, or via a package like “getpass” but this is fine for now

0.2 Next, get a virtual warehouse, database and a stage setup

# Create a Virtual Warehouse
session.sql("CREATE OR REPLACE WAREHOUSE MY_FIRST_WH WITH WAREHOUSE_SIZE='X-SMALL'").collect()

# Create Database
session.sql("CREATE OR REPLACE DATABASE MY_FIRST_DB").collect()

# Create a Stage
session.sql('CREATE OR REPLACE STAGE PIPELINE').collect()

0.3 Finally, Create Some Data (and write it to Snowflake)

import pandas as pd
pandas_df = pd.DataFrame()
pandas_df['DUMMY_1'] = list(range(0,10000))
pandas_df['DUMMY_2'] = list(range(0,10000))
session.write_pandas(pandas_df, table_name='DUMMY_TABLE',
auto_create_table=True, overwrite=True)

You could also load a file — see here for a simple rundown.

1. Using SQL

Best for when you know exactly how to do something in SQL and you just want to get past this bit and onto more interesting python.

1.1 First, Use SQL to sum Dummy_1 and Dummy_2 and create a table

session.sql("""CREATE TABLE NEW_TABLE_SQL AS
SELECT dt.DUMMY_1, dt.DUMMY_2, dt.DUMMY_1 + dt.DUMMY_2 AS SUM_COL
FROM DUMMY_TABLE dt""").collect()

1.2 Finally, test that it worked

session.sql("""select * from NEW_TABLE_SQL""").collect()

1.3 Conclusion

Pretty painless, and it will also be performant, but stepping from one language to another is generally hard on the head and hard for others reading/reviewing the code.

2. Using a Sproc

Best for when you just want to lift and drop (presumably complex) python code, and don’t have data volumes/performance demands that necessitate parallelisation.

2.1 First, write your SPROC

def sum_sproc(
session: Session,
input_table: str,
output_table: str,
)-> None:
"""
Take in a dataframe, add a column, return it

"""

# convert as pandas DF
data = session.table(input_table).to_pandas()

# Your manipulations (here we just add two columns)
data['SUM_COL'] = data['DUMMY_1'] + data['DUMMY_2']
# anything you like here

# Write the table back to Snowflake
session.write_pandas(data, table_name=output_table,
auto_create_table=True, overwrite=True)

# return data.to_dict() # not used here, but we could return to user as a dict

2.2 Next, we register the SPROC

sum_sproc = session.sproc.register(
func=sum_sproc, # the name of the function (see cell above)
name='sum_sproc', # the name of the function once stored in SF
is_permanent=True, # store it permanently?
replace=True, # replace anything that was there under this name
stage_location='@PIPELINE', # the stage where we store it
packages=['snowflake-snowpark-python','pandas'],) # the packages the fucntion uses

2.3 Next, we run the SPROC

sum_sproc('DUMMY_TABLE', 'NEW_TABLE_SPROC', session=session)

2.4 Finally, test that it worked

session.table('NEW_TABLE_SPROC').to_pandas() 

2.5 Conclusion

This feels ok, but the SPROC bits are going to be a bit of a nuisance if you are trying to do something simple. But it is very low lift aside from that — perfect if you’ve got a high complexity task in your current pipeline that you just want to migrate without fuss.

Beware: SPROCs run on a single node, so while you might have performant python code, you won’t be parallelising it at all in Snowflake.

3. In Snowpark Data Frames

Best for when you’re writing from scratch or migrating away from Spark

3.1 First, manipulate directly in Snowpark

snowpark_df = session.create_dataframe(pandas_df)
snowpark_df = snowpark_df.with_column('SUM_COL',
snowpark_df.col("DUMMY_1") +
snowpark_df.col("DUMMY_2"))

3.2 Next, write the Snowpark DataFrame to a table

snowpark_df.write.mode("overwrite").save_as_table("NEW_TABLE_SPDF")

3.3 Finally, test that it worked

session.sql("""select * from NEW_TABLE_SPDF""").collect()

3.4 Conclusion

It’s compact, it’s simple, it’s very similar to Spark (and Pandas) syntax. Not only this but it lets you tap in to all the distributed compute capabilities that Snowflake has, without any hassle/config on your part.

4. Using a (vectorised) UDF

Best for complex manipulations that will only result in a single column, like running a machine learning prediction.

4.1 First, write your UDF

def sum_udf(df: T.PandasDataFrame[int, int]) -> T.PandasSeries[int]:
import pandas as pd
df.columns = ["DUMMY_1", "DUMMY_2"]
df['NEW_COL'] = df['DUMMY_1'] + df['DUMMY_2']
output = pd.Series(df['NEW_COL']) # note what I call this column is irrelevant here, we're returning a Series
return output

Note the translation of a snowpark dataframe to a pandas dataframe doesn’t include column headers, so if you need them you will need to write them in (see line 3 above)

4.2 Next, register your UDF

sum_udf_vec = session.udf.register(
func=sum_udf,
name="sum_udf",
stage_location='@PIPELINE',
input_types=[T.FloatType()]*2,
return_type = T.IntegerType(),
replace=True,
is_permanent=True,
packages=['pandas'],
max_batch_size=10, # <-- set this if you're concerned about memory mgmt, in this case we could leave it blank considering the simplicity
session=session)

4.3 Next, run the UDF via Snowpark

feature_cols = list(pandas_df.columns)
snowpark_df = session.create_dataframe(pandas_df)
snowpark_df = snowpark_df.with_column("SUM",
sum_udf_vec(*feature_cols))

Note snowpark_df.with_colum as we are only creating a single new column

4.4 Next, write the Snowpark DataFrame to a table

snowpark_df.write.mode("overwrite").save_as_table("NEW_TABLE_UDF")

4.5 Finally, test that it worked

session.sql("""select * from NEW_TABLE_UDF""").collect()

4.6 Conclusion

This is probably the go-to for most data scientists when they are performing relatively complex manipulations like running a machine learning inference job on data. UDFs do have the downside that they only deliver a single row back to the user (you can work around this by embedding two columns into one, but that defeats the presumed desire of simplicity). Better to make these manipulations in discrete steps, generating a new column one at a time.

Note this looks a lot like writing it directly in Snowpark, really all that changes is the moving the definition of the function to a UDF, but it’s easy to see how this is going to be cleaner for anything of even modest complexity.

5. Using a UDTF

Best for when you need a stateful interaction between rows, like a rolling average (but presumably more complex) or when you have partitions that you want to treat distinctly.

5.1 First, write your UDTF

from typing import Iterable, Tuple
class Sum_UDTF:
def process(self, DUMMY_1: int, DUMMY_2: int) -> Iterable[Tuple[int]]:
yield(DUMMY_1 + DUMMY_2, DUMMY_1 * DUMMY_2)

Note I added in 2 columns for the output here to demonstrate the ability of UDTFs to deliver more than 1 column back to the user

5.2 Next, register your UDTF

sum_udtf = session.udtf.register(Sum_UDTF,
name="sum_udtf",
output_schema=["SUM","PRD"],
input_types = [T.IntegerType(),
T.IntegerType()],
is_permanent=False,
replace=True)

5.3 Next, execute your UDTF

snowpark_df = snowpark_df.with_columns(
["SUM","PRD"], # your new col names
[sum_udtf(snowpark_df['DUMMY_1'], snowpark_df['DUMMY_2'])]) #note the [] around the sum_udtf call

Note snowpark_df.with_columns as we are only creating a multiple new columns. This and the need to wrap the other argument in a list are easy misses that can lead to frustration.

5.4 Next, write the Snowpark DataFrame to a table

snowpark_df.write.mode("overwrite").save_as_table("NEW_TABLE_UDTF")

5.5 Finally, test that it worked

session.sql("""select * from NEW_TABLE_UDTF""").collect()

Note you may lose the ordering of the data this way, it doesn’t impact the final result, but worth knowing

5.5 Conclusion

This feels a little clunky, but given we’re not really using a UDTF for its intended purpose that’s to be expected. What is going on under the hood is a row by row analysis of the dataframe which enables stateful calculations. This is the sort of thing you might want to do if you were creating some rolling total, or perhaps partitioning over data (more to come on this in future posts). Personally, I’d avoid UDTFs for these types of transformations.

Wrapping Up

We’ve looked at a variety of different ways to manipulate tabular data in Snowflake via Python. Depending on the circumstances one of them may be preferable, but they all have their uses.

Stay tuned for a deeper dive into what is going on under the hood of these functions.

--

--

Michael Taylor
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

Equal parts data scientist, consultant, data privacy wonk, animal lover, basketball coach/player and cook. Thoughts are my own, not my employers