Distributed ML with Snowpark Python UDFs

In the last article I was talking about Collaborative ML using new Snowflake functionalities and this time I want to touch on how to scale ML workloads with Snowflake and the ease of use and agility provided by Snowpark UDFs (User Defined Functions). This is an interesting topic when we want to inference at scale and our data may increase or we need to reduce execution times.

Recently I was challenged about how to distribute ML workloads using Snowpark and how to refactor it from the current Spark code. The use case was similar to this great end-to-end example on how to predict the number of trips for citibike.

The current code was using Spark and Pandas UDF in order to distribute the workload and train and inference for each Station so the number of trips could be forecasted. Each Station will have their own model, so this is a great use case to distribute workloads.

Because this was Spark, the first thing is to wait a few minutes to have the cluster up and running, choose the right number of workers and make some tuning like this:

With Snowflake we can just instantiate virtual warehouses immediately (no wait time), being as small or as large as needed. We can also resize any warehouse non disruptively before running the jobs, or suspend it when no longer is used. Using Snowpark, we can call the use_warehouse API to define the warehouse to be used or resize it.

As simple, agile, fast and cost effective as that! Get the resources you need, when you need them and only pay for what you are using them.

We want to predict the number of trips for each Station ID. In a traditional Spark environment, some data preparation is needed. Spark shuffle is an expensive operation but it is needed as Spark uses RDDs to distribute the data and it needs to be grouped by Station ID. So before calling the Pandas UDF, the data will be distributed. Something like this before making the call to the station_train_predict_p_udf Pandas UDF.

Snowflake offers Snowpark UDFs and UDTFs in Python. That means that all code used for training and inference can be written and executed within Snowflake in our preferred language for ML, using a secure sandbox with access to all Anaconda packages. We can define our function like this:

And then register it as one UDF within Snowflake:

Once the UDF is being registered, when using Snowflake there is no need to shuffle or distribute the data. This is something done automatically by Snowflake and it will use all the resources available according to the size of the warehouse defined. In the example provided, we can see how first we prepare a dataframe with historical data and one with features like weather and holidays predictions that we use to predict new trips as they can be an important decision factor (I do not like to ride when raining):

Here we are grouping by Station ID when building the dataframe. Think about a retailer use case where maybe they want to forecast by each store and section. Or each section or type of article. We just need to re-write the way the dataframe is created and Snowflake will be calling a Snowpark UDF for each of those groups we want to predict. The Snowpark UDF code will be simpler because there is no need to deal with the specific columns we want to use for grouping. That is something where Snowflake introduces better agility and flexibility with dataframes and UDFs.

In our example, we can just join the historical and forecast dataframe, and call the UDF with the parameters needed. The query profiler at the Cloud Services layer will create the query and will optimize its execution based on the warehouse being used. This code will create a new table with the predictions and all that code will be executed within Snowflake:

When that is executed, we can see the query profile at Snowflake and also notice all the feature preparation that is done before calling the model. Because it is lazy executed, all transformations and feature engineering are only executed (and also distributed across nodes) when the new table is saved.

I can resize my warehouse to be an XSMALL one (so just one node) and execute it to get a baseline. The whole process to predict trips for 522 stations took 4 minutes and 23 seconds.

And we can take a look at the statistics for the execution of the 522 UDFs:

Now I want to scale, either because I may have more stores to predict or because I want to reduce the time needed to complete my query. With Snowflake, and opposite to the work preparation that was needed previously with the Spark cluster, this is a simple operation. We just need one line of code or API call to resize the warehouse size. The Cloud Services layer will take care of optimizing the query and distributing the workload across all nodes available according to the new size. By increasing the size of the warehouse to a MEDIUM (4 nodes) I can see how the execution time has been reduced to 1 minute and 31 seconds.

If I take a look to the UDF statistics, they are quite similar, the difference to reduce the total time is that Snowflake has distributed the load across the nodes available:

Snowpark Python UDFs and UDTFs, in combination with Snowflake scalability and agility are a very powerful way to embed some ML operations within data pipelines. There is no need to move data out of Snowflake as all execution is done within Snowflake and using as many resources as needed, only paying for what is used.

Please be aware that at the time of writing this blog Snowpark Python is in Public Preview. This is all personal opinions and not from my current employer (Snowflake)

Enjoy!

--

--