Flexibility With Traps in Kafka

Piotr Babel
Synerise
Published in
10 min readOct 11, 2019

Kafka is used for building real-time data pipelines and streaming apps. It runs as a cluster on one or more servers. The Kafka cluster stores data in topics. The topic can consist of a few partitions.

It has Producer, Consumer, Streams and Connector APIs. Moreover, Kafka has plenty of configuration parameters which provide a lot of flexibility. Many companies use Kafka as a “circulatory system” for their applications.

Bearing in mind these aspects, correct use and configuration require remembering many things. Kafka documentation is comprehensive, but it lacks a simple explanation, especially for newcomers.

Here I will explain what’s going on in a few configurations, how they can impact the entire system and what traps (problems) can wait for us:

  • Consumer offset retention — can cause data reprocessing issues in lower-throughput topics;
  • Segment retention — low-throughput topics could retain messages longer than expected without taking into account segment-level retention period;
  • Unclean leader elections — can lead to data loss;
  • Producer settings — configurations which differentiate throughput and latency;
  • Number of partitions — how more partitions can affect performance;
  • Partition assignment strategy — uneven partition assignment to consumers.

Read this article for a little refresh about Kafka before going deeper into the subject.

Consumer offset retention

The default offset retention period can cause reprocessing or skipping of data in low-throughput topics. Starting in version 0.9.0, each consumer commits the offset of its most recently consumed message to a __consumer_offsets topic.

Retaining consumer offsets for a certain period of time enables consumers to know where to resume reading from the partition log if partitions rebalance across brokers, or when a consumer or broker temporarily becomes unavailable. If a consumer just started or if it requests an expired offset, it will either reset to the earliest (oldest) or latest (most recent) offset stored in the __consumer_offsets topic.

Source: Increase offsets retention default to 7 days

Pre-2.0 Kafka low-throughput topics can pass the offset retention period (one day) without processing any new messages, the __consumer_offsets topic deletes the consumer offsets for that topic.

Kafka consumers are then unable to retrieve their offsets, so they reset to the earliest offset (based on auto.offset.reset configuration) and start processing the earliest message still available on the broker. This means that they reprocessed some data they had already consumed in the past. It is better to have retention.ms lower than offsets.retention.minutes.

Increasing the offset retention period can cause certain side effects. For example, it can increase memory usage on the broker, since it must retain those offsets for a longer period of time in memory.

Segment retention

Low-throughput topics can retain messages longer than expected if you don’t account for the segment-level retention period.

Kafka stores data for each partition on a log, which are then further divided into log segments. Kafka determines how long to store data based on topic-level and segment-level log retention periods. This makes it difficult to track how long messages are actually stored on the broker.

Table 1. The configuration settings that determine how long Kafka will retain data.

Kafka closes a segment and opens a new one when one of two things happens (whichever comes first):

  • the segment reaches maximum size (as determined by segment.bytes)
  • the segment-level retention period has passed (based on segment.ms)

Adjusting the size of the log segments can be important if topics have a low throughput. For example, if a topic receives 200 megabytes per day of messages ( log.segment.bytes = 1GB, log.segment.ms = 7 days), it will take 5 days to fill one segment.

As messages can’t be expired until the log segment is closed, if log.retention.ms is set to 7 days, there will actually be up to 12 days of messages retained until the closed log segment expires. This is because once the log segment is closed with the current 5 days of messages, that log segment must be retained for 7 days before it expires based on the time policy (as the segment can’t be removed until the last message in the segment can be expired).

Understand the data rate of your partitions to ensure you have the correct retention space. The data rate dictates how much retention space, in bytes, is needed to guarantee retention for a given amount of time. If you don’t know the data rate, you can’t correctly calculate the retention space needed to meet a time-based retention goal.

Unclean leader elections

In Kafka, an unclean leader election occurs when an unclean broker (“unclean” meaning not finished replicating the latest data updates from the previous leader) becomes the new leader. Although this feature prioritizes availability over durability, it can lead to data loss.

Kafka stores data across partitions in each topic and each partition has a leader and zero or more followers that fetch and replicate new data from the leader. Each broker serves as the leader for certain partitions and the follower for others, which reduces the likelihood of a single point of failure for any given partition. If followers stay up to date with the leader, and remain available, they will be considered in-sync replicas (ISR). Followers behind the leader are removed from the ISR until they catch up with the partition leader.

If unclean.leader.election.enable is disabled (false), Kafka won’t be able to elect a new leader at certain times. If it cannot elect a new leader, Kafka will stop all reads and writes to that partition. On the other hand, if unclean leader elections are enabled and the leader becomes unavailable, Kafka will still be able to keep the partition online by electing a new leader, even in the absence of in-sync replicas (ISR=0).

Let’s say that we have 3 replicas with replication factor set to 3 and unclean leader election enabled:

  • The broker that is the leader for Partition X processed messages up to offset=100 and then goes offline. The followers (brokers) are slightly behind with offset=90 (ISR=0).
  • If unclean leader elections are enabled, one of the follower brokers will get elected as the new leader for the partition (even if it is “unclean”). This allows consumers and producers to continue sending requests to Partition X. So, now the new leader can have more messages up to offset=120. There are completely new messages between offsets 90 and 100.
  • If the previous leader comes back online, it will reset its offset to match the new leader’s offset, resulting in data loss. Let’s note that some consumers may have read the old messages with offsets 90–100 from old leader, some consumers got the new one, and some got a mix of both.

Producer settings

There are plenty of producer configurations that can impact performance and durability, but some of them are commonly used:

Batch.size

Each Kafka producer batches records for a single partition, optimizing network and IO requests issued to a partition leader. Therefore, increasing batch size could result in higher throughput. Under a light load, this may increase Kafka send latency since the producer waits for a batch to be ready.

Under a heavy load, it is recommended to increase the batch size to improve throughput and latency.

Linger.ms

The Linger.ms setting also controls batching. It puts a ceiling on how long producers wait before sending a batch, even if the batch is not full. In low-load scenarios, this improves throughput by sacrificing latency.

Acks

Kafka provides fault-tolerance via replication so the failure of a single node or a change in partition leadership does not affect availability. If you configure your producers without acks, messages can be silently lost.

Producer-required acks configuration determines the number of acknowledgments required by the partition leader before a write request is considered completed. This setting affects data reliability and it takes values 0, 1, or -1 (i.e. “all”).

To achieve highest reliability, setting acks = all guarantees that the leader waits for all in-sync replicas (ISR) to acknowledge the message. In this case, if the number of in-sync replicas is less than the configured min.insync.replicas, the request will fail. Note that even if asks = all, but min.insync.replicas = 1 message durability can’t be guaranteed.

While ack = -1 provides stronger guarantees against data loss, it results in higher latency and lower throughput.

Number of partitions

In order to pick the correct number of partitions, we measure the throughput that we would like to achieve on a single partition for production P and consumption C. Let’s say your target throughput is T. Then you need to have at least max(T/P, T/C) partitions. The per-partition throughput that one can achieve on the producer depends on configurations such as the batching size, compression codec, type of acknowledgment, replication factor, etc.

More partitions can introduce some problems:

  • More open file handles — each partition maps to a directory in the file system in the broker. Within that log directory, there will be two files (one for the index and another for the actual data) per log segment.
  • May increase unavailability — consumer and producer requests go to the leader replica. When a broker fails, partitions with a leader on that broker become temporarily unavailable. Kafka will automatically move the leader of those unavailable partitions to some other replicas to continue serving the client requests. This process is done by one of the Kafka brokers designated as the controller. It involves reading and writing some metadata for each affected partition in ZooKeeper. This process can’t start if the failed broker is a controller. The new controller should be elected and then read some metadata for every partition from ZooKeper during initialization.
  • May increase end-to-end latency — the end-to-end latency in Kafka is defined by the time from when a message is published by the producer to when the message is read by the consumer. Kafka only exposes a message to a consumer after it has been committed, i.e., when the message is replicated to all the in-sync replicas. So, number of replicas where the message should be replicated and number of threads responsible for this process can be a significant portion of the end-to-end latency.
  • May require more memory in the client — internally, the producer buffers messages per partition. After enough data has been accumulated (batch.size parameter) or enough time has passed (linger.ms parameter), the accumulated messages are removed from the buffer and sent to the broker. If one increases the number of partitions, messages will be accumulated in more partitions in the producer.
  • When the amount of memory used exceeds the configured memory limit, the producer has to either block or drop any new message, neither of which is ideal. A similar issue exists with the consumer as well. The consumer fetches a batch of messages per partition. The more partitions that a consumer uses, the more memory it needs.
  • Check out this proposal to improve the memory usage in consumers.

When publishing a keyed message, Kafka deterministically maps the message to a partition based on the hash of the key. This provides a guarantee that messages with the same key are always routed to the same partition. Changing the number of partitions in a topic change hashing of the keys so new messages can be produced on a different partition than before.

Image 1. Overview of three brokers with topics and partitions (blue — partitions leader, grey — followers).

Partition assignment strategy

When a consumer wants to join a consumer group, it sends a JoinGroup request to the group coordinator. The first consumer to join the group becomes the group leader. The leader receives a list of all consumers in the group from the group coordinator (this will include all consumers that sent a heartbeat recently and are therefore considered alive) and it is responsible for assigning a subset of partitions to each consumer. It uses an implementation of the PartitionAssignor interface to decide which partitions should be handled by which consumer.

After deciding on the partition assignment, the consumer leader sends the list of assignments to the GroupCoordinator which sends this information to all the consumers. Each consumer only sees his own assignment — the leader is the only client process that has the full list of consumers in the group and their assignments.

This process repeats every time a rebalance happens.

You can implement your own strategy extending the PartitionAssignor interface or use:

  • org.apache.kafka.clients.consumer.RangeAssignor — for each topic, divide its partitions by the number of consumers subscribed to it and assign X to each (lexicographically sorted) consumer. If it does not evenly divide, the first consumers will have more partitions.
  • org.apache.kafka.clients.consumer.RoundRobinAssignor — assign all partitions from all the subscribed topics to each consumer sequentially, one by one.
  • org.apache.kafka.clients.consumer.StickyAssignor — assign partitions so that they are distributed as evenly as possible. During rebalances, partitions stay with their previously assigned consumers as much as possible.

The configuration is provided via partition.assignment.strategy parameter in the consumer.

RangeAssignor, which is the default partition assignment strategy, can increase partitions assignment to some consumers in the group if topics consumed by the group (for example via regular expression) consists of an unequal number of partitions.

Image 2. Consumers partition assignments.

Let’s assume that we have one topic with 8 partitions (Topic 1) and 20 topics with 2 partitions (similar to Topic 2 and 3 on the image). In such configuration (default RangeAssignor) we end up with 23 partitions for Consumer 1 and 2 (3 partitions from Topic 1, 20 partitions for others 20 topics) and 2 partitions for Consumer 3 (2 partitions from Topic 1).

It is better to use different partition assignment strategy to evenly distribute partitions amongst consumers in the group.

Summing Up

There are hundreds of Kafka configurations that can be adjusted to configure producers, brokers and consumers. In this post, we’ve pinpointed some of the valuable lessons we’ve learned from running Apache Kafka in production.

Originally published at Synerise: Flexibility With Traps in Kafka on 11.09.2019.

--

--

Piotr Babel
Synerise

Interested in distributed systems, strongly-typed languages and machine learning