Snowflake Dynamic Tables with Coalesce.io
Enabling bleeding-edge Snowflake features with User-Defined Nodes
Declarative SQL Pipelines for Snowflake?
I’ve worked on several Confluent and Apache Kafka projects in the past and first used ksqlDB on one of them in 2018, not long after that feature became generally available. I was hooked. I wrote about the experience on the Confluent blog, discussed it on the Streaming Audio podcast, and even wrote some open-source software to make it easier to use. What clicked for me was how a data pipeline could be declared with SQL: define it once and let the platform handle the orchestration and execution.
Most projects I worked on back then used Snowflake, were batch-oriented, and required complex plumbing for handling pipeline refreshes. There is a Kafka connector for Snowflake, and I’ve used it on a few projects, but those organizations had larger investments in Confluent and weren’t using it just to load Snowflake. What I hoped for even back then was a declarative approach to data pipelines managed 100% by Snowflake, using Snowflake SQL.
Ask, and ye shall receive: Dynamic Tables
Dynamic Tables is the new name for the feature announced at Snowflake Summit as “Materialized Tables,” and is now in private preview. I love the new name, and I assume this decision was made to remove confusion with materialized views. Saras Nowak and Jeremiah Hansen wrote an in-depth article about the value proposition of Dynamic Tables, and I won’t repeat their work here. Let’s instead discuss some of the nuances of declarative pipelines and how they differ from more traditional data engineering approaches.
Idempotency
In legacy on-prem data warehouses, we were usually constrained by storage and compute. Modeling techniques arose to try and ease these constraints, specifically Kimball’s Bus Architecture, and this approach usually had us updating our final facts and dimensions with processes that were not idempotent. Maxime Beauchemin wrote about idempotency in the fantastic article called Functional Data Engineering, one of many fantastic articles from him. He describes it as “vital” in modern data pipelines, and I agree, demonstrated by the immutable Bronze zone in this diagram from my Does Modeling Still Matter in the Data Cloud? talk that I’ve given a few times now:
It’s important to understand that idempotency is not merely a best practice with Dynamic Tables… it’s a requirement. We define them with a CREATE DYNAMIC TABLE AS SELECT…
(CDTAS) statement, which means all the logic for constructing the various steps in our data pipeline must be defined with idempotent SELECT
statements: we cannot issue DML statements in these pipelines. I see this as a feature instead of a limitation because storage is the cheapest element of the Data Cloud; there’s no reason why the final Gold or Diamond zones should be the only places where we store history.
Coalesce.io user-defined nodes (UDNs)
The basic building block for defining data pipelines in Coalesce.io is called a node. Derived from the term in directed acyclic graph (DAG) theory, a node is a bundled definition of materialization, processing, and deployment logic for a step in a data pipeline. Coalesce ships with many out-of-the-box node types, but the real value comes from allowing our customers to create their own user-defined nodes (UDNs) or use one developed by the community published in a package. Our package functionality is currently in beta, but soon you’ll be able to browse and include packages from the Coalesce community hub. If you are interested in the Dynamic Tables UDN I describe below, reach out to support and request the coalesce-utils
package or browse the code yourself. For a preview of how it will look when generally available, packages will be installed with the coa
CLI referencing the Git repository containing the package:
❯ coa package add https://github.com/coalesceio/coalesce-utils
|2022-11-16T20:14:28.331Z|[CLI] | info:Add package initialized. running...
|2022-11-16T20:14:28.333Z|[CLI] | info:Initializing package installation...
|2022-11-16T20:14:28.334Z|[CLI] | info:Cloning Coalesce package from repo URL: https://github.com/coalesceio/coalesce-utils
|2022-11-16T20:14:29.716Z|[CLI] | info:Gathering your project workspace data for environment: 1
|2022-11-16T20:14:30.920Z|[CLI] | info:Installing package using namespace "COALESCE-UTILS"
|2022-11-16T20:14:31.140Z|[CLI] | info:Cleaning up any stale package data in your project...
|2022-11-16T20:14:31.140Z|[CLI] | info:Pruning workspace data: macros
|2022-11-16T20:14:31.140Z|[CLI] | info:Pruning workspace data: stepTypes
|2022-11-16T20:14:31.140Z|[CLI] | info:Preparing workspace data for import: macros
|2022-11-16T20:14:31.141Z|[CLI] | info:Preparing workspace data for import: stepTypes
|2022-11-16T20:14:31.143Z|[CLI] | info:Adding stepType: 6 as COALESCE-UTILS::6
|2022-11-16T20:14:31.503Z|[CLI] | info:added COALESCE-UTILS...update successful!
|2022-11-16T20:14:31.505Z|[CLI] | info:CLI Package Installation complete
The Dynamic Tables UDN
I want to start with a big thanks to John Gontarz for doing the initial prototype of this UDN: I really just smoothed out the edges. I’ll first add a Source called CALL_CENTER
to my DAG. As you can see, this table is a record of all changes, with a change timestamp and a DML type:
From the Source node, we now create a new Dynamic Table
node type, and prune down to just the interesting columns to improve readability. We set the Lag
, which determines how often Snowflake will refresh the table, and Refresh Warehouse
, which determines the warehouse to use, and then we click CREATE
and view the DDL that was executed:
CREATE
OR REPLACE DYNAMIC TABLE "STEWART_DB"."BRONZE"."DT_CALL_CENTER"
LAG = '60 Minutes' WAREHOUSE = compute_wh AS
SELECT
"CC_CALL_CENTER_ID" as "CC_CALL_CENTER_ID",
"CC_NAME" as "CC_NAME",
"CC_CLASS" as "CC_CLASS",
TO_TIMESTAMP("CALL_CENTER"."CC_UPDATE_TS") as "CC_UPDATE_TS",
"CC_DML_TYPE" as "CC_DML_TYPE"
FROM
"PACKAGES"."RETAIL"."CALL_CENTER" "CALL_CENTER"
This statement will enable Snowflake change tracking on any of the source tables referenced in the SELECT
statement. When we issue the show dynamic tables
command in Snowsight, I see that my table was created with a refresh_mode
of INCREMENTAL
, which is exactly what I wanted.
Enabling Change Tracking Options in the UDN
Not to be confused with Snowflake change tracking mentioned above, we’ve enabled adding custom change tracking columns to any Dynamic Tables created using our UDN. These are the common START_TIME
, END_TIME
, and IS_CURRENT
columns that we see in history tables, slowly-changing dimensions, and similar structures that store a history of changes to an entity over time. I also added some lightweight transformations using INITCAP
just to demonstrate how that works in Coalesce:
CREATE
OR REPLACE DYNAMIC TABLE "STEWART_DB"."SILVER"."DT_CALL_CENTER"
LAG = '60 Minutes' WAREHOUSE = compute_wh AS
SELECT
"CC_CALL_CENTER_ID" as "CC_CALL_CENTER_ID",
INITCAP("DT_CALL_CENTER"."CC_NAME") as "CC_NAME",
INITCAP("DT_CALL_CENTER"."CC_CLASS") as "CC_CLASS",
"CC_DML_TYPE" as "CC_DML_TYPE",
"CC_UPDATE_TS" as "START_TIME",
ifnull(
timestampadd(
'millisecond',
-1,
lag("CC_UPDATE_TS") over (
partition by "CC_CALL_CENTER_ID"
order by
"CC_UPDATE_TS" desc
)
),
to_timestamp('9999-12-31 23:59:59.999')
) as "END_TIME",
CASE
WHEN (
row_number() over (
partition by "CC_CALL_CENTER_ID"
order by
"CC_UPDATE_TS" desc
) = 1
) THEN TRUE
ELSE FALSE
END as "IS_CURRENT"
FROM
"STEWART_DB"."BRONZE"."DT_CALL_CENTER" "DT_CALL_CENTER"
Now, when we issue the show dynamic tables
command in Snowsight, we see the SILVER.DT_CALL_CENTER
table has a refresh_mode
value of FULL
and a refresh_mode_reason
value of Change tracking is not supported on queries with window functions.
This is a current limitation with Dynamic Tables and not unlike refresh limitations I’ve seen with other databases over the years. So Snowflake will still manage the refresh of the table and the dependencies on other Dynamic Tables that might be incremental, but each time the table is refreshed, it’s a flush and fill.
Creating a Dynamic Table with Current Values
It’s a common requirement to want downstream tables populated with only the current values for the business key(s) used to define a unique entity. This isn’t an either/or situation: with cheap storage in the Data Cloud, we can easily build another version of our CALL_CENTER
table in our Silver zone that only has current values. Additionally, we’ve also added an option called Propagate Last Non-Delete. If the solution we’re using for detecting changes in the source delivers DELETE
transactions with NULL
values in many of the fields, we can grab the last non-delete value using a LAG
function:
CREATE
OR REPLACE DYNAMIC TABLE "STEWART_DB"."SILVER"."DT_CALL_CENTER_CURRENT"
LAG = '60 Minutes' WAREHOUSE = compute_wh AS
SELECT
CASE
WHEN "CC_DML_TYPE" = 'D' THEN lag("CC_CALL_CENTER_ID") over (
partition by "CC_CALL_CENTER_ID"
order by
"CC_UPDATE_TS" asc
)
ELSE "CC_CALL_CENTER_ID"
END as "CC_CALL_CENTER_ID",
CASE
WHEN "CC_DML_TYPE" = 'D' THEN lag(INITCAP("DT_CALL_CENTER"."CC_NAME")) over (
partition by "CC_CALL_CENTER_ID"
order by
"CC_UPDATE_TS" asc
)
ELSE INITCAP("DT_CALL_CENTER"."CC_NAME")
END as "CC_NAME",
CASE
WHEN "CC_DML_TYPE" = 'D' THEN lag(INITCAP("DT_CALL_CENTER"."CC_CLASS")) over (
partition by "CC_CALL_CENTER_ID"
order by
"CC_UPDATE_TS" asc
)
ELSE INITCAP("DT_CALL_CENTER"."CC_CLASS")
END as "CC_CLASS",
"CC_DML_TYPE" as "CC_DML_TYPE",
"CC_UPDATE_TS" as CC_UPDATE_TS,
CASE
WHEN (
row_number() over (
partition by "CC_CALL_CENTER_ID"
order by
"CC_UPDATE_TS" desc
) = 1
) THEN TRUE
ELSE FALSE
END as "IS_CURRENT"
FROM
"STEWART_DB"."BRONZE"."DT_CALL_CENTER" "DT_CALL_CENTER"
QUALIFY "IS_CURRENT"
Putting it All Together: a DAG of Dependencies
We’ll create one more table in our Gold zone called DT_SALES_BY_CALL_CENTER
that joins our DT_CALL_CENTER_CURRENT
Dynamic Table with another one I created called DT_CATALOG_SALES
. Unlike materialized views, Dynamic Tables can be built with a SELECT
that joins as many tables as necessary for our declarative logic. In this example, I’ve also included a SQL Test that ensures we only have one record per order:
CREATE
OR REPLACE DYNAMIC TABLE "STEWART_DB"."GOLD"."DT_SALES_BY_CALL_CENTER"
LAG = '60 Minutes' WAREHOUSE = compute_wh AS
SELECT
"CC_CALL_CENTER_ID" as "CC_CALL_CENTER_ID",
"CS_ITEM_ID" as "CS_ITEM_ID",
"CS_ORDER_NUMBER" as "CS_ORDER_NUMBER",
"CC_NAME" as "CC_NAME",
"CS_SOLD_TS" as "CS_SOLD_TS",
"CC_CLASS" as "CC_CLASS",
"CS_QUANTITY" as "CS_QUANTITY",
"CS_NET_PROFIT" as "CS_NET_PROFIT"
FROM
"STEWART_DB"."SILVER"."DT_CATALOG_SALES" "DT_CATALOG_SALES"
JOIN "STEWART_DB"."SILVER"."DT_CALL_CENTER_CURRENT" "DT_CALL_CENTER_CURRENT"
ON "DT_CATALOG_SALES"."CS_CALL_CENTER_ID" = "DT_CALL_CENTER_CURRENT"."CC_CALL_CENTER_ID"
The declarative nature of Dynamic Tables is indeed powerful, but as the number of CDTAS statements grows, it can be difficult to define a mechanism for deployment: they need to be created (or recreated) in the correct order to ensure all the dependencies exist. This is where Coalesce shines the brightest. Not only does our column-aware architecture make defining Dynamic Tables frictionless, our built-in DAG understands dependency ordering. When building data pipelines without Dynamic Tables, Coalesce uses the dependency graph for both deployment and refreshing of tables and views. While Snowflake now handles the refresh part, Coalesce guarantees Dynamic Tables are created only after all upstream dependencies are satisfied.