Doing Monte Carlo Simulations at scale with Snowpark for Python

Photo by Free Walking Tour Salzburg on Unsplash

I recently worked with a customer that was migrating a program they had for doing Monte Carlo simulations to estimate expected loss for defaulting loans to Snowpark for Python. It did show that it is possible to do Monte Carlo simulation at scale (millions) using Snowpark for Python.

What is a Monte Carlo simulation?

Monte Carlo simulations, or probability simulations, are a method used to predict possible outcomes of an uncertain event. It was named after a well-known casino town, called Monaco, since the element of chance is core to the modeling approach, similar to a game of roulette.

A Monte Carlo simulation consists of input variables, output variables, and a mathematical model.

In this post I will use monte carlo simulations to predict future stock prices.

Mathematical model

The mathematical model for predicting the stock prices is:

Stock Price Today = Stock Price Yesterday * e^r

Where r is the periodic daily return.

To calculate the periodic daily returns I will use the Geometric Brownian motion (GBM) model, which is often used for financial processes (such as the price of a stock over time).

Geometric Brownian motion is a stochastic process, which means that it involves random variables that change over time. The basic formula for geometric Brownian motion is:

dS = μS dt + σS dz

Where:

  • dS is the change in the price of the stock over a small time interval
  • S is the current price of the stock
  • μ is the average rate of return on the stock
  • dt is the small time interval (e.g. 1 day)
  • σ is the volatility of the stock (how much the price fluctuates)
  • dz is a random variable that follows a normal distribution with a mean of 0 and a standard deviation of 1

What this formula means is that the change in the stock price over a small time interval is equal to the average rate of return times the current stock price, plus a random variable that represents the volatility of the stock.

The random variable dz is what makes the model “stochastic” or random. It represents the unpredictable fluctuations in the stock price that are caused by external factors like news events or changes in the market.

[Thank you ChatGPT for the explanation of GBM]

Even if there are a lot of parts in the GMB formula, it is about calculating the overall driving force, called drift, and then add a random component to it.

Implementation

In this example I use a table that I have in my Snowflake account, it has the historical prices for the P&G stock. For the simulations I am going to use closing price.

By plotting the closing price over time I can see that the stock price has increased for the time period I have data for. Since Snowpark for Python today do not have plotting functions, I need to use a Python library that can.

The easiest way is to use the Pandas library built in plot function by converting my Snowpark DataFrame to a Pandas DataFrame using the to_pandas method. I do need to be aware by using to_pandas I do pull back all the data into my clients memory.

import pandas as pd

df_closing.to_pandas().plot(x="DATE", figsize=(10,6))

Before doing the calculations I am creating two variables, n_sim_runs and n_days, that controls the number of simulations by day and the the number of days to run the simulation for.

n_sim_runs = 200
n_days = 100

Above settings will generate 20 000 simulations.

To calculate the Drift, I need the average daily return and the variance of it and the formula for the calculation is:

Drift = Average Daily Return − (Variance/2)

The Average Daily Return is defined by calculating the percentage change of the closing price between the current day and the previous day and then apply the natural logarithm on that change.

Since Snowpark for Python, at the time of writing this blog, do not expose the LN function, that return the natural logarithm, the call_function can be used to call it.

I am also getting the last close value, that will be used later. The use of a function for the percentage change logic is there just to make the code a little easier to read.

from snowflake.snowpark import Session, Column
import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F
from snowflake.snowpark import Window

def pct_change(indx_col: Column, val_col: Column):
return ((val_col - F.lag(val_col, 1).over(Window.orderBy(indx_col))) / F.lag(val_col, 1).over(Window.orderBy(indx_col)))

df_log_returns = df_closing.select(F.col("DATE"), F.col("CLOSE")
,F.last_value(F.col("CLOSE")).over(Window.orderBy("DATE")).as_("LAST_CLOSE")
,F.call_function("LN", (F.lit(1) + pct_change(F.col("DATE"), F.col("CLOSE")))).as_("log_return"))
df_log_returns.show()

Now I can calculate the drift, by using the mean of the log return value, called u, minus the variance of the log return value divided by 2 (or 0.5 * the variance).

I am also getting the last closing price and the standard deviation for the log return. These values will be used later.

df_params = df_log_returns.select(F.mean("LOG_RETURN").as_("u")
, F.variance("LOG_RETURN").as_("var")
, F.stddev("LOG_RETURN").as_("std_dev")
,F.max(F.col("last_close")).as_("LAST_CLOSE"))\
.with_column("drift", (F.col("u")-(F.lit(0.5)*F.col("var"))))\
.select("std_dev", "drift", "last_close")

df_params.show()

To calculate the simulated/predicted stock price I need two DataFrames, df_days and df_sim_runs, where df_days has one row for each day and df_sim_runs has one row for each simulation.

This is easy to do using the generator function that allows me to specify the number of rows I want and also the columns for the DataFrame.

df_days = session.generator(F.row_number().over(Window.order_by(F.seq4())).as_("day_id") ,rowcount=n_days)
df_sim_runs = session.generator(F.row_number().over(Window.order_by(F.seq4())).as_("sim_run") ,rowcount=n_sim_runs)

For the random component I will use a Percent-Point Function, which returns a discrete value that is less than or equal to the given probability, and since Snowpark for Python or Snowflake, as of the time of writing this pot, does not have a function for this I need to solve if some other way.

Scipy has a Percent-Point Function, norm.ppf, that could be used for this. Unfortunately it does not support a Snowpark DataFrame as input, so I can either bring my data back to my client, convert it to a numpy array and then use the function or I can bring the function to my data.

The way to bring the function to my data in Snowflake, is to use a Python User Defined Function, UDF. This will allow me to use the norm.ppf function on my data in Snowflake, without having to pull it back to my client.

I use Snowpark for Python to define and deploy my function to Snowflake, using the @udf decorator. By using a PandasSeries data type for the input parameter and return value, I instruct Snowflake to apply the function, UDF, on a batch of rows at the time.

from scipy.stats import norm

@F.udf(name="norm_ppf", is_permanent=True, replace=True, packages=["scipy"], stage_location='UDF_STAGE', session=session)
def norm_ppf(pd_series: T.PandasSeries[float]) -> T.PandasSeries[float]:
return norm.ppf(pd_series)

I can now apply the function, called norm_ppf, on my data in Snowflake and with that calculate the daily return. In order to call my UDF I use the call_function function.

By joining in my days and simulations DataFrames, I now have calculated the daily return for each day and simulations I specified in the beginning.

I am also adding a column for the simulated/predicted stock price with a empty value.

df_daily_returns = df_days.join(df_sim_runs).join(df_params)\
.select("day_id", "sim_run"
, F.exp(F.col("drift") + F.col("std_dev") * F.call_function("norm_ppf", F.uniform(0.0,1.0,F.random()))).as_("daily_return")
, F.lit(None).as_("SIM_CLOSE"))\
.sort(F.col("DAY_ID"), F.col("sim_run"))
df_daily_returns.show()

I need to have a staring point for the simulation/prediction calculations that has the last closing price.

In order to do that, I can create a DataFrame, df_day_0, with one row for each simulation, that has the initial values to be used for calculating the future stock prices.

last_close = df_params.select("LAST_CLOSE").collect()[0][0]
df_day_0 = session.generator(F.lit(0).as_("DAY_ID"),
F.row_number().over(Window.order_by(F.seq4())).as_("SIM_RUN")
, F.lit(1.0).as_("DAILY_RETURN") ,F.lit(last_close).as_("SIM_CLOSE"), rowcount=n_sim_runs)
df_day_0.show()

I then add the df_day_0 DataFrame to the daily returns DataFrame, using the union_all function.

df_simulations = df_day_0.union_all(df_daily_returns)

The way to calculate the simulated/predicted stock price is to use the previous day simulated/predicted stock price (or the last closing price) times the daily return.

Since I am running this logic in Snowflake I can not access previous calculated values the same way as I would if looping through a Numpy array, that would be stored in my client memory.

One way to solve this is to take the initial stock price (last close price) and then apply all daily returns for the previous and current row on it. In order to do that I need to calculate the Product of the daily returns.

First I need to collect all previous and current row daily returns into a array. The way to do that is to create a Python User Defined Table Function, UDTF, that aggregates all previous rows daily return into a array.

from typing import Tuple,Iterable

@F.udtf(name="collect_list", is_permanent=True, replace=True
,packages=["typing"], output_schema=T.StructType([T.StructField("list", T.ArrayType())])
,stage_location="UDF_STAGE", session=session)
class collect_list_handler:
def __init__(self) -> None:
self.list = []
def process(self, element: float) -> Iterable[Tuple[list]]:
self.list.append(element)
yield (self.list,)

I can then create a new column in my DataFrame using the UDTF to get the values.

df_simulations_calc_input = df_simulations.with_column("SIM_CLOSE_0", F.first_value(F.col("SIM_CLOSE")).over(Window.partition_by("SIM_RUN").order_by("DAY_ID") ) )\
.with_column("L_DAILY_RETURN", F.call_table_function("collect_list", F.col("DAILY_RETURN")).over(partition_by="SIM_RUN", order_by="DAY_ID"))

df_simulations_calc_input.filter(F.col("SIM_RUN") == 1).sort("DAY_ID").show(3)

Now I can calculate the simulated/predicted stock price, this is done by taking the last closing price * the product of the previous daily returns including the current.

Since I have all the previous daily returns in an array I can then use the Numpy product function on them, and since I want to bring the code to Snowflake I will use a Python UDF to do it. I will also do the simulation/prediction calculation of the stock price in my UDF function.

import numpy as np

@F.udf(name=f"calc_close" ,is_permanent=True, replace=True
, packages=["numpy"], stage_location='UDF_STAGE', session=session)
def calc_return(last_close: float, daily_return: list) -> float:
pred_close = last_close * np.prod(daily_return)
return float(pred_close)

By using the function on my DataFrame I now have calculated the simulated/predicted stock price for the requested number of days and simulations by day.

df_sim_close = df_simulations_calc_input.with_column("SIM_CLOSE", 
F.call_function(f"calc_close"
, F.col("SIM_CLOSE_0"),F.col("L_DAILY_RETURN")))\
.select("DAY_ID", "SIM_RUN", "SIM_CLOSE_0","L_DAILY_RETURN", "SIM_CLOSE").sort("DAY_ID", "SIM_RUN")

df_sim_close.filter(F.col("SIM_RUN") == 1).sort("DAY_ID", "SIM_RUN").show(3)

I can then plot the days, simulations and simulated/predicted stock price to see the spread of it, that goes from somewhere below 110 to above 200.

import seaborn as sns

pd_local = df_sim_close.to_pandas()
sns.lineplot(data=pd_local, x='DAY_ID', y='SIM_CLOSE', hue='SIM_RUN')

By getting the Mean, Quantile (5%) and Quantile (95%) of the simulated last prices, I can estimate that the expected price at the last day of the simulation would be 152.75 and there is a 5% probability that the price is below 131.59 and a 5% probability that it is above 175.91.

metrics = df_sim_close.select(F.round(F.mean(F.col("SIM_CLOSE")), 2)
, F.round(F.percentile_cont(0.05).within_group("SIM_CLOSE"), 2)
, F.round(F.percentile_cont(0.95).within_group("SIM_CLOSE"), 2)).collect()
print(f"Expected price: {metrics[0][0]}")
print(f"Quantile (5%): {metrics[0][1]}")
print(f"Quantile (95%): {metrics[0][2]}")

With that I hope I showed you how you can do Monte Carlo simulations all within Snowflake using Snowpark for Python.

Next step would be to build a app to make it more interactive, so stay tuned!

--

--