Normalize and Enrich Streaming Security Data in Snowflake with Dynamic Tables

Snowflake is commonly used for searching and analyzing security data, but what about all the upstream work needed to prepare that data for effective threat detection and response? Most of that work tends to be done in stream processing solutions like Spark, Flink or Lambda. But that introduces complexity and cost.. which is why Snowflake’s new streaming capabilities are so exciting.

I wanted to better understand what this could look like for security use cases but didn’t find an “Aha!” example describing a consolidated pipeline+database. Fortunately we live in the future and ChatGPT was happy to review the new Dynamic Tables documentation. The following was a collaborative effort.

Key Concepts of Dynamic Tables

  • Declarative Data Transformation: Rather than manually defining each transformation step, security engineers can define the desired end state using dynamic tables, letting Snowflake handle the intricacies.
  • Automated Refresh: Dynamic tables automatically update their data based on a specified query, without complex orchestration infrastructure.
  • Target Freshness: Users can set a "target freshness" for their data, helping to ensure that ingest SLAs are met. Built-in monitoring dashboards are available to identify pipeline health issues.

So far so good, now let’s consider how this Snowflake component can be used to consolidate event normalization and enrichment into the security data lake. We’ll consider firewall logs where certain fields need to be extracted for easy access, and geolocation details to give analysts more context.

Implementation Steps for a Basic Pipeline

1. Real-time Data Ingestion with Snowpipe Streaming

Snowflake’s new Snowpipe Streaming capability allows for direct streaming of data into Snowflake without the need for intermediary staging. As the company’s firewall generates logs, it can directly stream the events into Snowflake using Snowpipe Streaming.

This direct streaming is a significant improvement, reducing both latency and cost over the previous method of staging files in cloud storage. In this example, we populate the raw_events table in real-time.

2. Landing the Data in Snowflake

The raw_events table captures the streaming network logs with columns like event_id, event_timestamp, and a VARIANT column event_message. That last column contains the complete JSON object and can be queried with schema-on-read so that it’s not sensitive to changes in event structure.

CREATE TABLE raw_events (
event_id STRING,
event_timestamp TIMESTAMP,
event_message VARIANT
);

Additionally, we’ll use a shared table geolocation containing IP addresses mapped to their respective geolocation details. This shared table can be accessed via the Snowflake Marketplace listing from IPinfo. Both free and premium options are available.

3. Normalization

The first step post-ingestion is to extract key fields from the event_message, such as source_ip, destination_ip, and event_type. This is going to happen continuously using the stream processing capabilities of Dynamic Tables. Note how crazy simple this is to set up.

CREATE OR REPLACE DYNAMIC TABLE normalized_events 
TARGET_LAG = '5 MINUTES'
WAREHOUSE = 'your_warehouse' AS
SELECT
event_id,
event_timestamp,
event_message AS full_message,
event_message:$.user_id::STRING AS user_id,
event_message:$.user_ip::STRING AS user_ip,
event_message:$.event_type::STRING AS event_type
FROM
raw_events;

4. Enrichment

The normalized data is then cross-referenced with the geolocation table. This process enriches each event with geolocation details based on the source IP, providing additional context about the event's origin.

CREATE OR REPLACE DYNAMIC TABLE enriched_events 
TARGET_LAG = '5 MINUTES'
WAREHOUSE = 'your_warehouse' AS
SELECT
n.event_id,
n.event_timestamp,
n.full_message,
n.user_id,
n.user_ip,
n.event_type,
g.city,
g.state,
g.country
FROM
normalized_events n
JOIN
geolocation g ON n.user_ip = g.ip_address;

5. Usage

That’s all there is to it, now we can query the enriched_events dynamic table to get the latest event data that's both normalized and enriched with geolocation info:

SELECT * 
FROM enriched_events
ORDER BY event_timestamp DESC
LIMIT 100;

Example Scenario: Incident Response

Imagine that a company’s firewall events are streaming through the pipeline described above. A suspected breach is flagged and incident response kicks off. Attention turns to the security data lake where firewall events, with details like the device’s IP address and the type of traffic, are kept.

The analyst from the incident response team queries the enriched_events table. They’re able to quickly identify certain destinations that they don’t expect for their network, aiding in the investigation process. As there is no retention limit or archive tiering, the analyst can look way back to when the initial entry took place. Armed with this enriched data, the analyst can make more informed decisions to mitigate the incident.

Conclusion

Snowflake Dynamic Tables, combined with the power of Snowpipe Streaming, provide an opportunity to consolidate the security data lifecycle. With stream processing happening on the same platform as search and analytics, security engineers and application builders can eliminate overhead and complexity. This approach could also be extended to threat detection where streaming data can be combined with stored data for better fidelity. New kinds of enrichment and threat detection are bound to follow the blurring lines between data that’s streaming and at rest.

For further reading:

--

--

Omer Singer
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

I believe that better data is the key to better security. These are personal posts that don’t represent Snowflake.