How to read Delta Lake tables in plain SQL with DuckDB

Jimmy Jensen
7 min readMar 5, 2024

--

Edit: As of DuckDB 0.10.3 (2024–05–22) the extension duckdb_delta is now an experimental feature. Now DuckDB natively supports reading the delta protocol. The extension is automatically loaded upon using it.

Reading from Azure or S3 requires additional setup:

-- Read from local file system
SELECT
*
FROM delta_scan('file:///path/to/deltatable');

-- Read from S3
SELECT
*
FROM delta_scan('s3://bucket-name/path/to/delta-table');

-- Read from Azure Blob Storage
SELECT
*
FROM delta_scan('az://container-name/path/to/delta-table');

-- Read from Azure Data Lake Storage (ADLS)
SELECT
*
FROM delta_scan('abfss://container-name/path/to/delta-table');

I created a small DuckDB Web GUI which you can take inspiration from: mrjsj/delta-lake-explorer (github.com)

Introduction

I’ve been wanting to build an easy and efficient data platform for a hobby project for quite a while.

I want an open source, single node data platform with robust orchestration, fast and efficient data processing, the benefits of an open storage format with ACID transactions, an easy and powerful SQL interface for end users, and a flexible, responsive BI-tool supporting BI-as-code.

The data stack, I’ve come up with is

However, there has been one thing that bugged me. I want Delta Lake. But DuckDB unfortunately doesn’t support reading from delta tables at this moment. And on the other hand I want DuckDB.

DuckDB and Delta Lake

In case you haven’t heard about DuckDB yet, here’s a short description.

DuckDB is a fast in-process analytical database — DuckDB.org

You can think of it being the SQLite of OLAP databases.

Delta Lake is a storage framework which enables building a Lakehouse architecture on parquet files. Tables are stored as parquet files together with a delta log containing information on which table operations have been applied to the table.

DuckDB can read parquet files, but it must take the delta log into account to only read the current parquet files of the table. A feature which it does not yet support.

Current workarounds

There are various workarounds.

These workarounds only work in Python or through the DuckDB CLI. But many BI tools only support SQL. This made me think about how I would tackle this issue with plain SQL.

One way I came up with to simply use Polars to convert the latest version of the delta table to plain parquet files as a final step in the ETL. Not an elegant solution.

Another thought I had was for the ETL process to generate a view consisting of the current parquet files making up the delta table. But this require to update the view whenever the a change was made to the Delta Lake table, and it wouldn’t support time travel.

I wanted something which didn’t copy any data, and wouldn’t need to be refreshed. It should just work.

What I came up with

To my knowledge, there’s no workaround/hack using plain SQL in DuckDB — until now.

I created my own Delta Lake table reader using SQL by querying the delta log json-files, and combined it with a filter to read the files read by the DuckDB function read_parquet.

In a perfect work, I could just pass a list of files to the read_parquet function, however it was not possible, as read_parquet only takes a constant parameter.

The delta log

The delta log contains all the history of added and removed parquet files, but reading it into SQL doesn’t look great.

WITH delta_log AS (SELECT 
add,
remove,
commitInfo,
str_split(filename,'/')[-1] AS file_name
FROM read_json_auto(
'./delta_table/_delta_log/*.json',
filename=true
)
)
SELECT * FROM delta_log;
The _delta_log read into DuckDB with read_json_auto

However, these are the fields needed for identifying the current files of the Delta Lake table. I do some string manipulation as DuckDB gives you the absolute file path instead of just the filename.

The next step is to create three separate queries for the commitInfo, the add actions and the remove actions, respectively. These queries are enriched with the corresponding commitInfo of the transaction by joining on the filename. The commitInfo holds the timestamp of when the transaction was committed.

Commit info

The commit info of the transactions log contains information on — you guessed it — the commits to the Delta Lake table. It has information on which operation was committed at which time, and a bunch of other metadata. However, for this specific case, we’re only interested in the timestamp — to correctly filter which parquet files to include when reading the Delta Lake table.

WITH commit_info AS (
SELECT
unnest(commitInfo),
file_name
FROM delta_log
WHERE commitInfo IS NOT NULL
)
SELECT * FROM commit_info;
Example of the commitInfo of a Delta Lake table transaction log

Add actions

These contain information on which files were added at which version to the Delta Lake table. I have joined the commit_info to get the timestamp column.

The add actions also contain information on the size of the specific parquet file, the number of records and other statistics used by Delta Lake for file skipping when reading the table. These extra information are not required either for this specific case.

WITH add_actions AS (
SELECT
unnest(add),
delta_log.file_name,
commit_info.timestamp
FROM delta_log
JOIN commit_info ON delta_log.file_name = commit_info.file_name
WHERE add IS NOT NULL
)
SELECT * FROM add_actions;
Example of add-actions from the Delta Lake table transaction log

Similarly, the remove actions contain information on which files were removed from the Delta Lake table.

Filtering current files

Now we have the building blocks required for identifying which parquet files make up the current Delta Lake table.

We take all paths (files) from the add actions which have not been later been removed by a remove action. As files can be re-added to the table (for example by restoring a previous table version), the timestamp condition is crucial.

current_files AS (
SELECT
path
FROM add_actions
WHERE NOT EXISTS (
SELECT
1
FROM
remove_actions
WHERE add_transactions.path = remove_transactions.path
AND add_transactions.timestamp < remove_transactions.timestamp
)
SELECT * FROM current_files
The two parquet files making up the current Delta Lake table version

We’re left with the two parquet files which the Delta Lake table currently consists of.

Final step

Finally, we simply read in all the parquet files using the regular read_parquet and filter on the current files.

SELECT 
*
FROM
read_parquet(
'./delta_table/**/*.parquet',
filename=true,
hive_partitioning=true
) as deltatable
WHERE EXISTS (
SELECT
1
FROM
current_files
WHERE ends_with(deltatable.filename, current_files.path)
);

Quack it into a macro

A macro in DuckDB can either return a scalar or a table. The latter is much like a view, but with the benefit of accepting parameters. You call it like you would with read_json, read_csv or read_parquet, i.e.

SELECT * FROM my_macro(a, b);

This also lets you put additional filters which get propagated to the macro, i.e.

SELECT * FROM my_macro(a, b) WHERE my_filter = 'abc';

All the above steps can be put into a parameterised macro for reusability. You will notice, I sneaked in an optional version parameter to support time-travel.

CREATE OR REPLACE MACRO read_delta(delta_table_path, version := -1) AS TABLE
WITH delta_log AS (
SELECT
add,
remove,
commitInfo,
str_split(filename,'/')[-1] AS file_name
FROM read_json_auto(
concat(delta_table_path, '/_delta_log/*.json'),
filename=true
)
WHERE str_split(file_name,'.')[1]::int <= version::int
OR version::int = -1
), commit_info AS (
SELECT
unnest(commitInfo),
file_name
FROM delta_log
WHERE commitInfo IS NOT NULL
), add_actions AS (
SELECT
unnest(add),
delta_log.file_name,
commit_info.timestamp
FROM delta_log
JOIN commit_info ON delta_log.file_name = commit_info.file_name
WHERE add IS NOT NULL
), remove_actions AS (
SELECT
unnest(remove),
delta_log.file_name,
commit_info.timestamp
FROM delta_log
JOIN commit_info ON delta_log.file_name = commit_info.file_name
WHERE remove IS NOT NULL
), current_files AS (
SELECT
path
FROM add_actions
WHERE NOT EXISTS (
SELECT
1
FROM
remove_actions
WHERE add_actions.path = remove_actions.path
AND add_actions.timestamp < remove_actions.timestamp
)
)
SELECT
*
FROM
read_parquet(
concat(delta_table_path, '/**/*.parquet'),
filename=true,
hive_partitioning=true
) as deltatable
WHERE EXISTS (
SELECT
1
FROM current_files
WHERE ends_with(deltatable.filename, current_files.path)
);

Then the macro can be called from a regular query:

SELECT 
*
FROM
read_delta('./delta_table')
Example output of read_delta as of current version

Or with time-travel:

SELECT 
*
FROM
read_delta('./delta_table', version := 5)
Example output of read_delta as of version 5

Final thoughts

Initially, I didn’t like this solution as I’m reading all the parquet files and then filtering on the filename, however I have not yet experimented with the performance of this.

Now, I am now quite positive, as DuckDB leverages both projection and filter pushdown which should limit the amount of data read when querying Delta Lake tables. You can read more about it here.

I’m eager to try it out myself with larger tables and larger transaction logs.

If you decide to try this out, please let me know it went!

--

--