Snowflake Dynamic Tables with Coalesce.io

Enabling bleeding-edge Snowflake features with User-Defined Nodes

Declarative Data Pipelines in Snowflake

Declarative SQL Pipelines for Snowflake?

I’ve worked on several Confluent and Apache Kafka projects in the past and first used ksqlDB on one of them in 2018, not long after that feature became generally available. I was hooked. I wrote about the experience on the Confluent blog, discussed it on the Streaming Audio podcast, and even wrote some open-source software to make it easier to use. What clicked for me was how a data pipeline could be declared with SQL: define it once and let the platform handle the orchestration and execution.

Most projects I worked on back then used Snowflake, were batch-oriented, and required complex plumbing for handling pipeline refreshes. There is a Kafka connector for Snowflake, and I’ve used it on a few projects, but those organizations had larger investments in Confluent and weren’t using it just to load Snowflake. What I hoped for even back then was a declarative approach to data pipelines managed 100% by Snowflake, using Snowflake SQL.

Ask, and ye shall receive: Dynamic Tables

Dynamic Tables is the new name for the feature announced at Snowflake Summit as “Materialized Tables,” and is now in private preview. I love the new name, and I assume this decision was made to remove confusion with materialized views. Saras Nowak and Jeremiah Hansen wrote an in-depth article about the value proposition of Dynamic Tables, and I won’t repeat their work here. Let’s instead discuss some of the nuances of declarative pipelines and how they differ from more traditional data engineering approaches.

Idempotency

In legacy on-prem data warehouses, we were usually constrained by storage and compute. Modeling techniques arose to try and ease these constraints, specifically Kimball’s Bus Architecture, and this approach usually had us updating our final facts and dimensions with processes that were not idempotent. Maxime Beauchemin wrote about idempotency in the fantastic article called Functional Data Engineering, one of many fantastic articles from him. He describes it as “vital” in modern data pipelines, and I agree, demonstrated by the immutable Bronze zone in this diagram from my Does Modeling Still Matter in the Data Cloud? talk that I’ve given a few times now:

The immutable Bronze layer provides idempotency downstream.

It’s important to understand that idempotency is not merely a best practice with Dynamic Tables… it’s a requirement. We define them with a CREATE DYNAMIC TABLE AS SELECT…(CDTAS) statement, which means all the logic for constructing the various steps in our data pipeline must be defined with idempotent SELECT statements: we cannot issue DML statements in these pipelines. I see this as a feature instead of a limitation because storage is the cheapest element of the Data Cloud; there’s no reason why the final Gold or Diamond zones should be the only places where we store history.

Coalesce.io user-defined nodes (UDNs)

The basic building block for defining data pipelines in Coalesce.io is called a node. Derived from the term in directed acyclic graph (DAG) theory, a node is a bundled definition of materialization, processing, and deployment logic for a step in a data pipeline. Coalesce ships with many out-of-the-box node types, but the real value comes from allowing our customers to create their own user-defined nodes (UDNs) or use one developed by the community published in a package. Our package functionality is currently in beta, but soon you’ll be able to browse and include packages from the Coalesce community hub. If you are interested in the Dynamic Tables UDN I describe below, reach out to support and request the coalesce-utils package or browse the code yourself. For a preview of how it will look when generally available, packages will be installed with the coa CLI referencing the Git repository containing the package:

❯ coa package add https://github.com/coalesceio/coalesce-utils
|2022-11-16T20:14:28.331Z|[CLI] | info:Add package initialized. running...
|2022-11-16T20:14:28.333Z|[CLI] | info:Initializing package installation...
|2022-11-16T20:14:28.334Z|[CLI] | info:Cloning Coalesce package from repo URL: https://github.com/coalesceio/coalesce-utils
|2022-11-16T20:14:29.716Z|[CLI] | info:Gathering your project workspace data for environment: 1
|2022-11-16T20:14:30.920Z|[CLI] | info:Installing package using namespace "COALESCE-UTILS"
|2022-11-16T20:14:31.140Z|[CLI] | info:Cleaning up any stale package data in your project...
|2022-11-16T20:14:31.140Z|[CLI] | info:Pruning workspace data: macros
|2022-11-16T20:14:31.140Z|[CLI] | info:Pruning workspace data: stepTypes
|2022-11-16T20:14:31.140Z|[CLI] | info:Preparing workspace data for import: macros
|2022-11-16T20:14:31.141Z|[CLI] | info:Preparing workspace data for import: stepTypes
|2022-11-16T20:14:31.143Z|[CLI] | info:Adding stepType: 6 as COALESCE-UTILS::6
|2022-11-16T20:14:31.503Z|[CLI] | info:added COALESCE-UTILS...update successful!
|2022-11-16T20:14:31.505Z|[CLI] | info:CLI Package Installation complete

The Dynamic Tables UDN

I want to start with a big thanks to John Gontarz for doing the initial prototype of this UDN: I really just smoothed out the edges. I’ll first add a Source called CALL_CENTER to my DAG. As you can see, this table is a record of all changes, with a change timestamp and a DML type:

Our CALL_CENTER Source is a record of all changes.

From the Source node, we now create a new Dynamic Table node type, and prune down to just the interesting columns to improve readability. We set the Lag, which determines how often Snowflake will refresh the table, and Refresh Warehouse, which determines the warehouse to use, and then we click CREATE and view the DDL that was executed:

Our first Dynamic Table is created.
Our first Dynamic Table is created.
CREATE
OR REPLACE DYNAMIC TABLE "STEWART_DB"."BRONZE"."DT_CALL_CENTER"
LAG = '60 Minutes' WAREHOUSE = compute_wh AS
SELECT
"CC_CALL_CENTER_ID" as "CC_CALL_CENTER_ID",
"CC_NAME" as "CC_NAME",
"CC_CLASS" as "CC_CLASS",
TO_TIMESTAMP("CALL_CENTER"."CC_UPDATE_TS") as "CC_UPDATE_TS",
"CC_DML_TYPE" as "CC_DML_TYPE"
FROM
"PACKAGES"."RETAIL"."CALL_CENTER" "CALL_CENTER"

This statement will enable Snowflake change tracking on any of the source tables referenced in the SELECT statement. When we issue the show dynamic tables command in Snowsight, I see that my table was created with a refresh_mode of INCREMENTAL, which is exactly what I wanted.

Enabling Change Tracking Options in the UDN

Not to be confused with Snowflake change tracking mentioned above, we’ve enabled adding custom change tracking columns to any Dynamic Tables created using our UDN. These are the common START_TIME, END_TIME, and IS_CURRENT columns that we see in history tables, slowly-changing dimensions, and similar structures that store a history of changes to an entity over time. I also added some lightweight transformations using INITCAP just to demonstrate how that works in Coalesce:

Defining table key(s) and a change timestamp add additional columns.
CREATE
OR REPLACE DYNAMIC TABLE "STEWART_DB"."SILVER"."DT_CALL_CENTER"
LAG = '60 Minutes' WAREHOUSE = compute_wh AS
SELECT
"CC_CALL_CENTER_ID" as "CC_CALL_CENTER_ID",
INITCAP("DT_CALL_CENTER"."CC_NAME") as "CC_NAME",
INITCAP("DT_CALL_CENTER"."CC_CLASS") as "CC_CLASS",
"CC_DML_TYPE" as "CC_DML_TYPE",
"CC_UPDATE_TS" as "START_TIME",
ifnull(
timestampadd(
'millisecond',
-1,
lag("CC_UPDATE_TS") over (
partition by "CC_CALL_CENTER_ID"
order by
"CC_UPDATE_TS" desc
)
),
to_timestamp('9999-12-31 23:59:59.999')
) as "END_TIME",
CASE
WHEN (
row_number() over (
partition by "CC_CALL_CENTER_ID"
order by
"CC_UPDATE_TS" desc
) = 1
) THEN TRUE
ELSE FALSE
END as "IS_CURRENT"
FROM
"STEWART_DB"."BRONZE"."DT_CALL_CENTER" "DT_CALL_CENTER"

Now, when we issue the show dynamic tables command in Snowsight, we see the SILVER.DT_CALL_CENTER table has a refresh_mode value of FULL and a refresh_mode_reason value of Change tracking is not supported on queries with window functions. This is a current limitation with Dynamic Tables and not unlike refresh limitations I’ve seen with other databases over the years. So Snowflake will still manage the refresh of the table and the dependencies on other Dynamic Tables that might be incremental, but each time the table is refreshed, it’s a flush and fill.

Creating a Dynamic Table with Current Values

It’s a common requirement to want downstream tables populated with only the current values for the business key(s) used to define a unique entity. This isn’t an either/or situation: with cheap storage in the Data Cloud, we can easily build another version of our CALL_CENTER table in our Silver zone that only has current values. Additionally, we’ve also added an option called Propagate Last Non-Delete. If the solution we’re using for detecting changes in the source delivers DELETE transactions with NULL values in many of the fields, we can grab the last non-delete value using a LAG function:

We can propagate non-null values to a DELETE record and QUALIFY current values.
CREATE
OR REPLACE DYNAMIC TABLE "STEWART_DB"."SILVER"."DT_CALL_CENTER_CURRENT"
LAG = '60 Minutes' WAREHOUSE = compute_wh AS
SELECT
CASE
WHEN "CC_DML_TYPE" = 'D' THEN lag("CC_CALL_CENTER_ID") over (
partition by "CC_CALL_CENTER_ID"
order by
"CC_UPDATE_TS" asc
)
ELSE "CC_CALL_CENTER_ID"
END as "CC_CALL_CENTER_ID",
CASE
WHEN "CC_DML_TYPE" = 'D' THEN lag(INITCAP("DT_CALL_CENTER"."CC_NAME")) over (
partition by "CC_CALL_CENTER_ID"
order by
"CC_UPDATE_TS" asc
)
ELSE INITCAP("DT_CALL_CENTER"."CC_NAME")
END as "CC_NAME",
CASE
WHEN "CC_DML_TYPE" = 'D' THEN lag(INITCAP("DT_CALL_CENTER"."CC_CLASS")) over (
partition by "CC_CALL_CENTER_ID"
order by
"CC_UPDATE_TS" asc
)
ELSE INITCAP("DT_CALL_CENTER"."CC_CLASS")
END as "CC_CLASS",
"CC_DML_TYPE" as "CC_DML_TYPE",
"CC_UPDATE_TS" as CC_UPDATE_TS,
CASE
WHEN (
row_number() over (
partition by "CC_CALL_CENTER_ID"
order by
"CC_UPDATE_TS" desc
) = 1
) THEN TRUE
ELSE FALSE
END as "IS_CURRENT"
FROM
"STEWART_DB"."BRONZE"."DT_CALL_CENTER" "DT_CALL_CENTER"
QUALIFY "IS_CURRENT"

Putting it All Together: a DAG of Dependencies

We’ll create one more table in our Gold zone called DT_SALES_BY_CALL_CENTER that joins our DT_CALL_CENTER_CURRENT Dynamic Table with another one I created called DT_CATALOG_SALES. Unlike materialized views, Dynamic Tables can be built with a SELECT that joins as many tables as necessary for our declarative logic. In this example, I’ve also included a SQL Test that ensures we only have one record per order:

SQL Tests pass when zero records are returned.
CREATE
OR REPLACE DYNAMIC TABLE "STEWART_DB"."GOLD"."DT_SALES_BY_CALL_CENTER"
LAG = '60 Minutes' WAREHOUSE = compute_wh AS
SELECT
"CC_CALL_CENTER_ID" as "CC_CALL_CENTER_ID",
"CS_ITEM_ID" as "CS_ITEM_ID",
"CS_ORDER_NUMBER" as "CS_ORDER_NUMBER",
"CC_NAME" as "CC_NAME",
"CS_SOLD_TS" as "CS_SOLD_TS",
"CC_CLASS" as "CC_CLASS",
"CS_QUANTITY" as "CS_QUANTITY",
"CS_NET_PROFIT" as "CS_NET_PROFIT"
FROM
"STEWART_DB"."SILVER"."DT_CATALOG_SALES" "DT_CATALOG_SALES"
JOIN "STEWART_DB"."SILVER"."DT_CALL_CENTER_CURRENT" "DT_CALL_CENTER_CURRENT"
ON "DT_CATALOG_SALES"."CS_CALL_CENTER_ID" = "DT_CALL_CENTER_CURRENT"."CC_CALL_CENTER_ID"

The declarative nature of Dynamic Tables is indeed powerful, but as the number of CDTAS statements grows, it can be difficult to define a mechanism for deployment: they need to be created (or recreated) in the correct order to ensure all the dependencies exist. This is where Coalesce shines the brightest. Not only does our column-aware architecture make defining Dynamic Tables frictionless, our built-in DAG understands dependency ordering. When building data pipelines without Dynamic Tables, Coalesce uses the dependency graph for both deployment and refreshing of tables and views. While Snowflake now handles the refresh part, Coalesce guarantees Dynamic Tables are created only after all upstream dependencies are satisfied.

The Coalesce DAG of dependencies is frictionless.

Read more

  1. Dynamic Tables: Delivering Declarative Streaming Data Pipelines with Snowflake
  2. Why Column Aware Matters
  3. Building Data Transformations at Scale with Data Patterns in Coalesce

--

--

Stewart Bryson
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

Snowflake Data Superhero | Oracle ACE Alum | Writer, speaker, podcast guest | Amateur cyclist | Professional philosopher