Working programmatically in Snowflake using Snowpark API

Abdulkhader Sakivelu
DataPebbles
Published in
6 min readMar 3, 2023

As organizations become more and more data-centric, there are many technological solutions to assist them in making the transition. There is no ‘one size fits all’ solution as the correct offering varies from case to case. However, few solutions are sufficiently stable and offer consistent performance with inbuilt functionality which suits most of the organization's requirements. one such offering has been Snowflake.

SNOWFLAKE:

Snowflake is a fully managed SaaS that provides a single platform for data-related solutions. It separates the storage and compute layer which can be built on top of a cloud provider of our choice. However, the one drawback is that everything inside Snowflake is done using SQL.

While SQL is one of the most popular and easy-to-use data tools, setting up complex business logic is relatively complicated with it and requires a lot of effort. Many developers prefer to use other programming languages to work with data as it is relatively easy and more versatile to set up quality pipelines using them due to the availability of a variety of libraries that offer more functionalities. So for example, if a data scientist wanted to work with data in Snowflake, they would have to pull the data out of Snowflake onto a different infrastructure and run their operations on top of it. This just increases the cost of operation.

SNOWPARK:

Snowpark is the latest addition to the Snowflake ecosystem, which enables data engineers/data scientists to interact with data programmatically within the Snowflake environment. The Snowpark library's usage is similar to how we interact with Spark Dataframes. Someone with a Spark background should find it easy to start working with Snowpark. When a Snowpark program is run, it will run in a JVM environment on the infrastructure you are running it from (Local, EC2 instance, etc.,). A connection is established between this JVM process and your Snowflake resource. The code is evaluated by Snowflake and a query plan will be generated by converting them to SQL queries which will be logged in the JVM (driver process) as well. This will be executed in the Snowflake infrastructure when we initiate an action (collect, write, show, etc.,).

This also enables us to implement CI/CD pipelines, and write unit tests for our code to ensure there are no issues when we move the code to production (That extra comma in our select queries which always tends to creep up can be caught on time)

Note: If you are collecting the results then ensure that the machine where the driver process is running has enough memory to store the results. Failure to do so will result in Out Of Memory(OOM) errors.

SNOWPARK API WITH PYTHON:

Let us get some hands-on with the Snowpark library in python and see how it works. For this exercise below is the prerequisite

  • Snowflake trial account (No payment details required)
  • Docker environment (Docker desktop will be sufficient)
  • Kubernetes environment (Minikube is used in this exercise)

Assuming you have signed up for a trial version of the Snowflake account, it comes with preloaded sample data from Snowflake. We will be using that for this exercise.

In this use case, we will perform and compare how the same operations on data can be done using SQL and Snowpark API (python). Let's start with the SQL approach. As mentioned previously, the trial account comes preloaded with sample data and you should be able to locate it in the database “SNOWFLAKE_SAMPLE_DATA”. Take a moment to go over various schema and tables in them.

We will be using the schema “TPCH_SF1”. The schema contains tables regarding online transactions.

We will gather details like the total number of transactions per customer, total amount spent, and average amount per transaction. We can achieve the above using SQL as shown below. Note that we are using the default warehouse and haven’t created a new one.

Queries in a Snowflake worksheet

As you can see, we joined two tables, grouped them on the customer key, and ran aggregation functions to get the results. Let's try to achieve the same result using Snowpark API.

Before that let us set up the required libraries. We can do that by following this official documentation here.

The first step is to import the Snowpark libraries and establish a connection to the Snowflake warehouse. We do that by creating a session object by passing the configuration values as key-value pairs. The username, password, and account identifier are mandatory while the rest are optional. Once we have the session object, we can create a dataframe out of one whole table by using the API “session.table(<name>)”. Snowpark uses lazy evaluation, hence only a reference to the table is created and no computations are performed until an action is called. Once we have the table references we will join the respective dataframes using the API as below. The default behavior of the join condition is “inner”, for any other type of join we can specify by using the optional third argument.

Finally, we will be storing the results as a table using the “write” API. So when the job completes, we should see the results in a table.

The code looks like below.

Snowpark api in python

Note: I have already created a new database named ‘MY_FIRST_DB’ and a schema named ‘FIRST_SCHEMA’ in Snowflake. This will be used as the destination for storing the output table.

Since we already have done all the hard work, let us add the finishing touches by containerizing the code using docker. The Dockerfile for the setup looks like this.

Dockerfile

Note: I have added a small shell script named ‘execute.sh’ to run the job in the conda environment we created in the docker image.

Wrapper script for running the job

So once we have the above image created using the docker build tool, we can either run the job by hand or can orchestrate using Kubernetes which is also straightforward. We can create a CRON job to run on a particular schedule of our choice. The YAML specification file for the job will look something like the below one where I have configured the job to run hourly.

Kubernetes YAML files

The above job is scheduled to run each hour and every time it runs, it overwrites the data in the table. You can verify the results in the Snowflake table you have created and it should be similar to the below one.

Results

Conclusion:

That ends our small use case. The main goal was to go over how we can work with data in Snowflake without using SQL but we also went over how to containerize the code. A similar approach when integrated with the existing DevOps practices in an organization will enable us to create CI/CD pipelines effectively.

For any questions or suggestions please reach out @ abdulkhader.sakivelu@datapebbles.com

--

--