Spring Boot + Kafka Transactions

Milo Felipe
2 min readJun 1, 2020

Kafka applications that primarily exhibit the “consume-process-produce” pattern need to use transactions to support atomic operations. Here’s an example consumer logic:

Without transactions:

  • If an exception is thrown at line 2, message will be consumed again when consumer restarts. All good if process(message) is idempotent.
  • If an exception is thrown right after line 3, notification1 will be published again when consumer restarts. (notification1 is published twice)
  • If an exception is thrown right after line 4, message will be saved in the database again when consumer restarts. (message is saved twice, notification1 is published twice)
  • If an exception is thrown right after line 5, notification2 will be published again when consumer restarts. (notification2 is published twice, message is saved twice, notification1 is published twice)

With transactions:

  • When an exception occurs, all published messages and database operations that happened within a transaction are rolled back. When the consumer restarts (and assuming errors are fixed), all operations within the transaction will only be committed if all succeeds. No multiple publishing of the same notifications. No repeated database inserts.

How to enable transactions?

In Spring Boot, enabling Kafka transactions can be done in the consumer and producer configuration parameters (at the very least).

The KafkaListener could be something like this:

You can use a ChainedKafkaTransactionManager if you want to synchronize the database transaction with the Kafka transaction. Define the following beans:

Start your Kafka Cluster. Then start your Spring Boot application.

Here are two exceptions you might encounter when you start your application:

Transactional Id authorization failed. — This means your Kafka Cluster’s security is enabled so you need ACLs configured for your user. I’m using Confluent Kafka Community and this is how I do it:

./kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --transactional-id * --allow-principal User:* --operation write

In older versions, transactional-id * should be ‘*’.

Timeout expired while initializing transactional state in 60000ms. — This means your cluster doesn’t have enough brokers to support transactions. You probably just have one broker because you’re developing locally. You have to update the broker configuration parameters:

transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

--

--