Apache Kafka Transactions

Chakresh Tiwari
ShoutLoudz
Published in
4 min readMay 11, 2024

--

Kafka Transactions
Photo by Towfiqu barbhuiya on Unsplash

Introduction

In this blog post, we are going to discuss how Kafka transaction happens. So anytime we are publishing data into a Kafka topic from a producer, a communication between the producer and Kafka happens, this communication is known as a Transaction. And there could be cases like sometimes these transactions can fail. So we will understand how to enable Kafka transactions in producers how to handle success and failure cases and how to handle consumers to only read success messages.

We have three spring boot-microservices:

  1. Transfer Service — When a transfer request comes it withdraws the amount from the sender's account and deposits it in the receipts account. Since we are developing event-driven applications transfer and deposit events will be published into two different topics (deposit and withdraw) in Kafka. So this service works as a producer service.
  2. Withdrawal Service- This service will consume the messages from the withdraw topic.
  3. Deposit Service- This service will consume the messages from the deposit topic.

Enable Kafka Transaction in Producer Microservice

To enable Transactions we need to add a property in our spring boot producer microservice.

spring.kafka.producer.transaction-id-prefix=transfer-service-${random.value}-

This property works as a bookmark which helps Kafka to remember where it left a particular transaction and this ID should be unique in each producer node.

Each Kafka message is part of a transaction, once a transaction fails based on transaction ID Kafka finds the status of messages, whether they were delivered or not.

In Spring boot we can enable transactions from code also by adding property in producer config.

@Value("${spring.kafka.producer.transaction-id-prefix}")
private String transactionalIdPrefix;

props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalIdPrefix);

To enable transactions in the spring boot producer microservice, we add Transactional annotation on top of the method. By default spring rollback a transaction if any runtime exception occurs, no rollback for checked exceptions, you can specify this by mentioning which exception to rollback. If there is no more than one type of transaction then no need to mention any property Just Transactional annotation will be enough.


// @Transactional(value = "kafkaTransactionManager", rollbackFor = TransferServiceException.class,
// noRollbackFor = SQLClientInfoException.class)
@Transactional
@Override
public boolean transfer(TransferRestModel transferRestModel) {
WithdrawalRequestedEvent withdrawalEvent = new WithdrawalRequestedEvent();
DepositRequestedEvent depositEvent = new DepositRequestedEvent();

try {
kafkaTemplate.send(environment.getProperty("withdraw-money-topic", "withdraw-money-topic"),
withdrawalEvent);
LOGGER.info("Sent event to withdrawal topic.");

// Business logic that causes and error
callRemoteServce();

kafkaTemplate.send(environment.getProperty("deposit-money-topic", "deposit-money-topic"), depositEvent);
LOGGER.info("Sent event to deposit topic");

} catch (Exception ex) {
LOGGER.error(ex.getMessage(), ex);
throw new TransferServiceException(ex);
}

return true;
}

Enable Consumers to read only committed messages

Consumer microservices should read only committed messages, If the transaction is not successful Kafka message will not be marked as committed and this message will not be visible to consumers

spring.kafka.consumer.isolation-level=READ_COMMITTED
By default its value is read_uncommited, so that is not correct in case of
failed transactions

So if the transaction fails to complete, the consumer microservices will not receive that event and the transaction will be rollbacked. The transaction could fail because of reasons like Kafka published the data into the topic successfully but subsequent logic throws some exceptions, in that case, wrong data should not be processed by consumers.

Local Transactions

We can send multiple Kafka messages in one transaction, Single transactions will be used to publish messages into different topics. It means either all messages are successfully written or none. If any exception occurs outside of the local transaction then the local transaction will not rollback.

Kafka and Database Transactions

Now in transfer microservice, we will add logic to insert transaction details into the database as well.

  1. So when this API gets called, it publishes events in the two Kafka topics and those will be read deposit and withdraw consumer microservices. Now in the same method, we need to persist this transaction data into the database. So now here two transactions came one is to publish data into Kafka and another in DB.
  2. Kafka transactions will be managed by KafkaTransactionManager and DB transactions will be managed by JPA Transaction Manager.
  3. Now we can have a scenario in which both transactions should happen under one main transaction or both can happen separately. But if they are happening separately then if one transaction failed to publish data into Kafka topic but the DB transaction did not fail it will cause incorrect data. So transactions should be like this if one fails then another should also roll back.
  4. Consumer services should read only committed data.
  5. Now the transactional annotation in code takes the bean name in argument to manage which transaction. Now in this case we need to use both Kafka and JPA transaction manager, because if anything fails both transactions should roll back.

Code Block

@Bean("transactionManager")
JpaTransactionManager jpaTransactionManager(EntityManagerFactory entityManagerFactory) {
return new JpaTransactionManager(entityManagerFactory);
}

@Transactional(value = "transactionManager")
@Override
public boolean transfer(TransferRestModel transferRestModel) {
WithdrawalRequestedEvent withdrawalEvent = new WithdrawalRequestedEvent(transferRestModel.getSenderId(),
transferRestModel.getRecepientId(), transferRestModel.getAmount());
DepositRequestedEvent depositEvent = new DepositRequestedEvent(transferRestModel.getSenderId(),
transferRestModel.getRecepientId(), transferRestModel.getAmount());

//save data to DB
TransferEntity transfer = new TransferEntity();
BeanUtils.copyProperties(transferRestModel, transfer);
transfer.setTransferId(UUID.randomUUID().toString());
try {
transferRepository.save(transfer);

//same as above
}
catch(Exceptione){

}

}

This creates a bean with the name transactionManager and this bean name will used in the transactional method, to handle both Kafka and Jpa transactions.

So now if any exception occurs in the transfer method, the data will be removed from both DB as well as Kafka.

This is all about Kafka Transactions.

Thanks for reading!!

--

--

Chakresh Tiwari
ShoutLoudz

Software Engineer at Cisco(Appdynamics) , Sharing my knowledge and experience related to work. I am here to help learners to prepare for tech interviews.