Incremental, Parameterized, and Dynamic SQL all on Databricks SQL

Databricks SQL SME
DBSQL SME Engineering
17 min readMar 19, 2024
Databricks Workflows on DBSQL Run

Author: Cody Austin Davis

Note, this is an intermediate/advanced article covering many data warehousing concepts. This article will serve as an end to end example, while subsequent articles will break down each concept and dive deeper into the best practices of each one.

The last few months have been huge for data warehousing on DBSQL. Databricks has released features that have unlocked the ability to do advanced ETL on DBSQL natively in Databricks. In this blog, we will walk through a simple but real-world ETL use case to highlight all the added capabilities for users that are looking to do end-to-end ETL on Databricks SQL. The features that this blog will cover include:

  1. Running SQL notebooks on DSBQL with Workflows: Now, users can leverage all the advanced functionality of notebooks (visuals, widget parameters, cell-based, version history, etc etc.) right on DBSQL and even run those notebook on Workflows. The UX for orchestrating SQL workloads is just like the rest of the platform. This makes for a really nice user experience we will cover!
  2. SQL Variables: Users can now create and leverage variables in DBSQL. Variables can be statically set and also dynamically set as a result of a query. This unlocks a new universe of functionality for DBSQL warehousing that previously required python-enabled clusters.
  3. EXECUTE IMMEDIATE: Not only can users leverage variables, but now users can also execute constructed SQL strings dynamically (as a result of a VARIABLE query).

These features are just the start of what Databricks is aiming to provide for SQL user. These features alone open up massive use cases and design patterns that were not possible on DBSQL without using external tools or other custom workarounds.

All these features are great, but the best way to learn the impact of them is to see them used in the real world, so lets dive into an example!

NOTE: These notebooks offer a simple but rich set of examples for using Databricks SQL.There are a lot of details hidden in the notebooks, so we recommend importing the repo and trying this pipeline out yourself.

Incremental Ingestion Pipeline Demo

We are going to walk through creating and running a fully incremental SQL-only pipeline on DBSQL that ingests some IoT data, builds an incremental Bronze → Silver → Gold medallion architecture via change data feed, and does the following:

  1. DDL: Define DDL dynamically enabling change data feed for tables.

2. COPY INTO — Raw data into Bronze Layer with COPY INTO, loading the source data multiple times to simulate a “bad data” load so that we can more intelligently handle duplicates and late data with MERGE downstream.

3. Dynamic Bronze to Silver — Incrementally load Bronze to Silver Delta Tables with VARIABLES + Change Data Feed Version Tracking.

4. Silver to Gold Aggregates — Incrementally update Gold aggregates with custom lean MERGE behavior, VARIABLES, and Change Data Feed.

5. Dynamically Refresh BI View layer — We we build BI layer VIEWS dynamically based on version history to implement blue/green deployments.

This end-to-end demo uses DBSQL natively as the sole engine! No need for python clusters or other external orchestrators.

At the end of this demo, we will have a single Databricks Workflow that orchestrates an incremental and complete medallion architecture all in DBSQL. Our workflow output will look like this:

Author screenshot: Workflow output

In this example, we are using the change data feed of our Delta tables to do custom batching across stages with SQL-based version checkpoints. This is now relatively easy to do with SQL VARIABLES since we can dynamically get the most recently processed version of our Change Data Feed. While this design pattern is critical for certain use cases, in many use cases it is simpler and easier to simple do Streaming Tables + Materialized Views for your ETL pipelines. To learn how to use ST + MVs design patterns on DBSQL, check out this blog. Before we dive into the details, lets cover some heuristics on when you would use CDF-based batching (or other type of batching) vs Streaming Tables + Materialized Views for your SQL pipelines.

  1. Streaming Tables + Materialized Views — Most data pipeline use cases can be built with this design pattern. If the path of the data is not overly complex and relatively linear (A → B → C), then this is the simplest and best approach.
  2. CDF / Incremental Manual Batching — There are many cases where SQL ETL pipelines have more complex logic within a given batch. Some use cases don’t need to be able to “dial up and down” the data freshness interval, and sometimes they logically cannot change the refresh interval. Often for a given batch of data, users have many operations that they need to put the data through that make trying to fit the logic into a streaming pipeline infeasible. If you find that trying to fit your existing complex batch logic into a ST/MV pattern is proving to be incredibly difficult and you would prefer more control vs simplicity, this design pattern would be the better approach. This is most often found in large enterprise data warehouses.

So assuming we want to utilize these new SQL features to get more control over our batching logic, lets re-construct this job at each stage and see how we can use these new features in our pipeline.

DDL → Bronze Layer

First, in a notebook, we can define the DDL for our Bronze and Silver tables. In this notebook, we enable IDENTITY keys, Change Data Feed, as well as Liquid Clustering for our tables. To make this more interesting, we can start our notebook by using our VARIABLES and dynamic SQL that allows us to decide to “truncate and reload” our database. Our job will have 3 job parameters: Catalog, Schema, and ReloadFromScratch. This will allow us to dynamically point to a schema and catalog.

DECLARE OR REPLACE VARIABLE start_over_sql STRING;

SET VARIABLE start_over_sql = (SELECT
MAX(CASE WHEN "${ReloadFromScratch}" = 'no' THEN "SELECT 'Running incrmentally from last state' AS status" ELSE "DROP SCHEMA IF EXISTS IDENTIFIER('$Schema') CASCADE" END) AS stmt
);

EXECUTE IMMEDIATE start_over_sql;


-- Dynamically Set Scope for Notebook
USE CATALOG '${Catalog}';
CREATE SCHEMA IF NOT EXISTS IDENTIFIER('${Schema}');
USE IDENTIFIER(CONCAT('${Catalog}','.', '${Schema}'));

The notebook view looks like this:

Notebook View

Now we can define our table DDLs like so:

--===== Step 1 - Create Bronze Streaming Table
CREATE TABLE IF NOT EXISTS cdf_demo_bronze_sensors
(
Id BIGINT GENERATED BY DEFAULT AS IDENTITY,
device_id INT,
user_id INT,
calories_burnt DECIMAL(10,2),
miles_walked DECIMAL(10,2),
num_steps DECIMAL(10,2),
event_timestamp TIMESTAMP,
value STRING,
bronze_update_timestamp TIMESTAMP
)
USING DELTA
TBLPROPERTIES ('delta.targetFileSize' = '1mb', 'delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name', 'delta.enableChangeDataFeed'= 'true')
CLUSTER BY (Id, user_id, device_id)
;

---===== Step 2 - Create Silver Target Table
CREATE TABLE IF NOT EXISTS cdf_demo_silver_sensors
(
Id BIGINT GENERATED BY DEFAULT AS IDENTITY,
device_id INT,
user_id INT,
calories_burnt DECIMAL(10,2),
miles_walked DECIMAL(10,2),
num_steps DECIMAL(10,2),
event_timestamp TIMESTAMP,
value STRING,
silver_update_timestamp TIMESTAMP,
last_version INT -- Can either be tracked in the table or separately
)
TBLPROPERTIES ('delta.targetFileSize' = '1mb', 'delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name', 'delta.enableChangeDataFeed'= 'true')
CLUSTER BY (Id, user_id, device_id)
;


---===== OPTIONAL - Step 2 - Create Silver Target Table
CREATE TABLE IF NOT EXISTS cdf_demo_silver_sensors_scd_2
(
Id BIGINT GENERATED BY DEFAULT AS IDENTITY,
device_id INT,
user_id INT,
calories_burnt DECIMAL(10,2),
miles_walked DECIMAL(10,2),
num_steps DECIMAL(10,2),
event_timestamp TIMESTAMP,
value STRING,
is_active BOOLEAN,
start_timestamp INT,
end_timestamp INT,
silver_update_timestamp TIMESTAMP,
last_version INT -- Can either be tracked in the table or separately
)
TBLPROPERTIES ('delta.targetFileSize' = '1mb', 'delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name', 'delta.enableChangeDataFeed'= 'true')
CLUSTER BY (Id, user_id, device_id);


---===== Step 3 - Create CDF Version checkpoint tables for our incremental CDF pipelines

--TIP: Not always necessary depending on your update patterns, can track this directly in the silver table as well, just might be faster and more reliable if track in a separate table
CREATE TABLE IF NOT EXISTS cdf_checkpoint_silver_sensor (latest_version INT DEFAULT 0, update_timestamp TIMESTAMP)
TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported')
CLUSTER BY (latest_version)
;

CREATE TABLE IF NOT EXISTS cdf_checkpoint_gold_users (latest_version INT DEFAULT 0, update_timestamp TIMESTAMP)
TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported')
CLUSTER BY (latest_version)
;

CREATE TABLE IF NOT EXISTS cdf_checkpoint_gold_users (latest_version INT DEFAULT 0, update_timestamp TIMESTAMP)
TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported')
CLUSTER BY (latest_version)
;

CREATE TABLE IF NOT EXISTS cdf_checkpoint_gold_devices (latest_version INT DEFAULT 0, update_timestamp TIMESTAMP)
TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported')
CLUSTER BY (latest_version)
;

Here we added CLUSTER BY keys that represent our primary and foreign keys downstream. We also enabled delta.enableChangeDataFeed on all our pipeline tables in the TBL PROPERTIES section. Note that in our downstream Silver table, we add an update timestamp as well as a column that tracks the source table version that this update came from.

We also create small checkpoint tables that track the versions of our source CDF we have processed for each pipeline. We can then use these tables downstream to automatically get our next table version to process!

Now we can implement a simple Raw → Bronze pipeline with COPY INTO:

-- Can do graceful data type conversion directly in the COPY statement
-- COPY OPTIONS for robust handling of changes to source data / bronze tables

COPY INTO cdf_demo_bronze_sensors
FROM (SELECT
id::bigint AS Id,
device_id::integer AS device_id,
user_id::integer AS user_id,
calories_burnt::decimal(10,2) AS calories_burnt,
miles_walked::decimal(10,2) AS miles_walked,
num_steps::decimal(10,2) AS num_steps,
timestamp::timestamp AS event_timestamp,
value AS value, -- This is a JSON object,
now() AS bronze_update_timestamp
FROM "/databricks-datasets/iot-stream/data-device/")
FILEFORMAT = json -- csv, xml, txt, parquet, binary, etc.
COPY_OPTIONS('force'='true', 'ignoreChanges' = 'true', 'ignoreDeletes' = 'true', 'mergeSchema' = 'true') --'true' always loads all data it sees. option to be incremental or always load all files


--Other Helpful copy options:
/*
PATTERN('[A-Za-z0-9].json') --Only load files that match a regex pattern
FORMAT_OPTIONS ('ignoreCorruptFiles' = 'true') -- skips bad files for more robust incremental loads
COPY_OPTIONS ('mergeSchema' = 'true') --auto evolve the schema at the bronze layer
'ignoreChanges' = 'true' - ENSURE DOWNSTREAM PIPELINE CAN HANDLE DUPLICATE ALREADY PROCESSED RECORDS WITH MERGE/INSERT WHERE NOT EXISTS/Etc.
'ignoreDeletes' = 'true'
*/;

We can even run this COPY command multiple times to double check the robustness of our downstream incremental MERGE logic. To check behavior for other operations like updates and deletes, we can manually add some examples here to our Bronze layer:

-- We can inject strange behavior and then check downstream if properly propagated

UPDATE cdf_demo_bronze_sensors
SET num_steps = 99999
WHERE Id = 1;


DELETE FROM cdf_demo_bronze_sensors
WHERE Id = 2;

Now we have a working Bronze layer! Next, lets see how to dynamically read from our Bronze change data feed to do a MERGE to our Silver layer.

Bronze → Silver Layer

Now we can do some incremental ETL from our change data feed to MERGE data into our Silver layer. Next in our Bronze → Silver notebook, we can first dynamically get the most recently processed version of our Bronze table, and compare that to the most recent Bronze version like so:


---===== Step 5 - Create Dynamic Incremental Read Pattern
DECLARE OR REPLACE VARIABLE checkpoint_version_bronze_to_silver INT;
DECLARE OR REPLACE VARIABLE next_temp_checkpoint INT;

-- This can either be stored in the target table or in a separate state table that is smaller
SET VARIABLE checkpoint_version_bronze_to_silver = (SELECT COALESCE(MAX(latest_version), 0) FROM cdf_checkpoint_silver_sensor);


SET VARIABLE next_temp_checkpoint = (SELECT COALESCE(MAX(_commit_version), checkpoint_version_bronze_to_silver) AS tmp_v
FROM table_changes('cdf_demo_bronze_sensors', checkpoint_version_bronze_to_silver)
WHERE _commit_version > checkpoint_version_bronze_to_silver
);


SELECT CONCAT('Latest Version Processed: ', checkpoint_version_bronze_to_silver::string) AS latest_check,
CONCAT('Next Checkpoint Max Version For Active Batch: ', next_temp_checkpoint::string) AS next_check

This gives us the output:

Using SQL Variables to manage version state

There is a lot happening here, so lets break down what this is doing:

  1. Define the “cdf_checkpoint_silver_sensor” checkpoint table (in previous step) and read the most recent version in that checkpoint. At the end of this step, we will insert a new updated max version into this checkpoint. This tracks the most recent version we processed into Silver already.
  2. Read the most recent version of the Bronze table (Source) to see what the latest version is. Once we process this batch, this will be our new max processed version in the cdf_checkpoint_silver_sensor table.

Ultimately, we are automating the tracking of our incremental batches. This is similar to what streaming tables are doing under the hood, but now with this extra control, we can enable more complex use cases that require more complex/nuanced batch logic that is common in existing data warehouses today.

Next, we can use this in our incremental MERGE to Silver:

---===== Step 6 - MERGE incremental reads into batch silver table
/*
Common scenarios to handle

1. Late arriving data (make sure silver latest_update_timestamp < source_record_update_timestamp)

2. Duplicate + late updates (decide on when the latest source upate timestamp for a given id can be)

3. Updates After Deletes (within a batch and across batches) --> Ignore Updates for Non matches to target table

4. Inserts After Deletes --> Insert if business logic dictates, if you do NOT want to insert a record that has already been deleted, then use SCD2 and check history to add merge condition on WHEN NOT MATHCED AND <found_deleted_record>


-- TIP - If you have deletes to deal with, you want to track your state in a separate table, because there are scenarios where the
*/
MERGE INTO cdf_demo_silver_sensors AS target
USING (
SELECT *
FROM table_changes('cdf_demo_bronze_sensors', checkpoint_version_bronze_to_silver) -- add one to increment the last version that has impacted the target layer
WHERE 1=1
AND _change_type != 'update_preimage'
AND _commit_version > checkpoint_version_bronze_to_silver -- Only bring in new data from new CDF version, if no new data, gracefully merge no data
QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY _commit_timestamp DESC) = 1 -- Get the most recent CDF behavior per unique record identifier just in case data source throws dup updates/deletes/inserts
) AS source
ON source.Id = target.Id
WHEN MATCHED
AND source._change_type IN ('insert' , 'update_postimage')
AND ((source.bronze_update_timestamp > target.silver_update_timestamp) OR target.silver_update_timestamp IS NULL) -- Check to make sure older / late records do not sneak in
THEN UPDATE
SET target.calories_burnt = source.calories_burnt,
target.miles_walked = source.miles_walked,
target.silver_update_timestamp = source._commit_timestamp,
target.last_version = source._commit_version

WHEN MATCHED
AND source._change_type = 'delete' -- NON SCD Type 2 Deletion - has some considerations and potential to re-run old data if not a single record retains the new CDF commit version
AND ((source._commit_timestamp > target.silver_update_timestamp) OR source.bronze_update_timestamp > target.silver_update_timestamp)

THEN DELETE

WHEN NOT MATCHED AND source._change_type IN ('insert', 'update_postimage') -- Inserts + update for same record can be in same initial batch, where update wins as the most recent record
-- Ignore 'delete' not matches, because you cant delete something that isnt there
--- Then get most recent record if there are duplicate/multiple record updates in the source CDF batch
THEN INSERT (Id, device_id, user_id, calories_burnt, miles_walked, num_steps, event_timestamp, value, silver_update_timestamp, last_version)
VALUES(source.Id, source.device_id, source.user_id, source.calories_burnt, source.miles_walked, source.num_steps, source.event_timestamp, source.value, source._commit_timestamp, source._commit_version)
;

This is a real-world example of an incremental and comprehensive merge statement. This merge looks like many common MERGE scenarios that must handle deletes, erroneous/duplicate upstream updates, as well as late arriving data. Lets break down what is happening:

  1. We read the Bronze table CDF with the table_changes function, but this time, using the SQL VARIABLE to dynamically read from the version we last processed.
  2. We then use a QUALIFY statement to de-dup potential erroneous intra-batch updates/inserts from our source feed. This makes for a more robust pipeline if your data source is a bit noisy/messy. This logic will pick the most recent update for a given record within the batch.
  3. Merge ON our Id or other necessary merge keys (remember, we have CLUSTERED on our merge keys.)
  4. We then write the rest of the MERGE command to update records, adding some extra checks to make sure late arriving records do not update records in the Silver table needlessly. This logic is more specific to your data behavior and requirements and serves as a starting point example.

Now that we have the MERGE written, lets make sure it provided the correct behavior by checking our erroneous updates/delete we inserted above:

Checking for update/delete propogation downstream

The merge works! Now that our pipeline ran successfully, our last step is to update our tracking table to update the last processed version by running a simple SQL insert:

INSERT INTO cdf_checkpoint_silver_sensor BY NAME 
SELECT next_temp_checkpoint AS latest_version, now() AS update_timestamp;

Now we have incrementally read from our Bronze to Silver layer all using DBSQL! Lets now do the same thing, but for incrementally refreshing Gold aggregate tables.

Silver → Gold Incremental Aggregate Layer

Many downstream tables do summaries and aggregations on the main Silver data model. Whether these are cumulative tables, dimensional snapshots for reporting, or smoothed views of underlying noisy data, we can use this dynamic change data feed ETL design pattern to have more control over how these downstream Gold tables are updated in DBSQL. Lets take a look an the example:

CREATE TABLE IF NOT EXISTS cdf_demo_gold_user_num_step_aggregate
(user_id INT,
total_steps DECIMAL,
gold_steps_update_timestamp TIMESTAMP,
latest_silver_version INT)
CLUSTER BY (user_id, gold_steps_update_timestamp); --Using liquid clustering

-- For small simple fact/dim tables, this can and should just be a materialized view
-- But if the logic for incrementally update this table is more complex, you may want more control.

We created our table and clustered on our primary key and update_timestamp. Note, that most simple Gold aggregate tables can and probably should be materialized views, but many data warehousing use cases require a bit more control. To show how we might do this manually in Databricks SQL, we write a new Silver → Gold MERGE statement for our incremental aggregate table. To do this, we do the same thing we did above to track our Bronze → Silver ETL:


---===== Step 5 - Create Dynamic Incremental Read Pattern
DECLARE OR REPLACE VARIABLE checkpoint_version_silver_to_gold_steps_agg INT;
DECLARE OR REPLACE VARIABLE next_temp_checkpoint INT;

-- This can either be stored in the target table or in a separate state table that is smaller
SET VARIABLE checkpoint_version_silver_to_gold_steps_agg = (SELECT COALESCE(MAX(latest_version), 0) FROM cdf_checkpoint_gold_users);


SET VARIABLE next_temp_checkpoint = (SELECT COALESCE(MAX(_commit_version), checkpoint_version_silver_to_gold_steps_agg) AS tmp_v
FROM table_changes('cdf_demo_silver_sensors', checkpoint_version_silver_to_gold_steps_agg)
WHERE _commit_version > checkpoint_version_silver_to_gold_steps_agg
);


SELECT CONCAT('Latest Silver --> Gold Steps Agg Version Processed: ', checkpoint_version_silver_to_gold_steps_agg::string) AS latest_check,
CONCAT('Next Checkpoint Max Version For Active Batch: ', next_temp_checkpoint::string) AS next_check

Now lets do our Silver → Gold MERGE:

---===== Step 6 - MERGE incremental reads into batch silver table
/*
Common scenarios to handle

1. Late arriving data (make sure silver latest_update_timestamp < source_record_update_timestamp)

2. Duplicate + late updates (decide on when the latest source upate timestamp for a given id can be)

3. Updates After Deletes (within a batch and across batches) --> Ignore Updates for Non matches to target table

4. Inserts After Deletes --> Insert if business logic dictates, if you do NOT want to insert a record that has already been deleted, then use SCD2 and check history to add merge condition on WHEN NOT MATHCED AND <found_deleted_record>

*/
MERGE INTO cdf_demo_gold_user_num_step_aggregate AS target
USING (

WITH users_to_recalculate AS (
SELECT user_id, MAX(_commit_version) AS latest_version
FROM table_changes('cdf_demo_silver_sensors', checkpoint_version_silver_to_gold_steps_agg) -- add one to increment the last version that has impacted the target layer
WHERE 1=1
AND _commit_version > checkpoint_version_silver_to_gold_steps_agg -- Only bring in new data from new CDF version, if no new data, gracefully merge no data
GROUP BY user_id
)
-- Prep already incrementalized Aggregate from CDF
SELECT
s.user_id,
SUM(s.num_steps) AS total_steps,
MAX(latest_version) AS latest_version
FROM cdf_demo_silver_sensors s
INNER JOIN users_to_recalculate AS update_users ON s.user_id = update_users.user_id
GROUP BY s.user_id

) as source
ON source.user_id = target.user_id

WHEN MATCHED
THEN UPDATE
SET target.total_steps = source.total_steps,
target.gold_steps_update_timestamp = current_timestamp(),
target.latest_silver_version = source.latest_version

WHEN NOT MATCHED
THEN INSERT (user_id, total_steps, latest_silver_version, gold_steps_update_timestamp)
VALUES(source.user_id, source.total_steps, source.latest_version, current_timestamp())
;

/* Update the checkpoint on success */
INSERT INTO cdf_checkpoint_gold_users BY NAME
SELECT next_temp_checkpoint AS latest_version, now() AS update_timestamp
WHERE next_temp_checkpoint > (SELECT COALESCE(MAX(latest_version), 0) FROM cdf_checkpoint_gold_users)
;

Here we use table_changes to incrementally get the distinct user_ids that have any updates. Then we join only those updated users to the Silver table to recalculate user-level aggregations to Gold, all while utilizing file pruning on our cluster keys.

Now as a bonus, lets check out how we can use dynamic SQL + VIEW to selectively swap a view definition for end users.

Dynamic BI View Layer

At the end of the pipeline, we can also use dynamic SQL to control refreshes on VIEWs and other data assets. This functionality enables a common design pattern called blue/green deployments — where you run 2 parallel data models/pipelines and switch between which model is the “active” version. You can also utilize this pattern to perform other sanity checks on the pipeline results and underlying data model before you publish or dynamically publish active views based on some other business requirement.

Lets create a small VIEW that gives BI users an easy data table to calculate number of steps calculated by a user vs the total number of steps calculated:

The underlying view definition would look something like this with our above tables:

CREATE OR REPLACE VIEW bi_view_cumulative_steps
AS
SELECt s.*,
u.total_steps AS TotalStepsByUser,
SUM(s.num_steps) OVER (PARTITION BY s.user_id
ORDER BY s.event_timestamp) AS cumulative_steps
FROM cdf_demo_silver_sensors s
INNER JOIN cdf_demo_gold_user_num_step_aggregate u ON u.user_id = s.user_id
WHERE s.user_id = 1
ORDER BY s.user_id, s.event_timestamp

Now we can create example of writing a dynamic query to perform some basic quality / rule checks before we allow the VIEW to be impacted by the pipeline updated like this:

DECLARE OR REPLACE VARIABLE active_silver INT;
DECLARE OR REPLACE VARIABLE prev_silver INT;
DECLARE OR REPLACE VARIABLE active_gold INT;

-- This can either be stored in the target table or in a separate state table that is smaller
SET VARIABLE (active_silver, prev_silver) = (WITH h AS (DESCRIBE HISTORY cdf_demo_silver_sensors)
SELECT MAX(version) AS v, COALESCE(MAX(version) - 1, 0) AS v_minus_1 FROM h
);


SELECT CONCAT('Latest Silver Table Version', active_silver::string) AS ActiveSilverTableVersion,
CONCAT('Previous Silver Table Version', prev_silver::string) AS PrevSilverTableVersion
;



--- Implement some data quality rules to determine which version of the table to use (or blue/green)
DECLARE OR REPLACE VARIABLE version_to_publish INT;


SET VARIABLE version_to_publish = (WITH deployment_gate_rules AS (
SELECT
count_if(event_timestamp >= (now() - INTERVAL 10 YEARS)) AS NewishRecords,
COUNT(0) AS Records
FROM cdf_demo_silver_sensors
)
SELECT

IF(
(NewishRecords::float / Records::float ) > 0.95 -- RULE 1
AND Records = 999999 -- RULE 2
--AND -- Add more rules!
, active_silver
, prev_silver) AS v
FROM deployment_gate_rules
);

SELECT CONCAT('Version of source table to publish in active view: ', version_to_publish) AS DeploymentVersion;

In this example, we check the distribution of the age of the records and also validate count of the table within a SQL IF statement. This allows us to dynamically choose whether or not to publish to the newest version of the table, or use the version before the pipeline made the most recent updates. This small example can be extended to all kinds of business rule systems, especially for blue/green database deployments.

Now that we have a variable that defines the version of the underlying tables we should publish, we can use Dynamic SQL to define a VIEW:

DECLARE OR REPLACE VARIABLE dynamic_view STRING;

SET VARIABLE dynamic_view = CONCAT("CREATE OR REPLACE VIEW bi_view_cumulative_steps
AS
SELECt s.*,
u.total_steps,
SUM(s.num_steps) OVER (PARTITION BY s.user_id ORDER BY s.event_timestamp) AS cumulative_steps
FROM cdf_demo_silver_sensors VERSION AS OF ", version_to_publish::int, " s
INNER JOIN cdf_demo_gold_user_num_step_aggregate u ON u.user_id = s.user_id
WHERE s.user_id = 1
ORDER BY s.user_id, s.event_timestamp");


EXECUTE IMMEDIATE dynamic_view

Notice that the VIEW is created dynamically at the specific previous Silver table version.

Now we have a complete Raw → BI data pipeline using all the newest SQL capabilities. All we have to do is create a workflow with these notebooks using our normal notebook task type:

Orchestrating a SQL notebook on DBSQL with Workflows

The only difference here is that you can now use DBSQL as the compute engine for your SQL notebooks in workflows (the new banner above)! And now jobs and task-level parameters can be referenced in SQL notebooks just like any other notebook widgets, letting you seamlessly parameterize your SQL workflows on DBSQL. This brings the seamless notebook experience to many more data warehousing use cases and ultimately unifies the development experience across the product as a whole.

Now we can observe our DBSQL workflows just like the rest of our jobs on Databricks:

ETL Observability on DBSQL + Workflows

We hope this article helped spark ideas and gives readers simple yet real-world examples on how to leverage SQL in Databricks for data warehousing. This is just the start of where Databricks is going with the SQL development experience on Databricks. Look out for more articles on this publication to stay up to date on all the latest and greatest capabilities.

--

--

Databricks SQL SME
DBSQL SME Engineering

One stop shop for all technical how-tos, demos, and best practices for building on Databricks SQL