Exploring Kafka Producer’s Internals
Co-Author: Razvan Dobre
This is the first part of a series where we explore Kafka client's internals. This post focuses on the Kafka Producer.
Adobe Experience Platform Pipeline is a low latency, Kafka-based streaming system. Pipeline connects hundred of Adobe components and systems. Our Kafka clusters handle 310B msg/day, with 300 TB/day IN and 920 TB/day OUT traffic. Therefore, it is crucial to understand the Kafka Client’s internals for being able to scale at such volumes of traffic.
Kafka Producer is a client that publishes messages to Kafka.
- Producer Metadata — manages metadata needed by the producer: topics and partitions in the cluster, broker node serving as the leader of the partition, etc.
- Partitioner — computes the partition for the given record.
- Serializers — record key and value serializers. Serializer converts the object to the byte array.
- Producer Interceptors — interceptors which possibly mutate the records.
- Record Accumulator — accumulates records and groups them by topic-partition into batches.
- Transaction Manager — manages transactions and maintains the necessary state to ensure idempotent production.
- Sender — background thread that sends data to the Kafka cluster.
Configuring Kafka Producer
Kafka Producer has three required properties:
- bootstrap.servers — A list of host/port pairs to establish the initial connection to the Kafka cluster. Format: “host1:port1,host2:port2,…”
- key.serializer — Fully qualified class name representing key serializer that implements org.apache.kafka.common.serialization.Serializer interface.
- value.serializer — Fully qualified class name representing value serializer that implements org.apache.kafka.common.serialization.Serializer interface.
Sending messages to Kafka
Kafka Producer sends messages asynchronously and returns a Future<RecordMetadata>, representing send results. In addition, users have an option to supply Callback to be invoked when Kafka broker acknowledges the record. While it looks simple, a few things happen behind the scenes.
- The producer passes the message to a configured list of interceptors. For example, an interceptor might mutate the message and return an updated version.
- Serializers convert record key and value to byte arrays
- Default or configured partitioner calculates topic partition if none is specified.
- The record accumulator appends the message to producer batches using a configured compression algorithm.
At this point, the message is still in memory and not sent to the Kafka broker. Record Accumulator groups messages in memory by topic and partition.
Sender thread groups multiple batches with the same broker as a leader into requests and sends them. At this point, the message is sent to Kafka.
Kafka Producer offers configuration parameters to control time spent on various stages:
- max.block.ms — time waiting for metadata fetch and buffer allocation
- linger.ms — time to wait to allow other records to be sent
- retry.backoff.ms — time to wait before retrying the failed request
- request.timeout.ms — time to wait for the response from Kafka broker
- delivery.timeout.ms — was introduced later, part of KIP-91 to give users a guaranteed upper bound on a timeout without tuning producer components internals
Users can control the durability of messages written to Kafka via the acks configuration parameter. Allowed values are:
- 0, a producer will not wait for acknowledgment from brokers
- 1, a producer will wait only for the partition leader to write a message, without waiting for all followers
- all, a producer will wait for all in-sync replicas to acknowledge the message. This comes at latency costs and represents the strongest available guarantee.
With acks=all there are a couple of nuances to clarify for in-sync replicas. On Kafka side, two settings and the current state can affect behaviour:
- topic replication factor
- min.insync.replicas setting
- a current number of in-sync replicas, including the leader itself.
min.insync.replicas specify the minimum threshold of in-sync replicas for acks=all requests. If this requirement can’t be met, the Broker will reject the producer’s request without even trying to write and wait for acks. The table below illustrates possible scenarios.
During transient failures, in-sync replicas might be lower than the total number of replicas, but as long as it’s greater than or equals to min.insync.replicas — requests with acks=all will succeed.
Users can mitigate transient failures and increase durability by resending failed requests. This could be achieved via retries(default MAX_INT) and delivery.timeout.ms(default 120000) settings. Retries can result in duplicated messages and changes in the ordering of the messages. These side effects can be mitigated via setting enable.idempotence=true, but it comes at the cost of lower throughput.
Messages in topics are organized into partitions. Users can control partition assignment via message key or pluggable ProducerPartitioner implementation. Partitioner can be set using partitioner.class configuration, which should be a fully qualified class name implementing org.apache.kafka.clients.producer.Partitioner interface.
Kafka offers three implementations out of the box: DefaultPartitioner, RoundRobinPartitioner and UniformStickyPartitioner.
DefaultPartitioner — if the message key is null — use the current partition, and changes on the next batch. For not null keys, it calculates using the formula: murmur2hash(key) % total nr of topic partitions.
RoundRobinPartitioner — ignores message key, distributes messages equally, in a round-robin fashion, across all active partitions. Partition is considered active if it has an assigned broker as a leader.
UniformStickyPartitioner — ignores message key, uses the current partition, and changes partition on the next batch.
In the next blogposts, we will cover Kafka Consumer, monitoring Producer and Consumer, performance tuning and a couple of additional technical aspects.