Apache Paimon: Introducing Deletion Vectors

Near real-time updates and extremely fast queries

Giannis Polyzos
8 min readMay 13, 2024

Apache Paimon is now a Top-Level Project under the Apache Software Foundation.

It started as a project under the Apache Flink umbrella (originally called Flink Table Store) and quickly moved from an early-stage umbrella project to an Apache incubator project. In less than 12 months, it has been deployed in many production environments and has grown a strong community.

Apache Paimon uses LSM trees (Log-structured merge trees), the go-to data structure for high-speed data ingestion, used by many popular data systems like RocksDB, Cassandra, ScyllaDB, Clickhouse, etc. It Innovatively combines the LSM structure with the lake format, bringing real-time streaming updates into the lake architecture.

Apache Flink is a unified compute engine and Apache Paimon provides unified lakehouse storage. When integrated with Flink CDC which provides unified data ingestion, we get an end-to-end unified batch & streaming stack for real-time data analytics.

More information on Apache Flink’s roadmap and the vision for the project (along with Flink CDC and Apache Paimon) can be found here.

Unified Storage aims to provide a single table abstraction for batch, stream, and OLAP queries.

Background

Paimon 0.8, was released last week and marks the first official release after the project graduation.

This new release brings many important features and improvements, along with important doc improvements.

In this blog post, we will explore a new feature called deletion vectors.

We will explore why they are needed, and how they can help provide an even better balance between streaming writes and fast queries.

Disclaimer: This article contains contributions from Jingsong Li, PMC chair of Apache Paimon.

Business Use Case

Let’s take a business example, we have an orders table in our database and want to ingest it into the data lake.

CREATE TABLE orders (
order_id BIGINT,
order_name STRING,
order_user_id BIGINT,
order_shop_id BIGINT,
order_product_id BIGINT,
order_fee DECIMAL(20, 2),
order_create_time TIMESTAMP(3),
order_update_time TIMESTAMP(3),
order_state INT,
PRIMARY KEY (order_id) NOT ENFORCED
)

After entering the lake, you can perform batch ETL scheduling and analysis, and query. The general structure is as follows:

Generally speaking, Batch ETL doesn’t have high requirements for read performance and can be completed in minutes. Analytical queries on the other hand need to return within seconds; we need to provide analysts with a good user experience 😄

Next, let’s take a look at how Paimon’s underlying design satisfies the above architecture.

Primary Key Table

This table requires a primary key to be set. It can handle updates automatically in realtime and can also be queried in real-time.

The basic file structure of Paimon is as follows:

The table or partition contains multiple buckets, and each bucket is a separate LSM tree structure containing multiple files.

The writing process of LSM is as follows: With every checkpoint, batches of data get stored on disk. L0 files are flushed on disk and compaction is triggered automatically to merge the data and handle small files.

By default, Paimon doesn’t require you to make MoR or CoW tradeoffs — due to the LSM, but it can mimic similar behavior:

  • MoR (Merge On Read): The data merging process is semi-asynchronous by default (but when there are too many L0 files; i.e lots of incremental data, it might backpressure the writing); To avoid this you can set the compaction to be fully asynchronous (no backpressure during writing).
  • CoW (Copy On Write): The merged data can also be set to be synchronous; that is, after writing trigger a full compaction and merge all the files.

Merge-On-Read

The MoR mode requires merging all files. Because files are ordered, a multi-way merging is performed which requires a comparison between the primary keys.

There is a problem here.

A single LSM tree can only be read by a single thread, so the parallelism of reads is limited. If the amount of data within a bucket is too large, it can result in poor reading performance.

The recommended bucket size is between 200MB and 1GB. This allows keeping the query performance < 10 seconds.

On the other hand, if the Bucket is too small, there will be more small files being read and written, which will put pressure on the file system.

In addition, due to the merging process, filter-based data skipping can’t be performed on non-primary key columns, otherwise, new data will be filtered out, resulting in incorrect old data.

This mode provides the best writing performance, as data does not need to be forcibly merged. However, when reading the LSM, there are performance issues due to multi-path merging:

  1. Single LSM, single thread, limited concurrency.
  2. Non-primary key columns cannot be filtered and pushed down.
  3. Multi-way merging requires certain performance consumption.

Copy-On-Write

An intuitive idea is whether the data can be merged directly during writing. We can set full-compaction.delta-commits to 1, to force a full compaction with every new snapshot. (this is the most “forceful mode”, so you can experiment with different intervals)

ALTER TABLE orders SET  ('full-compaction.delta-commits' = '1');

When reading the data now, there is no need to merge multiple files, so we can get really fast queries. However, since we trigger full compaction with every write, we can notice serious write amplifications.

So basically MoR and CoW can provide two extreme alternatives; either really fast writes or really fast queries.

Although Paimon’s MoR (due to the LSM) is sufficient in most cases, such as Batch ETL, there are some shortcomings in some scenarios that require high-performance query analysis.

Is there a mode that we can achieve a better trade-off between reads and writes?

Deletion Vectors

Paimon 0.8 introduces the Deletion Vectors.

The Deletion Vectors mode is designed to take into account both data reading and writing efficiency.

Vectors are generated during writing, representing which data in the file has been deleted. Unnecessary data can be directly filtered out when reading. This is equivalent to the completion of the merge during full compaction and doesn’t affect read performance.

A simple example is as follows:

Delete data can be marked directly to the Delete file. Upsert updates data by first deleting and then adding.

Let’s look at this mode of reading and writing:

  1. The reading performance is good: during reading, data can be directly retrieved by employing data with deletion vectors, avoiding additional merge costs between different files. Furthermore, data reading concurrency is no longer limited, and non-primary key columns can also be used for filter push-down.
  2. Writing performance: additional overhead (looking up LSM Tree and generating the corresponding Deletion File) will be introduced during writing. You need to query and mark data corresponding to the same primary key, and modify the Deletion Vectors of the historical file.

Generally speaking, in this mode, we can get a huge improvement in read performance without losing too much write performance.

Dealing with deletion files is also easy with Paimon, as it uses LSM and the most common application of LSM is point lookups.

This means we can quickly find the files that need to be deleted, along with their line numbers using Paimon LSM’s point lookup capability.

When the data is written, it will go to the Lookup LSM Tree and produce the corresponding Deletion File. The deleted data can be directly filtered out when reading.

For those familiar with Paimon, it uses the same underlying mechanism with the lookup changelog-producer. Each bucket will produce a corresponding Deletion File.

The structure of the file is as follows:

Each file saves its Deletion Vector through Bitmap. One bucket and one deletion file can minimize the problem of too many small files caused by Deletion files.

Each Bitmap uses a RoaringBitmap structure. Apache Iceberg and Delta Lake already use this approach for query acceleration during batch deletion. The RoaringBitmap is a compressed bitmap that can significantly reduce the storage space amplification.

Testing the Effect of Deletion Vectors

Test Environment

Running Apache Flink 1.17 with Paimon 0.8, for writing on Amazon s3. DV by default is disabled, so deletion-vectors.enabled needs to be set to true.

Then we will be querying the data using Spark 3.3.1 and Trino 422. The latest Paimon-Trino version has been optimized for ORC reading.

Data size

The table schema is the orders table defined at the beginning of this blog post. We generate 500 million records using the Flink Datagen connector with the primary key ranging from 1 to 1 billion; using bucket = 8.

After the writing is completed, a single bucket will contain about 40+ files, with a size of ~5 GB (yes more than the recommended; for testing the effect)

Write performance

  • Without DV enabled: 455 seconds, single concurrent writing of ~135.000 records per second
  • Turn on DV: 937 seconds, single concurrent writing of ~66.000 records per second

The writing performance is twice as slow, but we will continue to optimize it in subsequent versions.

Query performance

As you can see, there is no difference between Trino and Spark, because they both share the same Reader implementation when they need to be merged.

When DV is turned on, Spark’s query performance is greatly improved, while Trino’s improvement is even greater.

Why? Because Trino uses Trino’s ORC Reader; its column storage structure is not needed when merging. This allows native read performance.

Conclusion

Apache Paimon’s Deletion Vectors mode provides a good balance between reads and writes. We can sacrifice some writing performance, but in return, we can get way faster read queries.

In the future, we can expect optimized Vector Deletion support for StarRocks. We should probably expect a powerful performance experience, as StarRocks can provide the best OLAP for Paimon.

Apache Paimon keeps enhancing the Streaming Lakehouse experience. As a unified lakehouse storage, it allows support for all scenarios; batch, OLAP, and stream at minute-level latencies.

Make sure to keep an eye on the project PIPs and if you like it, don't forget to give it some ❤️ via ⭐ on GitHub.

--

--

Giannis Polyzos

Staff Streaming Product Architect @ Ververica ~ Stateful Stream Processing and Streaming Lakehouse https://www.linkedin.com/in/polyzos/