Photo by Tom Pottiger on Unsplash

Kafka — Internals of Producer and Consumers

Amit Singh Rathore
Geek Culture
Published in
6 min readOct 22, 2022

--

Moving data in and out of Kafka

In my last blog, I discussed the basic architecture of Kafka. In this blog, we will see more details about Producers and Consumers.

Producer

Applications that publish data to Kafka topics are known as producers. Applications integrate a Kafka client library to write to Kafka. The writing process starts with creating a ProducerRecord.

Components/Process in Kafka Producers

  • Interceptors — interceptors that can mutate the records before sending e.g. Claim-check-interceptor.
  • Producer Metadata — Manages metadata needed by the producer: Topics and partitions in the cluster, broker node serving as the leader of the partition, etc.
  • Serializers — Key/Value serializers that convert the object to the byte array.
  • Partitioner — computes the partition for the given record. If the partition is specified in the ProducerRecord, then the partitioner will return the same, otherwise, it will choose a partition for the message key based on the partitioning strategy (Round Robin, Hash Key, or Custom Partitioning). org.apache.kafka.clients.producer.internals.DefaultPartitioner, org.apache.kafka.clients.producer.RoundRobinPartitioner, org.apache.kafka.clients.producer.UniformStickyPartitioner, org.apache.kafka.clients.producer.Partitioner (Inteface)
  • Record Accumulator — Accumulates records and groups them by topic- partition into batches. A batch of unsent records is maintained in the buffer memory. A separate I/O thread (sender thread) is responsible for sending those batches of records as a request to the Kafka broker.
  • Transaction Manager — manages transactions and maintains the necessary state to ensure idempotent production.
  • Channel selector— create a network client to establish communication with the broker.

Producer Acks Setting

Kafka producers only write data to the current leader broker for a partition. If we wish that the message must be written to a minimum number of replicas before being considered a successful write we need to set acks which is the number of brokers who need to acknowledge receiving the message before it is considered a successful write.

Acks = all and min.insync.replicas = 3

Note: When acks=all with a replication.factor=N and min.insync.replicas=M we can tolerate N-M brokers going down for topic availability purposes.

acks = 0: Fire-and-Forget
acks = 1: Leader Acknowledgment
acks = all: Maximum Data Durability

Kafka Producer Retries

retries
delivery.timeout.ms
retry.backoff.ms
max.in.flight.requests.per.connection

Idempotent Kafka Producer

Retrying to send a failed message has a small risk of duplicates. This can happen if the data was replicated to ISRs but acknowledgment did not reach to producer and hence it retried. To avoid this Kafka uses PID sequences which are always increasing. Kafka always takes the largest PID-Sequence Number combination that is successfully written. When a lower sequence number is received, it is discarded.

enable.idempotence=true
acks=all

Kafka Message Compression

compression.type
none, gzip, lz4, snappy, and zstd

If we are using producer-level compression then we should set the broker-level setting to compression.type=producer . If the producer level compression and broker level compression do not match broker will decompress and compress again.

Kafka Producer Batching

linger.ms — time to wait before sending a batch out
batch.size — maximum bytes to be included in a batch

Deletion of message

It’s possible to remove a message from a Kafka topic by publishing a new message with the key of the message we want to delete as NULL. Kafka will find those keys having null values and deletes that message. We need to set delete.retention.ms for controlling when to delete a message.

Broker

  • Producer records land on the socket receive buffer. One of the Network threads picks up the message and passes it to the shared request queue.
  • The record is picked up by the I/O thread. It validates the CRC of the data. Then the record is written to the commit log.
  • I/O thread hands over the response logic to the Purgatory map(broker that manages delayed operations.). This map waits for other brokers to acknowledge the write (ISR). This map is implemented with ConcurrentHashMap and ConcurrentLinkedQueue.
  • After the message is replicated a response will be put into the response queue.
  • The network thread pulls the response from the queue and put it into the socket send buffer.

Broker needs memory 30 Gb+ (for pagecache)
Broker needs multiple cores (mutithread app)
Broker needs 1Gbps + network (for communication)
Broker needs reliable disk (st1 EBS)

Taking the above into consideration recommended EC2 on AWS will be r5.xlarge with EBS.

When we start a Kafka Cluster, the brokers will first create a session with the zookeeper and they will try to create an ephemeral node /controller inside the zookeeper. The broker that will be able to successfully create the “/controller” node will become the controller. The rest of the brokers will create a watch on this “/controller” node.

In case the controller goes down or its session with the zookeeper is lost then this znode will be deleted and the rest of the brokers will be notified, and a new controller will be elected again.

zookeeper-shell.sh localhost:2181
ls /
ls /brokers/ids
get /controller

Responsibility of Controller

Broker Liveliness
Leader Election
Updating the ISR

Consumer

Applications that read data from Kafka topics are known as consumers. Applications integrate a Kafka client library to read from Apache Kafka. Consumers read from one or more partitions with ordering being maintained within each partition. Kafka consumers implement a “pull model”. This means that consumers send fetch requests to brokers in order to get data.

From the above picture, we see the following components in the Kafka consumers.

  • Coordinator — Manages group membership, offsets
  • Metadata — manages metadata needed by the consumer: topics and partitions in the cluster, broker node serving as the leader of the partition, etc.
  • Network Client — Handles connection/request to brokers
  • Fetcher — Fetches batches of records from brokers.
  • Deserializers — Key/Value deserializers that convert the byte array to the object.
  • Interceptors — interceptors that possibly mutate the records

Delivery Semantics for Consumers

enable.auto.commit=true
auto.commit.interval.ms

  • At most once: offsets are committed as soon as the message is received. In case of errors, the message might be lost.
  • At least once: offsets are committed after the message is processed. Might lead to multiple reads. Ensure message processing is idempotent.
  • Exactly once: Only in Kafka → Kafka flows with the transaction.

Other configurations:

fetch.min.bytes
fetch.max.wait.ms

Increasing fetch.min.bytes and time will result in increased throughput and decreasing it will result in better latency.

SSL Listeners with PEM:

# content of client.properties
bootstrap.servers=kafka.server:9092
security.protocol=SSL
ssl.truststore.type=PEM
ssl.truststore.location=truststore.pemssl.keystore.type=PEM
ssl.keystore.location=keystore.pem
ssl.key.password=hello

# encrypt your private key in PKCS8
openssl pkcs8 -topk8 -in privateKey.key -out encryptedPrivateKey.p9

# Copy the content of your encrypted private key and put into the keystore.pem
# follow the exact order

-----BEGIN ENCRYPTED PRIVATE KEY-----
xxxx
-----END ENCRYPTED PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
xxxx
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
xxxx
-----END CERTIFICATE-----

Happy learning!!

--

--

Amit Singh Rathore
Geek Culture

Staff Data Engineer @ Visa — Writes about Cloud | Big Data | ML