Your Cheatsheet to Snowflake Snowpark Dataframes Using Python
In this article, we will quickly understand how we can use Snowflake’s Snowpark API for our workflows using Python.
Executive Summary
- Sandbox Environment Creation
- Setting Up Session
- Using Snowpark Dataframes
Sandbox Environment Creation:
Before you set up your development environment, you should be aware of the following points:
- The Snowpark API requires Python 3.8.
- You can create a virtual Python Environment using Anaconda, Miniconda, or virtualenv.
You can use conda to setup Python 3.8 on a virtual environment and add Snowflake Conda Channel along with the libraries.
conda create --name py38_env --override-channels -c https://repo.anaconda.com/pkgs/snowflake python=3.8 numpy pandas
You must install Snowpark Python Packages to the environment along with the jupyter notebook. Here’s a quick start:
conda install snowflake-snowpark-python
pip install notebook
Setting Up Session
The first step in using the library is establishing a session with the Snowflake database.
from snowflake.snowpark import Session
To create the session:
- Create a Python dictionary (
dict
) containing the names and values of the parameters for connecting to Snowflake. - Pass this dictionary to the
Session.builder.configs
method to return a builder object that has these connection parameters. - Call the
create
method ofbuilder
establishing the session.
# Constructing Dict for Connection Params
conn_config = {
"account": "******.central-india.azure",
"user": "******",
"password": "************",
"role" : "accountadmin",
"warehouse" : "compute_wh",
"database" : "snowflake_alert",
"schema" : "edw"
}
#Invoking Snowpark Session for Establishing Connection
conn = Session.builder.configs(conn_config).create()
And now once you have done the above steps, you are ready to connect to snowflake using snowpark 🥳
Using Snowpark Dataframe
The snowpark Dataframe is a lazily-evaluated relational dataset; the computation is only performed once you call a method that performs an action.
To demonstrate the snowpark dataframe, we will be using the dataset of
SNOWFLAKE_SAMPLE_DATA.TPCDS_SF100TCL.CATALOG_SALES in this article.
Understanding how to create dataframe
Using Snowpark Session SQL to query data from snowflake:
#.. Will not query data for below on snowflake
query_1 = conn.sql("select * from tb_catalog_sales limit 10")
#.. Will query data for below on snowflake because of **collect()**
query_2 = conn.sql("select * from tb_catalog_sales limit 20").collect()
#.. Will query data for query_1 because of **show()**
#query_1.show()
Creating Snowpark Dataframe by directly reading a table
df_tbl_read = conn.table("edw.tb_catalog_sales")
#df_tbl_read.show()
Creating Snowpark Dataframe by reading data from Stage
#conn.sql("create stage @test_stage;").collect()
#conn.sql("put file://C:\\Users\\divya\\Downloads\\test_file.csv @test_stage;").collect()
df_stg_read = conn.read.schema(StructType([StructField("name", StringType()),StructField("url", StringType()),StructField("username", StringType()),StructField("password", StringType())])).csv("@test_stage/test_file.csv")
#df_stg_read.show()
Creating Snowpark Dataframe by specifying range or sequence
df_create = conn.create_dataframe([(1,'one'),(2,"two")],schema = ["col_1","col_2"])
df_create_rng = conn.range(1,100,3).to_df("col_a")
#df_create.show()
#df_create_rng.show()
Creating Snowpark Dataframe by Joining two tables/dataframes
df_tbl2_read = conn.table("edw.tb_catalog_sales_logs")
df_join = df_tbl_read.join(df_tbl2_read, df_tbl_read["CS_ITEM_SK"] == df_tbl2_read["CS_ITEM_SK"])
#df_join.show()
Performing operations on a DataFrame
Broadly, the operations on DataFrame can be divided into two types:
- Transformations produce a new DataFrame from one or more existing DataFrames. Note that transformations are lazy and don’t cause the DataFrame to be evaluated. If the API does not provide a method to express the SQL that you want to use, you can use
functions.sqlExpr()
as a workaround. - Actions cause the DataFrame to be evaluated. When you call a method that performs an action, Snowpark sends the SQL query for the DataFrame to the server for evaluation.
Transforming a DataFrame
Using Select Method to create a new dataframe with specific columns from existing DF
#.. https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/api/snowflake.snowpark.DataFrame.select.html#snowflake.snowpark.DataFrame.select
df_select1 = df_tbl_read.select(col("CS_SOLD_TIME_SK"))
df_select2 = df_tbl_read.select(col("CS_SOLD_TIME_SK").substr(0, 3).as_("column1"))
df_select3 = df_tbl_read.select(col("CS_SOLD_TIME_SK").as_("column1"),col("CS_BILL_CDEMO_SK"))
#df_select1.show()
#df_select2.show()
#df_select3.show()
Using Filter Method to filter data (Similar to Where Clause)
#.. AND condition - &
#.. OR Condition - |
#.. https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/api/snowflake.snowpark.DataFrame.filter.html#snowflake.snowpark.DataFrame.filter
df_filter = df_tbl_read.filter((col("CS_SOLD_TIME_SK")==46616)&(col("CS_BILL_CDEMO_SK")==1642927)).select(col("CS_EXT_WHOLESALE_COST"))
#df_filter.show()
Using the SORT Method to Order the data
#.. https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/api/snowflake.snowpark.DataFrame.sort.html#snowflake.snowpark.DataFrame.sort
df_sort1 = df_tbl_read.filter((col("CS_QUANTITY").isNotNull()) & (col("CS_WHOLESALE_COST").isNotNull())).sort(col("CS_QUANTITY").asc(),col("CS_WHOLESALE_COST").desc()).select(col("CS_ORDER_NUMBER"),col("CS_QUANTITY"),col("CS_WHOLESALE_COST"),col("CS_LIST_PRICE"))
#df_sort1.show()
df_sort2 = df_tbl_read.filter((col("CS_QUANTITY").isNotNull()) & (col("CS_WHOLESALE_COST").isNotNull())).sort([col("CS_QUANTITY"),col("CS_WHOLESALE_COST").desc()],ascending=[1,1]).select(col("CS_ORDER_NUMBER"),col("CS_QUANTITY"),col("CS_WHOLESALE_COST"),col("CS_LIST_PRICE"))
#df_sort2.show()
Using AGG Method to aggregate the data
#.. https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/api/snowflake.snowpark.DataFrame.agg.html#snowflake.snowpark.DataFrame.agg
df_sum1 = df_tbl_read.agg(f.sum("CS_QUANTITY"))
df_stddev1 = df_tbl_read.agg(f.stddev("CS_QUANTITY"))
df_count1 = df_tbl_read.agg(f.count("CS_QUANTITY"))
df_max_min1 = df_tbl_read.agg(("CS_QUANTITY","min"),("CS_WHOLESALE_COST","max"))
df_max_min2 = df_tbl_read.agg({"CS_QUANTITY":"min",
"CS_WHOLESALE_COST":"max"})
#df_sum1.show() - using f.<>
#df_stddev1.show() - using f.<>
#df_count1.show() - using f.<>
#df_max_min1.show() - using Tuple()
#df_max_min2.show() - using Dict{}
Using Group_by for grouping aggregate results
#.. https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/api/snowflake.snowpark.DataFrame.group_by.html#snowflake.snowpark.DataFrame.group_by
df_group = df_tbl_read.group_by("CS_SOLD_DATE_SK").agg((f.sum("CS_QUANTITY").alias("QTY_SUM")),f.stddev("CS_WHOLESALE_COST").alias("STD_Dev"))
#df_group.show()
Using Window Method as Window Function
#.. https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/api/snowflake.snowpark.Window.html#snowflake.snowpark.Window
df_window = df_tbl_read.with_column("Rank", row_number().over(Window.order_by(col("CS_SOLD_DATE_SK").desc()))).select(col("Rank"),col("CS_ORDER_NUMBER"),col("CS_QUANTITY"),col("CS_WHOLESALE_COST"),col("CS_LIST_PRICE"))
#df_window.show()
Using DataFrame NA Function for Handling Missing Values
#.. https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/api/snowflake.snowpark.DataFrameNaFunctions.html#snowflake.snowpark.DataFrameNaFunctions
df_null_handling = df_tbl_read.filter((col("CS_QUANTITY").isNull()) & (col("CS_WHOLESALE_COST").isNull())).sort([col("CS_QUANTITY"),col("CS_WHOLESALE_COST").desc()],ascending=[1,1]).select(col("CS_ORDER_NUMBER"),col("CS_QUANTITY"),col("CS_WHOLESALE_COST"),col("CS_LIST_PRICE"))
#df_null_handling.na.fill({"CS_QUANTITY":111}).show()
Performing an action on a DataFrame
Using the Collect() Method to generate an array of rows from a query
#df_tbl_read.collect()
Using Collect_NoWait() to Execute Query Asynchronously
df_async = df_tbl_read.collect_nowait()
#df_async.result()
Using Show Method to print the result
#df_tbl_read.show(5)
Find my Source Code for this Article
About Me
I’m Divyansh Saxena, Cloud Data Engineer at IBM and Snowflake Data Superhero for 2023!
Follow me on Medium for regular updates on Snowflake Best Practices and other trending topics:
Also, I am open to connecting all data enthusiasts across the globe on Linkedin:
https://www.linkedin.com/in/divyanshsaxena/
Follow my Medium Channel for regular updates on similar topics