Start Leveraging Snowflake ML Features to Speed Up your ML Projects!

I really like this graph that shows how Snowflake Data Cloud unites Data Scientist, Data Engineers and Business Intelligence users in a single platform.

These teams have been usually working in silos. Particularly Data Scientists are used to working in their sandboxes and notebooks and sometimes lack some experience on data engineering skills and are not aware of all benefits of data platforms like Snowflake. Working with many of them I have found they are used to managing projects in a certain way and the goal for this blog is to provide a couple of examples on how things can be improved. My goal is to trigger some ideas to explore the benefits of pushing down as much work as possible to the Snowflake Data Platform.

Recently I have been working on updating a traditional project to get benefit of new Snowpark ML features and the elasticity and agility of Snowflake so I will note a couple of observations here.

1st: Forget about CSV files and manage training and testing data within Snowflake

First, this project had a data collection and preparation phase. It had to create a table with info about images. As Data Scientists feel very comfortable with CSV files, one was created for training and another one for testing. This is a very common thing and here some lines of code:

# Create dataframe of all preprocessed images and labels
image_files = os.listdir('../../data_preprocessed')
image_labels = [x.replace('t_shirt', 'tshirt').split('_')[0] for x in image_files]


data = pd.DataFrame({
'img_name': image_files,
'label': image_labels
})
# randomly split into train and test with a 90:10 ratio
msk = np.random.rand(len(df)) < 0.9
train = data[msk]
test = data[~msk]


# save as csvs
train.to_csv('sim_train.csv', index=False)
test.to_csv('sim_test.csv', index=False)

Later that was used in the code, again moving it into memory:

classes = pd.read_csv(os.path.join(anno_path, 'sim_train.csv')).label.unique()
classes.sort()

It was used to create the image datasets that was going to be used for training and testing:

image_datasets = {
'train': CustomImageDataset(
annotations_file = os.path.join(anno_path, 'sim_train.csv'),
img_dir = img_path,
transform = data_transforms['train']
),
'val': CustomImageDataset(
annotations_file = os.path.join(anno_path, 'sim_test.csv'),
img_dir = img_path,
transform = data_transforms['val']
),
}

Who controls that CSV file? How many copies do you have? Can I share that CSV file with other users in a secure way? Can I take a copy of that file in an easy and efficient way before running a training or inference? All these things are much easier when all data is within Snowflake. Either with Staging areas or like in this case, just using Snowflake tables and managing Snowpark Dataframes. A single copy that is managed by Role Based Access Control (RBAC) and where I can create an immediate Zero Copy Clones before doing any training to keep a copy of my data without any additional storage.

With the Snowflake approach, I can create my dataframe:

labels = []
names = []
data = []
for img in tqdm.tqdm(images):
names.append(img.split('/')[-1])
label_cleaned = img.replace('t_shirt', 'tshirt') # We perform this cleaning here but should have being done before as part of data engineering
labels.append(label_cleaned.split('/')[-1].split('_')[0])

with open(img, "rb") as image_file:
data.append(base64.b64encode(image_file.read()).decode("utf-8"))

df_base64 = pd.DataFrame(
{
'LABEL': labels,
'NAME': names,
'DATA': data
}
)

And store it directly into a Snowflake table that can be used later:

sdf = session.create_dataframe(df_base64)
sdf.write.save_as_table(table_name="IMAGES_ENCODED", mode='overwrite')

Now I can ask Snowflake to provide a train and a test dataframe, and just make the call to_pandas() at the precise moment the data is going to be needed in that format.

train_df, val_df = session.table("IMAGES_ENCODED").random_split(weights=[0.8, 0.2], seed=0)

image_datasets = {
'train': CustomImageDataset(
df_annotations = train_df.to_pandas(),
img_dir = img_path,
transform = data_transforms['train']
),
'val': CustomImageDataset(
df_annotations = val_df.to_pandas(),
img_dir = img_path,
transform = data_transforms['val']
),
}

Simple steps, more secure and faster.

2nd: Bring pre-processing into Snowflake

My second example is about pre-procesing that data for feature engineering and feature creation. The use case is to provide highly personalized recommendations on shopping. The features include things like gender, age, profession, work experience, family size, the clicks performed at the website but also information about the clothes the user clicks on. Here we are using a transformer to provide embeddings for each image. Those embeddings are used to find similar clothes and that information is fed into the ML algorithm to provide recommendations. There is one pre-processing step as the embedding is a series of 512 numbers. Here we can create one feature for each value, so we have to extract the vector and split it creating new columns. Once that is done, we have to pre-process the rest of the values with one hot encoder, min-max scaler and ordinal encoder.

This is a way to run all that using traditional numpy and pandas:

def data_prep(df, feature_cols, target_col, training_frac, logger):
"""
prep data for training: extract embeddings, transform categorical features
"""
# Select feature columns
X = df[feature_cols]
# Extract embeddings and convert into dataframe
logger.info(
f"\tExtracting Embeddings. RAM memory % used: {psutil.virtual_memory()[2]}"
)
embedding_array = np.stack(X.EMBEDDINGS.apply(lambda x: np.array(eval(x))))
logger.info(
f"\tConverting Embeddings to pd.DataFrame. RAM memory % used: {psutil.virtual_memory()[2]}"
)
embeddings_only = pd.DataFrame(embedding_array, columns=['EMBEDDING_'+str(i)for i in range(512)])

logger.info(
f"\tConcating Embeddings. RAM memory % used: {psutil.virtual_memory()[2]}"
)
# combine embeddings and non-embedding features into single dataframe
X = pd.concat([X.drop('EMBEDDINGS', axis=1), embeddings_only], axis=1)
logger.info(
f"\tRAM memory % used: {psutil.virtual_memory()[2]}"
)
# convert gender from string to binary feature
X.GENDER = X.GENDER.apply(lambda x: x=='Female').astype(int)
# 1-hot encode profession
profession_columns = [
'PROFESSION_ARTIST','PROFESSION_DOCTOR','PROFESSION_ENGINEER',
'PROFESSION_ENTERTAINMENT','PROFESSION_EXECUTIVE','PROFESSION_HEALTHCARE',
'PROFESSION_HOMEMAKER','PROFESSION_LAWYER','PROFESSION_MARKETING'
]
for c in profession_columns:
X[c] = (X.PROFESSION == c[11:]).astype(int)
X = X.drop('PROFESSION', axis=1)
y = df[target_col]
# create training subsample
train_sample = X.sample(frac=training_frac).index
return X, y, train_sample

These are the steps taken, memory consumption and total time spent in the preparation process:

job-tutorial - INFO -  Extracting Embeddings. RAM memory % used: 23.2
job-tutorial - INFO - Converting Embeddings to pd.DataFrame. RAM memory % used: 42.2
job-tutorial - INFO - Concating Embeddings. RAM memory % used: 42.2
job-tutorial - INFO - RAM memory % used: 51.5

Wall time: 4min 14s

Now let’s use all the power of Snowflake to run the same operations. First, we can define an Stored Procedure to make this work. This is a Java one based on this procedure to split the multi-value column.

create or replace procedure split_string (TAB_NAME varchar, COL_NAME varchar, VIEW_NAME varchar) 
returns varchar(1000)
language javascript
as
$$
var cnt_query = "select max(regexp_count(" + COL_NAME + ",','))+1 as MAX_COUNT from " + TAB_NAME + ";"
var cnt_stmt = snowflake.createStatement(
{
sqlText: `${cnt_query}`
}
)

res = cnt_stmt.execute()

if(!res.next()){
return 'Not able to execute'
}

var count_value = res.getColumnValue('MAX_COUNT')
var query_string= "create or replace view " + VIEW_NAME + " as SELECT *, "

for (var i = 1; i<count_value; i++){
query_string = query_string + " replace(replace(split_part(" + COL_NAME + ",','," + i + "), '[', ''), ']', '')::FLOAT as " + COL_NAME + "_" + i + ","
}

query_string = query_string + " replace(replace(split_part(" + COL_NAME + ",','," + count_value + "), '[', ''), ']', '')::FLOAT as " + COL_NAME + "_" + count_value + " from "+ TAB_NAME + ";"

var cvs_stmt = snowflake.createStatement(
{
sqlText: `${query_string}`
}
)

res = cvs_stmt.execute()

return "The view " + VIEW_NAME + " is created."

$$
;

Now calling that function will create the new table with a column for each vector value in just 24 seconds (using the smallest warehouse size):

session.call('SPLIT_STRING', 'CLICKSTREAM_TRAIN_FULL', 'EMBEDDINGS', 'EMBEDDINGS_V')

Wall time: 24 s

Using Snowpark Dataframes we can drop columns not needed and split between training and testing:

table_name = "EMBEDDINGS_V"

df_embeddings = session.table(table_name)

df_embeddings = df_embeddings.drop('EMBEDDINGS', '"Clickstream"', 'NAME', 'SCORE', 'CUSTOMERID')

train_df, test_df = df_embeddings.random_split(weights=[0.8, 0.2], seed = 0)

Wall time: 19 s

Using Snowflake ML the pre-procesing can be done within Snowflake without having to bring any data out:

num_cols=['AGE', 'WORK_EXPERIENCE', 'ANNUAL_INCOME', 'SPENDING_SCORE', 'FAMILY_SIZE']

pipe = Pipeline(steps=[
("mme", MinMaxScaler(input_cols= num_cols, output_cols=num_cols)),
("oe", OrdinalEncoder(input_cols='GENDER', output_cols='GENDER')),
("ohe", OneHotEncoder(input_cols=["PROFESSION"], output_cols=["PROFESSION"], drop_input_cols = True))
])

train_df2 = pipe.fit(train_df).transform(train_df)

Wall time: 26.3 s

So moving from 4 minutes and 14 seconds to just 1 minute and 9 seconds.

These are just two examples, but my advice is to use Snowflake as much as possible for all the ML steps you need in your projects. From transformations and feature creation that can be done using Snowpark Dataframes, pre-processing using Snowflake Modeling, Training Models within Snowflake to registering and serving models with Snowpark ML Model Registry.

Enjoy!

Carlos Carrero.-

--

--