So You Want to… Speed Up Your Python in Snowflake

Photo by Julian Hochgesang on Unsplash

After all, who doesn’t want code to execute faster? Snowflake and Snowpark give you lots of the tools to make this happen, but they aren’t a silver bullet, you need to know how to get the most out of the toolset. In this article I explore four different ways to speed up your code in Snowflake (well, three really).

  1. Be Pythonic: deal with the low hanging fruit by optimising your code in general
  2. Consider JIT Compiling: speed up bottlenecks in code by using a Just In Time (JIT) compiler like Numba
  3. Use The Right Snowpark Features: make sure you’re using Snowpark as efficiently as possible by using Snowpark Dataframes and UDFs
  4. Parallelise: use asynchronous calls to distribute compute over your available Snowflake resources

1. Be Pythonic

Top of mind when using Python should always be this. I’m not going to dive into all the details as this is more than well covered by the Pythonistas, but in summary:

  1. Avoid for loops whenever you can
  2. Use in built Numpy/Pandas/Whatever Package You’re Using functions, they tend to be efficiently written and often push down execution to C etc.
  3. Use List/Dictionary Comprehension to manipulate data for the same reason as 2

Single Node Code that is slow outside of Snowflake isn’t going to be much quicker in Snowflake, so make sure your code is appropriately efficient.

2. Consider (JIT) Compiling

Python is great, and you won’t hear me say otherwise. I first learned to code in Basic, then Fortan (77 for what it’s worth) and while the latter was and is an incredible close to the metal number cruncher, usability leaves something to be desired. Conversely, Python’s strength is how easy it is to use, but it’s not always as performant. One “weakness” is slow execution of for loops and similar constructs; avoid them and you should be fine. Unfortunately, it’s not always that simple.

The Problem

I recently had a financial services client who was porting some Python code to Snowflake and wasn’t getting the performance that they desired. Upon inspection my first thought was to be “More Pythonic”, spotting a number of for loops working over tables of data. A good instinct, but misplaced as these weren’t any old for loops. No, they were recursive calculations, using the last calculated value to inform the next one. Not Python’s strong suit as a rule, and hard to avoid when doing bespoke recursive things in Python

So how did we fix it?

While Python performance struggles with recursive problems all is not lost. A number of packages (pardon the future pun) are available to speed up Python execution including Numba, Cython, PyPy and many more. Numba is supported by Snowflake’s Anaconda environments so it was the natural choice to generate some performance gains.

Let’s take a look:

First, Import Your Libraries

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
import numpy as np
import pandas as pd
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split

Next, Connect to Snowflake

# Reading Snowflake Connection Details
snowflake_connection_cfg = json.loads(open("creds.json").read())
# Creating Snowpark Session
session = Session.builder.configs(snowflake_connection_cfg).create()
# Create a fresh & new schema
session.sql("CREATE OR REPLACE DATABASE DB").collect()
session.sql("CREATE OR REPLACE WAREHOUSE WH WITH WAREHOUSE_SIZE='X-SMALL'").collect()
session.sql("CREATE OR REPLACE STAGE MODELS").collect()

Next Create and Load Some Summy Data

import numpy as np
import pandas as pd
import numba
curve = np.random.rand(1000000,2)
curve = pd.DataFrame(curve)
curve_df = pd.DataFrame(curve, columns=['1','2'])
session.write_pandas(curve_df, table_name='DATA', auto_create_table=True, overwrite=True)

Next, Define Your Comparison Case

@sproc(session=session, name='run_calc_standard', stage_location='@MODELS',  
packages=['snowflake-snowpark-python', 'numpy', 'pandas'],
is_permanent=True,
replace=True)
def run_calc_standard(session: Session, input_table: str, output_table: str) -> str:
def forward_to_zero(z1, f2, t1, t2):
zero = ((1+z1)**t1*(1+f2)**(t2-t1))**(1/t2)-1
return zero

def forward_to_zero_curve(curve):
zero = curve.copy()
for i in range(0,len(zero)):
if i == 0:
zero.iat[0,1] = curve.iat[0,1]
else:
zero.iat[i,1] = forward_to_zero(zero.iat[i-1,1]/100,
curve.iat[i,1]/100,
curve.iat[i-1,0],
curve.iat[i,0])*100
return zero

curve = session.table(input_table).to_pandas()
result = forward_to_zero_curve(curve)
session.write_pandas(curve, table_name=output_table,
auto_create_table=True, overwrite=True)
return('Curve Calculated, Without Numba')

And Execute Against Our Dummy Data

%%time
run_calc_standard("DATA", 'OUTPUT_STANDARD')

Next, Rewrite Your Function(s) With Numba

@sproc(session=session, name='run_calc_numba', stage_location='@MODELS',  
packages=['snowflake-snowpark-python', 'numba', 'numpy', 'pandas'],
is_permanent=True,
replace=True)
def run_calc_numba(session: Session, input_table: str, output_table: str) -> str:
import numba
@numba.njit
def forward_to_zero_curve_numba(curve):
zero = curve.copy()
for i in range(0,len(zero)):
if i == 0:
zero[0,1] = curve[0,1]
else:
zero[i,1] = (((1+zero[i-1,1]/100)**curve[i-1,0]
*(1+curve[i,1]/100)**(curve[i,0]-curve[i-1,0]))
**(1/curve[i,0])-1)*100
return zero
curve = session.table(input_table).to_pandas()
curve = np.array(curve)
result = pd.DataFrame(forward_to_zero_curve_numba(curve), columns=['1','2'])
session.write_pandas(result, table_name=output_table,
auto_create_table=True, overwrite=True)
return('Curve Calculated, With Numba')

Finally, Execute Your Comparison

%%time
run_calc_numba("DATA", 'OUTPUT_NUMBA')

The Result

To be as pessimistic as possible I’m calculating the time from making the call to receiving the results, the end user wall time.

Without Numba = 102s

With Numba = 8s

That’s a 10x speed improvement, and really a lot more as the “penalty” to hand off a single job to Snowflake is a one time cost, so handing over lots of work in one shot (or even parallelising the work, see below) is a much more efficient approach.

For context on local hardware this decreased execution time from c. 40s to c. 100ms, a quite staggering improvement.

3. Use The Right Snowpark Features

I often think of Snowflake as the database equivalent of Python. Easy to use and learn, configuration is a cinch, optimisation happens without you asking for it but all this happens when you use it right. A great example of this is developing a model pipeline in a way that makes it easy to tap into Snowflake’s Vectorised UDFs.

The Problem

It can be tempting to create a Machine Learning pipeline that captures each step of data transformation in a new table. Sometimes this is the right call, especially if you intend to reuse partially transformed data (think feature stores). Often it’s an overhead that is unnecessary and might prevent/complicate tapping in to some of Snowpark’s capabilities.

The example below shows how I avoided the use of slower UDTFs, and more complex to write Snowpark manipulations by packaging up the entire pipeline using sci-kit learn

First, Import Your Libraries

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
import numpy as np
import pandas as pd
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split

Next, Connect to Snowflake

# Reading Snowflake Connection Details
snowflake_connection_cfg = json.loads(open("creds.json").read())
# Creating Snowpark Session
session = Session.builder.configs(snowflake_connection_cfg).create()
# Create a fresh & new schema
session.sql("CREATE OR REPLACE DATABASE DB").collect()
session.sql("CREATE OR REPLACE WAREHOUSE WH WITH WAREHOUSE_SIZE='X-SMALL'").collect()
session.sql("CREATE OR REPLACE STAGE MODELS").collect()

Next, Create Some Dummy Data (with some holes in it to impute)

# Generate our dataset
n_features = 10
n_samples = 10000
columns = [str(i) for i in range(0,n_features)]
X,y = make_classification(n_samples=n_samples,
n_features=n_features,
n_classes=2)
df = pd.DataFrame(X, columns=columns)


# Poke holes in our dataset for later imputation
A = np.array(df)
c = 100 # number of holes
mask=np.zeros(n_samples*n_features,dtype=bool)
mask[:c] = True
np.random.shuffle(mask)
mask=mask.reshape(n_samples,n_features)
A[mask] = np.nan
df = pd.DataFrame(A, columns=columns)
df['Y']=y

session.write_pandas(df, table_name='ML_DATASET_RAW',
auto_create_table=True, overwrite=True)

Next, Build a Pipeline Model

Note we are doing imputation, scaling, PCA and Machine Learning all in one function, this will help us later on.

# Let's Create A SProc, note could also use Deploy button top right
@sproc(session=session, name='pipe_model_train_sproc', stage_location='@MODELS',
packages=['snowflake-snowpark-python', 'scikit-learn', 'numpy'],
is_permanent=True,
replace=True)
def train_pipe_model(session: Session, train_table: str, model_name: str) -> str:
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from joblib import dump

# Get the Data
data = session.table(train_table).to_pandas()
y_train = data['Y']
X_train = data.drop('Y', axis=1)

# Build a pipeline
pipe = Pipeline([('imputer', SimpleImputer(missing_values=np.nan, strategy='mean')),
('scaler', StandardScaler()),
('PCA', PCA(n_components=8, svd_solver='arpack')),
('LR', LogisticRegression())])
pipe.fit(X_train, y_train)
dump(pipe, '/tmp/'+model_name)
session.file.put('/tmp/'+model_name, '@MODELS', auto_compress=False, overwrite=True)

# Let's Run Our Sproc (and save the model result)
train_pipe_model('ML_DATASET_RAW', 'PIPE_MODEL_SPROC.sav')
print('ML Model trained and saved')

Finally, Register the Pipeline as a UDF and Execute

# Register the pipeline as a UDF
@cached(cache={})
def load_model(model_path: str) -> object:
from joblib import load

model = load(model_path)
return model
def udf_pipe_model(df: pd.DataFrame) -> pd.Series:
import sys
model = load_model(sys._xoptions["snowflake_import_directory"]+'PIPE_MODEL.sav')
scored_data = pd.Series(model.predict(df))
return scored_data
feature_cols = [str(i) for i in range(0,10)]
udf_pipe_model = session.udf.register(session=session,func=udf_pipe_model,
name="udf_score_pipe_model",
stage_location='@MODELS',
input_types=[T.FloatType()]*len(feature_cols),
return_type=T.FloatType(),
replace=True, is_permanent=True,
max_batch_size=1000,
imports=['@MODELS/PIPE_MODEL.sav'],
packages=['scikit-learn==1.1.1','pandas','joblib','cachetools'])
# Execute the Pipeline
dummy_test_sdf = session.table('ML_DATASET_RAW')
test_res = dummy_test_sdf.with_column('Y_PRED', udf_pipe_model(*feature_cols))
print(test_res.limit(15).to_pandas())

The Result

No UDTFs, so no (relatively) tricky syntax and no (potentially) slow looping through transactions to generate new tabular data. In its place we get a clean UDF, which captures our complete ML pipeline in one place. And because it’s a UDF, you can tap in to massive parallelisation, batch processing millions if not billions of records in seconds by simply increasing the size of your virtual warehouse. Remember, each increase in Virtual Warehouse Size doubles the available compute, so X-Small is 1, Small is 2x, Medium is 4x etc. and you only pay for what you consume so it really is as simple as altering the following SQL

CREATE OR REPLACE WAREHOUSE WH WITH
WAREHOUSE_SIZE = 'X-SMALL' -- all the way up to 6X-Large which is 512x bigger

or as Python

session.sql("CREATE OR REPLACE WAREHOUSE WH WITH WAREHOUSE_SIZE = 'X-SMALL'").collect()

4. Finally, Parallelise

This is a bit of a trite observation in these days of cloud computing, but parallelising your workload is a surefire way to “improve” the performance of your code. Of course, the code isn’t actually running any faster, you’re just using more compute resources in the same span of time. Loosely we can break parallelisation into two buckets: easy (embarrassingly easy) parallelisation and hard parallelisation. The hard stuff I won’t tackle here, (I’m just not that clever) but let’s consider the simple scenario.

The Problem

I recently worked with a client who was seeking to speed up some traditional statistical model search/build code by porting it to Python and Snowflake. Their goal was essentially a tree search of algorithm permutations, where the model would progressively move from compact to augmented. At each node the model start point would be the best prior node. For example,

7 Features = A,B,C,D,E,F,G

  1. Level 1: 5 models analysed, Starting features= None; Best feature = A
  2. Level 2: 4 models analysed, Starting features= A; Best feature = D
  3. Level 3: 3 models analysed,Starting features= A,D ; Best feature = C
  4. Level 4: 2 models analysed,Starting features= A,D,C; Best feature = B
  5. Level 5: 1 models analysed,Starting features= A,D,C,B; Best feature = E

Assuming we do the full tree search in this fashion we have to look at (𝑛²+𝑛)/2 models to train. If we can’t parallelise that’s a punishing workload.

So how did we fix it?

Snowflake UDFs are a doddle to parallelise, as they are designed ground up to do so. But what about Sprocs? They are single node bound after all…

Enter Asynchronous calls. By taking advantage of the ease of spinning up compute resources within Snowflake we can parallelise the tree search with only a few extra lines of code. Then, as we traverse the tree and have less branches at each node we can progressively let our virtual warehouses wind down.

First Up, Import Your Libraries

from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
from sklearn import svm
import pandas as pd
import numpy as np
import json
from snowflake.snowpark.session import Session
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T
from snowflake.snowpark.functions import sproc
from sklearn.datasets import load_iris, make_classification

Next, Create Some Dummy Data

num_cols = 10
columns = [str(i) for i in range(0,num_cols)]
X,y = make_classification(n_samples=100000, n_features=num_cols, n_classes=2)
df = pd.DataFrame(X, columns=columns)
df['y']=y
df.head(5)

Next, Connect to Snowflake and Load the Data

# Reading Snowflake Connection Details
snowflake_connection_cfg = json.loads(open("creds.json").read())

# Creating Snowpark Session
session = Session.builder.configs(snowflake_connection_cfg).create()

# Create a fresh & new schema
session.sql("CREATE OR REPLACE DATABASE ASYNC_DB").collect()
session.sql("CREATE OR REPLACE WAREHOUSE ASYNC_WH WITH WAREHOUSE_SIZE='LARGE'").collect()
session.sql("CREATE OR REPLACE STAGE FUNCTIONS").collect()

session.write_pandas(df, table_name='ML_DATASET',
auto_create_table=True, overwrite=True)

Next, Create a Stored Procedure for Training

from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
import numpy as np

@sproc(name='get_scores_sf',
packages=['snowflake-snowpark-python', 'scikit-learn', 'numpy'],
is_permanent=True,
replace=True,
stage_location='@FUNCTIONS',
session=session)
def get_scores_sf(session: Session,
columns_tmp: str) -> float:
columns_tmp = columns_tmp.split(',')
data = session.table('ML_DATASET').to_pandas()
y = data['y']
X = data[columns_tmp]
clf = LogisticRegression()
scores = cross_val_score(clf, X, y, cv=4, scoring='f1_macro') # insert early stopping/timeout criteria here
return np.mean(scores)

Finally, Run Your Tree Search

import snowflake.connector
conn = snowflake.connector.connect(user=snowflake_connection_cfg['user'],
password=snowflake_connection_cfg['password'],
account=snowflake_connection_cfg['account'],
role=snowflake_connection_cfg['role'],
database="ASYNC_DB",
warehouse="ASYNC_WH")
cur = conn.cursor()

best_cols = []
col_count = len(columns)
remaining_columns = columns.copy()
for i in range(0,col_count):
MLE_list = []
MLE_col = []
MLE_dict = {}
for current_column in remaining_columns:
current_warehouse = "ASYNC_" + str(current_column)
if i==0:
session.sql("CREATE OR REPLACE WAREHOUSE " # note I created one warehouse per feature, but could combine multiple features in one warehouse provided sufficient mem available
+ current_warehouse +
" WITH WAREHOUSE_SIZE='X-SMALL'").collect() # gradually these warehouses will spin down, could also explicitly tell them)
else:
session.use_warehouse(current_warehouse)
print("warehouse:", current_warehouse,
"; current column:",current_column,
"; best columns (so far):", best_cols)
columns_tmp = best_cols + [current_column]
columns_tmp = "'" + ','.join(columns_tmp) + "'"
queryId = cur.execute_async("CALL get_scores_sf("+columns_tmp+")")['queryId'] # this bit to make async calls
MLE_list += [queryId]
MLE_col += [current_column]
for queryID,current_column in zip(MLE_list, MLE_col): # a little suboptimal, but deemed tolerable
cur.get_results_from_sfqid(queryID) # Retrieves the results of an asynchronous query
result = cur.fetchall() # returns a list of sequences/dict
MLE_dict[current_column] = result
best_col = max(MLE_dict, key=MLE_dict.get)
best_cols += best_col
remaining_columns.remove(best_col)
# introduce some stopping condition here
print("The optimal set of columns, in rank order, are:", best_cols)

The Result

For my client 100+ features was not uncommon meaning the entire search “length” would be 5050 models long. With this parallelisation that search length drops to 100. We haven’t saved on computing, but we saved 50x on time.

--

--

Michael Taylor
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

Equal parts data scientist, consultant, data privacy wonk, animal lover, basketball coach/player and cook. Thoughts are my own, not my employers