Revolutionize Your Data Workflow: Harnessing the Power of Delta Live Tables for Streamlined ETL

Andrés Zoppelletto
Blue Orange Digital
7 min readApr 25, 2024

Delta Live Tables (DLT) is the declarative Databricks ETL framework for their Data Intelligence Platform. It helps data teams simplify streaming and batch ETL cost-effectively. Once you define the transformations to perform on your data you can let DLT pipelines automatically manage task orchestration, cluster management, monitoring, data quality and error handling.

databricks Data Intelligence Platform

Delta Live Tables streamlines ETL by handlying relationships among datasets, and deploying and adjusting production infrastructure automatically to guarantee the prompt and precise delivery of data according to your requirements.

Let’s get to work

This demo walks through consuming Parquet files, performing transformations, and creating Delta tables step by step. Key steps include creating compute clusters and SQL warehouses, importing Python code, configuring input streams, generating Delta Live Tables, and creating DLT pipelines.

Creating resources

For this demo, we will need a compute cluster (to run the Python scripts that prepare the data) and a SQL warehouse (to consume the final DLT).

We will start creating the compute cluster, with minimal requirements. Don’t forget to set the “Terminate after xx minutes of inactivity” in order to avoid charges.

Then, we will create the SQL warehouse (serverless in this case):

Once both resources are created, we need to check the cluster’s status. We need the cluster started to be able to use it from our initial setup notebook.

Importing code

From our User’s workspace we can import the following Python notebook to start with the demo. This code will configure the input streams.

# Databricks notebook source
# MAGIC %md
# MAGIC
# MAGIC # Create Demo database

# COMMAND ----------

# MAGIC %md
# MAGIC Create a demo database

# COMMAND ----------


spark.sql(f"USE CATALOG hive_metastore")
spark.sql(f"DROP DATABASE IF EXISTS demo_db CASCADE")
spark.sql(f"CREATE DATABASE IF NOT EXISTS demo_db")
spark.sql(f"USE DATABASE demo_db")

# COMMAND ----------

# MAGIC %md
# MAGIC
# MAGIC # Setup Stream

# COMMAND ----------

# MAGIC %md
# MAGIC
# MAGIC ## Getting the schema sources

# COMMAND ----------

# MAGIC %md
# MAGIC Reading a short set of files from the source in order to identify the schema sample structure

# COMMAND ----------


file_schema = (spark.read
.format("parquet")
.option("inferSchema", True)
.load("dbfs:/databricks-datasets/samples/lending_club/parquet/*.parquet")
.limit(10)
.schema)

# COMMAND ----------

# MAGIC %md
# MAGIC Reading the parquet stream

# COMMAND ----------


from pyspark.sql.functions import to_timestamp, col

dfLendingClub = (spark.readStream
.format("parquet")
.schema(file_schema)
.load("dbfs:/databricks-datasets/samples/lending_club/parquet/*.parquet")
.withColumn("earliest_cr_line", to_timestamp(col("earliest_cr_line"), "MMM-yyyy"))
.withColumn("last_pymnt_d", to_timestamp(col("last_pymnt_d"), "MMM-yyyy"))
.withColumn("next_pymnt_d", to_timestamp(col("next_pymnt_d"), "MMM-yyyy"))
.withColumn("issue_d", to_timestamp(col("issue_d"), "MMM-yyyy")))

# See the dataframe
display(dfLendingClub)

# COMMAND ----------

# MAGIC %md
# MAGIC Turn on auto compacting and optimize features for new tables

# COMMAND ----------

# MAGIC %sql
# MAGIC
# MAGIC set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;
# MAGIC set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;

# COMMAND ----------

# MAGIC %md
# MAGIC Writing the initial *delta* stream with raw unaltered data

# COMMAND ----------


(dfLendingClub.writeStream
.format("delta")
.option("checkpointLocation", "/tmp/demo_db/bronze_compact")
.option("path", f"/tmp/demo_db/lending_club_stream_compact")
.table("lending_club_stream_compact"))

# COMMAND ----------


display(dbutils.fs.ls(f"dbfs:/tmp/demo_db/lending_club_stream_compact"))

# COMMAND ----------

# MAGIC %md
# MAGIC
# MAGIC # Medallion Architecture

# COMMAND ----------

# MAGIC %md
# MAGIC ## Setup

# COMMAND ----------

# MAGIC %md
# MAGIC
# MAGIC ## Bronze layer
# MAGIC ### Creating the dataframe

# COMMAND ----------


from pyspark.sql.functions import DataFrame

df_bronze : DataFrame = spark.readStream\
.format("delta")\
.table("lending_club_stream_compact")

# COMMAND ----------

# MAGIC %md
# MAGIC
# MAGIC ## Silver layer
# MAGIC ### Deduplicating Bronze data

# COMMAND ----------


df_silver : DataFrame = df_bronze.distinct()

# COMMAND ----------

# MAGIC %md
# MAGIC ### Writing deduplicated silver streams

# COMMAND ----------


df_silver.writeStream\
.format("delta")\
.option("checkpointLocation", "/tmp/demo_db/silver")\
.option("path", f"/tmp/demo_db/lending_club_stream_silver")\
.table("lending_club_stream_silver")

# COMMAND ----------


display(dbutils.fs.ls(f"dbfs:/tmp/demo_db/lending_club_stream_silver"))

# COMMAND ----------

# MAGIC %md
# MAGIC ### Write deduplicated silver streams updates

# COMMAND ----------


df_silver.writeStream\
.format("delta")\
.option("checkpointLocation", "/tmp/demo_db/silver_updates")\
.option("path", f"/tmp/demo_db/lending_club_stream_silver_updates")\
.table("lending_club_stream_silver_updates")

# COMMAND ----------


display(dbutils.fs.ls(f"dbfs:/tmp/demo_db/lending_club_stream_silver_updates"))

# COMMAND ----------

# MAGIC %md
# MAGIC ## Gold layer
# MAGIC ### Windowed Aggregation

# COMMAND ----------


display(df_silver.select("next_pymnt_d").na.drop().distinct())

# COMMAND ----------


from pyspark.sql.functions import window

"""
Do some real time aggregations with watermarking
"""

df_gold : DataFrame = df_silver.withWatermark("next_pymnt_d", "1 month")\
.groupBy(
window("next_pymnt_d", "10 minutes", "5 minutes"))\
.sum()

display(df_gold)

# COMMAND ----------


"""
Fix column names for aggregation
"""

new_columns = [column.replace("(","_").replace(")", "") for column in df_gold.columns]

df_gold.toDF(*new_columns)\
.writeStream\
.format("delta")\
.option("checkpointLocation", "/tmp/demo_db/gold")\
.option("path", f"/tmp/demo_db/lending_club_stream_gold")\
.outputMode("complete")\
.table("lending_club_stream_gold")

# COMMAND ----------


display(dbutils.fs.ls(f"dbfs:/tmp/demo_db/lending_club_stream_gold"))

The process will look like this:

We will use public Databricks data sets, so everyone can run the code. Once imported, we need to run every step of it.

After finishing the execution of this first notebook, we will have the streams created. We can confirm it by exploring the hive_metastore catalog under demo_db database.

Now we need to import the code that will generate the Delta Live Tables to be consumed using the SQL warehouse.

# Databricks notebook source
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

# COMMAND ----------

"""
Set up configuration
"""

inputData1 = 'GradeA' #spark.conf.get("advancingdlt.pipeline.entityName1")
inputData2 = 'GradeB' #spark.conf.get("advancingdlt.pipeline.entityName2")

# COMMAND ----------

"""
Setup a data set to read schema
"""

@dlt.create_table(name = "lending_club_schema",
comment = "Small table to get lending club schema")

def data_raw_lending_club_schema():
## get the schema from the parquet files
return (spark.read.format("parquet")
.option("inferSchema", True)
.load("dbfs:/databricks-datasets/samples/lending_club/parquet/*.parquet")
.limit(10))

# COMMAND ----------

"""
Ingestion Data
"""

from pyspark.sql.functions import to_timestamp, col

@dlt.create_table(name="lending_club_raw",
comment = "Raw Lending Club Data")

def data_raw_lending_club_raw():
return(spark.readStream.format("parquet")
.schema(dlt.read("lending_club_schema").schema)
.load("dbfs:/databricks-datasets/samples/lending_club/parquet/*.parquet")
.withColumn("earliest_cr_line", to_timestamp(col("earliest_cr_line"), "MMM-yyyy"))
.withColumn("last_pymnt_d", to_timestamp(col("last_pymnt_d"), "MMM-yyyy"))
.withColumn("next_pymnt_d", to_timestamp(col("next_pymnt_d"), "MMM-yyyy"))
.withColumn("issue_d", to_timestamp(col("issue_d"), "MMM-yyyy")))

# COMMAND ----------

"""
State table for all valid records
"""

@dlt.create_table(name="lending_club_cleaned",
comment = "Lending Club Grade state")

def data_raw_lending_club_cleaned():
return(dlt.read_stream("lending_club_raw").distinct())

# COMMAND ----------

"""
State table for valid grades
"""

@dlt.create_table(name="lending_club_grades",
comment = "Lending Club Grade state")

def data_raw_lending_club_grades():
return(dlt.read("lending_club_cleaned")
.select("grade")
.distinct())

# COMMAND ----------

"""
State table for valid payment dates
"""

@dlt.create_table(name="lending_club_payment_dates",
comment = "Lending Club Payment Dates")

def data_raw_lending_club_paymnt_dates():
return(dlt.read("lending_club_cleaned")
.select("next_pymnt_d")
.distinct())

# COMMAND ----------

# DBTITLE 1,Categorize data - Silver Layer
"""
Creating raw delta live tables
"""


@dlt.create_table(name=f"{inputData1}",
comment="Silver batch 1 dataset ingested from /databricks-datasets - Grade A."
)

def data_raw_GradeA():
return (dlt.read_stream("lending_club_cleaned").where("grade=='A'"))

@dlt.create_table(name=f"{inputData2}",
comment="Silver batch 2 dataset ingested from /databricks-datasets - Grade B."
)

def data_raw_GradeB():
return (dlt.read_stream("lending_club_cleaned").where("grade=='B'"))

# COMMAND ----------

# DBTITLE 0,Set up Quality Check
"""
Setting up expectations for quality check
"""

@dlt.table(name=f"Expected_{inputData1}",
comment="Grade A data cleaned and prepared for analysis."
)
@dlt.expect_or_drop("valid loan_amnt","loan_amnt>1500")

def data_prepared():
return (
dlt.read_stream(f"{inputData1}")
)

@dlt.table(name=f"Expected_{inputData2}",
comment="Grade B data cleaned and prepared for analysis."
)
@dlt.expect_or_drop("valid funded_amnt","funded_amnt>2000")

def data_prepared():
return (
dlt.read_stream(f"{inputData2}")
)

# COMMAND ----------

# DBTITLE 1,Extract Top Loanees
"""
Do some data transformation and preprocessing
"""

@dlt.table(name=f"TopLoanees_{inputData1}",
comment="A table containing the top loanees with grade A."
)
def top_loanees_MI():
return (
dlt.read(f"Expected_{inputData1}")
.filter(expr("addr_state == 'MI'"))
.sort(desc("loan_amnt"))
.limit(10)
)

@dlt.table(name=f"TopLoanees_{inputData2}",
comment="A table containing the top lonees with grade B."
)
def top_loanees_CA():
return (
dlt.read(f"Expected_{inputData2}")
.filter(expr("addr_state == 'CA'"))
.sort(desc("loan_amnt"))
.limit(10)
)

# COMMAND ----------

# DBTITLE 1,Unify Top loanees of Grade A and Grade B
"""
Unify cleaned and preprocessed delta live tables
"""

@dlt.table(name=f"all_Top_lonees",
comment="A table containing all the top lonees with both grade A and B."
)

def all_top_lonees():
return (dlt.read(f"TopLoanees_{inputData1}")
.union(dlt.read(f"TopLoanees_{inputData2}"))
)

DLT pipeline

After importing it, we need to create the Delta Live Tables pipeline that will use these definitions. That will be done from Delta Live Tables section.

When creating the DLT datapipeline, select the Python source code previously uploaded.

We can set the Pipeline mode as Continuous in order to ingest new data as it arrives, but that will keep an always running cluster. Choose the pipeline mode based on the latency and cost requirements for your pipeline.

Set a Target schema value in order to publish tables and views to the metastore.

Once created, just hit Start if it didn’t start automatically.

While running, the objects will be created.

Once it finishes, all the steps have been executed.

All done!

New tables are available for consumption!

Catalog Explorer view
Sample Data view
Querying a Delta Live Table

Discover the game-changing effectiveness of Delta Live Tables for yourself. Take a guided tour with us and see how you can supercharge your data processes on the Databricks platform. Let’s join forces to reshape the way we manage data handling. Give it a go now and experience firsthand the effortless capabilities of Delta Live Tables.

--

--

Andrés Zoppelletto
Blue Orange Digital

Senior Systems Engineer with 20+ years of experience in building Business Intelligence, Datawarehouse and BigData solutions