Your Cheatsheet to Snowflake Snowpark Dataframes Using Python

In this article, we will quickly understand how we can use Snowflake’s Snowpark API for our workflows using Python.

Executive Summary

  • Sandbox Environment Creation
  • Setting Up Session
  • Using Snowpark Dataframes

Sandbox Environment Creation:

Before you set up your development environment, you should be aware of the following points:

  1. The Snowpark API requires Python 3.8.
  2. You can create a virtual Python Environment using Anaconda, Miniconda, or virtualenv.

You can use conda to setup Python 3.8 on a virtual environment and add Snowflake Conda Channel along with the libraries.

conda create --name py38_env --override-channels -c https://repo.anaconda.com/pkgs/snowflake python=3.8 numpy pandas

You must install Snowpark Python Packages to the environment along with the jupyter notebook. Here’s a quick start:

conda install snowflake-snowpark-python
pip install notebook

Setting Up Session

The first step in using the library is establishing a session with the Snowflake database.

from snowflake.snowpark import Session

To create the session:

  1. Create a Python dictionary (dict) containing the names and values of the parameters for connecting to Snowflake.
  2. Pass this dictionary to the Session.builder.configs method to return a builder object that has these connection parameters.
  3. Call the create method of builder establishing the session.
# Constructing Dict for Connection Params
conn_config = {
"account": "******.central-india.azure",
"user": "******",
"password": "************",
"role" : "accountadmin",
"warehouse" : "compute_wh",
"database" : "snowflake_alert",
"schema" : "edw"
}

#Invoking Snowpark Session for Establishing Connection
conn = Session.builder.configs(conn_config).create()

And now once you have done the above steps, you are ready to connect to snowflake using snowpark 🥳

Using Snowpark Dataframe

The snowpark Dataframe is a lazily-evaluated relational dataset; the computation is only performed once you call a method that performs an action.

To demonstrate the snowpark dataframe, we will be using the dataset of
SNOWFLAKE_SAMPLE_DATA.TPCDS_SF100TCL.CATALOG_SALES in this article.

Understanding how to create dataframe

Using Snowpark Session SQL to query data from snowflake:

#.. Will not query data for below on snowflake
query_1 = conn.sql("select * from tb_catalog_sales limit 10")

#.. Will query data for below on snowflake because of **collect()**
query_2 = conn.sql("select * from tb_catalog_sales limit 20").collect()

#.. Will query data for query_1 because of **show()**
#query_1.show()

Creating Snowpark Dataframe by directly reading a table

df_tbl_read = conn.table("edw.tb_catalog_sales")
#df_tbl_read.show()

Creating Snowpark Dataframe by reading data from Stage

#conn.sql("create stage @test_stage;").collect()
#conn.sql("put file://C:\\Users\\divya\\Downloads\\test_file.csv @test_stage;").collect()
df_stg_read = conn.read.schema(StructType([StructField("name", StringType()),StructField("url", StringType()),StructField("username", StringType()),StructField("password", StringType())])).csv("@test_stage/test_file.csv")
#df_stg_read.show()

Creating Snowpark Dataframe by specifying range or sequence

df_create = conn.create_dataframe([(1,'one'),(2,"two")],schema = ["col_1","col_2"])
df_create_rng = conn.range(1,100,3).to_df("col_a")
#df_create.show()
#df_create_rng.show()

Creating Snowpark Dataframe by Joining two tables/dataframes

df_tbl2_read = conn.table("edw.tb_catalog_sales_logs")
df_join = df_tbl_read.join(df_tbl2_read, df_tbl_read["CS_ITEM_SK"] == df_tbl2_read["CS_ITEM_SK"])
#df_join.show()

Performing operations on a DataFrame

Broadly, the operations on DataFrame can be divided into two types:

  • Transformations produce a new DataFrame from one or more existing DataFrames. Note that transformations are lazy and don’t cause the DataFrame to be evaluated. If the API does not provide a method to express the SQL that you want to use, you can use functions.sqlExpr() as a workaround.
  • Actions cause the DataFrame to be evaluated. When you call a method that performs an action, Snowpark sends the SQL query for the DataFrame to the server for evaluation.

Transforming a DataFrame

Using Select Method to create a new dataframe with specific columns from existing DF

#.. https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/api/snowflake.snowpark.DataFrame.select.html#snowflake.snowpark.DataFrame.select
df_select1 = df_tbl_read.select(col("CS_SOLD_TIME_SK"))
df_select2 = df_tbl_read.select(col("CS_SOLD_TIME_SK").substr(0, 3).as_("column1"))
df_select3 = df_tbl_read.select(col("CS_SOLD_TIME_SK").as_("column1"),col("CS_BILL_CDEMO_SK"))
#df_select1.show()
#df_select2.show()
#df_select3.show()

Using Filter Method to filter data (Similar to Where Clause)

#.. AND condition - &
#.. OR Condition - |
#.. https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/api/snowflake.snowpark.DataFrame.filter.html#snowflake.snowpark.DataFrame.filter
df_filter = df_tbl_read.filter((col("CS_SOLD_TIME_SK")==46616)&(col("CS_BILL_CDEMO_SK")==1642927)).select(col("CS_EXT_WHOLESALE_COST"))
#df_filter.show()

Using the SORT Method to Order the data

#.. https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/api/snowflake.snowpark.DataFrame.sort.html#snowflake.snowpark.DataFrame.sort
df_sort1 = df_tbl_read.filter((col("CS_QUANTITY").isNotNull()) & (col("CS_WHOLESALE_COST").isNotNull())).sort(col("CS_QUANTITY").asc(),col("CS_WHOLESALE_COST").desc()).select(col("CS_ORDER_NUMBER"),col("CS_QUANTITY"),col("CS_WHOLESALE_COST"),col("CS_LIST_PRICE"))
#df_sort1.show()
df_sort2 = df_tbl_read.filter((col("CS_QUANTITY").isNotNull()) & (col("CS_WHOLESALE_COST").isNotNull())).sort([col("CS_QUANTITY"),col("CS_WHOLESALE_COST").desc()],ascending=[1,1]).select(col("CS_ORDER_NUMBER"),col("CS_QUANTITY"),col("CS_WHOLESALE_COST"),col("CS_LIST_PRICE"))
#df_sort2.show()

Using AGG Method to aggregate the data

#.. https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/api/snowflake.snowpark.DataFrame.agg.html#snowflake.snowpark.DataFrame.agg
df_sum1 = df_tbl_read.agg(f.sum("CS_QUANTITY"))
df_stddev1 = df_tbl_read.agg(f.stddev("CS_QUANTITY"))
df_count1 = df_tbl_read.agg(f.count("CS_QUANTITY"))
df_max_min1 = df_tbl_read.agg(("CS_QUANTITY","min"),("CS_WHOLESALE_COST","max"))
df_max_min2 = df_tbl_read.agg({"CS_QUANTITY":"min",
"CS_WHOLESALE_COST":"max"})
#df_sum1.show() - using f.<>
#df_stddev1.show() - using f.<>
#df_count1.show() - using f.<>
#df_max_min1.show() - using Tuple()
#df_max_min2.show() - using Dict{}

Using Group_by for grouping aggregate results

#.. https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/api/snowflake.snowpark.DataFrame.group_by.html#snowflake.snowpark.DataFrame.group_by
df_group = df_tbl_read.group_by("CS_SOLD_DATE_SK").agg((f.sum("CS_QUANTITY").alias("QTY_SUM")),f.stddev("CS_WHOLESALE_COST").alias("STD_Dev"))
#df_group.show()

Using Window Method as Window Function

#.. https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/api/snowflake.snowpark.Window.html#snowflake.snowpark.Window
df_window = df_tbl_read.with_column("Rank", row_number().over(Window.order_by(col("CS_SOLD_DATE_SK").desc()))).select(col("Rank"),col("CS_ORDER_NUMBER"),col("CS_QUANTITY"),col("CS_WHOLESALE_COST"),col("CS_LIST_PRICE"))
#df_window.show()

Using DataFrame NA Function for Handling Missing Values

#.. https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/api/snowflake.snowpark.DataFrameNaFunctions.html#snowflake.snowpark.DataFrameNaFunctions
df_null_handling = df_tbl_read.filter((col("CS_QUANTITY").isNull()) & (col("CS_WHOLESALE_COST").isNull())).sort([col("CS_QUANTITY"),col("CS_WHOLESALE_COST").desc()],ascending=[1,1]).select(col("CS_ORDER_NUMBER"),col("CS_QUANTITY"),col("CS_WHOLESALE_COST"),col("CS_LIST_PRICE"))
#df_null_handling.na.fill({"CS_QUANTITY":111}).show()

Performing an action on a DataFrame

Using the Collect() Method to generate an array of rows from a query

#df_tbl_read.collect()

Using Collect_NoWait() to Execute Query Asynchronously

df_async = df_tbl_read.collect_nowait()
#df_async.result()

Using Show Method to print the result

#df_tbl_read.show(5)

Find my Source Code for this Article

About Me

I’m Divyansh Saxena, Cloud Data Engineer at IBM and Snowflake Data Superhero for 2023!

Follow me on Medium for regular updates on Snowflake Best Practices and other trending topics:

Also, I am open to connecting all data enthusiasts across the globe on Linkedin:

https://www.linkedin.com/in/divyanshsaxena/

Follow my Medium Channel for regular updates on similar topics

--

--