Running a R UDF with Snowpark Container Services

Sometimes when speaking with Snowflake customers and prospects I get the question about R and Snowflake and specifically how they can take models trained in R and run those within Snowflake.

There are a number of blog posts showing how you can use the dbplyr library to write R code that gets automatically translated to SQL and pushed down to Snowflake, my colleague Simon Field has written a great post on how you can for example use HEX’s support for R with Snowflake.

However, when it comes to run a model that is trained using a R library in Snowflake, the only option has been to convert it to a format like PMML or ONNX and then use a Python/Java/Scala UDF to run the scoring.

That is, up to now!

With Snowpark Container Service, in Private Preview at the point of writing this post, new possibilities open up.

Before I continue, I recommend that you first read my colleague Caleb Baechtold’s blog post Snowpark Container Services — A Tech Primer that will give you a good introduction to the Snowpark Container Services.

Since you now know all there is to know about Snowpark Container Services I will jump directly into how you can create a UDF that uses a container with R running in order to score data within Snowflake.

Train and save a R model

The first step would be to train a model in R that we later can use in Snowflake, but before doing that I will create the following project folder structure.

r_with_spcs # Project root folder
|- data # source data for training
|- r-models # location for saved model
|- spcs_deployment # files etc for deployment to Snowpark Container Service
|- container # files to be included in the container

For training the model I am using a bank customer churn dataset from Kaggle, you can download it here, and make sure you save the file in the data folder.

I will train a GLM model using the following script, train_r_model.R, that I have saved in the r_with_spcs folder. It saves the fitted model, log_reg_mod.rds, in the model folder. With Snowpark Container services I could create a service job to run the training within Snowflake using data in Snowflake and save the model to a Snowflake stage, but for demonstration purposes I will use local training.

library(tidyverse)
library(tidymodels)

churn <- read.csv('./data/Churn_Modelling.csv')

# Remove columns not needed and fix the data types
churn_clean <- churn %>%
select(-c(RowNumber, CustomerId, Surname )) %>%
mutate_if(is.character, as.factor) %>%
mutate(
HasCrCard = as.factor(HasCrCard),
IsActiveMember = as.factor(IsActiveMember),
Exited = as.factor(Exited),
Exited = factor(Exited, levels = c(0, 1),
labels = c("Churn","Not Churn")))

# Split data into train and test
set.seed(123)
churn_split <- initial_split(churn_clean, strata = Exited)
churn_train <- training(churn_split)
churn_test <- testing(churn_split)

# Create a Logistic Regression model using glm
log_reg_mod <- logistic_reg() %>%
set_engine("glm") %>%
set_mode("classification")

# Train the model on our data
log_reg_fit <- log_reg_mod %>%
fit(Exited ~ ., data = churn_train)

# Get information about the trained model
log_reg_fit
summary(log_reg_fit)

# Estimate performance of our model
# We will use the area under the Receiver Operating Characteristic (ROC) curve, and overall classification accuracy.
log_reg_testing_pred <- predict(log_reg_fit, churn_test) %>%
bind_cols(predict(log_reg_fit, churn_test, type = "prob")) %>%
bind_cols(churn_test %>% select(Exited))
head(log_reg_testing_pred)

# Overall accuracy and precision of our model
class_metrics <- metric_set(accuracy, precision)
log_reg_testing_pred %>%
class_metrics(truth = Exited, estimate=.pred_class)

#Save the fitted R Model file
saveRDS(log_reg_fit, file = "./r-models/log_reg_mod.rds")

I now have a R trained model that I can use for my UDF, so let’s continue to the next step.

Creating the scoring REST API

In order to create a UDF that is based on a container i.e. service function, I need to create a service that has a REST interface that responds to POST requests. The reason is that Snowpark Container Services Service Functions use the same interface as Snowflake’s External Functions to communicate over REST- of course with the caveat that in this case, the container itself is also running inside of Snowflake. To create a R based REST interface I will use the plumber library.

When Snowflake calls the REST endpoint through the UDF the data comes as JSON where all column values are within an array, and the result needs to be sent back to Snowflake in the same format. Below is an example of the expected JSON structure.

{"data": [[row_nbr, value1, value2..], [row_nbr, value1, value2..]] }

My REST API will have two HTTP paths; /connection-status and /predict, where connection-status is used for checking that the service is running and predict is the path that will be called by the UDF in order to score the data.

The code for those two paths are in a file called r_score_api.R that is stored in the spcs_deployment/container. The code will load the model, then when predict is called convert the input data to a data frame and convert the output of the model into a list that is returned.

The full code is displayed below.

library(plumber)
library(yaml)
library(tidyverse)
library(tidymodels)

options(warn=-1)

# Load the model
model <- readr::read_rds("/r-models/log_reg_mod.rds")

#* Test connection
#* @get /connection-status
function(){
list(status = "Connection to R Churn Prediction API successful",
time = Sys.time())
}

#* Predict churn
#* @serializer unboxedJSON
#* @post /predict
function(req, res){
#Input data comes as:
# {"data": [[row_nbr, value1, value2..], [row_nbr, value1, value2..]] }
#
input <- as.data.frame(req$body$data)
# Input data has no columns name
colnames(input) <- c('row_nbr', 'CreditScore', 'Geography', 'Gender', 'Age', 'Tenure', 'Balance', 'NumOfProducts', 'HasCrCard', 'IsActiveMember', 'EstimatedSalary')

# All data types is strings so we need to fix that
input_clean <- input %>%
mutate(across( c(CreditScore, Age, Tenure, NumOfProducts), as.integer)) %>%
mutate(across(c(Balance, EstimatedSalary), as.numeric)) %>%
mutate_if(is.character, as.factor)

pred <- predict(model, input_clean, type = "prob")
#. Return need to be:
# {"data": [[row_nbr, value1, value2..], [row_nbr, value1, value2..]] }
#
l <- split(pred, rownames(pred))
returnData <- list(data = mapply(function(x, y) list(x - 1, as.list(y)), seq(length(l)), l, SIMPLIFY = FALSE))
return(returnData)
}

#* @plumber
function(pr){
pr %>%
pr_set_api_spec(yaml::read_yaml("openapi.yaml"))

}

I am also using a OpenAPI specification file, openapi.yaml stored in spcs_deployment/container, so I get the Swagger documentation generated.

openapi: "3.0.3"
servers:
description: Localhost
url: http://127.0.0.1:8080
info:
description: API Description
version: "1.0.0"
title: R Churn Prediction
paths:
/connection-status:
get:
summary: 'Checks that our API service is running'
responses:
default:
description: Default response.
/predict:
post:
summary: 'Churn input JSON'
responses:
default:
description: Default response.
parameters: []
requestBody:
description: Customer Data
required: true
content:
application/json:
schema:
type: object
properties:
data:
type: array
title: input data
items:
type: array
items: {}

Below is an example how the swagger documentation would look like.

The last part is to create a R file, plumber_churn.R stored in spcs_deployment/container, that starts plumber and loads r_score_api.R

library(plumber)
options(warn=-1)

r <- plumber::plumb("r_score_api.R")
r$run(port=8080, host="0.0.0.0",swagger=TRUE)

I now have all the necessary parts to build the Docker container and deploy it to Snowpark Container Services.

Building the Docker Container

In order to build the Docker image I need to create a Dockerfile where I install the necessary R libraries, copy the files in spcs_deployment/container to the image and start the plumber API.

FROM rocker/r-ver:4.3
RUN apt-get update -qq && apt-get install -y \
libssl-dev \
libcurl4-gnutls-dev
RUN R -e "install.packages('plumber', dependencies=TRUE)"
RUN R -e "install.packages('readr', dependencies=TRUE)"
RUN R -e "install.packages('yaml', dependencies=TRUE)"
RUN R -e "install.packages('tidyverse', dependencies=TRUE)"
RUN R -e "install.packages('tidymodels', dependencies=TRUE)"
RUN R -e "install.packages('swagger', dependencies=TRUE)"
RUN R -e "install.packages('rapidoc', dependencies=TRUE)"
COPY / /
EXPOSE 8080
ENTRYPOINT ["Rscript", "plumber_churn.R"]

To build the image I run the following command within the spcs_deployment/container folder.

docker build -t r-churn-api-image .

Note: If you are not on a x86 platform (for example Apple M1/M2) you need to use — platform linux/amd64 for your build command.

Before I push the image to Snowflake I need to create an Image Repository in my Snowflake account.

Creating a Image Repository

If you already have created an Image Registry you can skip this step, unless you want to use multiple.

One way to create an image repository in Snowflake is to run the following command, which will create it in the current database and schema.

CREATE IMAGE REPOSITORY IF NOT EXISTS images;

In order to push an image to the image repository I need to get the repository url and the easiest way to do this is to run the DESCRIBE command and copy the value in the repository_url column from the response.

SHOW IMAGE REPOSITORIES IN SCHEMA;

Creating Snowflake stages for service specification file and model file

I also need to create a stage for storing the service specification file, created further down, and a stage to store my r model. It could be the same stage or a en existing one, but for this example I will keep it separated

-- Stage for specification file(s)
CREATE STAGE IF NOT EXISTS specs
ENCRYPTION = (TYPE='SNOWFLAKE_SSE');

-- Stage for storing our r-model
CREATE STAGE IF NOT EXISTS r_models
ENCRYPTION = (TYPE='SNOWFLAKE_SSE');

Pushing a image to the Image Repository

I can now push my image, but before that I need to tag it and also authenticate against the Image Repository. The tag should contain the repository url from the DESCRIBE command earlier and the image name, below is an example where you need to replace the values in < > with the values for your snowflake account.

docker tag r-churn-api-image <your-org>-<your account>.registry.snowflakecomputing.com/<your db>/<your schema>/<your image registry>/r-churn-api-image

In order to authenticate I run the following command using my Snowflake user and password.

docker login http://<your-org>-<your account>.registry.snowflakecomputing.com

I push the image by running

docker push <your-org>-<your account>.registry.snowflakecomputing.com/<your db>/<your schema>/<your image registry>/r-churn-api-image

Creating the service specification fil

In order to create a Snowpark Container Service I need a service specification file, it specifies what images to be used and additional parts that might be needed for my service.

For this example I need to set the port number for the endpoint to be called by my UDF, same as in the Dockerfile, openapi.yaml and plumber_churn.R.

I am also going to mount a Snowflake stage as a volumeMount, r-models, which allows me to update the model without having to rebuild the container and recreate my service.

Below is the service specification file, r-churn-api.yaml that I have saved in the spcs_deployment folder.

spec:
container:
- name: r-churn-api
image: /<your db>/<your schema>/images/r-churn-api-image
volumeMounts:
- name: r-models
mountPath: /r-models
env:
SERVER_PORT: 8080
readinessProbe:
port: 8080
path: /connection-status
volume:
- name: r-models
source: "@r_models"
uid: 0
gid: 0
endpoint:
- name: rchurnendpoint
port: 8080

Uploading files to stages

To upload the service specification file, r-churn-api.yaml, and the r model, log_reg_mod.rds, I can either use the SnowSQL tool or just use the Snowsight interface. I will not go into details how you do this, but in the end r-churn-api.yaml is in the specs stage and log_reg_mod.rds is in the r-models stage.

I am now ready to create my UDF.

Creating the UDF

First I need to create or use an existing COMPUTE POOL, for this example I will create a COMPUTE POOL using the STANDARD_2 instance type.

I will only need one node for it and I also will set the parameter AUTO_SUSPEND_SECS to a value, so it does not continue to run if no services are using it.

CREATE COMPUTE POOL UDF_STANDARD2_POOL
with
instance_family=STANDARD_2
min_nodes=1
max_nodes=1
AUTO_SUSPEND_SECS=1200;

Now I can create a service, using r-churn-api.yaml as the service specification.

create service r_churn_api
in compute pool UDF_STANDARD2_POOL
from @specs
spec='r-churn-api.yaml';

I can use DESCRIBE COMPUTE POOL to check that the compute pool is started.

DESCRIBE COMPUTE POOL UDF_STANDARD2_POOL;

And if I want to check the status of my service I can use DESCRIBE SERVICE:

DESCRIBE SERVICE r_churn_api;

For monitoring the logs from the container I can use CALL SYSTEM$GET_SERVICE_LOGS

CALL SYSTEM$GET_SERVICE_LOGS('r_churn_api', '0', 'r-churn-api', 10);

Once my COMPTE POOL and SERVICE are running I can create a UDF. I need to provide the name of the previously created service, r_churn_api, the name of the endpoint, rchurnendpoint, and the HTTP path to be called.

CREATE OR REPLACE FUNCTION r_churn_udf(CreditScore float, Geography varchar
, Gender varchar, Age INTEGER
, Tenure INTEGER, Balance float
, NumOfProducts INTEGER, HasCrCard INTEGER
, IsActiveMember INTEGER
, EstimatedSalary float)
RETURNS OBJECT
SERVICE=r_churn_api
ENDPOINT=rchurnendpoint
AS '/predict';

And that’s it, I now have a UDF that is calling my R model.

I can test calling my UDF using the following SQL:

SELECT r_churn_udf(CreditScore, Geography, Gender, Age, Tenure, Balance
, NumOfProducts, HasCrCard, IsActiveMember, EstimatedSalary)
AS CHURN_PREDICTIONS
FROM (VALUES (608, 'Spain', 'Female', 41, 1, 83807.86, 1, 0, 1, 112542.5),
(556, 'France', 'Male', 61, 2, 117419.35, 1, 1, 1, 94153.83)
AS v1 (CreditScore, Geography, Gender, Age, Tenure, Balance
, NumOfProducts, HasCrCard, IsActiveMember, EstimatedSalary));

Conclusion

With Snowpark Container Services, currently in Private Preview, new possibilities open up when it comes to running R within Snowflake as I have demonstrated in this post.

One of the bigger benefits with this is that from an end user point of view it is the same experience to call the UDF I created here as calling any other functions in Snowflake, meaning it can be used from SQL, Java, Scala and Python without the user needing to know that there is a container behind.

To learn more about Snowpark Container Services, check out this technical overview; and if you are interested in testing the Snowpark Container Service, please reach out to your Snowflake account team.

The code for this example can be found here.

--

--