Upserting Data using Spark and Iceberg

Use Spark and Iceberg’s MERGE INTO syntax to efficiently store daily, incremental snapshots of a mutable source table.

Jonathan Merlevede
datamindedbe
6 min readMay 25, 2023

--

Iceberg allows tracking a table’s history by storing incremental diffs. Unfortunately, there are some caveats, and getting this to work as you likely want it to requires non-obvious querying. In this post, we look at the why and the how.

We use Spark as our analytical engine, but this post should also apply to other engines working with Iceberg.

Merge iceberg into table. Thanks, DALL-E!

Problem setting

A typical pattern in analytical data processing is to ingest data from an operational system on a daily basis. Often, we store previously ingested table versions in addition to the current state of things, for example, to support reproducing machine learning results. When working with “standard” Spark and Parquet, we do this by storing daily snapshots and by partitioning on the ingestion date.

We aim to use Apache Iceberg to achieve the same result — storing a queryable history of table snapshots — more efficiently. Iceberg is a project offering a metadata format and a set of execution engine plugins. This extends popular analytical processing engines like Spark, Flink, and Trino with features such as incremental updates, time travel, and ACID transactions.

Upserting

Using Spark with Iceberg unlocks the SQL MERGE INTO statement, which implements a table “upsert”, a portmanteau formed by combining a “table insert” and “table update”:

MERGE INTO prod.db.target t -- a target table
USING (SELECT ...) s -- the source updates
ON t.id = s.id -- condition to find updates for target rows
WHEN MATCHED AND <predicate_x> THEN DELETE
WHEN MATCHED AND <predicate_y> THEN UPDATE *
WHEN NOT MATCHED THEN INSERT *

The code above uses the result of the SELECTstatement to delete, update and insert rows from and into the table prod.db.target, depending on whether an id exists in the source table or not and whether or not <predicate_x> or <predicate_y> are true. For an overview of the MERGE INTO statement, check out the Iceberg documentation here.

On the storage side, executing an upsert statement like the one above triggers Iceberg to create new data files corresponding to any modified partitions (~ copy-on-write) or to create small files expressing e.g. deletes (~ merge-on-read, available since Iceberg v2). Iceberg also creates new metadata files pointing to these new data files. Unchanged data files (partitions) are re-used. This allows efficient storing of snapshot series by keeping only the snapshot “deltas”. Earlier versions of the prod.db.target table can be “recalled” by time traveling, using the TIMESTAMP AS OF or VERSION AS OF clauses.

Iceberg table format spec. (source)

TL;DR Upserting allows us to keep our target copy up-to-date while maintaining a complete history of previous states of a source table, without storing full snapshots of the data.

MERGE INTO limitations

How can we use upserts to reconcile differences between an existing Iceberg table and a newly extracted snapshot?

Assume that we extract or create a mutable source table snapshot on a daily basis, and want to use it to upsert an Iceberg table called iceberg. Ideally, we would be able to write the following:

MERGE INTO iceberg
USING snapshot
ON iceberg.id = snapshot.id
WHEN MATCHED THEN UPDATE *
WHEN NOT MATCHED BY TARGET THEN INSERT *
WHEN NOT MATCHED BY SOURCE THEN DELETE

Unfortunately, this straightforward query does not work for two reasons.

Deletes. Unlike Delta, Iceberg does not support the syntax MATCHED BY SOURCE. Iceberg’s NOT MATCHED statement corresponds to NOT MATCHED BY TARGET, that is, it fires when a row exists in snapshot but not in iceberg. This is problematic if rows can be deleted from the source system.

Superfluous copies. For rows that are the same in iceberg and snapshot, the WHEN MATCHED THEN UPDATE * results in an identical duplicate of the data being stored on your filesystem. This means that Iceberg will not bring storage benefits over storing multiple snapshots of the source table.

Under the hood, Iceberg decides which partitions it will re-write based on the ON conditional (see this issue for a discussion on the impact on performance). Rows matched by the ON statement but not by any guard on the match conditions will, therefore, still be copied every time you run the upsert statement. Practically, this means that re-writing the MERGE INTO statement above to read +- as follows still results in duplicate partitions being stored:

MERGE INTO iceberg
USING snapshot
ON iceberg.id = snapshot.id
-- condition expressing change is true if one or more columns is different
-- in iceberg and target, i.e.
-- (iceberg.col1 != snapshot.col1) OR (iceberg.col2 != snapshot.col2) OR ...
WHEN MATCHED AND <condition_expressing_change> THEN UPDATE *
WHEN NOT MATCHED BY TARGET THEN INSERT *
WHEN NOT MATCHED BY SOURCE THEN DELETE

(More detailed analysis with references to source code below)

In fact, your MATCHED conditions are always re-written to include a catchall condition that emits the target row.

Iceberg will construct modified partitions differently depending on whether your MERGE INTO statement contains only MATCHED conditions, NOT MATCHED conditions or both. If only MATCHED conditions exist, a right outer join between target and source suffices (with source being on the right). If only NOT MATCHED conditions exist, Iceberg uses a left anti join and performs a simple append operation instead of re-writing partitions. If both MATCHED and NOT MATCHED conditions exist, a full outer join between target and source is required.

To determine which rows/partitions of the target to re-write, Iceberg performs a quick inner join between source and target tables. To support NOT MATCHED BY SOURCE, a right outer join would be required, as is implemented by Delta here.

Overcoming MERGE INTO limitations

Luckily, we can easily overcome these properties of Iceberg’s upserting functionality. We do this by first preparing a table containing only the changes to your iceberg table. When dealing with sources where rows can be updated and deleted, this requires a full outer join or a sequence of anti-joins. Then, we can use this changes table as the source of updates for our MERGE INTO statement.

One way to do this is by using a CTE as follows:

WITH changes AS
SELECT
COALESCE(b.id, a.id) AS id,
b.col1 as col1,
b.col2 as col2,
...
CASE WHEN b.id IS NULL THEN 'D' WHEN a.id IS NULL THEN 'I' ELSE 'U' END as cdc
FROM iceberg a
FULL OUTER JOIN snapshot b ON a.id = b.id
WHERE NOT (a.col1 = b.col1 AND a.col2 = b.col2 AND ...)

MERGE INTO iceberg
USING changes
ON iceberg.id = changes.id
WHEN MATCHED AND changes.cdc = 'D' THEN DELETE
WHEN MATCHED AND changes.cdc = 'U' THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

This results in only changes being stored in the target table iceberg, and supports insertions, updates, and deletes in the source table snapshot.

The table changes includes a row for every new insert, update or delete in the source table. To construct changes, we perform a full outer join between the recent snapshot of the source table (snapshot) and the existing Iceberg table (iceberg):

  • If a row existed in iceberg but no longer exists in snapshot (b.id is null), this corresponds to a delete operation in the source table.
  • If an id exists in snapshot but does not yet exist in iceberg (a.id is null), this corresponds to an insert operation in the source table.
  • If a row with a specific id exists in both iceberg and snapshot (both a.id and b.id are non-null), the row with this id was unchanged or updated. We filter out unchanged rows by specifying WHERE NOT (a.col1 = b.col1 AND a.col2 = b.col2 AND ...).

The changes tables only has entries for rows actually requiring changes in the iceberg table, working around the problem of superfluous updates. Merging changes into iceberg using MERGE INTO is straightforward and works the way you would expect it to.

This post looked at how we can leverage Iceberg to maintain a history of full table snapshots efficiently.

Iceberg requires some tinkering for it to work the way we want it to, but enables patterns that were previously impossible or inefficient to use with Spark. Iceberg extends Spark's capabilities with functionality that was previously only available within data warehouses like Snowflake. We hope that with time, Iceberg will become even easier to use. It's definitely a technology worth exploring!

Edit 21/11: Added some sentences to the introduction and re-wrote some sentences for clarity.

--

--