Building an Idempotent Event Consumer with Quarkus and Kafka

How to implement a simple Kafka consumer that detects and discards duplicate messages.

Dunith Danushka
Tributary Data
5 min readJul 25, 2021

--

Photo by Pablo Heimplatz on Unsplash

Message consumers deal with state manipulations very often. Message brokers can send duplicate messages due to various reasons. So the consumers should master the art of ‘Idempotent message handling’ to

overcome severe flows in the application state.

In this post, I discuss a simple mechanism to detect and discard duplicate messages in message processing.

Why do we receive the same message more than once?

The execution cycle of a typical message consumer would look like this.

Typical message consumer

The consumer repeatedly executes the following activities.

  • Receive the message from the broker.
  • Process the message by executing some business logic.
  • Acknowledge the broker about the message that it has been processed successfully and should not be redelivered.

However, we don’t live in a perfect world. Failures should be expected. In our case, the acknowledgment of processing might not reach the broker due to several reasons.

Due to consumer failure — The message consumer may crash in the middle of the processing. Hence, failing to send the acknowledgment.

Due to message broker failure — By the time the consumer’s acknowledgment reaches the broker, it might have crashed or gone offline. So it misses the acknowledgment.

Due to a network failure — The consumer and broker are available. But the acknowledgment gets lost in transit due to some network-related error.

Typically, a message broker pledges to deliver a message to its destination at least once. Thus, in either of the above cases, the broker attempts to redeliver the unacknowledged messages. So the consumer can receive the same message more than once.

Why idempotent message consumption is important

A consumer is considered idempotent if it produces the same result even after multiple invocations.

The nature of the processing logic causes the consumer to be idempotent or not. For example, stateless processing, like calculating a value based on inputs, is naturally idempotent.

Stateless vs stateful message consumption. Stateless consumers MUST deal with duplicate messages to overcome severe flows in the state.

But some operations need to be made idempotent explicitly. Especially the operations that deal with the state. Duplicate processing of messages can do severe damages to the consumer state. For instance, a consumer who updates the account balance must discard duplicate withdrawal events. Otherwise, the account will be debited incorrectly.

Making consumers idempotent

The fundamental principle of an idempotent consumer is to keep track of the messages that it had processed.

Based on that, let’s build a simple framework to accomplish this. The framework requires the following.

1. Unique message ID: The producers should add a unique ID to the messages while publishing to the broker. It could be something like UUID or a random string. The point is to identify a message uniquely. As far as I know, some messaging specifications like JMS support this naturally.

2. A place to store message IDs: The consumer must take the responsibility of reliably recording the message IDs that it had successfully processed. The simplest way is to create a message_log table in a database. I will explain that shortly.

3. The duplicate detection logic: Finally, the consumer should check the message_log table before processing any received message. If the table already contains the message ID, the processing should not happen. Otherwise, the consumer processes the message and records its ID in the message_log table.

The following illustrates the simple framework described above.

Message de-duplication logic

Implementation with Quarkus, MySQL, and Kafka

I did a reference implementation for this based on a message consumer written in Quarkus. This Debezium sample heavily influenced that.

It reads messages off a Kafka topic and simply logs the message content, followed by writing message ID to the consumed_messages table in a MySQL database. You can find the source code for the completed project here.

The consumed_messages table

The table definition is straightforward, and it has the following structure. The columns are self-explanatory.

Hibernate-based ORM classes

I’ve created the ConsumedMessage and MessageLog classes to map the table with Hibernate so that I can avoid writing hand-rolled SQL statements to deal with it.

ORM classes that models the consumed_messages table in the application

Also, Quarkus provides the JPA annotations to mark methods that need to be transactional. In the MessageLog class below, both methods are performed within a scope of a transaction. If something goes wrong such as a unique constraint violation at the table level, we can have a clean rollback.

The Event Handler

The OrderEventHandler class below receives an OrderEvent object and checks whether the event ID has already been processed or not. If processed, the event is returned. Otherwise, the relevant business logic is invoked, and the event will be marked as processed.

Notice that the onOrderEvent() method is also made transactional for reliability and consistency. If the business logic throws an error or saving the message ID to consumed_messages fails due to some reason, we will have a clean rollback.

The Kafka event consumer

The KafkaEventConsumer is an infrastructure-level class. It reads messages off the ‘orders’ topic, deserializes the JSON encoded content as OrderEvents, and hands them over to the OrderEventHandler. You can find the source code here.

The order event

A typical order event coming from Kafka would look like the following. It has a unique eventId.

Alternative approaches

In his blog post, Chris Richardson explains two alternative implementation approaches.

One approach utilizes the consumed_messages table, which is keyed by the message ID. The consumer attempts to insert the ID of the received message into this table, followed by an application table update. If the ID already exists, the database transaction will fail, allowing the consumer to roll back everything and acknowledge.

The second approach focuses more on NoSQL databases where ACID transactions are limited. There, the message ID is stored as an attribute in the same table that is being updated.

Takeaways

A message broker can deliver the same message repeatedly. Duplicate message processing can cause severe bugs in stateful consumers.

To make the message consumption idempotent, the consumers must record successfully processed messages and discard duplicates.

Chris Richardson identified this as the Idempotent Consumer pattern.

--

--

Dunith Danushka
Tributary Data

Editor of Tributary Data. Technologist, Writer, Senior Developer Advocate at Redpanda. Opinions are my own.