Everything You Need To Know About Kafka (Part 2)

Advanced features and behaviors of Kafka explained with references to useful articles.

Ofek Hod
13 min readMay 9, 2020

All the topics and resources about Kafka are spread over the internet across different articles and long books and you have to look very hard in order to find the good ones.

This article covers most of the important advanced topics of Kafka and provides useful references available on the internet for digging even further into the topics you find interesting.

Kafka is a product with various capabilities, you can tune it’s behavior with configuration in order to make it fit to your use case in terms of storage, availability, message durability and much more.

This article explains Kafka’s full capabilities, what Kafka guarantees and what’s not and how to tune it’s configuration to enhance performance.

If you don’t know Kafka’s basic concepts and terms, please read Part 1 first.

Message

Kafka message is actually called Record but usually referenced as a message.

Structure

Message has more complex structure than just the byte array value, it also includes a header with timestamp, offset, key, message size, checksum (to detect corruptions), magic byte (indicates the version of the message format) and compression codec.

The message timestamp is given either by the producer when the message was sent or by the broker when the message was received.

Message Key

The producer can set an optional key for any message it creates, if no key is given a null value is assigned.

messages with the same key are guaranteed to always go to the same partition, this guarantee reflects upon ordering manners- messages sent with the same key from the same producer will be consumed in the exact same order they were sent.

Note that there is no order guarantee for messages resides on different partitions, this can happen for messages sent from different producers and messages sent from the same producer but with different keys (or no keys);

Partitions

You can increase the number of partitions for a defined topic (never decrease), though it may cause rebalance operation (described in next section).

When creating new topics and partitions, Kafka takes into account the number of brokers, different disks, number of partitions and racks, but not disk size nor partition size, so if some brokers have more disk space than others (in case of new servers in the cluster) or some partitions are abnormally large because of key partitioning, you need to be careful.

Replication

Definition

You can set partitions replication which means each partition will be saved on more than one broker, this mechanism provides fault tolerance, but not extra parallelism because there is only one partition leader which handles all read/write requests and the rest of the replicated partitions are just followers which replicate the leader’s data and ready to take over if the leader dies.

Replication is configured by default.replication.factor for broker, and replication.factor for specific topic.

Considerations

You can refer to replication as a trade-off between hardware and availability, N replicas means N times of storage consumption; also N replicas means you allow to lose up to N-1 brokers and still be fully available (no message loss).

Replications are stored at different brokers, i.e replication factor must be less than or equal to the number of brokers.

The default replication factor is 1, so if one broker shuts down you lose availability, since there is only one “instance” of it’s partitions and no “extra copies”.

Kafka makes sure every replica resides on a different broker, but if brokers are on the same rack you lose multiple brokers in rack-level shut downs. Configure broker.rack for every broker, this way Kafka will make sure replicas for a partition are spread across different racks as well.

Leader Election

A partition follower is considered out-of-sync when it is inactive or behind the newest offset of the leader more than a certain amount of time (replica.lag.time.max.ms), otherwise it is considered as in-sync, which means it reads messages from the partition leader in a fair pace without accumulating “too much” lag (according to the configuration).

If the leader partition needs to be switched (shut down or lagging), only in-sync partition replicas can take over; you can allow unclean leader election by modifying unclean.leader.election.enable=true (default false) in order to allow out-of-sync replicas to take over in case there are no available in-sync replicas.

Allowing unclean leader elections risks consistency for consumers because out-of-sync followers may not contain the most recent messages; producer messages are committed before making sure they were written to out-of-sync replicas, read more about it later in Producer- Message Committing and Acknowledgement.

Producer

Record Batching

The producer sends messages in batches, batch size is determined by one of two thresholds of time and size (batch.size for size and linger.ms for time).

Larger batches are helpful when messages are sent in high loads, preventing frequent sends in short time which are CPU intensive. Small batches are helpful when messages are sent in slow pace, so you send them soon enough before messages get outdated.

Message Partitioning

The producer uses partitioning strategy in order to determine partitions for messages it sends.

The DefaultPartitioner consists of the following strategy:

  1. If a partition is specified in the message- use it.
  2. If no partition is specified but a key is present- choose a partition based on the hash of the key.
  3. If no partition or key is present- use a sticky partitioner strategy- “stick” to the same partition until the batch is full (by time or size), then choose other partition.

The DefaultPartitioner for Kafka versions prior to 2.4 consists of a round-robin fashion.

Message Committing and Acknowledgement

Produced message is considered committed when it was written to its partition and all its in-sync followers.

You can change the producer’s perspective for this behavior and define when a producer is acknowledged about the messages it sent with acks parameter (producer configuration):

  • acks=0: Producer doesn’t wait for any response from the broker.
  • acks=1: Producer waits for success response from the broker after the message was written to the partition leader,
  • acks=all: Producer waits for success response from the broker after the message was written to the partition leader and all it’s in-sync replication followers.

You can set the broker’s minimum number of in-sync replicas (min.insync.replicas default 1, broker configuration), so if the number of in-sync replicas falls below this number (includes the partition leader) and the producer uses acks=all, the broker will not accept it’s messages and send exception back to the producer. If you don’t accept a situation when the partition leader is the only partition available, raise this value to at least 2, so at least one extra in-sync replica is receiving the messages.

Note that min.insync.replicas is a safety measure you set to don’t allow situations when you acknowledge all in-sync replicas with acks=all producer but actually have no in-sync replicas to acknowledge, or not enough of them (“enough” is determined by the minimum value you set); in any case this condition is not met, the broker checks all the in-sync replicas before acknowledging, not only min.insync.replicas.

Data Order Guarantee

Data order guarantee is about the assumption that partitions receives messages from a producer in the same order they were sent.

Data order is not fully promised by default, it actually a trade-off you make between performance and the level of safeness in terms of order, doing so by changing configuration and producer implementation.

When a message can’t be delivered to the broker (cases like availability problems), the default behavior is to keep retry for two minutes, and allow up to 5 messages to be retried in the background until blocking the sending process for new sends; this behavior doesn’t promise complete order, but limits in a way the non-ordered sending to a configurable standard; these are the default values (producer configuration):

retries=2147483647(MAXINT): When failing to send a message to the broker, retry this amount of times.

delivery.timeout.ms=120000(2 minutes): Maximum time of delay allowed for sending a message, when this timeout is reached “give up” and stop retry even if retries bound is not reached yet.

max.in.flight.requests.per.connection=5: Maximum number of unacknowledged requests (messages) to handle before blocking the producer from sending new messages.

The only way to promise full order guarantee is to set max.in.flight.requests.per.connection=1, note that this behavior is risking performance because one failed message will block the whole producer until acknowledged.

When it comes to producer implementation, when the configured number of retries exceeded or when there are errors in the developer side (like serialization), the developer have to decide whether to “give up” on the message or send it again; If you choose to save messages aside and send them again without blocking the producer, note that the same order problems are now determined by your code, so as a rule of thumb prefer to let the configuration reflect the failure handling behavior by set the desired number of retries, timeouts and in-flight requests, instead of implement those in your producer code.

Exactly Once vs At Least Once Delivery Semantics

Notice that the retry approach may lead to duplicate messages, this is why Kafka guarantees At Least Once Delivery, and not Exactly Once Delivery by default.

You should be aware to these semantics since they may have crucial implications on your system, for example a Bank with messages like “Add 10$ to account value” may lead to serious incorrectness in At Least Once approach which allows duplicates, a way to get around this problem is to use messages like “Account value is 100$”, now duplicates won’t change the account balance state and At Least Once approach is acceptable.

When allowing to resend messages with retries>0 (non regarding to max.in.flight.requests.per.connection), the same message may be sent twice to the broker, which results At Least Once Delivery.

In order to guarantee Exactly One Delivery, you have to set retries=0 (versions prior to 0.11).

Kafka version 0.11 introduced Idempotent Delivery which guarantees that resend messages will not result in duplicate entries in the broker; to achieve this, batches of messages sent to Kafka are assigned with a sequence number which the broker will use to remove any duplicates; enable it by enable.idempotence (default false).

Consumer

Data Order Guarantee

Given any consumer reading from several partitions of a topic/s, Kafka guarantees order by offsets for every partition, so you know for sure the messages consumed from a partition are received in the exact same order they published to that partition, but you don’t have any order assumptions for messages consumed from different partitions (even for the same topic and the same consumer).

Notice that Consumer’s message order guarantees are for message which are already in their partitions, refer to Producer Data Order Guarantee for the message sending order.

Parallelism Level

The number of consumers in the same consumer group defines the level of parallelism, this applies as long as the number of consumers in the consumer group is smaller or equal to the number of partitions in the topic.

The following example should clarify this: We have 3 brokers cluster and a topic with 3 partitions- p1, p2, p3.

We subscribe a single consumer of consumer group cg1- this consumer is now responsible for consuming from partitions p1, p2, p3 (all) with parallelism of 1 (since there is only one consumer).

Now we subscribe a second consumer of the same consumer group- in this case one of the consumers will be responsible for partition p1 and the second will be responsible for partitions p2, p3, now parallelism is 2 (since there are two consumers).

for 3 consumers of the same consumer group- every consumer will be responsible for only one of the partitions, providing parallelism of 3 (since there are three consumers).

For 4 consumers of the same consumer group- now one of the consumers will be idle and the parallelism stays at 3, no matter how many more consumers we add. For the sake of the example- the first consumer is consuming from p1, the second from p2, the third from p3 and the fourth is idle.

Fifth consumer of a new consumer group cg2 will be responsible independently for all the partitions in that topic, it will not change the previous consumers behavior and have a parallelism of 1.

Rebalace

Partition rebalance is the process of re-assigning partitions to consumers, it happens when new partitions are added, new consumer is added or a consumer is leaving (may happen due to exception, network problems or initiated exit). In order to preserve reading consistency, during a rebalance the consumer group entirely stops receiving messages until the new partition assignment is taking place.

Kafka reassigns partitions using the RangeAssignor strategy by default (set by partition.assignment.strategy), though you can use other strategies (like RoundRobinAssignor and StickyAssignor) or implement one by yourself.

In order to prevent unnecessary rebalances, self-managed systems like Kubernetes can utilize static membership for consumers.

Offset Committing

A consumer commits the offsets of the messages it reads from every partition in the poll iteration, so it always “remembers” the last offset it read for every partition he is responsible of; if it happens that a consumer shuts down, the new consumer which will be responsible reading from the partitions will keep from the last offset committed. If no offset was committed before (mostly happens on the first run) the offset is chosen by the consumer configuration- the first offset of every partition (auto.offset.reset=earliest) or the last offset (auto.offset.reset=latest, default).

Different consumer groups of the same topic are completely independent units of reading from that topic, which means different consumers with different consumer groups which are responsible occasionally for the same partition, are committing offsets independently for that partition.

Optimizations

Compression

Producer can enable compression, by default messages sent uncompressed. Supported compression types are snappy, gzip and lz4.

Compression occurs on the whole message batch, so sending larger batches means more efficient compression, especially when some field values repeat themselves.

The producer is responsible for the compression (compression.type), and the consumer for the decompression (no configuration needed), so Kafka brokers and the network bandwidth are enjoying the benefits of the small sized compressed data, but the producers/consumers takes the toll of time and CPU for the compress/decompress processing.

You can also set compression.type on the broker side so messages will be compressed on the server instead of the producer/consumer.

Use compression when network bandwidth or broker memory/storage is your bottleneck. Moreover, every compression algorithm has different trade-offs of time/CPU consumption versus compression aggressiveness (more aggressive means less storage size), so you should choose it wisely.

Zero-Copy

Kafka uses a zero-copy method when sending messages from brokers, this method is used in order to improve performance and is basically about sending messages from files directly to the network, without using any intermediate buffers, thus reducing overhead of copying data and managing buffers in memory.

On Linux, Kafka brokers writes the messages to the filesystem cache and does not guarantee writing messages to disk, this is because it relies on replication for message durability.

Log Compaction

Definition

Kafka Log compaction deletes messages with duplicate keys inside a partition, saving at least the last value for every key; this can potentially decrease partition size dramatically.

This feature is helpful for systems which needs only the last value for every key, similar to a HashMap behavior, for example:

  1. Shipping addresses for customers- you don’t care about old addressed which already changed and interested only about the last updated value for new shipping.
  2. Save the last state of an application for crash recoveries.

This feature runs a process named cleaner; before explaining it, you first have to understand what a segment is.

Segment

partitions split into segments, each segment contains either 1GB or a week of messages, whichever comes first. Every segment is a file and the newest segment we are currently writing to is called an active segment.

Segments are immutable (besides the active segment), so they promised to never change once written to disk, only switched by others in some circumstances (cleaner described later).

Kafka time retention deletes segments which are too old, and size retention deletes the last segments when the topic is too big.

Cleaner

When enabled for topic (log.cleanup.policy=compact), the brokers makes sure every partition for that topic is containing only the last value for every message key by deleting older messages with repeated key.

The process responsible to maintain the log compaction is called cleaner and you can define how many threads Kafka allocates to it (log.cleaner.threads default 1).

The cleaner process creates a map (key to offset) for a partition and starts iterating through its segments, from the newest one to the oldest (skipping the current active segment), for every segment, it creates a replacement segment and then swaps it with the original one. The replacement segment is created as follows: for every message in the segment, if the key does not exists in the map, it means this is the newest message for this key so it written to the map and copied to the replacement segment, it the key already exists in the map, it means there is a newer message with the same key, so the message doesn’t copied to the replacement segment, considered as deleted.

The cleaner deletes null key messages (producer that doesn’t mention message key is writing null keys messages).

Since the clean process is I/O intensive and should not run too frequent, it runs only for partitions with 50% or more dirty messages (not cleaned) (true for Kafka version 0.10 and older).

What’s Next?

Additional recommendations:

--

--