Why did we choose Kafka

Pavel Likin
IT’s Tinkoff
Published in
6 min readApr 30, 2020

I work with the Tinkoff team, which is developing its own notification center. Mostly I am developing on Java using Spring boot and solving various technical problems that arise in the project.

Most of our microservices communicate asynchronously with each other through a message broker. Earlier as a broker, we used IBM MQ, which stopped handling the load, but at the same time had high delivery guarantees.

As a replacement, we choose Apache Kafka, which has high scalability but unfortunately requires a nearly individual approach to configuration for different scenarios. In addition, the Kafka default delivery mechanism did not allow us to maintain the required level of consistency from the box. Next, I’ll share our experience with Kafka configuration, in particular, how to configure and live with once delivery.

Guaranteed delivery (and not only)

The settings discussed below will help prevent a number of problems with the default connection settings. But first, we want to pay attention to one parameter, which will facilitate a possible debug.

This will help client.id for Producer and Consumer. At first glance, you can use the application name as a value, and in most cases it will work. Although the situation when an application uses several Consumer’s and you set them to the same client.id, leads to the following warning:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

If you want to use JMX in an application with Kafka, it can be a problem. In this case, it is best to use a combination of the application name and, for example, the name of a topic as the client.id value. The result of our configuration can be seen in the output of the kafka-consumer-groups command from the Confluent utilities:

Now let’s look at the guaranteed message delivery script. Kafka Producer has an acks parameter that allows you to configure how many times the cluster leader needs to consider the message as successfully recorded. This parameter can take the following values:

  • 0 — acknowledge will not be considered.
  • 1 — the default parameter, you only need to acknowledge from 1 replica.
  • -1 — acknowledgment from all synchronized replicas is required (setting up the cluster min.insync.replicas).

From these values, you can see that acks equal to -1 gives the strongest guarantees that the message will not be lost.

As we all know, distributed systems are unreliable. To protect against temporary malfunctions, Kafka Producer provides a retries parameter which allows you to specify the number of attempts to resend during delivery.timeout.ms. Since the retries parameter is set to Integer.MAX_VALUE (2147483647) by default, the number of re-send messages can be adjusted by changing only the delivery.timeout.ms.

Moving to exactly-once delivery

These settings allow our Producer to deliver messages with a high warranty. Now let’s talk about how to guarantee that only one copy of the message is recorded in the Kafka Topic? In the simplest case, you need to set enable.idempotence to true on Producer. Idempotence ensures that only one message is written to a specific batch of one top. The preconditions for enabling the idempotence are acks = all, retry > 0, max.in.flight.requests.per.connection ≤ 5. If these parameters are not set by the developer, the above values will be automatically set.

When the idempotence is set, it is necessary to ensure that the same messages get into the same partitions every time. This can be done by configuring the key and parameter partitioner.class on the Producer. Let’s start with the key. It must be the same for each submission. This can easily be done by using some business identifier from the original message. The partitioner.class parameter is set to DefaultPartitioner by default. In this partitioning strategy, the default is as follows:

  • If the partition is explicitly specified when sending the message, then we use it.
  • If the partition is not specified, but the key is specified, we select the partition by hash from the key.
  • If the partition and the key are not specified — we choose the partition by round-robin.

Besides, use of a key and idempotent sending with parameter max.in.flight.requests.per.connection = 1 gives you ordered processing of messages on Consumer. Keep in mind that if access control is configured on your cluster, you will need permissions to idempotent to the topic.

If you lack the ability to send messages by key or if the logic on the Producer side requires you to maintain data consistency between different partitions, transactions will come to the aid. Besides, with the help of a chain transaction you can conditionally synchronize a record in Kafka, for example, with a record in a database. In order to enable the transaction sending to Producer, it is necessary that the Producer possesses the idempotency and additionally set the transactional.id. If access control is configured on your Kafka cluster, then for a transaction record, as well as for an idempotent record, you will need to write permissions that can be granted by mask using the value stored in transactional.id.

Formally, any string, such as the application name, can be used as the transaction identifier. But if you run multiple instances of the same application with the same transactional.id, the first instance running will be stopped with an error because Kafka will consider it a zombie process.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer’s transaction has been expired by the broker.

To solve this problem, we add a suffix to the application name in the form of a hostname that we get from the environment variables.

The producer is configured, but transactions on Kafka control only the scope of the message. Regardless of the status of the transaction, the message immediately gets to the top but has additional system attributes.

To prevent such messages from being read by Consumer earlier, it must set the isolation.level parameter to read_committed. Such Consumer will be able to read nontransaction messages as before, and transaction messages — only after the commit.

If you have set all of the settings, you have configured it exactly-once delivery. Congratulations!

But there is one more nuance. Transactional.id, which we have set up above, is actually a transaction prefix. On the transaction manager, an ordinal number is added to it. The resulting identifier is issued on transactional.id.expiration.ms, which is configured on the Kafka cluster and has a default value of “7 days”. If the application has not received any messages during this time, you will receive InvalidPidMappingException when trying to send the next transaction.

Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id

After that, the transaction coordinator will issue a new sequence number for the next transaction. In this case the message may be lost if InvalidPidMappingException is not processed correctly.

Conclusion

As you can see, simply sending messages to Kafka is not enough. You need to select a combination of parameters and be ready to make quick changes. In this article I have tried to show the details of the exact once delivery configuration and described some of the problems we encountered with client.id and transactional.id configurations. Below you can find Producer and Consumer settings in a brief form.

Producer:

  1. acks = all
  2. retries > 0
  3. enable.idempotence = true
  4. max.in.flight.requests.per.connection ≤ 5 (1 — for orderly sending)
  5. transactional.id = ${application-name}-${hostname}

Consumer:

  1. isolation.level = read_committed

To minimize errors in future applications, we have made our own wrapper over the spring-specific configuration, where values for some of the listed parameters are already set.

And here is a couple of materials for self-study:

--

--