A Case for Delta Live Table as the Workflow for Lakehouse — Integrating Source Data

Lackshu Balasubramaniam
8 min readApr 19, 2022

--

Ducks not trying too hard to get in a row

A Lakehouse requires a reasonably good workflow mechanism to manage the movement of data and for the data engineers to understand the dependencies between the processes.

Concerns around workflows are as below:

  • Dependency Management
  • Incremental Loads — Partitions over time periods
  • Checkpointing and retries i.e. error handling
  • Data Quality Checks — Formulas, Rules, Constraint Checks including Value Ranges
  • Governance — PII, Confidential Data, Controlling Access to Data, Masking/Encryption
  • Backdating
  • Version Control
  • Deployment
  • Operational Concerns
  • Data Recovery or Data Reload

The list is not exhaustive but covers areas that require due consideration. In this article and further articles in this series I will attempt to demonstrate how we could leverage technologies from Databricks to address the concerns listed above.

In this installation I’m covering data integration of ingested data into Bronze and Silver tables. It’s a fairly high level discussion and I might follow on with an article which has code examples.

Integrating to Existing Processes

I’ve been mulling over how I would go about integrating the DLT approach to existing processes for some time now. Another question is how I would go about implementing the orchestration for the different source systems. The following captures my thought process so far and will continue to expand as I make mistakes and learn from them.

The diagram below depicts the different flows we could implement using DLT. I’ll walk through each case in the diagram below.

Processing Source Systems into the Lakehouse via DLT

Relational Database Sources

For relational sources, the source data could come in the following forms

  • Full Load
  • Subset of Table Load. Pull a sliding snapshot of the last 2 years from current date on a daily basis for example.
  • Incremental Load(with an initial Full Load)
  • Change Data Capture (CDC) which would be based off a Sequence Number or Timestamp and an Operation.

Assuming there’re slowly changing dimension type 2 (SCD2) delta tables in place post ingestion, The SCD2 tables would reveal transaction history over time and we could leverage the tables to load data into bronze. However, I’m discounting this for the time being. All things being equal, it’s useful to keep history i.e. the ability track changes over time to the entities. The counter argument is SCD2 could be implemented in the silver layer and it isn’t a hard and fast requirement early on.

We could also go the route of Change Data Feed(CDF) table which would reflect the current state of data and row history up to a point. We’ll use CDF as the basis for the data flow in this article. What this means is that the Bronze Table is a window into the CDF table which would expose the most recent row changes to the orchestration workflow.

Apply Changes Into

Here we need to set the context around apply changes into command which is integral to processing relational sources. This command is a available via Python and SQL for Delta Live Tables. We’re expanding on the SQL example below.

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[WHERE condition]
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]

The KEYS option above is a shorthand which allows for upserts:

  • if a row with the key combination exists in the target live table, the row gets updated with the incoming row.
  • otherwise the row gets inserted.

The APPLY AS DELETE WHEN option handles the delete operation by checking for a condition based on incoming data. This condition check could be achieved via one of the below or some other checks:

  • an operation field implemented via CDC i.e. insert, update, delete
  • a delete flag that could have been implemented in a SCD2 source table
  • a derived column from joining tables

The above complete the Create-Update-Delete considerations.

Please note that the same target row could be operated on multiple times given that the source would be a stream. This is where the SEQUENCE option comes in handy. You would order the rows by some kind of sequencing which could be a transaction id or something equivalent to a Log Sequence Number(LSN). Worst case scenario, it would be a timestamp field.

The EXCEPT column list option is also useful for excluding the fields you’re using to signal delete and/or sequence fields in the target table. I also saw an option for specifying the target as a SCD2 table which would result in To and From fields in a recent presentation but I haven’t tested if it’s available yet.

We use the command to perform a merge into the silver table as of the last load. Please note that the DLT process would have a checkpoint mechanism which would be the basis of last row processed from the source bronze table. The next time the orchestration runs it would continue from where it last stopped (assuming new rows arrived into bronze table) in order to apply incremental changes into the silver table. If we do a full reload in the orchestration, the whole bronze table would get reprocessed and the silver table would get reloaded.

I have to say I’m quite impressed by the level of thinking that lead to the Apply Changes Into command.

Source Table with Change Data Capture (CDC) Feed

Pulling rows from a table with CDC enabled would give us a list of rows and operations performed from a point in time i.e. last watermark. The operation for each row would be indicated as insert, update or delete.

We now have two options to load the data into the Bronze Table. The first route is to perform a merge of the CDC feed into a delta table with CDF. As a result we have a history of changes to rows and could query the table to get the current state of the source table. This CDF table then becomes a source table for materializing the ‘bronze’ table via table_changes function. However this approach would be an overkill because we’re implementing CDF which is a form of CDC on top of CDC from source.

The second approach is to use an auto loader approach on the arriving files to materialize the bronze table itself. The bronze table then becomes a feed of changes applied to the source table over time.

Source Table with Incremental Load

Pulling incremental changes from a table with a sequence number or timestamp column would give us rows that were inserted/updated since the last high watermark. Since we can’t track deletes with this scenario, we assume that we only care about inserts and updates in this case.

The incremental changes are merged into the CDF tables to track inserts and updates. The CDF table is used for two purposes:

  • a table which is a reflection of the source table and is queryable
  • as the means to get the insert/update operations performed on the rows into a bronze table.

Arguably the incremental changes could flow in via auto loader and the apply changes into command would deal with inserts vs. updates. However it would be useful to have a queryable daily snapshot table, also it’s best to standardize operations for database sources.

We then use apply changes into command to perform merge into the silver table leveraging the key column from the bronze table to process rows as of the last load. The orchestration is the same as what was described in the earlier section.

Source Table with Full/Subset Load

For this scenario I’m pulling all rows or a subset (based on a rolling window period) from a table on a daily basis. This could be the case for

  • small tables
  • tables that require delete tracking
  • tables that don’t have a watermark column to identify rows that changed on a periodic basis.
  • tables that don’t have CDC or Change Tracking functionality

The advantage of this approach is we can track deletes to a reasonable extent by comparing the last pull to existing CDF table. We can also identify inserts vs. updates by checking for existence of the row in the CDF table. All of these occurs via the merge command.

Similar to the other scenarios, the CDF table is used as the means to get the insert/update/delete operations performed on the rows into a bronze table which flows into the silver table via apply changes into command.

Semi Structured Files

DLT is an excellent fit for semi-structured data like CSV, JSON or XML as they tend to be append-only operations and are processed as they arrive. For this case the data could be processed continuously or on demand via storage events.

CSV Files

CSV Files tend to be logs or extracts which are streams of data that would be processed into bronze and silver tables.

The source data could be loaded into a Landing Table that would be auto loader based. Auto loader would deal with checkpointing and process data as they land.

The landing table is abstracted into the bronze table as a DLT view or DLT table. Schema checks, data validation/data quality checks and base business logic could then be applied to flow the data into the silver table.

JSON/XML Files

XML or JSON payload tend to look more like datasets as it’s logical to expose them that way via REST API interfaces. It’s best to pull them apart as entities into separate landing tables to feed into separate bronze tables. They could then be merged into a single silver tables or loaded into separate silver tables.

Caveats

Delta Live Table does either incremental or full table refresh. A number of batch scenario would not fit into these scenarios, for example:

  • if we need to reprocess for a particular time window e.g. a 12 month snapshot.
  • if we need to recalculate summary numbers for a large table or a series or large tables and it would introduce unnecessary overheads.
  • It can be also be cumbersome to fit apply changes into every step of the way if we want to process data incrementally

Conclusion

I believe we are at the point where we could make data engineering less complex by leveraging the orchestration functionality available from Databricks. It then becomes a question of how framework-y we make the processes to apply streamlined processes on a larger number of source entities.

I believe we have to err on the side of caution because overapplying frameworks on DLT would overcomplicate matters. Data quality rules and schema checks are some of the low hanging fruits that would fit into framework configurations.

It’s probably best to leverage the provided workflow mechanisms and build custom notebooks for a set of related entities on the basis of subject areas. There might be some commonalities here which would allow for some amount of abstraction.

When we get to Gold layer, the tables tend to be fairly custom because they would be reporting specific. It would be hard to abstract them beyond building views that encapsulate the logic to build the gold tables.

All in all I like the direction Databricks is taking.

--

--

Lackshu Balasubramaniam

I’m a data engineering bloke who’s into books. I primarily work on Azure and Databricks. My reading interest is mostly around psychology and economics.