Snowflake Trail For Observability (+Logs, +Events, +Traces)

Fru
DemoHub Tutorials
Published in
6 min readJul 1, 2024

Snowflake’s observability framework, which includes logging, tracing, and events, is a powerful toolkit for understanding and optimizing your data pipelines. It provides detailed insights into how your code interacts with data and how data flows through your applications, greatly improving time to detect (TTD) and time to resolve (TTR).

This comprehensive system leverages the OpenTelemetry standard, a vendor-neutral approach to instrumenting, generating, collecting, and exporting telemetry data (logs, metrics, and traces) for analysis and visualization.

Telemetry Instrumentation Hierarchy


├── Logs
│ └── Discrete events (text-based)
│ ├── Info logs
│ ├── Warning logs
│ ├── Error logs
│ └── Debug logs

├── Traces
│ │
│ └── Spans (basic unit of work)
│ │
│ ├── Attributes (key-value pairs) e.g.
│ │ ├── process_step
│ │ └── order_id
│ │
│ └── Trace Events (discrete events within a span) e.g.
│ ├── order_processed
│ ├── fetch_customer_details
│ └── calculate_price

└── Attributes (can apply to both logs and traces)
├── Used for filtering
└── Used for analysis

Logs: Capture discrete events within your Snowflake environment, providing a detailed, textual record of what happened at specific points in time. E.g.

logger.info(f"Processed {customer_count} customer records.")
logger.warning("High customer count detected.")

Traces: Record the flow of execution within your code, including timing and dependencies between operations. Traces are a sequence of events that belong to a single transaction.

Spans: A span is the basic unit of work in a trace. It represents a single operation within a trace and can have a parent-child relationship with other spans. e.g.

telemetry.set_span_attribute("process_step", "fetch_order_details")

Trace Events: Trace events are discrete events that occur within a span. They provide additional context about what happened during the span’s lifetime. e.g.

telemetry.add_event("order_processed", {"order_id": order_id, "status": "SUCCESS"})

Attributes: Attributes are key-value pairs that provide additional context to spans or logs. They help in filtering and analyzing telemetry data: e.g.

telemetry.set_span_attribute("order_id", order_id)

Events: Structured data points emitted by your code, providing context and additional details about logs and traces.

Snowflake Trail

Snowflake Trail is Snowflake’s observability framework that provides detailed logging and tracing capabilities. It’s the umbrella of features designed to help users understand and optimize query performance, troubleshoot issues, and gain insights into their Snowflake usage.

Practical Example

Let’s illustrate the power of Snowflake Trail and observability using a real-world example from the OrdersDB data model (setup the model in your practice environment if you haven’t done so already).

Before we dive into the code, let’s set up the necessary components:

CREATE OR REPLACE DATABASE observability_db;

-- Create an event table to store logs and traces
CREATE OR REPLACE EVENT TABLE observability_db.public.observability_event_table;

-- Associate the event table with your account (requires ACCOUNTADMIN role)
ALTER ACCOUNT SET EVENT_TABLE = observability_db.public.observability_event_table;

-- Set log level to INFO
ALTER SESSION SET LOG_LEVEL = INFO;

-- Enable Tracing
ALTER SESSION SET TRACE_LEVEL = ALWAYS;

Important Note: The Event Table created above is a special type of table designed to store logging and tracing data generated within your Snowflake environment. It has a predefined set of columns to capture various information, such as timestamps, event types, log messages, and trace details. By associating an event table with your Snowflake account, you can collect and centrally store all logs and traces, making it easier to analyze, troubleshoot, and monitor the health and performance of your data pipelines and applications.

1. LOGGING

This Python UDF has been instrumented to log messages indicating the number of customer records processed:

CREATE OR REPLACE FUNCTION log_customer_counts()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'run'
AS $$
import logging
import random
# Create a logger instance with a specific name
logger = logging.getLogger("salesdb.custs.customer_processing_logger")
def run():
# Simulate processing customer records
customer_count = random.randint(10, 100)
logger.info(f"Processed {customer_count} customer records.")
# Simulate different logging scenarios
if customer_count > 50:
logger.warning("High customer count detected.")
elif customer_count < 20:
logger.debug("Low customer count.")
else:
logger.info("Normal customer count.")
if customer_count == 0:
logger.error("No customers found!")
return "SUCCESS"
$$;

Execute it:

select log_customer_counts();

Querying Logs

Query the event table to retrieve the logged messages:

-- Query for all types of events in the event table
select * from observability_db.public.observability_event_table;
-- Query specifically for logs and extract relevant information
SELECT
TIMESTAMP AS time,
RESOURCE_ATTRIBUTES['snow.executable.name'] as executable,
RECORD['severity_text'] AS severity,
VALUE AS message
FROM
observability_db.public.observability_event_table
WHERE
RECORD_TYPE = 'LOG';

Sample results:

2. TRACING

Let’s create and use a process_order_with_trace stored procedure to demonstrate instrumentation with tracing in Snowflake.

CREATE OR REPLACE PROCEDURE observability_db.public.process_order_with_trace(order_id INT)
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python', 'snowflake-telemetry-python')
HANDLER = 'process_order_with_trace'
AS $$
import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, sum
from snowflake import telemetry
import json
def process_order_with_trace(session: Session, order_id: int):
#Fully qualify the table names within the SP
sales_order_table = "OrdersDB.public.sales_order"
customer_table = "OrdersDB.public.customer"
sales_order_item_table = "OrdersDB.public.sales_order_item"
observability_event_table = "observability_db.public.observability_event_table"
# Add order_id as a span attribute
telemetry.set_span_attribute("order_id", order_id)
# Fetch order details
telemetry.set_span_attribute("process_step", "fetch_order_details")
order_df = session.table(sales_order_table).where(f"order_id = {order_id}")
order_list = order_df.collect() #Collect results from order_df
# If order doesn't exist, log an event and return
if len(order_list) == 0: #Check if the query returned results
telemetry.add_event("order_not_found", {"order_id": order_id})
return "Order not found."
order = order_list[0] #Extract the first row as dictionary

# Fetch customer details
telemetry.set_span_attribute("process_step", "fetch_customer_details")
customer_id = order["CUSTOMER_ID"]
customer_df = session.table(customer_table).where(f"customer_id = {customer_id}")
customer = customer_df.collect()[0] if customer_df.count() > 0 else None
telemetry.add_event("fetch_customer_details", {"order_id": order_id, "status": "SUCCESS"})
# Calculate total using snowpark functions. Do not iterate over the columns
telemetry.set_span_attribute("process_step", "calculate_total")
order_price = order_df.select(sum(c
-
ol("order_price").cast("float")).alias("order_price")).collect()[0]["ORDER_PRICE"]
telemetry.set_span_attribute("order_price", order_price)
telemetry.add_event("calculate_price", {"order_id": order_id, "status": "SUCCESS"})
# Update order status
telemetry.set_span_attribute("process_step", "update_order_status")
# Create a dictionary with the new order status
updated_order = {"ORDER_ID": order["ORDER_ID"], "ORDER_STATUS": 'Shipped'}
# Update the sales_order table directly
session.table(sales_order_table).filter(col("ORDER_ID") == order_id).update(updated_order)
telemetry.add_event("updated_order_status", {"order_id": order_id, "status": "SUCCESS"})
# Order processed successfully
telemetry.add_event("order_processed", {"order_id": order_id, "status": "SUCCESS"})

# Log the high-value order information (convert numeric values to strings)
log_message = f"High-value order placed: Order ID={str(order['ORDER_ID'])}, Customer ID={str(order['CUSTOMER_ID'])}, Total Amount={str(order_price)}" # Use order_price here

return "Order processed successfully."
$$;

Execute it:

CALL process_order_with_trace(5400003);

The CALL process_order_with_trace(5400003); statement triggers the stored procedure for a specific sales order (ID 5400003). This procedure not only processes the order by fetching details, checking inventory, calculating the total, and updating the status, but also incorporates tracing using Snowflake’s telemetry framework.

Query for Traces and Span Events:

- Query for all types of events in the event table
select * from observability_db.public.observability_event_table;
- Query for Span Events
SELECT
RECORD_ATTRIBUTES['order_id']::INT AS order_id,
RECORD['name']::VARCHAR AS span_name,
TIMESTAMP AS start_time,
LEAD(TIMESTAMP) OVER (ORDER BY TIMESTAMP) AS end_time,
DATEDIFF('MILLISECOND', TIMESTAMP, LEAD(TIMESTAMP) OVER (ORDER BY TIMESTAMP)) AS duration_ms
FROM observability_db.public.observability_event_table
WHERE RECORD_TYPE = 'SPAN_EVENT'
AND RESOURCE_ATTRIBUTES['snow.executable.name']::VARCHAR = 'PROCESS_ORDER_WITH_TRACE(ORDER_ID NUMBER):VARCHAR(16777216)'
ORDER BY TIMESTAMP;

Results:

Using a simple streamlit app (code not included in this tutorial) to visualize the order flow through the stages:

Conclusion & Resources

As we’ve seen, Snowflake’s observability framework — comprising logging, tracing, and events — is a powerful toolkit for understanding and optimizing your data pipelines.

By leveraging these capabilities in instrumenting your scripts, data engineering pipelines and most importantly Native Applications, you can gain deep insights into how your code interacts with data, identify performance bottlenecks, and proactively detect and resolve issues. This level of observability is essential for building reliable, efficient, and scalable data applications.

To delve further into Snowflake observability and explore advanced techniques, be sure to check out these additional resources:

Additionally, consider exploring third-party observability platforms like DataDog, Grafana, and Metaplane, which offer seamless integration with Snowflake and provide additional features for monitoring, alerting, and visualization.

Originally published at https://tutorials.demohub.dev.

--

--

Fru
DemoHub Tutorials

Technologists | Leader | Educator. I transform tech jargon, complex concepts into plain English and ignite curiosity. Disclaimer: All views are my own.