Using Snowpark for Python with Amazon SageMaker

2023–01–17: Since I originally wrote this post, Snowpark for Python has gone GA and also added a lot of new functionality, so I have done some updates based on that.

That Amazon SageMaker is one of the most popular cloud services for Data Science are no surprise for anyone and in combination with Snowflake it’s a winner.

Snowpark for Python is a DataFrame API that pushes down manipulations and transformations of a Snowpark Dataframe object as SQL and was made General Available on Snowday 2022.

In this post I will show you how it will enable you to do more of your data exploration and data processing within Snowflake (using our compute) and how to use a Amazon SageMaker model in a Python UDF for in-database scoring.

Getting Started

Today there is no Amazon SageMaker kernels that has Snowpark for Python preinstalled so you need to install it someway. My recommendation is to use either a customised notebook instance or a customised SageMaker image, depending on if your are using Notebook Instances or SageMaker Studio, or using a Lifecycle Configuration. Make sure that you are using Python 3.8.

For reproducing the code I am using in this post you will need the following Python libraries, aside from Snowpark for Python, pandas, NumPy, cachetools, Amazon Sagemaker Python SDK, AWS SDK for Python (boto3) and XGBoost version 1.3.3.

The example I am using in this post is based on the Amazon SageMaker example Customer Churn Prediction with XGBoost

Data Exploration

First thing first, starting with importing the libraries that I am using for this example.

The data I am using is already loaded to a Snowflake table, CUSTOMER_CHURN, and therefore my first step is to connect to Snowflake and create a DataFrame that references the table.

If you want to get some more information around how the Snowpark DataFrame works you can read my previous post, Feature Engineering with Snowflake, Using Snowpark and Scala. Even if it is about the Scala version, much of it is applicable to the Python version.

Below code will connect to my Snowflake account, display the role, database, schema and virtual warehouse I am using for my session,. It creates a Dataframe based on the CUSTOMER_CHURN table and show 10 rows from it.

I am now ready for doing some exploration on my data and a good first step is to get the frequency for each categorical feature. This will give me an understanding of the distribution of the values.

To generate frequency tables (or contingency tables) for each column I am using the RATIO_TO_REPORT function. Since the function is not, at the time of writing this post, exposed in the Snowpark API I can use the call_function function to call it.

I want to do this for my categorical features (string columns) and a simple way to do this is to use the schema object of my DataFrame and filter out all columns that has a string data type. Then it is just a matter of looping through the columns to get the frequencies and display those together with the number of distinct values.

To get some basic statistics on all my numeric and string columns I use the describe method and since there is a lot of columns in my DataFrame and I want to have more readable output I will leverage pandas pretty printing by returning the result as a pandas dataframe by calling the to_pandas method.

Based on the frequency tables above I can see that PHONE has too many unique values to be useful and should be removed. The same for CUST_ID which is a unique key.

AREA_CODE is stored as a numeric data type and does not show up among the frequency tables. Since it is a categorical feature I need to cast it as a string so it gets treated as such.

The drop function allows me to drop one or more columns from my dataframe and with_column allows me to create a new column or if the column name is the same as an existing one, replaced it. To cast a numeric value to string I use to_varchar.

Neither of these operations will change the table behind the dataframe, it only affects the SQL that is generated. I can get the SQL of a dataframe by calling the queries method.

Next step is to look at the relationship between each of the categorical features and the CHURN column. Once again RATIO_TO_REPORT will be used, but this time I am also using a window object with it so I can group the ration by feature value and CHURN value. Since I want to see one column for each CHURN value, I need to use the PIVOT function.

As expected the majority of the cases are for non CHURN, where INTL_PLAN stands out having a significant number of cases that has CHURN = True when the INTL_PLAN = Yes. Normally this would be something to look into further to see if there is anything that could explain this, maybe by getting more data or talk with a domain expert.

I also want see if any of the features are highly correlated to another feature, which could indicate that they might be redundant.

Snowpark for Python do have a correlation function, corr, but not a function/method for returning a correlation matrix. Since I am doing this in Python I can write the code for generating one, without having to pull data back to my client.

I am creating a function for it that will take one parameter which is the dataframe I want to get a correlation matrix for and then loop through each numeric column and get its correlation with all columns. Since I do not want the number of columns to explode in the result I am creating a new dataframe for each column and then use the union function to combine them.

I can see that DAY_CHARGE and DAY_MINS has a correlation of 1 and the same for EVE_CHARGE and EVE_MINS and NIGTH_CHARGE and NIGHT_MINS and INTL_CHARGE and INTL_MINS.

Having a correlation coefficient of 1 menas that there is a perfect positive relationship between the two features, meaning that if one increase then the other does as well and vice versa. That means from a Machine Learning perspective that I only need to keep one of each pair since they will provide the same information to my model.

Data Preprocessing

Last thing I need to do before training a model is to transform my categorical features (string columns) to numeric features. The reason for this is that most algorithms only handles numeric values.

There is many ways to do this, for example, if I have a feature called COLOR that has the values BLUE, RED and GREEN and I want to transform it into numerical values I can do this in a number of ways.

One way is to replace each value with a number, so I would replace BLUE with 0, RED with 1 and GREEN with 2 for each row. This technique is often referred to as Label Encoding.

Another way is that I create a new feature for each value so I would have COLOR_BLUE, COLOR_RED and COLOR_GREEN and then set the features to 0 or 1 depending on the value in COLOR for that row, for example for a row that has COLOR RED the COLOR_BLUE value would be 0, COLOR_RED 1 and COLOR_GREEN 0. This is usually referred to as One Hot Encoding.

Pandas has a function called get_dummies that can be used to do One Hot Encoding transformation on all categorical columns in a dataframe. Snowpark for Python does not, currently, have that type of function/method so I will create a function for it.

Now, there is a couple of things going on in the function that I think it is worth explaining further.

The function is first selecting the columns that has a string data type and calls Snowflake to get the unique values for each of those columns. By leveraging the semi-structured function array_agg in Snowflake the values are returned as an array/list, one for each column. By using is_distinct=True I only get the uique values. Since I need the values locally I use collect to pull it back to my client from Snowflake and the as_dict function to convert the result to a dict object.

I can then loop through all columns and create new columns for each unique value and use the iff function, that is equivalent to an if-then-else expression, to set 1 or 0. The the original columns is dropped and a new dataframe is returned.

Using my function I can transform my categorical features into numerical using One Hot Encoding. I am using pandas to print the result, but since I do not want to pull back all data, I am also using the limit method to only get 10 rows.

Model training with SageMaker built-in algoritm

In this example I am going to use the Sagemaker built-in XGBoost algorithm to train a model to predict CHURN.

The Sagemaker built-in XGBoost algorithm expects the training and validation data to be in a S3 bucket and the simplest way to get it there is to pull the data back to my client as a Pandas Dataframe and save it as CSV. I also split the data into training, validation and test before storing it on my local disk.

Once the data is uploaded to my S3 bucket, I need to define the training environment, set the hyper parameters and start the training by calling the fit method.

Deploy the model to Snowflake as a Python UDF

When using an Amazon SageMaker built-in algorithm for training, the fit method will not create a model object the same way as it would be if I was using something like scikit-learn. Instead the model is stored compressed at the output_path location.

I can get the name of the compressed model file and the location of it by calling model_data.

I also need to uncompress it in order to get the actual model and to do that I need to download it locally.

Before I create anything in Snowflake I want to verify that the model generates a prediction. In order to do that I will create a function that I later can reuse for my Python UDF.

2023–01–17: Since writing this post Snowflake added support for Vectorized UDFs, that gives the possibility to apply the UDF function on a batch of rows instead of one row at the time.

The function will accept a list with values as input and use that with the model that was trained and downloaded and then return the predictions as a list.

I can now test my model and function by creating an array with some values in and pass it into my function. If everything works I should get back the probability for CHURN.

Now I can deploy the scoring function and model to my Snowflake account.

In order to use the function as a vectorized UDF I need to change some parts of the scoring function. Instead of a list as input it needs to either have a Pandas Dataframe or a Pandas Dataseries as the input, the return should be a Pandas Dataseries.

Another thing to have in mind is that the way my scoring function now is written it will oad the model file for each call, I would like it to just load the model once and then reuse the model for all the calls.

One way to do that is to write a separate function for loading the model and then use cachetools to cache the function.

With this in mind my updated scoring function and my model loading function is defined as below.

In order to create a permanent Python UDF I need to use a Snowflake Stage and I can either use an existing one (needs to be an internal for the UDF code) or create a new. For creating a stage with Snowpark I can use a SQL command and execute it using the sql method of my session object. However, if you do not use an action method on the sql method, then the SQL will not be executed in Snowflake. So in order to execute my SQL I am using collect.

In order to be able to use my model file in my UDF I need to add it as an import using the add_import method. Since I am referring to the local file (that is named xgboost-model), Snowpark will upload the file to my stage when the UDF is created. I also need to specify which libraries my function is using and do that with add_packages.

For Python UDFs Snowflake provides thousands of preinstalled python libraries from Anaconda and there is a specific Snowflake channel that can be used to install same version of libraries on the client, so I do not need to install any libraries.

The udf.register method will deploy my function and model to Snowflake.

I can now test my UDF. This can be done by defining a new dataframe and call the function with the call_udf function. The DataFrame I create for this is using the same array that I used for testing my function earlier meaning you can also create a DataFrame just by specifying the values.

Since I created a permanent function, it is now also available for all my Snowflake users that has the correct permissions and can be used with SQL commands.

Conclusion

As you have seen in this post, Snowpark for Python and Python UDF enables you to do more processing within Snowflake and also to use models to score data within Snowflake.

Even if I have used Amazon SageMaker in this example most things would be the same with other services and tools.

--

--