Connecting to Snowflake from Snowpark Container Services

Snowflake recently released Snowpark Container Services into Public Preview (in AWS), which greatly expands the types of functions, computations, and services that can be deployed inside of Snowflake. We tend to talk about three different modes for Snowpark Container Services:

  • Service Functions: user-defined functions that leverage a running container for the computation, which can be used in a SQL statement;
  • Jobs: analogous to Stored Procedures, jobs create a Snowflake session, run multiple SQL statements, run to completion, and exit;
  • Services: long-running services that create a Snowflake session and run SQL statements on Snowflake in response to events or user inputs. A few examples of Services are: a web-app hosted in the container, which exposes endpoints that users can visit with their web-browser; a data API that can accept requests from an endpoint; a long-running connector that reaches outside of Snowflake to pull data in and write to Snowflake tables.

One thing that is common to Jobs and Services (but not really for Service Functions) is the need for the container to make a connection to Snowflake in order to issue SQL statements. Snowpark Container Services does some things to make this simple, efficient, and secure, and this blog will walk through some of the details, including some utility code, for making those connections from Python, Node.js, and Java.

Introduction

When you run a container in a SERVICE, the system automatically adds some environment variables and files to each container in order to simplify making connections from the container to Snowflake over the internal network. The environment variables it automatically populates are:

  • SNOWFLAKE_ACCOUNT — the account locator for this Snowflake account
  • SNOWFLAKE_HOST — the hostname for the internal network to use to make a connection
  • SNOWFLAKE_PORT — the port to use to make a connection
  • SNOWFLAKE_DATABASE — the database that this SERVICE was created in
  • SNOWFLAKE_SCHEMA — the schema that this SERVICE was created in
  • SNOWFLAKE_SERVICE_NAME — the name of the SERVICE

In addition, Snowpark Container Services adds a file at /snowflake/session/token in the container. This file contains an OAuth token that can be used to connect to Snowflake as the ROLE that created the SERVICE. That is, if you created the SERVICE with the ROLE of SPCS_ROLE, then the OAuth token would allow you to connect using the SPCS_ROLE.

In fact, the only authentication method allowed for connecting from the container to Snowflake using the internal network is this OAuth token.

With the environment variables and the OAuth token file, we can connect from the container to Snowflake using the standard Snowflake connectors and drivers for whichever language you desire.

Local Development

One thing you may notice about Jobs and Services is that they make connections to Snowflake and issue SQL statements or Snowpark actions, so we can mimic their behavior locally on our development machine while we work through the implementation of what we want our Job or Service to do.

One way that I’ve found useful to develop Jobs and Services is to actually start by building them on my own development machine outside of Snowflake. I use Docker tools locally to build the container image and run it locally, connecting to Snowflake using the connector or driver that is best for the language I’m developing in. Once the Job or Service is implemented to my liking, I can push my image to Snowflake and see it work as a SERVICE.

(Sidebar: I am actually working on an M2 Mac, so the architecture is different from the architecture we need to build for Snowpark Container Services. So, I actually re-build the Docker image specifying the appropriate platform (i.e., linux/amd64) and push that image.)

Making Connections

I will now go through the details of how to make a connection for three popular languages — Python, Node.js, and Java — using their driver/connector. My goal is to isolate the connection code into a reusable module that can be added to your application code.

These example modules will support both local development and from within Snowpark Container Services. Each sample code will check for the existence of a file at /snowflake/session/token, and if it exists, then it determines that the environment is within Snowpark Container Services. If it is inside Snowpark Container Services, it will use the OAuth token from that location along with the environment variables to make a connection.

If the file does not exist, and we can assume we are running outside of Snowpark Container Services, then the sample code will look for environment variables setting the account, username, password, database, and schema and connect with those values.

A warehouse is not required to make a connection to Snowflake, however, almost every application will need to execute SQL on a warehouse. As a best practice, I also set an environment variable for SNOWFLAKE_WAREHOUSE and use that when connecting, as well.

When running locally, I pass in those environment variables when running my Docker container, either via docker run or docker compose.

Python

For connecting with Python I am using the Snowflake Python Connector to make a SnowflakeConnection. I also provide a mechanism to create a Snowpark Python Session from that SnowflakeConnection. I put the helper code in a package/directory named spcs_helpers that contains 2 files:

The main file is connection.py:

import os
import snowflake.connector
from snowflake.snowpark import Session

def connection() -> snowflake.connector.SnowflakeConnection:
if os.path.isfile("/snowflake/session/token"):
creds = {
'host': os.getenv('SNOWFLAKE_HOST'),
'port': os.getenv('SNOWFLAKE_PORT'),
'protocol': "https",
'account': os.getenv('SNOWFLAKE_ACCOUNT'),
'authenticator': "oauth",
'token': open('/snowflake/session/token', 'r').read(),
'warehouse': os.getenv('SNOWFLAKE_WAREHOUSE'),
'database': os.getenv('SNOWFLAKE_DATABASE'),
'schema': os.getenv('SNOWFLAKE_SCHEMA'),
'client_session_keep_alive': True
}
else:
creds = {
'account': os.getenv('SNOWFLAKE_ACCOUNT'),
'user': os.getenv('SNOWFLAKE_USER'),
'password': os.getenv('SNOWFLAKE_PASSWORD'),
'warehouse': os.getenv('SNOWFLAKE_WAREHOUSE'),
'database': os.getenv('SNOWFLAKE_DATABASE'),
'schema': os.getenv('SNOWFLAKE_SCHEMA'),
'client_session_keep_alive': True
}

connection = snowflake.connector.connect(**creds)
return connection

def session() -> Session:
return Session.builder.configs({"connection": connection()}).create()

This creates 2 functions, connection() that creates a SnowflakeConnection and session() that makes a Session.

The __init__.py file defines the package and just exposes the connection() and session() functions:

from .connection import connection, session

In this way, we can make a connection or a session in our Python code as follows:

import spcs_helpers

sql = "<YOUR SQL HERE>"

# Connector connection
conn = spcs_helpers.connection()
data = conn.cursor().execute(sql).fetch_pandas_all()
# process results

# Snowpark Session
session = spcs_helpers.session()
data = session.sql(sql).to_pandas()
# process results

Node.js

For connecting with Node.js I am using the Snowflake Node.js Driver to create a connection pool. I put the helper code in a directory named spcs_helpers that contains 1 file.

The main file is connection.js:

const snowflake = require('snowflake-sdk')
const fs = require("fs");
const { connect } = require('http2');

function get_options() {
if (fs.existsSync("/snowflake/session/token")) {
return {
accessUrl: "https://" + process.env.SNOWFLAKE_HOST,
account: process.env.SNOWFLAKE_ACCOUNT,
authenticator: 'OAUTH',
token: fs.readFileSync('/snowflake/session/token', 'ascii'),
}
}
else {
return {
account: process.env.SNOWFLAKE_ACCOUNT,
username: process.env.SNOWFLAKE_USER,
password: process.env.SNOWFLAKE_PASSWORD,
warehouse: process.env.SNOWFLAKE_WAREHOUSE,
database: process.env.SNOWFLAKE_DATABASE,
schema: process.env.SNOWFLAKE_SCHEMA,
clientSessionKeepAlive: true,
}
}
}

var connectionPool = null
function getPool(poolOpts = {}) {
if (!poolOpts.min)
poolOpts.min = connectionPool ? connectionPool.min : 1
if (!poolOpts.max)
poolOpts.max = connectionPool ? connectionPool.max : 10
if (!poolOpts.testOnBorrow)
poolOpts.testOnBorrow = connectionPool ? connectionPool.testOnBorrow :false

if (connectionPool) {
if ((connectionPool.min == poolOpts.min) && (connectionPool.max == poolOpts.max)) {
return connectionPool
}
}

try {
connectionPool = snowflake.createPool(get_options(), poolOpts)
}
catch (error) {
console.error("Error making connection pool: " + error.message)
}

return connectionPool
}

connectionPool = getPool()
module.exports = { connectionPool, getPool };

This exposes a function getPool() that can be used to create a connectionPool. It defaults to a max pool size of 10, but this and other defaults can be overridden by passing in a map of pool options. The connectionPool is cached, so that if you call getPool() with the same options then it will return the already-created pool, but if the options are different then it will close the pool and create a new one with the specified options, and cache that new pool. The cached connectionPool is also exposed, so you can access it directly. Lastly, upon startup, the connectionPool is created with default options, so it is usable straight away, or you can call getPool() and supply pool options to get the pool you desire.

To use this helper, we can use code in our app such as:

const snowflake = require('./spcs_helpers/connection.js')

connectionPool = snowflake.getPool()

const sql = "<YOUR SQL HERE>"
const bindVars = <YOUR BIND VARS>

connectionPool.use(async (connection) => {
const statement = await connection.execute({
sqlText: sql,
binds: bindVars,
complete: function(err, stmt, rows) {
if (err) {
console.error("Failed to execute query: " + err.message)
// handle error
}
else {
console.log("Succesfully queried, nrows=" + rows.length)
// process results
}
}
})
})

Java

For connecting with Java I am using the Snowflake JDBC Driver to create a java.sql.Connection object. I put the helper code in a directory/submodule named spcs_helpers that contains 1 file/class.

For example, I put this code in com.example.demo.spcs_helpers.SnowflakeConnection.java:

package com.example.demo.spcs_helpers;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Map;
import java.util.Properties;

public class SnowflakeConnection {
public static Connection snowflakeConnection() throws Exception {
Properties props = new Properties();
return snowflakeConnection(props);
}

public static Connection snowflakeConnection(Properties props) throws Exception {
Map<String,String> env = System.getenv();

String url = "jdbc:snowflake://" + env.getOrDefault("SNOWFLAKE_ACCOUNT", null) + ".snowflakecomputing.com/";
if (Files.exists(Paths.get("/snowflake/session/token"))) {
props.put("CLIENT_SESSION_KEEP_ALIVE", true);
props.put("account", env.get("SNOWFLAKE_ACCOUNT"));
props.put("authenticator", "OAUTH");
props.put("token", new String(Files.readAllBytes(Paths.get("/snowflake/session/token"))));
props.put("warehouse", env.getOrDefault("SNOWFLAKE_WAREHOUSE", null));
props.put("db", env.get("SNOWFLAKE_DATABASE"));
props.put("schema", env.get("SNOWFLAKE_SCHEMA"));
url = "jdbc:snowflake://" + env.get("SNOWFLAKE_HOST") + ":" + env.get("SNOWFLAKE_PORT");
}
else {
props.put("CLIENT_SESSION_KEEP_ALIVE", true);
props.put("account", env.getOrDefault("SNOWFLAKE_ACCOUNT", null));
props.put("user", env.getOrDefault("SNOWFLAKE_USER", null));
props.put("password", env.getOrDefault("SNOWFLAKE_PASSWORD", null));
props.put("warehouse", env.getOrDefault("SNOWFLAKE_WAREHOUSE", null));
props.put("db", env.getOrDefault("SNOWFLAKE_DATABASE", null));
props.put("schema", env.getOrDefault("SNOWFLAKE_SCHEMA", null));
}
return DriverManager.getConnection(url, props);
}
}

This class has one static function, snowflakeConnection() that returns a java.sql.Connection. You can optionally pass in a java.util.Properties argument that will be used when making the connection (e.g., to set queryTimeout).

To use this helper, we can use code in our such as:

package com.example.demo;

import java.sql.SQLException;
import java.sql.ResultSet;
import java.sql.Statement;

import com.example.demo.spcs_helpers.SnowflakeConnection;

public class Main {
public static void main(String[] args) throws Exception {
String sql = "<YOUR SQL HERE>";

var connection = SnowflakeConnection.snowflakeConnection();
Statement statement = connection.createStatement();
try {
ResultSet resultSet = statement.executeQuery(sql);
if (resultSet.next()) {
// process results
}
}
catch (SQLException sqle) {
// process errors
}
System.exit(1);
}
}

Summary

This blog post presents some example code for connecting from containers in a SERVICE in Snowpark Container Services to Snowflake. The code aims to isolate the connection code to make it easy to incorporate into your application, as well as simple to share with other developers. The samples also have some best practices baked in which allow for simple local development, as well as deployed containers.

Note, however, that there are many ways to structure your code, and this blog post simply presents one way to do this. But, hopefully, this will give you a jump-start to developing new and exciting applications in Snowpark Container Services!

--

--

Brian Hess
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

I’ve been in the data and analytics space for over 25 years in a variety of roles. Essentially, I like doing stuff with data and making data work.