Snowflake Beyond SQL: Leveraging Python in Snowflake

Snowflake Data Cloud Platform is created and known as an SQL-based platform, which means if you want to use Snowflake you need to use SQL. However, for the non-SQL community of developers, data engineers, and data scientists, it becomes challenging to use Snowflake, as there is some learning curve. They are familiar with Python, the most popular language today for data processing. So Snowflake made it easy by making Python API in Snowflake for both data processing and Snowflake object management.

Snowpark is a popular open-source developer framework that can be used for data processing, transformation, data engineering, and machine learning. At the same time, Snowflake Python API can be used for managing the Snowflake objects and DevOps. Both are complimentary and can be used in any client IDE such as VS Code, Jupyter Notebook, Hex, etc.

To get started you need a few things:

  1. Snowflake account and create a user if you don’t have one, I highly recommend public key/private key rather than password-based authentication, you can have both for a given user.
  2. Set up the Python environment eg. conda and install the Python API libraries. Check the notebook code, at the end, for detailed instructions.
  3. Use a notebook of your choice, I used Visual Code and Gitlab for CI/CD. You can also use Snowflake Notebook.

To get started with Snowflake, you will need a Database, Schema, Table, and Warehouse objects at the minimum. Optionally you can create a custom role and grant access to the database.

Let's see how we can create these objects easily in Python. To do so you need to create a session object based on the credential and using that you need a root object to create Snowflake objects.

session=Session.builder.config("connection_name","demodc").create()
root = Root(session)

#create database
tb101_db = root.databases.create(Database(name="tb_101"),
mode="ifNotExists") # orReplace, errorIfExists

#create schema
raw_pos_sch = tb101_db.schemas.create(Schema(name="raw_pos"),
mode="orReplace")

#create warehouse
warehouses = root.warehouses
warehouse_name = "tb_de_wh"
de_wh= Warehouse(
name=warehouse_name,
warehouse_size="XSMALL",
auto_suspend=60,
initially_suspended="true",
comment="data engg warehouse for tasty bytes",
auto_resume="true"
)
dewh = warehouses.create(de_wh, mode='orReplace')

#create role
session.use_role("securityadmin")
tbadmin_role = Role(name="tb_admin", comment='admin for tasty bytes')
tbadmin_role = root.roles.create(tbadmin_role, mode='ifNotExists')

#grant new role to sysadmin to maintian the heirarchey
root.grants.grant(
Grant(
grantee=Grantees.role('sysadmin'),
securable=Securables.role('tb_admin'),
)
)

#grant access to database for that role
root.grants.grant(
Grant(
grantee=Grantees.role('tb_dev'),
securable=Securables.database('tb_101'),
privileges=[Privileges.usage]
)
)

Next, let's load data in Snowflake, you will need a stage that has data files and a table to which you want to load data. The files in this stage are CSV so you need to provide a format for CSV.

# create stage
session.use_role("sysadmin")
s3_stage_csv = Stage(
name="s3load_csv", url='s3://sfquickstarts/frostbyte_tastybytes/'
)
stages = root.databases["tb_101"].schemas["public"].stages
stages.create(s3_stage_csv, mode='ifNotExists')


# define the table
shift_sales = Table(
name="shift_sales",
columns=[TableColumn(name="location_id", datatype="number(38,0)"),
TableColumn(name="city", datatype="string"),
TableColumn(name="date", datatype="date"),
TableColumn(name="shift_sales", datatype="float"),
TableColumn(name="shift", datatype="string"),
TableColumn(name="month", datatype="number(2,0)"),
TableColumn(name="day_of_week", datatype="number(2,0)"),
TableColumn(name="city_population", datatype="number(38,0)"),
]
)
root.databases["tb_101"].schemas["raw_pos"].tables.create(shift_sales,
mode='orReplace')

# load data
shift_sales_schema = StructType([
StructField("location_id",StringType()),
StructField("city",StringType()),
StructField("date",StringType()),
StructField("shift_sales",StringType()),
StructField("shift",StringType()),
StructField("month",StringType()),
StructField("day_of_week", StringType()),
StructField("city_population", StringType()),

])
shift_sales_df=session.read
.options({"field_delimiter": ",", "skip_header": 0})
.schema(shift_sales_schema)
.csv("@tb_101.public.s3load_csv/analytics/shift_sales/")
copy_result = shift_sales_df.copy_into_table("shift_sales", force=True)
print(copy_result)
session.table("shift_sales").show()

As you can see in the above example, to create the table, you will need to create a table definition. Snowflake also offers a schema detection feature that means you can create a table while loading data, here is an example using parquet files.

#create stage to load data from  parquet files
s3_stage_parquet = Stage( name="s3load_parquet",
url='s3://sfquickstarts/data-engineering-with-snowpark-python/')
stages_parquet = root.databases["tb_101"].schemas["public"].stages
stages_parquet.create(s3_stage_parquet, mode='orReplace')


# craete table truck using schema detection from stage
stage_loc= "@tb_101.public.s3load_parquet/pos/truck/"
truckdf = session.read.option("compression", "snappy").parquet(stage_loc)
tref=truckdf.copy_into_table("truck")

Now let’s create a view by joining the table we created above with one of the tables from the Snowflake Marketplace dataset. You can go to the marketplace and create a database called “frostbyte_safeguard”, please make sure to change the name and grant permission to the public or sysadmin role.

tb_safegraph_df=session
.table('frostbyte_safegraph.public.frostbyte_tb_safegraph_s')
tb_safegraph_df.show();

shift_sales_harmoized_df = shift_sales_df.join(tb_safegraph_df,
shift_sales_df.col("LOCATION_ID") == tb_safegraph_df.col("LOCATION_ID"))\
.select(shift_sales_df["LOCATION_ID"].as_("LOCATION_ID"),
shift_sales_df["CITY"].as_("CITY"),
shift_sales_df["DATE"],
shift_sales_df["SHIFT_SALES"],
shift_sales_df["SHIFT"],
shift_sales_df["MONTH"],
shift_sales_df["DAY_OF_WEEK"],
shift_sales_df["CITY_POPULATION"],
tb_safegraph_df["LATITUDE"],
tb_safegraph_df["LONGITUDE"]
)

shift_sales_harmoized_df.show()
shift_sales_harmoized_df.create_or_replace_view
("TB_101.ANALYTICS.SHIFT_SALES_VW")

Now that we have created basic objects and loaded the data, we can automate loading the data using the Stored procedure and Task, you can create a DAG and run the DAG. For simplicity, let's use this example, DAG of three chain tasks, one to increase the size of the warehouse before the data loading process starts, second to load data, and finally reduce the size of the warehouse to control the cost. You can do it in one stored procedure too, but to show how can you create DAG we use separate tasks.


#create stored procedure for loading data
@sproc(name='tb_101.raw_pos.ingest_data_sproc',
packages=['snowflake-snowpark-python'],
is_permanent=True,
replace=True,
stage_location='@tb_101.public.mycode',
session=session)
def ingest_data_sproc(session: Session) -> T.Variant:
shift_sales_schema = StructType([
StructField("location_id",StringType()),
StructField("city",StringType()),
StructField("date",StringType()),
StructField("shift_sales",StringType()),
StructField("shift",StringType()),
StructField("month",StringType()),
StructField("day_of_week", StringType()),
StructField("city_population", StringType()),

])
shift_sales_df=session.read
.options({"field_delimiter": ",", "skip_header": 0})
.schema(shift_sales_schema)
.csv("@tb_101.public.s3load_csv/analytics/shift_sales/")
copy_result = shift_sales_df
.copy_into_table("shift_sales", force=True)
stage_loc= "@tb_101.public.s3load_parquet/pos/truck/"
truckdf = session.read.option("compression", "snappy")
.parquet(stage_loc)
tref=truckdf.copy_into_table("truck")
return ("Data loading complete!")

#stored procedure for changing the warehouse

@sproc(name='tb_101.raw_pos.change_wh_size',
packages=['snowflake-snowpark-python','snowflake.core'],
is_permanent=True,
replace=True,
stage_location='@tb_101.public.mycode',
session=session)
def change_wh_size(session: Session, whname: str, whsize: str) -> T.Variant:
root = Root(session)
whobj=root.warehouses[whname].fetch()
whobj.warehouse_size=whsize
whobj.resource_monitor = None
root.warehouses[whname].create_or_update(whobj)
return ("Warehouse Size changed")

We can use the above Python stored procedure in the DAG to automate the data pipeline.

dag_name = "data_ingestion_dag" 
dag = DAG(dag_name, schedule=timedelta(hours=1))
with dag:
dag_task1 = DAGTask("increase_wh_size",
definition=StoredProcedureCall(change_wh_size, args=["tb_de_wh","xlarge"],
stage_location="@tb_101.public.mycode",
packages=["snowflake-snowpark-python","snowflake.core"]), warehouse="tb_de_wh")
dag_task2 = DAGTask("loaddata_task", StoredProcedureCall(ingest_data_sproc,
stage_location="@tb_101.public.mycode",
packages=["snowflake-snowpark-python"]), warehouse="tb_de_wh")
dag_task3 = DAGTask("reduce_wh_size",
definition=StoredProcedureCall(change_wh_size, args=["tb_de_wh","xsmall"],
stage_location="@tb_101.public.mycode",
packages=["snowflake-snowpark-python","snowflake.core"]),
warehouse="tb_de_wh")
dag_task1 >> dag_task2 >> dag_task3
# task1 is a predecessor of task2 which is predecssor of task3
pubschema = root.databases["tb_101"].schemas["public"]
dag_op = DAGOperation(pubschema)
dag_op.deploy(dag, mode="orreplace")


# you can execute dag
dag_op.run(dag)

With this process automated now Data analysts and scientists can do all kinds of analysis using Python.

ss_df = session.table("tb_101.analytics.shift_sales_vw")
# Group by city and average shift sales
analysis_df = ss_df.group_by("city").agg(F.mean("shift_sales")
.alias("avg_shift_sales"))

# Sort by average shift sales
analysis_df = analysis_df.sort("avg_shift_sales", ascending=True)

# Pull to pandas and plot
analysis_df.to_pandas().plot.barh(x="CITY", y="AVG_SHIFT_SALES")

As you can see in the above example, there is zero SQL code and you can create a full data pipeline using Python.

Here is the full code to set up an end-to-end workflow.

https://github.com/umeshsf/publiccode/blob/main/Snowflake_devops_python.ipynb

Conclusion:

As you see, we have created various objects using Snowflake Python API. So to all the Python community of developers, Snowflake meets you where you are and allows you to write code in Python for both managing Snowflake objects for DevOps, Data Engineering for analytics, and machine learning. Snowflake has a partnership with Anaconda which allows you to use the same library on the client side and the server side, this makes it secure and seamless Python development in Snowflake.

Disclaimer: The opinions expressed in this post are my own and not necessarily those of my employer (Snowflake).

--

--