Databricks Delta Live Tables

Mohit Agarwal
3 min readDec 26, 2022

--

Delta Live tables is a new feature designed by Databricks to built a production ready ETL pipeline in a much efficient manner.

Advantages of DLT

  1. We can Create/Manage/Monitor the Pipelines/Codes in Databricks.
  2. No need to learn Multiple ETL Platforms i.e Azure Data Factory, Glue, Turbine, etc.
  3. Clean Data Lineage to monitor the flow of Data (Source to Target).
  4. Complex Merge statements can be abstracted under apply_changes built in function.
  5. Easy to Audit and Monitor the changes.
  6. No custom fancy code needed for Data Quality and Governance, could be achieved using expectations property.

DLT Tables / Views

Syntax for Creating DLT tables

import dlt

@dlt.view(name='sample_v',comment="reading from souce"):
def sample_v():
df = spark.read.load("abfs://<container_name>@<storage_account>.dfs.core.windows.net/tablepath")
return df

@dlt.table(name='sample_t',comment="creating full load table from source",path='mount path or abfs path where You want to store table')
def sample_t():
return spark.sql("select * from live.sample_v")

Note:

1) @dlt.view is a temporary view similar to normal views in any database, they only remain in existence till the runtime of your pipeline. By default name of decorator function is name for view if not passed manually in name parameter.

2) @dlt.table creates a delta table, by default name of decorator function is name for table if not passed manually in name parameter. Also, if path is not defined explicitly then it takes by default path of storage location in DLT pipeline settings.

3) live is a defined keyword and could be used throughout the code snippet to access the tables/views.

4) Whenever we create using @dlt.table syntax, tables are full load each time, means history will not be maintained for any record and full refresh will take place from source/view.

Tale of Incremental Load

Earlier developers use to write big chunk of code for merge statements, which was not only complex but hard to debug, with this new feature this could be achieved in more cleaner and efficient way.

@dlt.view(name='sample_v1',comment="reading from souce"):
def sample_v1():
df = spark.readStream.option("ignoreChanges",True).load("abfs://<container_name>@<storage_account>.dfs.core.windows.net/tablepath")
return df

dlt.create_streaming_live_table(name="target",path="abfs target path")
# similar to if not exist create table

dlt.apply_changes(
source = "sample_v1",
target = "target",
keys = [<list of keys>],
sequence_by = "<order by column>"),
stored_as_scd_type = 1 or 2
)

Source: dlt view that was created for reading the data from source

Target: Table which was created by using dlt.create_streaming_live_table (table_name)

keys : primary keys to compare on. (similar to partition by clause)

stored_as_scd_type : 1(upsert) or 2(maintain history)

sequence_by: similar to order by clause

Note:

  1. Using this function we can instruct DLT to perform loads incrementally on your non streaming sources. It will compare the new records based on the keys and sequence by columns and based on scd type chosen.
  2. Column names should be same in both source and target within apply changes function (this can be achieve by making temporary view).
  3. We can not create non streaming tables to perform incremenatal loads, it only supports create_streaming_live_table function, which creates streaming tables
  4. IgnoreChanges = True property must be enabled to handle deletes/updates from source.
  5. After running the pipeline 3 folders are created — tables(it stores the actual parquet files) , checkpoints (since this creates streaming tables), system (to audit logs for pipelines)
  6. apply_changes function creates one table with name __apply_changes_storage_table_name and a view with the same table name. In order to delete the tables completely we have to drop both views and tables.

General points about pipelines settings-

  1. We can add n number of notebooks in a single pipeline (all of them will run in parallel based on race condition of resource)
  2. Workflows could be used to add dependencies in the pipeline.
  3. Global Parameters could be passed from Pipeline to notebook.
  4. Storage location in pipeline settings can be updated after pipeline is created.
  5. We can pass storage account keys from KeyVault in the pipeline setting Json.
  6. Pipelines run in two modes- Development and Production modes , the only difference is the cluster shutdown time, with dev mode cluster remains active for some time so that it reduce the time for development, but in prod mode it recreates cluster each time and hence cost is reduced but time increases.

Hope this article helps you with your journey in Delta Live tables.

Contributors for this blogpost-

Utkarsh Srivastava- Data Engineer | Utkarsh Srivastava | LinkedIn

Mohit Agarwal- Data Engineer | Mohit Agarwal | LinkedIn

--

--