Using Flink Exactly Once to Mirror Kafka Topics, Is It An Overkill?

Hilmi Al Fatih
5 min readMay 31, 2024

--

Disclaimer: this topic was written at the end of 2021, so this might not be relevant as of now.

In our data platform department, for some reason, we need a system to mirror Kafka topics from one cluster to another. There are hundreds of topics with a huge throughput to be mirrored. The mirrored topic list is expected to grow since this is the gateway to serve our internal user who wants to use our platform.

This system is supposed to be deployed on our rather unstable k8s cluster where pods are sometimes randomly killed, the network is unreachable, etc. The consumers of the mirrored topics are our internal systems that send those records into our Hadoop and Elasticsearch clusters. In the past, we have been doing it using MirrorMaker (version 1) provided by Kafka. We then add our custom logic to enrich the records with metadata.

From our past experience, we learned that data loss or duplicated data can occur with MirrorMaker. Data loss impacts our users significantly such as losing events that affect the metrics calculation, losing important logs that may hinder the debugging, etc. On the other hand, while duplicated logs are rather fine for users, we as a platform provider have to bear the cost. Duplicated records mean 1) Bigger storage required, 2) Costly operations to perform deduplication in the user’s query.

From the above lessons, we want to minimize those impacts by mirroring the records exactly once. At the moment of writing, we research the possibility of which systems to use that serve our purpose well.

We consider the following options due to the long history of our team’s familiarity with each of them. They are 1) Kafka Mirrormaker-2, 2) Kafka Streams, 3) Apache Flink. Our main requirement is one that supports Kafka to Kafka exactly once delivery.

Before jumping into choosing which streaming technology supports our needs, let’s take a look first at how Kafka itself enables transactions. In most cases, the main recipe to enable exactly-once delivery is either transaction support or else idempotent update.

Kafka does not support idempotent updates, since internally it acts as an append-only log queue. This choice of append-only mode is what enables high throughput delivery in Kafka. However, Kafka supports transactional producing instead.

Kafka exposes the following API to perform transactions. Roughly we need to do the following steps:

  1. Initialize the transaction with initTransactions(). When calling this, transactionalId should be set in the producer otherwise it will throw an exception. The broker receives this `transactionalId` and returns a combination of producerId and epoch.
  2. Start the transaction by calling beginTransaction()
  3. Writes logs to the topic via send() method.
  4. commitTransaction() or abortTransaction().

So now let’s go to the details on how each of the options above supports exactly once delivery.

MirrorMaker2

Let’s start with Mirrormaker-2. Historically, our pipeline uses its older brother, Mirrormaker-1 which does not support exactly once delivery. At the moment of writing, mirrormaker2 has a draft (KIP-656) on enabling this feature, but not yet ready. So we decided to drop this option from the very beginning.

Update: it looks like the draft has no progress even until now (been more than 2 years from this writing).

Kafka Stream

One important API provided by Kafka transaction that I have not mentioned yet is the sendOffsetsToTransaction(offsets, consumerGroupId). Recall that offsets are stored by the broker in __consumer_offsets topic. Furthermore, Kafka transactions can span across multiple topics, i.e. we can write to multiple topics and atomically commit it as a single transaction (Do you start to see where it leads to ? 👀). These concepts are used by Kafka Stream to enable end-to-end exactly-once delivery. The idea is to keep sending records for the open transaction, call the sendOffsetsToTransaction method at the end, and finally commit the transaction. In case of failure, there won’t be any change in its view of committed offsets, so it can safely revert to the previous state and redo the process. There will not be any risk of duplicate or data loss here.

However, if we pay more attention to its underlying strategy, we will notice that the end-to-end exactly-once delivery will not span across different Kafka clusters. So unfortunately our system cannot use this since we want to send the records across different clusters.

For more details about EOS at KafkaStream, please refer to KIP-129.

Apache Flink

Flink enables end-to-end exactly once delivery is within its pipeline via two-phase commit. Internally it uses a Chandy-Lamport-based algorithm for its distributed snapshot. The routine is summarized as follows:

  1. Job Manager injects the checkpoint barrier to all of its source operators
  2. The downstream operator snapshots its state upon receiving the barrier and forwards it to the subsequent operator. This stage is called pre-commit in the 2PC point of view.
  3. The pre-commit stage is completed once all operators (from source all the way to sink) have received the checkpoint barrier and successfully snapshot its state.
  4. The Job Manager knows if the checkpoint has been completed, so it notifies all operators about the checkpoint complete event. This stage is similar to the commit phase of 2PC.
Flink two phase commit (source: Flink docs)
Flink two phase commit (source: Flink docs)

Kafka transaction API integrates well with Flink snapshot routines. Upon receiving the checkpoint barrier, the KafkaProducer instance finishes its producing and stores its current state to the state backend. The state includes transactionalId, producerId, and epoch. Later during the notifyCheckpointComplete stage, Flink restores the transaction state (using Java reflection) and commits the transaction.

When we talk about 2PC, the commit state should be guaranteed to eventually succeed. This holds true in general for the mechanism described above. However, several catches require attention. Some of them are:

  1. The commit should be completed within the transaction timeout. Once we surpass the timeout, the commit will not succeed.
  2. There should not be any other producer that uses the same producerId during the lifetime as it may cause conflict, leading to ProducerFencedException.

In the above scenario, both situations lead to data loss. Well, we can in fact reprocess the data, but we will have duplicated data in that situation. In any case, it breaks the exactly-once delivery guarantee.

Wrap up

With the above options, we do not have other options other than using Apache Flink. So we decided to proceed with it. As you may noticed, there are several catches to keep the end-to-end exactly-once delivery guarantee with Flink. We in fact developed some tools around our deployment to keep the guarantee. More on that probably on another story.

Now, to answer the original question in the title, is it overkill to use Flink exactly-once to mirror Kafka topics?

If we are to answer it from the business perspective, it depends. But from the technology perspective, no it is not. At the time of writing, from the understanding of the author, only Apache Flink supports it.

--

--

Hilmi Al Fatih

Software Engineer working on Data Platform Infrastructure