Real-time ETL on Databricks SQL with Streaming Tables and Materialized Views

Databricks SQL SME
DBSQL SME Engineering
9 min readNov 30, 2023
Author Screenshot: Materialized View / Streaming Table Pipeline

Authors: Megan Fogal and Yatish Anand

Introduction:

Historically, data teams would create different architectures for batch and streaming needs, often with disparate tools and engines (i.e. 1 engine for streaming, 1 for batch processing). This led to the commonly known ETL design patterns such as the Lambda and Kappa Architectures. These architectures usually require multiple technologies to move, govern, and access data, which drastically increases complexity and cost. Now with Streaming Tables and Materialized Views available in DBSQL, users can implement a single-unified data architecture that supports both batch and streaming capabilities with the same engine. In this blog, we are going to explore creating a Medallion Architecture pipeline using two new features of Databricks SQL (DBSQL): Streaming Tables(STs) and Materialized Views(MVs). This article will highlight a new, real-time, and declarative way of orchestrating tables directly in the DBSQL Warehouse. This allows users to unify all data processing strategies and move DBSQL to the center of their ETL architecture.

What is the Medallion Architecture?

The Lakehouse Medallion Architecture is a series of 3 layers that correlate to the quality of data: “Bronze”, “Silver”, and “Gold”.

  • The bronze layer is the raw data appended straight from the source with no cleaning, serving as a historical record.
  • The silver layer houses cleaned and enriched data, often including data quality expectations which align to downstream requirements. Common consumers of the silver layer include Data Science and internal product teams.
  • The gold layer is highly aggregated and refined data for specific business use cases. Common consumers of this layer include BI/Reporting applications and business analysts.

To learn more about the medallion architecture, check out this blog.

Introduction to Streaming Tables And Materialized Views

With the introduction of two new DBSQL features, streaming tables and materialized views, SQL analysts are now empowered to perform data engineering tasks and introduce real-time capabilities within their existing workflow. This allows analysts to conduct ETL tasks directly on live data streams within the Databricks environment using familiar SQL, eliminating reliance on third-party tools and data engineering teams.

Let’s take a look at streaming tables and materialized views:

Streaming Tables:

Streaming tables are Unity Catalog managed tables that support append-only incremental and streaming data processing from various data sources. In the medallion architecture they are commonly used for the bronze and the silver layer tables. Under the hood, a Delta Live Tables (DLT) pipeline is created for each streaming table. These tables are updated with each refresh, we’ll revisit this later.

Materialized Views:

In DBSQL, materialized views are Unity Catalog managed tables that store precomputed results based on the latest version of data in the source table. They differ from materialized views in traditional SQL databases in that instead of always updating results when the materialized view is queried, the results of a materialized view in Databricks reflect the state of data when the materialized view was last refreshed. By precomputing results, materialized views can reduce cost and improve latency. They are particularly beneficial for SQL analysts as they offer quick query retrieval due to being pre-computed, thus improving the overall analyst experience including BI dashboarding.

Now we’re going to build an end-to-end medallion architecture pipeline using these new DBSQL features.

Our goal is to take data from an example dataset consisting of customer financial tables and build a pipeline to deliver a high quality gold layer table for the analyst team. This is what we’ll be building:

Author Screenshot: Unity Catalog Lineage of Medallion Architecture

Bronze Layer

Step 1 — Incrementally ingest data to create the bronze layer: For the first stage, we will create a Streaming Table for incremental data processing of a raw table that contains new customer loan data. As mentioned above, a DLT pipeline will be automatically created behind the scenes.

CREATE OR REFRESH STREAMING TABLE raw_txs
COMMENT "New raw loan data incrementally ingested from cloud object storage landing zone"
AS SELECT * FROM STREAM read_files('/demos/dlt/loans/raw_transactions')

Two key things are occurring in the code above. First, the CREATE operation uses a DBSQL warehouse for the initial creation and data loading for this table. Second, the REFRESH operation leverages DLT. When a streaming table undergoes a refresh, it triggers an update to the DLT pipeline for the execution.

Upon executing the REFRESH command, you receive a DLT pipeline link. You can use this to monitor the status of the refresh.

Author Screenshot: Creating a Streaming Table

If you open the link, you are able to see the DLT pipeline that refreshes and manages the table state under the hood

Underlying Streaming Table Pipeline

Step 2 — Make a materialized view of a dimension table: Next, we will create a table that contains reference data that can be used to enrich our streaming tables. Dimension tables can be built in any part of the ETL pipeline, and in this example we will create a materialized view containing the metadata. Since dimension tables are rarely updated, a materialized view is appropriate here.

CREATE MATERIALIZED VIEW ref_accounting_treatment
AS SELECT * FROM delta.`/demos/dlt/loans/ref_accounting_treatment`;

The question often arises of when to use a streaming table and when to use a materialized view. In general, streaming tables are beneficial when there’s a need to process these streams of data as they arrive and rapidly transform them. Materialized views are useful when you are looking to transform and enrich data to make a table that is frequently accessed or commonly queried.

Silver Layer

Step 3 — Enrich transactions with metadata to create a silver layer: We will perform a stream-static join to enrich the raw_txs table with the metadata from our dimension table. With this, new records from the stream will be joined with data from the static table.

CREATE OR REFRESH STREAMING TABLE new_txs
COMMENT "Livestream of new transactions"
AS SELECT txs.*, ref.accounting_treatment as accounting_treatment FROM stream(raw_txs) txs
INNER JOIN ref_accounting_treatment ref ON txs.accounting_treatment_id = ref.id

Step 4 — Use DLT expectations to separate good and bad transactions: As DLT is being used under the hood, we can use DLT expectations to define data quality constraints. For more information on DLT expectations, check out the doc here. We are going to set 3 expectations to create our table with ‘good’ transactions.

  1. Payment date must be after 2020–12–31
  2. Balance and arrears must be greater than 0, if it fails this, the record is dropped.
  3. Cost center code cannot be null. If it is, the execution will be stopped immediately.
CREATE OR REFRESH STREAMING TABLE cleaned_new_txs (
CONSTRAINT `Payments should be this year` EXPECT (next_payment_date > date('2020-12-31')),
CONSTRAINT `Balance should be positive` EXPECT (balance > 0 AND arrears_balance > 0) ON VIOLATION DROP ROW,
CONSTRAINT `Cost center must be specified` EXPECT (cost_center_code IS NOT NULL) ON VIOLATION FAIL UPDATE
)
COMMENT "Livestream of new transactions, cleaned and compliant"
AS SELECT * from STREAM(new_txs)

Similarly, we will create a quarantine table to hold all of the ‘bad’ transactions. We’re holding onto these transactions so they can be used to analyze and monitor reasons for failure, additionally for compliance and audit needs.

CREATE OR REFRESH STREAMING TABLE quarantine_bad_txs (
CONSTRAINT `Payments should be this year` EXPECT (next_payment_date <= date('2020-12-31')),
CONSTRAINT `Balance should be positive` EXPECT (balance <= 0 OR arrears_balance <= 0) ON VIOLATION DROP ROW
)
COMMENT "Incorrect transactions requiring human analysis"
AS SELECT * from STREAM(new_txs)

Gold Layer:

Step 5 — Make an aggregated materialized view: Finally, we will use our silver tables to aggregate and refine our data. Here, we are going to use a materialized view so that the results are precomputed based on the latest version of the data, improving latency.

We’re going to make our gold table by aggregating loan balances per cost center.

CREATE MATERIALIZED VIEW new_loan_balances_by_cost_center
COMMENT "Live view of new loan balances for consumption by different cost centers"
TBLPROPERTIES ("pipelines.autoOptimize.zOrderCols" = "cost_center_code")
AS SELECT sum(balance) as sum_balance, cost_center_code FROM cleaned_new_txs
GROUP BY cost_center_code

You’ll notice we added in a DLT table property to Z-Order this materialized view. Since this table may be commonly queried, we added in a Z-order to accelerate Delta Lake data-skipping to provide faster queries on this materialized view. Z-ordering colocates related information based on the column specified. For this table, we are Z-ordering by ‘cost_center_code’ because it has high cardinality and is expected to be used frequently in query predicates.

Lineage and Governance

Step 6: Check out lineage: Now that we have all our tables and materialized views made, let’s check them out. Remember, all of these are in Unity Catalog, meaning we get lineage diagrams and information to visualize.

We can check out the Catalog Explorer to discover and govern our tables and materialized views.

Author Screenshot: Final database of tables

If you click on any of these, for example cleaned_new_txs, you have a lineage tab. Here, we can see a list of upstream and downstream tables, as well as tabs to view any pipelines, models, notebooks, queries, etc., that use this table.

Author Screenshot: Table Lineage

If you click on “See lineage graph” and expand everything, you will get a visual of the entire lineage of the pipeline.

Author Screenshot: End to End Table Lineage

Step 7: Create refresh queries for materialized views.

Now we’re going to set up REFRESH queries for our materialized views.

refresh_ref:

REFRESH MATERIALIZED VIEW ref_accounting_treatment

refresh_new_loan_balances:

REFRESH MATERIALIZED VIEW new_loan_balances_by_cost_center

These refreshes will initiate a DLT pipeline in the background. Just like with any other DLT pipeline, we can query the event log to view information about the pipeline such as lineage or data quality. Let’s check out the event log to see what type of refreshes have been done to our materialized view (full compute or incremental).

SELECT
timestamp,
message
FROM
event_log(TABLE(new_loan_balances_by_cost_center))
WHERE
event_type = 'planning_information'
ORDER BY
timestamp desc
Author Screenshot: Materialized View/Streaming Table Event Log

From these results, we can see that the first refresh was a full one as expected, then there are two resulting in ‘NO_OP’, meaning that no changes to the base table were made. The most recent refresh is marked as ‘ROW_BASED’, because additional rows were inserted to the base table, indicating that the materialized view was incrementally refreshed.

Step 8: Schedule the tables for automatic refresh

Both streaming tables and materialized views can be scheduled to refresh automatically based on a defined schedule. These schedules use CRON syntax. These can be scheduled for automatic refresh both in the initial CREATE query or with a separate ALTER TABLE query. For example, we can schedule a streaming table to run every hour.

CREATE query with schedule:

CREATE STREAMING TABLE new_txs
SCHEDULE CRON '0 0 * * * ? *'
COMMENT "Livestream of new transactions"
AS SELECT txs.*, ref.accounting_treatment as accounting_treatment FROM stream(raw_txs) txs
INNER JOIN ref_accounting_treatment ref ON txs.accounting_treatment_id = ref.id

ALTER TABLE query:

ALTER STREAMING TABLE new_txs_2
ADD SCHEDULE CRON '0 0 * * * ? *'

Conclusion

Thank you for reading, we hope you found this article helpful. Be on the lookout for more how-to articles and reach out if there is something specific you would like to see from the DBSQL SME team.

--

--

Databricks SQL SME
DBSQL SME Engineering

One stop shop for all technical how-tos, demos, and best practices for building on Databricks SQL