Snowflake Python Package Alerting

You’ve deployed a predictive model in Snowflake. It’s performing well but you suspect its performance could improve with a newer Python package. Let’s scale this out and assume your organization has dozens of models running in Snowflake. How do you track when key packages are updated in Snowflake? You automate an alert in Snowflake to notify team members when new package versions arrive.

Photo by Brett Jordan on Unsplash

Quick Preface

Before jumping into a bespoke alerting system, let’s acknowledge a few considerations.

  • Snowflake publishes release notes offering an overview of new features (e.g. supported packages), enhancements, fixes, etc.
  • Snowflake’s natively supported Python packages can be seen in the INFORMATION_SCHEMA.PACKAGES built-in view.
  • Snowflake has a Public Preview Alerting feature that fits most use cases. It’s not used in this situation as a very specific trigger is necessary.
  • Python packages that aren’t natively supported can still be used in Snowflake. See Vivek Nayak’s article for an example of loading third party packages into Snowflake.

Key Steps

Below is a quick preview of the major components of the alerting mechanism in Snowflake. The logic follows a cyclical pattern of checking for table changes executing procedures.

High-level view of implementing Python package alerting logic

Feel free to skip directly to the full implementation. Before executing the script, set the initial variables for email(s), package(s), and warehouse. A detailed step-by-step walkthrough follows below.

Step 1

Create a TABLE of email addresses to receive Python package alerts

This is pretty straight forward. For convenience, the script begins by setting a session variable of emails which are loaded into a table.

SET EMAILS = '["troy.barnes@greendale.edu", "abed.nadir@greendale.edu"]';
CREATE OR REPLACE TABLE PYTHON_EMAILS AS
(select DISTINCT REPLACE(VALUE,'"') AS EMAILS
from table(flatten(input => parse_json($EMAILS))));

Step 2

Create a NOTIFICATION INTEGRATION to enable emailing those in Step 1

A NOTIFICATION INTEGRATION is a Snowflake object that provides an interface between Snowflake and third-party messaging services (third-party cloud message queuing services, email services, etc.). It also establishes the permitted email recipients. Emails must be validated before adding to a NOTIFICATION INTEGRATION.

The ALLOWED_RECIPIENTS parameter of the NOTIFICATION INTEGRATION expects a very specific format. To avoid requiring duplicate lists of emails, EMAILS (from Step 1) is transformed and passed to the NOTIFICATION INTEGRATION via Snowflake SQL scripting.

SET EMAILS_PARANS = (SELECT REPLACE(REPLACE($EMAILS,'[','('), ']',')'));

BEGIN
let emails varchar := $EMAILS_PARANS;
let ddl varchar := 'CREATE OR REPLACE NOTIFICATION INTEGRATION PYTHON_PACKAGE_ALERTS
TYPE = EMAIL
ENABLED = TRUE
ALLOWED_RECIPIENTS = ' || :emails ;
let rs resultset := (EXECUTE IMMEDIATE :ddl);
RETURN 'Notification integration created';
END;

Step 3

Create a TABLE of currently supported Python packages’ latest versions, marking those of interest with TRACKING = 1

A single TABLE indicates which packages should be monitored for newer versions. A session variable, PACKAGES, captures an initial list of packages to be marked with TRACKING = 1 in the tracking TABLE. All packages in SNOWFLAKE.INFORMATION_SCHEMA.PACKAGES are copied to the tracking TABLE to make it easier to mark them for tracking in the future.

SET PACKAGES = '["pandas","snowflake-snowpark-python"]';

-- CREATE CURRENT BASE TABLE
CREATE OR REPLACE TABLE PYTHON_PACKAGE_TRACKER
AS
SELECT
PACKAGE_NAME
, MAX(VERSION) AS VERSION
, MAX(RUNTIME_VERSION) AS RUNTIME_VERSION
, 0 AS TRACKING
FROM SNOWFLAKE.INFORMATION_SCHEMA.PACKAGES
GROUP BY PACKAGE_NAME, TRACKING;

-- SET INITIAL PACKAGES TO BE TRACKED
UPDATE PYTHON_PACKAGE_TRACKER
SET TRACKING = 1
WHERE PACKAGE_NAME IN (select value from table(flatten(input => parse_json($packages))));

Step 4

Create a STREAM on the TABLE from Step 3 to track any changes to it

Step 5 will automate a process to check for new package versions and update the tracking TABLE in Step 3. Before defining Step 5, a STREAM is added to the tracking TABLE in Step 3 to record changes for downstream alert triggering.

ALTER TABLE PYTHON_PACKAGE_TRACKER SET CHANGE_TRACKING = TRUE; -- Required for tracking in STREAMS
CREATE STREAM PYTHON_PACKAGE_TRACKER_STREAM ON TABLE PYTHON_PACKAGE_TRACKER;

Step 5

Create and schedule a TASK to evaluate if tracked packages become available in newer versions and update the Step 3 TABLE accordingly

A TASK will update the tracking TABLE in Step 3 anytime “greater” package versions arrive in INFORMATION_SCHEMA.PACKAGES.

CREATE OR REPLACE TASK PYTHON_PACKAGE_TRACKER_UPDATER
SCHEDULE = 'USING CRON 0 2 * * * UTC' -- Every day at 2AM UTC
AS
MERGE INTO PYTHON_PACKAGE_TRACKER my_pkges
USING (SELECT
PACKAGE_NAME
,MAX(VERSION) AS VERSION
,MAX(RUNTIME_VERSION) AS RUNTIME_VERSION
FROM INFORMATION_SCHEMA.PACKAGES
GROUP BY PACKAGE_NAME) pkges ON my_pkges.PACKAGE_NAME = pkges.PACKAGE_NAME
WHEN MATCHED AND pkges.VERSION > my_pkges.VERSION AND my_pkges.TRACKING = 1 AND pkges.VERSION IS NOT NULL THEN UPDATE SET my_pkges.VERSION = pkges.VERSION
WHEN MATCHED AND pkges.RUNTIME_VERSION > my_pkges.RUNTIME_VERSION AND my_pkges.TRACKING = 1 AND pkges.RUNTIME_VERSION IS NOT NULL THEN UPDATE SET my_pkges.RUNTIME_VERSION = pkges.RUNTIME_VERSION;

The STREAM (from Step 4) captures insertions that occur as part of the above TASK. At this juncture, you may be wondering why a STREAM was not created directly on INFORMATION_SCHEMA.PACKAGES. Unfortunately, STREAMs cannot track built-in views, hence the need for an intermediary TABLE in Step 3 and its corresponding STREAM.

Step 6

Create a STORED PROCEDURE to craft emails bodies and transmit to recipients

At this point, the STREAM captures updates to the tracking TABLE and email addresses are stored in a TABLE. A STORED PROCEDURE extracts these elements, constructs an email body, and transmits emails using the system function SYSTEM$SEND_EMAIL. The STORED PROCEDURE is defined in Python to accommodate exception handling and html string creation.

-- CREATE STORED PROCEDURE TO CHECK STREAM AND SEND EMAIL
CREATE OR REPLACE PROCEDURE EMAIL_PYTHON_PACKAGE_UPDATES()
returns string
language python
runtime_version=3.8
packages = ('snowflake-snowpark-python', 'tabulate')
handler = 'create_email_body'
execute as owner
as
$$
import snowflake

def create_email_body(session):
header = '<h2>New Python Packages</h2>'
paragraph = '<p>The below Python package versions are now available directly in Snowflake. See INFORMATION_SCHEMA.PACKAGES for the full list.</p>'
try:
body = session.sql(
"SELECT PACKAGE_NAME, VERSION, RUNTIME_VERSION FROM PYTHON_PACKAGE_TRACKER_STREAM WHERE METADATA$ACTION = 'INSERT' LIMIT 100"
).to_pandas()
if body.size > 0:
body = header + '\n' + paragraph + '\n' + body.to_html(index=False, justify='left')
emails = session.sql(
"SELECT EMAILS FROM PYTHON_EMAILS LIMIT 50"
).to_pandas()['EMAILS'].values.tolist()
for s in emails:
try:
session.call('SYSTEM$SEND_EMAIL',
'PYTHON_PACKAGE_ALERTS',
s,
'New Python Packages in Snowflake',
body,
'text/html'
)
except snowflake.snowpark.exceptions.SnowparkSQLException:
# Email not valid, continue
continue
return 'email(s) sent'
else:
return 'no package updates to transmit'
except snowflake.snowpark.exceptions.SnowparkSQLException as e:
# Table select not valid
body = '%s\n%s' % (type(e), e)
return 'Table PYTHON_PACKAGE_TRACKER_STREAM invalid'
$$;

Step 7

Create a TASK to execute the STORED PROCEDURE in Step 6 if new packages versions are available

The STORED PROCEDURE from Step 6 is scheduled in a TASK, which immediately follows the previous TASK in Step 5. The STORED PROCEDURE checks if the STREAM has data using SYSTEM$STREAM_HAS_DATA. SYSTEM$STREAM_HAS_DATA does not guarantee avoiding false positives so the STORED PROCEDURE in Step 6 verifies the STREAM is not empty before transmitting emails.

SET warehouse = '[your_warehouse]'; --Set preferred warehouse for TASKS

CREATE OR REPLACE TASK PROCESS_PYTHON_PACKAGE_ALERT
warehouse = $WAREHOUSE
AFTER PYTHON_PACKAGE_TRACKER_UPDATER
WHEN SYSTEM$STREAM_HAS_DATA('PYTHON_PACKAGE_TRACKER_STREAM')
AS
CALL EMAIL_PYTHON_PACKAGE_UPDATES();

Step 8

Reset the STREAM from Step 4

Data changes captured in STREAMs must be consumed in DML to track further changes. The final step is to consume the data in the STREAM by creating a temporary TABLE from the STREAM content. This last step is captured in a TASK and scheduled as the last TASK in the process.

CREATE OR REPLACE TASK RESET_PYTHON_PACKAGE_TRACKER_STREAM
warehouse = $WAREHOUSE
AFTER PROCESS_PYTHON_PACKAGE_ALERT
WHEN SYSTEM$STREAM_HAS_DATA('PYTHON_PACKAGE_TRACKER_STREAM')
AS
CREATE OR REPLACE TEMP TABLE RESET_TBL AS
SELECT * FROM PYTHON_PACKAGE_TRACKER_STREAM limit 1;

Automate

Executing SYSTEM$TASK_DEPENDENTS_ENABLE with the root TASK will automate the entire process.

SELECT SYSTEM$TASK_DEPENDENTS_ENABLE('PYTHON_PACKAGE_TRACKER_UPDATER');

Result

An email is transmitted to recipients once newly available packages are added to Snowflake natively. Otherwise, no email(s) will be sent. Packages will be checked everyday.

Example email notification of newly supported python packages in Snowflake
Example email of newly supported Python packages natively in Snowflake

Conclusion

With Python package alerting implemented, Snowflake users can stay up to date on changes in Snowflake packages. With a couple SQL commands, you can also monitor for packages that have yet to be released in Snowflake.

INSERT INTO PYTHON_PACKAGE_TRACKER VALUES ('lazypredict', '0.0', '3.9', 1);
EXECUTE TASK RESET_PYTHON_PACKAGE_TRACKER_STREAM; -- To reset DML tracking

The full implementation can be found here.

--

--

Jason Summer
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

Solution Innovation Architect - (Gen)AI/ML @ Snowflake. Developing and scaling data engineering, ML modeling, AI, and LLMs in the data cloud