Delta Lake — game-changing update to production Change Data Feed

Stefan Graf
CodeX
Published in
4 min readJun 4, 2023
Photo by Chris Lawton on Unsplash

Everybody who worked with Delta probably already knows the great benefits of using Change Data Feed (CDF). This feature enables your consumers to consume only the changes in your data product very efficiently. It is also a great way, to read from a delta table with a stream, if the table is not appended only.

But there was one big caveat using change data feed in a productionized environment. It was not possible to rename or drop a column while CDF was enabled. You had to choose between column name mapping (which enables renaming or dropping columns) and CDF. This was a big show-stopper for any project where schema changes can occur, until now. With delta 2.3 this problem belongs to the past.

In the following examples, I want to show you how these schema changes work, what you need to do to enable them, and how to work with them. For this, I will create a table, insert data and show you potential problems. All with our beloved CDF enabled. I will use Databricks as the compute engine — here delta 2.3 is supported from DBR 13.0+, but you can use any way you want to interact with your delta files. For Spark users, version 3.3+ is required.

All code can also be found here: Graflinger/mediumblogrepo: Collection of all code samples used for my medium blogs (github.com)

CREATE Table

CREATE OR REPLACE TABLE cdf_test(
id INT
,test STRING
,test2 STRING)
TBLPROPERTIES (
delta.enableChangeDataFeed = true,
delta.columnMapping.mode = 'name',
delta.minReaderVersion = 2,
delta.minWriterVersion = 5)

Here is a simple example table, which shows you what you need to use CDF with non additive schema changes (rename and drop column) enabled.

INSERT data and read CDF

INSERT INTO cdf_test(id, test, test2) VALUES(1, 'test', 'test2')

SELECT
*
FROM
table_changes('cdf_test', 0)

To showcase the normal CDF output, I just inserted one row into the table and selected the CDF from version 0. Here is the data displayed:

Output of change data feed for simple insert

ALTER Table, INSERT with new schema and read CDF again

ALTER TABLE cdf_test DROP COLUMN test2

INSERT INTO cdf_test(id, test) VALUES(2, 'test')

SELECT
*
FROM
table_changes('cdf_test', 3)

Here you can see, that a drop columns works. New data got inserted with the new schema and the CDF got selected like you can see in the picture below:

Output of change data feed after schema change

Commit version problem — Non additive changes

But hold on — if you took a close look you might have discovered that we jumped from commit version 1 to commit version 3 with only 2 INSERTS. Let’s have a look at the table description with the following command:

DESCRIBE HISTORY cdf_test

This command shows all changes to the table.

DESCRIBE HISTORY output

As you can see, the alter table was applied in version 2. You may think why should we even care about his behaviour. Let me cite the documentation.

In Delta Lake 2.3 and above, you can perform batch reads on change data feed for tables with column mapping enabled that have experienced non-additive schema changes. Instead of using the schema of the latest version of the table, read operations use the schema of the end version of the table specified in the query. Queries still fail if the version range specified spans a non-additive schema change.

So what does this mean for us? If a non-additive schema change has been applied, you can’t read from CDF version from before and after the schema change. Let’s take a example to make it easier to understand. If we try to read our table from version 0–3, like here:

SELECT 
*
FROM
table_changes('cdf_test', 0, 3)

We’ll receive a long error message:

DeltaUnsupportedOperationException: Retrieving table changes between version 0 and 3 failed because of an incompatible data schema. Your read schema is {“type”:”struct”,”fields”:[{“name”:”id”,”type”:”integer”,”nullable”:true,”metadata”:{“delta.columnMapping.id”:1,”delta.columnMapping.physicalName”:”col-d016820a-f700–4c0b-9507–43e0a9d554bc”}},{“name”:”test”,”type”:”string”,”nullable”:true,”metadata”:{“delta.columnMapping.id”:2,”delta.columnMapping.physicalName”:”col-0f0d1c47-d11f-45fb-867c-500fa529e72f”}}]} at version 3, but we found an incompatible data schema at version 0.

To mitigate this problem, you need to read everything before the change and everything after. So you can read until commit version 2, and then from commit version 3. This behaviour is very important to keep in mind, because this is a potential source of error for your data product consumers. In the end, a non-additive schema change is always something, which needs to be communicated to every downstream user/system, because it is always a potential breaking change.

Conclusion

In conclusion, Delta Lake 2.3 revolutionizes the usage of CDF by overcoming one of its limitations in a productionized environment. With the ability to rename or drop columns while using CDF, schema changes become seamless and efficient. This update empowers data teams to evolve their data products without compromising performance. However, it’s important to be aware of the commit version behavior when dealing with non-additive schema changes to avoid potential errors. Delta Lake 2.3 solidifies its position as a robust platform, offering enhanced flexibility and reliability for big data workflows.

--

--

Stefan Graf
CodeX
Writer for

Data Engineer Consultant @Microsoft — Data and Cloud Enthusiast