A example of migrating Pandas based code to Snowpark based code

Photo by Alex Chumak on Unsplash

Some time ago I wrote a post on how to do feature engineering with Snowpark for Scala based on generated credit card transactions, based on the Machine Learning for Credit Card Fraud Detection — Practical Handbook. The data generation at that time was done using Python and Pandas DataFrames, but there were some challenges in generating large volumes (100s of millions of records).

Since then, Snowflake has released Snowpark for Python and I recently needed to generate around 400 million transactions for some testing. So it was a good time to migrate this code to use Snowpark for Python instead.

The purpose with this post is to show you how you an example of how you can migrate Pandas based code to Snowpark based code.

Some differences between a Snowpark DataFrame and Pandas DataFrame

The current DataFrame API of Snowpark for Python follows the PySpark DataFrame API in how it works and how to use it. This is usually not a challenge if you are used to PySpark, but if you only have experience with Pandas it might not be so easy.

A Pandas dataframe is a relational object where the data is represented as rows and columns, think of it as an Excel spreadsheet. All data in a Pandas DataFrame is stored in client memory, and all operations and transformations on the DataFrame object are performed instantly.

A Snowpark DataFrame is also a relational object, but the data is stored in Snowflake. The Snowpark DataFrame object contains only the logic, SQL, needed to retrieve data according to the operations and transformations applied to the DataFrame object. It is also evaluated lazily, meaning that it only executes the logic, the SQL, when a specific action is triggered.

Migration the Data Generation code

The code I am migrating is part of the Machine Learning for Credit Card Fraud Detection — Practical Handbook. It generates four datasets, customer profiles, terminal profiles, transactions and transactions with a fraud scenario.

Customer profile dataset

The customer profile dataset is generated, in the original Python code, using a loop that creates a list of lists of values are created and then converts them into a Pandas DataFrame.

The loop generates five different types of values; x_customer_id and y_customer_id, which is random integers between 0 and 100 and represents the X and Y coordinates of a customer within a 100x100 grid, mean_amount, which is a integer between 5 and 100 and is the average amount for a customer’s transactions, std_amount, which is mean_amount divided by two and represents the standard deviation of the amount, and mean_nb_tx_per_day, which represents the average number of transactions per day for a customer. All generated values is created using the Numpy random.uniform function.

import numpy as np
import pandas as pd

n_customers = 10000
np.random.seed(0)

customer_id_properties=[]

# Generate customer properties from random distributions
for customer_id in range(n_customers):

x_customer_id = np.random.uniform(0,100)
y_customer_id = np.random.uniform(0,100)

mean_amount = np.random.uniform(5,100)
std_amount = mean_amount/2

mean_nb_tx_per_day = np.random.uniform(0,4)

customer_id_properties.append([customer_id,
x_customer_id, y_customer_id,
mean_amount, std_amount,
mean_nb_tx_per_day])

customer_profiles_table = pd.DataFrame(customer_id_properties,
columns=['CUSTOMER_ID', 'x_customer_id',
'y_customer_id', 'mean_amount',
'std_amount', 'mean_nb_tx_per_day'])

To implement this with the Snowpark DataFrame API I have several options. I could keep the loop and create a Python list that is stored in the memory of my Python client and then create a Snowpark DataFrame from it. However, this will have some volume limitations and would require the data to be transferred from my client to Snowflake.

A better way is to generate the data on the snowflake side, which can be done using the generator method, which generates n number of rows and use the built-in functions uniform and random to generate random values.

The calculation of std_amount needs to be done after the mean_amount column has been generated, hence the use of with_column, which adds a new column to a Snowpark DataFrame. The select is only used to get the columns in the same order as the Pandas DataFrame example, there is no real need for it and it could be skipped.

from snowflake.snowpark import functions as F

n_customers = 10000

df_customer_profiles = snf_session.generator(
F.seq8(1).as_("customer_id")
, F.uniform(0, 100, F.random()).as_("x_customer_id")
, F.uniform(0, 100, F.random()).as_("y_customer_id")
, F.uniform(5, 100, F.random()).as_("mean_amount")
, F.uniform(0, 4, F.random()).as_("mean_nb_tx_per_day")
, rowcount=n_customers)\
.with_column("std_amount",(F.col("mean_amount")/F.lit(2)))\
.select(['CUSTOMER_ID','x_customer_id', 'y_customer_id'
,'mean_amount', 'std_amount', 'mean_nb_tx_per_day'])

The above code will not use any client memory, everything will be executed in Snowflake once an action method is triggered, and no data will be transferred between the client and Snowflake. And if I use show, this will trigger the execution, but it will only run until 10 rows are collected, to look at the first rows I get the following output.

Terminal profile dataset

The logic to create the terminal profile dataset is the same as for the customer profile dataset, but it only generates the X and Y coordinates for a terminal, so I will just show the Snowpark DataFrame code for it.

n_terminals = 20000

df_terminal_profiles = snf_session.generator(
F.seq8(1).as_("TERMINAL_ID")
, F.uniform(0, 100, F.random()).as_("x_terminal_id")
, F.uniform(0, 100, F.random()).as_("y_terminal_id")
, rowcount=n_terminals)

Association of customer profiles to terminals

The next part is to associate customers to terminals, the association is done by calculating the distance between the x and y coordinates for the customers and the terminals and then keeping all terminals that are less than a radius (r) from a customer.

The Python code converts the Pandas DataFrames to Numpy arrays to speed up the calculation. It also uses a function as the logic is applied to each row in the customer_profiles_table Pandas DataFrame using the apply method with lambda.

def get_list_terminals_within_radius(customer_profile, x_y_terminals, r):
x_y_customer = customer_profile[['x_customer_id','y_customer_id']].values.astype(float)

# Squared difference in coordinates between customer and terminal locations
squared_diff_x_y = np.square(x_y_customer - x_y_terminals)

# Sum along rows and compute squared root to get distance
dist_x_y = np.sqrt(np.sum(squared_diff_x_y, axis=1))

# Get the indices of terminals which are at a distance less than r
available_terminals = list(np.where(dist_x_y<r)[0])

x_y_terminals = terminal_profiles_table[['x_terminal_id','y_terminal_id']].values.astype(float)
customer_profiles_table['available_terminals']=customer_profiles_table.apply(lambda x : get_list_terminals_within_radius(x, x_y_terminals=x_y_terminals, r=50), axis=1)

Once again my focus is to keep the data in Snowflake and also try to use the Snowpark Dataframe API functions as much as possible.

To do this, I will join customers with terminals using the distance calculation and filtering as the join condition. This will create a new Snowpark DataFrame with one row for each customer and the terminals that are less than r away from the customer.

As the Snowpark DataFrame API does not expose the square function in Snowflake at the time of writing, I can use the function function to create a callable object that uses this function. If I just wanted to call the function directly, I could use call_function instead.

snf_square = F.function("SQUARE")

df_customer_terminals = df_customer_profiles.join(df_terminal_profiles,
F.sqrt(snf_square(F.col("X_CUSTOMER_ID") - F.col("X_TERMINAL_ID")) + snf_square(F.col("Y_CUSTOMER_ID") - F.col("Y_TERMINAL_ID"))) < F.lit(r))\
.select("CUSTOMER_ID", "TERMINAL_ID")

The following Snowpark DataFrame is generated from the above code:

I now have three Snowpark DataFrames; df_customer_profiles with my customers, df_terminal_profiles with my terminals and df_customer_terminals with the terminals closest to each customer.

The next step is to generate transactions for each customer for a specified time period.

Transaction dataset

The original Python function is called for each customer, using apply and lambda.

It loops through the number of days provided and for each day generate a random number of transactions using mean_nb_tx_per_day and a poisson distribution. For each transaction it will add a random time (seconds), a random amount, based on mean_amount and std_amount, and a random terminal, based on the nearest terminals. It also adds a value, tx_time_seconds, that shows the number of seconds that has passed from start_date to the current day and time. All is stored in a list of lists.

After the loop it generates a Pandas DataFrame for the list and also adds a column, tx_datetime, with a timestamp based on tx_time_seconds.

def generate_transactions_table(customer_profile, start_date = "2018-04-01", nb_days = 10):

customer_transactions = []

random.seed(int(customer_profile.CUSTOMER_ID))
np.random.seed(int(customer_profile.CUSTOMER_ID))

# For all days
for day in range(nb_days):

# Random number of transactions for that day
nb_tx = np.random.poisson(customer_profile.mean_nb_tx_per_day)

# If nb_tx positive, let us generate transactions
if nb_tx>0:

for tx in range(nb_tx):

# Time of transaction: Around noon, std 20000 seconds. This choice aims at simulating the fact that
# most transactions occur during the day.
time_tx = int(np.random.normal(86400/2, 20000))

# If transaction time between 0 and 86400, let us keep it, otherwise, let us discard it
if (time_tx>0) and (time_tx<86400):

# Amount is drawn from a normal distribution
amount = np.random.normal(customer_profile.mean_amount, customer_profile.std_amount)

# If amount negative, draw from a uniform distribution
if amount<0:
amount = np.random.uniform(0,customer_profile.mean_amount*2)

amount=np.round(amount,decimals=2)

if len(customer_profile.available_terminals)>0:

terminal_id = random.choice(customer_profile.available_terminals)

customer_transactions.append([time_tx+day*86400, day,
customer_profile.CUSTOMER_ID,
terminal_id, amount])

customer_transactions = pd.DataFrame(customer_transactions, columns=['TX_TIME_SECONDS', 'TX_TIME_DAYS', 'CUSTOMER_ID', 'TERMINAL_ID', 'TX_AMOUNT'])

if len(customer_transactions)>0:
customer_transactions['TX_DATETIME'] = pd.to_datetime(customer_transactions["TX_TIME_SECONDS"], unit='s', origin=start_date)
customer_transactions=customer_transactions[['TX_DATETIME','CUSTOMER_ID', 'TERMINAL_ID', 'TX_AMOUNT','TX_TIME_SECONDS', 'TX_TIME_DAYS']]

return customer_transactions

transactions_df=customer_profiles_table.groupby('CUSTOMER_ID').apply(lambda x : generate_transactions_table(x.iloc[0], nb_days=nb_days)).reset_index(drop=True)

Now we have an example that is not so easy to do with the Snowpark DataFrame API, the challenge is to generate a random number of rows for each customer based on a value in a column.

The options I have are to either pull the data back to my client and run the original code there, or take the original code to Snowflake, which is the purpose of Python User Defined Functions, UDF, and Python User Defined Table Functions, UDTF. Since I want to return multiple rows for each input row, I need to use a UDTF.

When creating a UDTF in Python I need to use a Python class with at least one process method that is called for each input row, there are additional methods that can be used but I do not need them for this example.

For the process method I will reuse the original Python code with some minor changes, instead of a Pandas DataFrame as input I need to provide each value that is needed as an input parameter to the function.

I also need to provide a return schema that tells Snowflake the number of columns and their data types to return.

row_schema=T.StructType([ T.StructField("TERMINAL_ID", T.IntegerType())
, T.StructField("TX_AMOUNT", T.DecimalType(38, 6))
, T.StructField("TX_TIME_SECONDS", T.IntegerType())
, T.StructField("TX_TIME_DAYS", T.IntegerType())
])

# Create a UDTF that generates transactions for each customer
class generate_trx_udtf:
# the process function is called for each input row
def process(self, customer_id: int, mean_nb_tx_per_day: int, mean_amount: int, std_amount: float, available_terminals: list, nb_days:int):
import random
customer_transactions = []

random.seed(customer_id)
np.random.seed(customer_id)
# For each day in the range of nb_days generate a random number
# of transactions based on the customer mean_nb_tx_per_day
for day in range(nb_days):
# Random number of transactions for that day
nb_tx = np.random.poisson(mean_nb_tx_per_day)
if nb_tx>0:
# For each transcation during a day
for tx in range(nb_tx):
# Generate a time for the transaction
# Around noon, std 20000 seconds. This choice aims at simulating the fact that
# most transactions occur during the day.
time_tx = int(np.random.normal(86400/2, 20000))

if (time_tx>0) and (time_tx<86400):
# Amount is drawn from a normal distribution
amount = np.random.normal(mean_amount, std_amount)
# If amount negative, draw from a uniform distribution
if amount<0:
amount = np.random.uniform(0,mean_amount*2)
amount=np.round(amount,decimals=2)
# Add a terminal to the transcation, based on the ones closet of the custom
if len(available_terminals)>0:
terminal_id = random.choice(available_terminals)
customer_transactions.append((terminal_id, amount,
time_tx+day*86400, day))
return customer_transactions

To create the UDFT function in Snowflake, I use the udtf.register method. I need to provide my class, generate_trx_udtf, the name of the UDTF, generate_trx_udtf, which third-party Python libraries it depends on, numpy, the datatypes of the inputs, the output schema, if it is a permanent function, False, and if it replaces if it already exists, True.

By setting is_permanent=False, the UDTF will only be visible to the Snowflake user we are using, and it will only exist as long as we have an active session to Snowflake.

generate_trx = session.udtf.register(generate_trx_udtf, 
name="generate_trx_udtf",
is_permanent=False,
packages=["numpy"],
output_schema=row_schema,
input_types=[T.LongType(), T.LongType(), T.LongType(), T.DecimalType(38, 6), T.ArrayType(T.StringType()), T.IntegerType()],
replace=True)

Before calling the UDTF I will create a new Snowpark DataFrame where I join the df_customer_terminals with df_customer_profiles and add a new column, available_terminals, which will contain a list of terminals associated with a customer.

To create a list of terminals I use the group_by method, so I get one row by customer_id (mean_nb_tx_per_day, mean_amount and std_amount is also included to make sure I keep those columns in the resulting DataFrame) and then use array_agg to get a list of all terminal id’s associated to the customer.

# Generate a list of terminals per customer
df_input = df_customer_profiles.join(df_customer_terminals, df_customer_profiles.col("CUSTOMER_ID") == df_customer_terminals.col("CUSTOMER_ID"), lsuffix="_CUST")\
.group_by(F.col("CUSTOMER_ID_CUST"), F.col("MEAN_NB_TX_PER_DAY"), F.col("MEAN_AMOUNT"), F.col("STD_AMOUNT")).agg(F.array_agg("TERMINAL_ID").as_("AVAILABLE_TERMINALS"))

The Snowpark DataFrame df_input now has a row for each customer and a column with all associated terminals.

I can now generate the transactions using my UDTF, generate_trx_udtf, and also create two additional columns; tx_datetime which creates a timestamp based on the startdate and tx_time_second sreturned from the UDTF and transaction_id which is based on the row number using the window function row_number.

nb_days = 180
start_date = '2023-01-01'

# Generate the transactions
df_customer_trx = df_input.join_table_function(generate_trx(F.col("CUSTOMER_ID_CUST"), F.col("MEAN_NB_TX_PER_DAY"), F.col("MEAN_AMOUNT"), F.col("STD_AMOUNT"), F.col("AVAILABLE_TERMINALS"), F.lit(nb_days)))\
.with_column("TX_DATETIME" ,F.dateadd("SECONDS", F.col("TX_TIME_SECONDS"), F.lit(start_date)))\
.with_column("TRANSACTION_ID",F.row_number().over(Window.order_by(F.col("TX_DATETIME"))))\
.select("TRANSACTION_ID", "TX_DATETIME" ,"CUSTOMER_ID", "TERMINAL_ID","TX_AMOUNT","TX_TIME_SECONDS","TX_TIME_DAYS").sort("TRANSACTION_ID")

I have now a Snowpark DataFrame with transactions for each customer.

The last thing to do before adding the fraud flag is to save the data into Snowflake tables, so far we have only created the logic to generate the data. write.save_as_table can be used to apply the logic and save the result into tables in Snowflake, using mode=”overwrite” the tables will be replaced if they already exist.

df_customer_trx.write.save_as_table("CUSTOMER_TRANSACTIONS", mode="overwrite")
df_customer_profiles.write.save_as_table("CUSTOMER_PROFILES", mode="overwrite")
df_customer_terminals.write.save_as_table("TERMINAL_PROFILES", mode="overwrite")

Add fraud scenarios to transactions

The final step is to generate fraudulent transactions, the dataset will have three different fraud scenarios.

  • Scenario 1 — all transactions with an amount over 220
  • Scenario 2 — every day two terminals are randomly drawn as fraudulent and all transactions on those terminals for the next 28 days are market as fraud.
  • Scenario 3 — each day three customers are randomly drawn as fraudulent and for the next 14 days 1/3 of their transactions are flagged as fraud.

The original logic is to use the filter for scenario one and then loop through all the days for scenarios two and three and apply the logic.

# By default, all transactions are genuine
transactions_df['TX_FRAUD']=0
transactions_df['TX_FRAUD_SCENARIO']=0

# Scenario 1
transactions_df.loc[transactions_df.TX_AMOUNT>220, 'TX_FRAUD']=1
transactions_df.loc[transactions_df.TX_AMOUNT>220, 'TX_FRAUD_SCENARIO']=1
nb_frauds_scenario_1=transactions_df.TX_FRAUD.sum()
print("Number of frauds from scenario 1: "+str(nb_frauds_scenario_1))

# Scenario 2
for day in range(transactions_df.TX_TIME_DAYS.max()):

compromised_terminals = terminal_profiles_table.TERMINAL_ID.sample(n=2, random_state=day)

compromised_transactions=transactions_df[(transactions_df.TX_TIME_DAYS>=day) &
(transactions_df.TX_TIME_DAYS<day+28) &
(transactions_df.TERMINAL_ID.isin(compromised_terminals))]

transactions_df.loc[compromised_transactions.index,'TX_FRAUD']=1
transactions_df.loc[compromised_transactions.index,'TX_FRAUD_SCENARIO']=2

nb_frauds_scenario_2=transactions_df.TX_FRAUD.sum()-nb_frauds_scenario_1
print("Number of frauds from scenario 2: "+str(nb_frauds_scenario_2))

# Scenario 3
for day in range(transactions_df.TX_TIME_DAYS.max()):

compromised_customers = customer_profiles_table.CUSTOMER_ID.sample(n=3, random_state=day).values

compromised_transactions=transactions_df[(transactions_df.TX_TIME_DAYS>=day) &
(transactions_df.TX_TIME_DAYS<day+14) &
(transactions_df.CUSTOMER_ID.isin(compromised_customers))]

nb_compromised_transactions=len(compromised_transactions)


random.seed(day)
index_fauds = random.sample(list(compromised_transactions.index.values),k=int(nb_compromised_transactions/3))

transactions_df.loc[index_fauds,'TX_AMOUNT']=transactions_df.loc[index_fauds,'TX_AMOUNT']*5
transactions_df.loc[index_fauds,'TX_FRAUD']=1
transactions_df.loc[index_fauds,'TX_FRAUD_SCENARIO']=3

As mentioned earlier in this post, I want to avoid pulling back data to loop through, so I need a different approach.

First, create a Snowpark DataFrame that contains three random customers for each day.

Using a window function, row_number, I can create a new column that randomly assigns a row number for each day, using the tx_time_days column as partition_by and the random function as order_by.

I can then filter on the new column to select only the row numbers one, two and three for each day.

# Get 3 random customers by for each day that will be fraudulent
df_rand_cust = transactions_df.select(F.col("TX_TIME_DAYS")
, F.col("CUSTOMER_ID")
, F.row_number().over(Window.partition_by(F.col("tx_time_days")).order_by(F.random())).as_("R_NR"))\
.filter(F.col("R_NR").in_(1,2, 3))\
.select("TX_TIME_DAYS","CUSTOMER_ID" )\
.sort("TX_TIME_DAYS")

I now have a Snowpark DataFrame with three random selected customers for each day.

The next step is to join the df_rand_cust with transactions_df to get 14 days of transactions for each fraudulent customer. I also use lsuffix and rsuffix to get a suffix on the column names for columns that has the same name in both DataFrame’s.

Then I create two new columns as part of the select, r_nr, which generates a random row number, and then rows that show the number of transactions each customer has during the 14 days.

The filter is used to select only 1/3 of all transactions using the rows column in combination with r_nr.

# Get the transcations that will be fraudelent for the customer ie one third of all their transaction between 
df_fraud_cust_trx = transactions_df.join(df_rand_cust,
((transactions_df["customer_id"] == df_rand_cust["CUSTOMER_ID"])
& ((transactions_df["TX_TIME_DAYS"] >= df_rand_cust["TX_TIME_DAYS"])
& (transactions_df["TX_TIME_DAYS"] < (df_rand_cust["TX_TIME_DAYS"] + F.lit(14)))))
,lsuffix="_T2", rsuffix="_CF")\
.select(F.col("TX_DATETIME")
, F.col("customer_id_t2").as_("CUSTOMER_ID"),F.row_number().over(Window.partition_by(F.col("customer_id_t2")).order_by(F.random())).as_("R_NR")
, F.count("*").over(Window.partition_by(F.col("CUSTOMER_ID"))).as_("ROWS"))\
.filter(F.col("R_NR") <= F.round(F.col("ROWS") * F.lit(0.33)))

The resulting DataFrame will look like this

Next I create a DataFrame with two random terminals for each day, the logic is the same as for getting the random customers.

# Get 2 random terminals by for each day that will be fraudulent
df_rand_term = transactions_df.select(F.col("TX_TIME_DAYS"), F.col("TERMINAL_ID")
, F.row_number().over(Window.partition_by(F.col("tx_time_days")).order_by(F.random())).as_("R_NR"))\
.filter(F.col("R_NR").in_(1,2)).select("TX_TIME_DAYS", "TERMINAL_ID" ).sort("TX_TIME_DAYS")

The final step is to join df_rand_term and df_fraud_cust_trx with transactions_df in order to set the different fraud scenarios. By using leftouter join I will get a null value in the customer_id column from df_fraud_cust_trx when it is not a fraud transaction based on scenario 3 and null value in the terminal_id column from df_rand_term when it is not a fraud transaction based on scenario 2.

Using the when function I can add logic to set the fraud scenario; if tx_amount is greater than 220 then 1, if column terminal_id_tf is not null then 2, if customer_id_cf is not null then 3 and otherwise 0 (no fraud).

I also multiply the tx_amount value by 5 if it is a customer fraud, i.e. customer_id_cf is not null.

Finally, I add the tx_fraud column, which is set to 1 if the tx_fraud_scenario is greater than zero, otherwise 0.

df_customer_trx_fraud = transactions_df.join(df_rand_term, 
((transactions_df["terminal_id"] == df_rand_term["TERMINAL_ID"])
& ((transactions_df["TX_TIME_DAYS"] >= df_rand_term["TX_TIME_DAYS"])
& (transactions_df["TX_TIME_DAYS"] < (df_rand_term["TX_TIME_DAYS"] + F.lit(28)))))
, how="leftouter", lsuffix="_T1", rsuffix="_TF")\
.join(df_fraud_cust_trx,
((transactions_df["customer_id"] == df_fraud_cust_trx["CUSTOMER_ID"])
& (transactions_df["TX_DATETIME"] == df_fraud_cust_trx["TX_DATETIME"]))
, how="leftouter", lsuffix="_T2", rsuffix="_CF")\
.select(F.col("TX_DATETIME_T2").as_("TX_DATETIME")
, F.col("CUSTOMER_ID_T2").as_("CUSTOMER_ID")
, F.col("TERMINAL_ID_T1").as_("TERMINAL_ID")
, F.iff(F.col("CUSTOMER_ID_CF").is_not_null(), F.col("TX_AMOUNT")* F.lit(5), F.col("TX_AMOUNT")).as_("TX_AMOUNT")
, "TX_TIME_SECONDS", F.col("TX_TIME_DAYS_T1").as_("TX_TIME_DAYS")
, F.when(F.col("TX_AMOUNT") > F.lit(220), F.lit(1))\
.when(F.col("TERMINAL_ID_TF").is_not_null(), F.lit(2))\
.when(F.col("CUSTOMER_ID_CF").is_not_null(), F.lit(3))\
.otherwise(F.lit(0)).as_("TX_FRAUD_SCENARIO"))\
.with_column("TX_FRAUD", F.iff(F.col("TX_FRAUD_SCENARIO") > F.lit(0), F.lit(1), F.lit(0)))

I can look at the new DataFrame and also count the number of rows per scenario.

The only thing left to do is to save the fraudulent transactions to a new table.

df_transactions_fraud.write.save_as_table("CUSTOMER_TRANSACTIONS_FRAUD", mode="overwrite")

Conclusion

As shown in this post it is possible to migrate Pandas based code to Snowpark based code, sometimes not as straightforward but still possible.

The advantage of doing this is of course that you can use the power of Snowflake but still have Python as your primary language.

All the code for this post can be found in my GitHub repository.

--

--