Unlocking Insights: Forecasting Day-Ahead Power Prices in North America with Snowpark for Python

In this article, we tested Snowpark, Snowflake’s new Python Data Analytics and Pipelining Library, to demonstrate its benefits and ease of architecture when applied to a common power market modeling analysis.

Snowpark for Python

Snowpark for Python is a developer API that uses familiar DataFrame constructs in order to support data pipelining, application development, and machine learning workflows. With Snowpark, users can write familiar Python in Snowflake natively using Snowflake’s robust cloud compute with no data movement across platforms. For information on Snowpark architecture and additional Snowpark benefits, check out Snowflake’s Snowpark documentation.

Power Market Forecast Modeling with Snowpark Demo

We’ll walk you through an example with Jupyter Notebooks, which is what we typically use to create analyses and workflow examples for our code repository. This demo provides the following sample code for using Snowpark for Python with Yes Energy® data:

  • Instantiates a Snowpark session that securely connects to Snowflake
  • Creates a linear regression model user-defined function (UDF) for predicting DALMP (the day-ahead locational marginal price) using natural gas prices with Yes Energy data
  • Registers the model training function as a Snowpark Python stored procedure that is called and scheduled in Snowflake using Snowflake tasks
  • Creates and calls a Snowpark Python UDF that returns new predictions given new inputs using the model

1. Setting up the Snowpark environment.

Here, we instantiate a Snowpark session that securely connects to your Snowflake account.

  • Import the session-level modules.
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, udf, call_udf
from snowflake.snowpark.types import Variant, PandasDataFrame, PandasSeries

from config_file import cred #Dictionary object with credentials
  • Set Snowpark session environmental parameters.
snowflake_conn = {
'account' : cred['snowflake1']['c'],
'user' : cred['snowflake1']['a'],
'password' : cred['snowflake1']['b'],
'role' : 'ACCOUNTADMIN',
'warehouse' : 'USER_WH',
'database' : 'MARKETDATA',
'schema' : 'YESDATA'
}
  • Instantiate Snowpark session.
snowpark_session = Session.builder.configs(snowflake_conn).create()
  • Create a named stage on which the model file, stored procedure, and UDF will be saved.
snowpark_session.sql('create or replace stage 
scratch.test_environment.sample_models;').collect()

2. Creating a stored procedure for model training and scheduling the training with Snowflake tasks.

Here we present a sample linear regression model as a Python function, using Snowpark DataFrames to get the data from Yes Energy’s DataSignals Cloud product. It will be registered as a Snowflake stored procedure that can be scheduled and run in Snowflake.

This is a pretty simple sample model to showcase Snowpark functionalities with Yes Energy data. In practice, your model may train millions or billions of data points — a task perfect for Snowflake compute clusters instead of spinning up other compute clusters outside of Snowflake or using local resources.

  • Define the stored procedure with which to build and train the model.
def train_fuel_dalmp_model(
session: Session,
save_model: bool) -> Variant:

from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
import os
from joblib import dump

model_saved = False
  • Grab Yes Energy data and perform data preprocessing on the input natural gas prices feature and the target DALMP variable with Snowpark. This code is part of the stored procedure defined above.
 # Easily import Yes Energy data into a Snowpark DataFrame.

df_lmp = session.table('dart_price_avg').filter((col('pointid') == 51288) & (col('timesegment') == 'ALL'))

df_fuel = session.table('fuel_prices').filter(col('objectid') == 10000002677)

# Perform data preprocessing

df = df_lmp.join(df_fuel, df_lmp.col('TRADEDATE') == df_fuel.col('DELIVERYDATE'))
  • Get the data ready for modeling, run and save the model based on the input training data and return model statistics.
 # Convert data to pandas dataframe and get it model-ready. When embedded as a Snowpark Stored Procedure,
# all the libraries will be run against Snowflake's cloud compute engine

df_pd = df.to_pandas()

X = df_pd[['WEIGHTED_AVERAGE_PRICE']]

y = df_pd[['DA_PRICE']]

# Split the data in train and test sets and then fit and test the model

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.10, random_state=4444, shuffle=True)

lm = LinearRegression()

lm.fit(X_train, y_train)

# Get model statistics to be returned by the procedure

r2_score_train = round(lm.score(X_train, y_train), 2)

r2_score_test = round(lm.score(X_test, y_test), 2)

coef = lm.coef_[0, 0]

intercept = lm.intercept_[0]

# Save the model locally then to the Snowflake stage created above

if save_model:
model_dir = 'C:/Users/Samuel/Desktop/Snowpark Local/'
model_file = os.path.join(model_dir, 'model.joblib') # It gets zipped on the stage
dump(lm, model_file)
session.file.put(model_file, '@scratch.test_environment.sample_models', overwrite=True)
model_saved = True

return {'R2 score train:' : r2_score_train,
'R2 score test:' : r2_score_test,
'Model equation:' : f'{coef}x + {intercept}',
'Model saved:' : model_saved}

3. Registering the model as a stored procedure in Snowflake.

  • Register the model as a stored procedure in Snowflake to the stage created earlier. Snowflake resolves all package dependencies using Anaconda on the server side. All you have to do is specify the packages in the stored procedure using the packages parameter.
snowpark_session.sproc.register(
func=train_fuel_dalmp_model,
name='train_fuel_dalmp_model',
packages=['snowflake-snowpark-python', 'scikit-learn', 'joblib'], # Specify packages needed during execution
is_permanent=True,
stage_location='@scratch.test_environment.sample_models',
replace=True)
  • Test calling the new stored procedure in Snowflake using SQL through Snowpark. Note the session input parameter does not need to be passed in. Snowflake automatically creates a Session object and passes it into the stored procedure. The model isn’t being saved since this is just for testing purposes.
snowpark_session.sql('call train_fuel_dalmp_model(False);').collect()

Out[9]:
[Row(TRAIN_FUEL_DALMP_MODEL='{\n "Model equation:": "4.806124852882219x + 19.755608598141638",\n "Model saved:": false,\n "R2 score test:": 0.64,\n "R2 score train:": 0.65\n}')]
  • Schedule the stored procedure for training using Snowflake tasks. Right now SQL is used to create and schedule the task.
training_model_task_sql = '''
create or replace task scratch.test_environment.training_model_task
warehouse = 'USER_WH'
schedule = 'USING CRON * 18 * * * UTC'
as
call train_fuel_dalmp_model(True);
'''

snowpark_session.sql(training_model_task_sql).collect()

Out[10]:
[Row(status='Task TRAINING_MODEL_TASK successfully created.')]
Turn on the task.
In [11]:
snowpark_session.sql('alter task scratch.test_environment.training_model_task resume;').collect()

Out[11]:
[Row(status='Statement executed successfully.')]
Turn off the task.
In [12]:
snowpark_session.sql('alter task scratch.test_environment.training_model_task suspend;').collect()

Out[12]:
[Row(status='Statement executed successfully.')]

4. Running predictions on the model.

  • Add trained model and Python packages from the Snowflake Anaconda channel on the server side so they are available as UDF dependencies.
snowpark_session.add_import('@scratch.test_environment.sample_models/model.joblib.gz')

snowpark_session.add_packages('joblib', 'scikit-learn', 'numpy', 'pandas')
  • Create a Snowpark Python UDF that takes in a pandas dataframe as input and returns a pandas series of predicted values from the above model using Snowpark’s Python UDF Batch API, which has several advantages over row-by-row processing.
@udf(name='predict_dalmp_udf',
session=snowpark_session,
replace=True,
is_permanent=True,
stage_location='@scratch.test_environment.sample_models')
def predict_dalmp_udf(df: PandasDataFrame[float]) -> PandasSeries[float]: # Takes in a pandas dataframe (or similar object) and returns a pandas series
import sys
from joblib import load
import sklearn
import numpy as np

import_directory_name = 'snowflake_import_directory' # Finds the UDF's import directory where the model from the stage is
import_dir = sys._xoptions[import_directory_name]

model_file = import_dir + 'model.joblib.gz' # Get the model from the stage
model = load(model_file)

dalmp_pred = model.predict(df) # Run model predictions on the dataframe

return dalmp_pred
  • Call the model to predict new output values. This model prediction UDF takes batches of rows as a pandas dataframe (or dataframe-like object) and calls the model using the call_udf function.
# Create a Snowpark DataFrame that will be used as input for calling the model prediction UDF

df = snowpark_session.create_dataframe(list(range(1, 21)), schema=['INPUT_FUEL_PRICE'])

# Call the function on the rows of the above DataFrame

df_output = df.select('INPUT_FUEL_PRICE', call_udf('predict_dalmp_udf', col('INPUT_FUEL_PRICE')).as_('PREDICTED_DALMP'))

df_output.sort(col('PREDICTED_DALMP'), ascending=True).show(20)

-------------------------------------------
|"INPUT_FUEL_PRICE" |"PREDICTED_DALMP" |
-------------------------------------------
|1 |24.561733451023873 |
|2 |29.36785830390609 |
|3 |34.1739831567883 |
|4 |38.980108009670516 |
|5 |43.78623286255273 |
|6 |48.59235771543494 |
|7 |53.398482568317164 |
|8 |58.20460742119937 |
|9 |63.01073227408159 |
|10 |67.8168571269638 |
|11 |72.62298197984602 |
|12 |77.42910683272824 |
|13 |82.23523168561044 |
|14 |87.04135653849266 |
|15 |91.84748139137487 |
|16 |96.65360624425709 |
|17 |101.45973109713931 |
|18 |106.26585595002152 |
|19 |111.07198080290374 |
|20 |115.87810565578594 |
-------------------------------------------

Next Steps

To dig into more model content and exploratory data analysis of power market data, click here to see and access Yes Energy’s new Snowflake Marketplace Listings. We offer a wide range of data, from independent system operator (ISO) data, locational marginal price (LMPs) data, and financial transmission rights (FTR) auction results to transmission and generation outages, real-time generation and flow data, historical power prices, and load and weather forecasts.

To learn more about how Snowpark and Yes Energy partnership can help you build and execute code faster within a single platform, saving you time and resources, click here to request a demo.

Snowflake + Yes Energy

Snowflake’s partnership with Yes Energy allows organizations to mobilize Yes Energy’s power market data through Snowflake Marketplace so that data analysts, data scientists, and machine learning engineers can quickly and easily analyze complex, rapidly-changing electric power market data to make better market decisions.

Yes Energy’s entire data catalog is available on Snowflake through a secure data share available on all cloud regions, allowing near-limitless, scalable cloud compute capabilities without any additional storage costs.

About Yes Energy

Yes Energy collects, cleans and standardizes 750,000+ data files and 110,000,000+ rows of new data from public, partner, and proprietary sources per day in order to provide you with the most comprehensive nodal power market data available. Learn how we can help you Win the Day Ahead™ with Better Data, Better Delivery, and Better Direction.

--

--

Yes Energy
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

Yes Energy® is how traders, power companies, and asset managers and developers can finally make sense of the complex, rapidly changing power market.