Making Batch API Calls in Snowflake with Vectorized UDFs

Recently, I was working on a project that required me to use a User Defined Function (UDF) within a Snowflake database query. This function needed to take data from the query and use it to interact with an external web service. The catch is the web service can process multiple requests at once, so I wanted to send a bunch of data in a single go instead of one piece at a time, but how could I do that from a UDF?

At first, finding clear instructions on how to set this up was tough, but I eventually figured it out. This blog post explains how I accomplished batch API calls using Snowflake’s support for Vectorized Python UDFs.

The Problem

At Skyflow, we work with many businesses that leverage Skyflow Data Privacy Vault as a shared service for securing and managing customer PII across their entire stack. This includes any customer data they’re storing within Snowflake.

Instead of Snowflake. other data stores, and SaaS applications individually storing and managing sensitive customer data like a social security number or home address, they instead store a de-identified reference, like a pointer, in the form of a vault-generated token. The token values are generated such that they preserve analytical operations like joins, counts, and group bys but carry no exploitable value. This helps reduce the compliance and data security footprint for your SaaS applications, your own application, and downstream services like Snowflake along with providing holistic data governance across all disparate systems.

However, there are situations with Snowflake where some of the original sensitive customer PII (Personally Identifiable Information) needs to be revealed for reporting purposes. For example, someone could execute query to create a report for customer density by state as shown below.

SELECT state AS state, COUNT(*) AS customer_count FROM customers GROUP BY state ORDER BY COUNT(*) DESC;

But if the state is a vault-generated token, like a UUID, the user of the report can’t actually read the state value (see image below) without re-identifying the token.

Example query with de-identified values for state.

Skyflow’s detokenize API is used to re-identify tokens. Access policies are applied during this process to make sure the requester has access to the data and in what format. Skyflow customers integrate Skyflow APIs with Snowflake to perform operations like detokenization as well as encrypted search.

Historically, we’ve had our customers use Snowflake’s External Function support to integrate the two products. While this setup works, it depends on using Amazon API Gateway or Microsoft Azure API Management service as a proxy to the third-party API. This not only complicates the setup but also adds network hops to pass the API call through the gateway first before hitting the Skyflow API endpoint (see image below).

External Function call to Skyflow detokenize API endpoint.

In June of 2023, Snowflake started supporting external API calls natively through their External Network Access feature. This allows you to create secure access to a specific network location external to Snowflake.

We’ve started having customers use this feature in combination with a Snowflake UDF. It’s a significantly better user experience, is completely native to Snowflake, and cuts out the proxy call (see image below).

UDF to detokenize data making an API call directly to Skyflow from Snowflake.

However, a challenge with this approach is that although by default External Functions send values from a query in batch, a UDF receives a call for every individual row that’s part of the query. This could result in hundreds or thousands of API calls depending on the query.

To make the migration successful for customers, we needed to figure out how to make the UDF approach support batch operations. This is where we started exploring Snowflake’s Vectorized UDF support to satisfy this requirement.

Creating Vectorized UDFs

The goal is for a customer to be able detokenize vault-generated tokens natively within Snowflake without a significant performance hit.

Detokenizing state values natively within Snowflake.

In the steps below, we show how to setup a UDF to call an external API and then modify it to support batch API calls. We use Skyflow as an example, but this setup would work for any API that supports batch calls.

Step 1: Creating an API secret

Snowflake has native secrets management support. This includes secrets for basic authentication, OAuth with code grant flow, and OAuth with client credentials flow. Once you create a secret, only specific Snowflake components such as API integrations and external functions can read the sensitive information.

Skyflow supports programmatic access through service accounts and API keys. Service accounts are the recommended approach. A service account is associated with a role and a role has a group of policies that controls access and functionality to the vault data.

In Skyflow’s web-based management tool, Skyflow Studio, once you create a service account, a service account file is automatically downloaded to your machine.

For this example, I’ve copied the contents of the service account file and created a secret of type GENERIC_STRING in Snowflake as shown below.

-- Store Skyflow service account key with Snowflake Secrets Manager
CREATE OR REPLACE SECRET skyflow_vault_secret
TYPE = GENERIC_STRING
SECRET_STRING = '{"clientID":"c07a6d60d1424c41b1d052cbe3035db7",
"clientName":"Snowflake Service Account",
"tokenURI":"https://manage.skyflowapis.com/v1/auth/sa/oauth/token",
"keyID":"g59972af789d499291fd8106226ea31e",
"privateKey":"-----BEGIN PRIVATE KEY-----\nMIIEvA…}';

Step 2: Create a network rule

To access an external API, you must create a network rule defining the domains you want to allow your account to access.

For example, in the code below, I’m creating an egress rule for the vault data API domain (i.e. ebfc9bee4242.vault.skyflowapis.com) and the Skyflow authentication domain, manage.skyflowapis.com.

-- Grant access to the Skyflow API endpoints for authentication and vault APIs
CREATE OR REPLACE NETWORK RULE skyflow_apis_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('ebfc9bee4242.vault.skyflowapis.com', 'manage.skyflowapis.com');

Step 3: Create an external access integration

The next pre-configuration step is to aggregate the secret and network rule for use with the UDF.

-- Create an integration using the network rule and secret
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION skyflow_external_access_integration
ALLOWED_NETWORK_RULES = (skyflow_apis_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (skyflow_vault_secret)
ENABLED = true;

Step 4: Create a UDF

To use the detokenize API, we need to create a UDF that will actually make the API. When creating the UDF, we need to set the EXTERNAL_ACCESS_INTEGRATIONS parameter to the external access integration name, skyflow_external_access_integration, that we created previously.

-- Create a UDF to detokenize a de-identified value
CREATE OR REPLACE FUNCTION skyflow_detokenize(value text)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'skyflow_detokenize'
EXTERNAL_ACCESS_INTEGRATIONS = (skyflow_external_access_integration)
PACKAGES = ('pyjwt', 'cryptography', 'requests', 'simplejson')
SECRETS = ('cred' = skyflow_vault_secret)
AS
$$
import _snowflake
import simplejson as json
import jwt
import requests
import time

def generate_auth_token():
credentials = json.loads(_snowflake.get_generic_secret_string('cred'), strict=False)

# Create the claims object with the data in the creds object
claims = {
"iss": credentials["clientID"],
"key": credentials["keyID"],
"aud": credentials["tokenURI"],
"exp": int(time.time()) + (3600), # JWT expires in Now + 60 minutes
"sub": credentials["clientID"],
}
# Sign the claims object with the private key contained in the creds object
signedJWT = jwt.encode(claims, credentials["privateKey"], algorithm='RS256')

body = {
'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
'assertion': signedJWT,
}
tokenURI = credentials["tokenURI"]

session = requests.Session()
r = session.post(tokenURI, json=body)
auth = json.loads(r.text)

return auth["accessToken"]

def skyflow_detokenize(value):
auth_token = generate_auth_token()

body = {
"detokenizationParameters": [
{
"token": value
}
]
}

url = "https://ebfc9bee4242.vault.skyflowapis.com/v1/vaults/i0e83ad6af494b9bac01245a52523e90/detokenize"
headers = {
"Authorization": "Bearer " + auth_token
}

session = requests.Session()
response = session.post(url, json=body, headers=headers)

response_as_json = json.loads(response.text)

return response_as_json["records"][0]["value"]
$$;

As mentioned, while the above UDF will work, every row returned in a query will result in an API call to the detokenize endpoint.

For example, in the query below, assuming the customers table has more than 100 records within it, this query would make 100 API calls.

SELECT skyflow_detokenize(name) FROM customers LIMIT 100;

Skyflow’s detokenize API supports batch operations where multiple vault-generated tokens can be passed at once within the body of the payload (see sample below). By default, 25 tokens can be sent at once, but this can be adjusted to support many more based on customer need.

curl -i -X POST "$VAULT_URL/v1/vaults/$VAULT_ID/detokenize" \
-H "Authorization: Bearer $BEARER_TOKEN" \
-d '{
"detokenizationParameters": [
{
"token": "'"$TOKEN"'"
},
{
"token": "'"$TOKEN"'"
},
{
"token": "'"$TOKEN"'"
},
{
"token": "'"$TOKEN"'"
}
]
}'

Using the API batch mode defaults, the query to detokenize names as shown previously should only make four API calls. In the next section, I show how to do this.

Step 5: Adjusting the UDF to a Vectorized UDF

To support batch, Snowflake supports Vectorized Python UDFs. With a Vectorized UDF, the Python function input is rows as Pandas DataFrames and the return values will be a Pandas arrays or Series.

Batch API alls from a Vectorized UDF in Snowflake

To convert a UDF to a Vectorized UDF, you need to import packages pandas and vectorized. Above the UDF function definition, you add a vectorized decorator, @vectorized(input=pandas.DataFrame, max_batch_size=25), to specify that the handler expects a DataFrame.

The max_batch_size attribute can be adjusted depending on the API you’re calling and what’s optimal for the use case.

Additionally, the skyflow_detokenize code is modified to convert the DataFrame input into a format the Skyflow API expects and convert it back into a DataFrame Series before returning it.

CREATE OR REPLACE FUNCTION skyflow_detokenize(token varchar)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'skyflow_detokenize'
EXTERNAL_ACCESS_INTEGRATIONS = (skyflow_external_access_integration)
PACKAGES = ('pandas', 'pyjwt', 'cryptography', 'requests', 'simplejson')
SECRETS = ('cred' = skyflow_vault_secret)
AS
$$
import pandas
import _snowflake
import simplejson as json
import jwt
import requests
import time
from _snowflake import vectorized

def generate_auth_token():
credentials = json.loads(_snowflake.get_generic_secret_string('cred'), strict=False)

# Create the claims object with the data in the creds object
claims = {
"iss": credentials["clientID"],
"key": credentials["keyID"],
"aud": credentials["tokenURI"],
"exp": int(time.time()) + (3600), # JWT expires in Now + 60 minutes
"sub": credentials["clientID"],
}
# Sign the claims object with the private key contained in the creds object
signedJWT = jwt.encode(claims, credentials["privateKey"], algorithm='RS256')

body = {
'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
'assertion': signedJWT,
}
tokenURI = credentials["tokenURI"]

session = requests.Session()
r = session.post(tokenURI, json=body)
auth = json.loads(r.text)

return auth["accessToken"]

@vectorized(input=pandas.DataFrame, max_batch_size=25)
def skyflow_detokenize(token_df):
auth_token = generate_auth_token()

# Convert the DataFrame Series into the format needed for the detokenize call.
token_values = token_df[0].apply(lambda x: {'token': x}).tolist()
body = {
'detokenizationParameters': token_values
}

url = 'https://ebfc9bee4242.vault.skyflowapis.com/v1/vaults/i0e83ad6af494b9bac01245a52523e90/detokenize'
headers = { 'Authorization': 'Bearer ' + auth_token }

session = requests.Session()
response_as_json = session.post(url, json=body, headers=headers).json()

# Convert the JSON response into a DataFrame Series.
data = []
for record in response_as_json['records']:
data.append(record['value'])

return pandas.Series(data)
$$;

With these changes in place, the query below only makes four API calls rather than 100, which is a massive performance improvement.

SELECT skyflow_detokenize(name) FROM customers LIMIT 100;

Wrap Up

Snowflake UDFs with external API calls are a simple way to extend Snowflake’s base functionality well beyond what it natively supports. You could integrate Snowflake with translation APIs, Twilio for sending text messages, or consume data directly from Twitter into a Snowflake table.

When integrating UDFs into queries, there are times where it makes sense to send data in batches to reduce latency and the number of API calls. Vectorized Python UDFs are designed to satisfy this requirement and as shown here, are easy to work with and require only minor modification of existing UDFs to make them work.

--

--

Sean Falconer
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

Head of Developer Relations and Marketing @ Skyflow | Engineer & Storyteller | 100% Canadian 🇨🇦 | Snowflake Data Superhero ❄️ | AWS Community Builder