Kafka Basics and Producer Configs (Part 1)

Rishabh Jain
8 min readApr 8, 2023

--

Data is the most precious thing that drives most new things we develop. Everything we do becomes a data source as we navigate to applications, and click for some actions, logs, and user metrics(health metrics). All this data needs to be handled in the best manner to become a potential product(or feature). More efficiently we can handle the data, the more proficient the products we can build, and the more quickly we can move forward. To efficiently handle large amounts of data, we need to build pipelines for handling this huge set of data, and message brokers provide us with ways to handle large sets efficiently and effectively.

Here comes the role of one of the best Message broker “APACHE KAFKA”. Kafka works on the principle of Pub-Sub Pattern in which the publisher sent data/events to some queue(or topic) and the subscriber subscribes to the event and do desired operations.

What is Kafka?

Kafka is a Pub Sub messaging system.

According to Kafka documentation:

“Kafka is distributed commit log” or more recently a “distributing streaming platform.” A filesystem or database commit log is designed to provide a durable record of all transactions so that they can be replayed to consistently build the state of a system. Similarly, data within Kafka is stored durably, in order, and can be read deterministically. In addition, the data can be distributed within the system to provide additional protections against failures, as well as significant opportunities for scaling performance.”

Messages in Kafka:

The data/Event we are publishing to Kafka is called a message. Message can be anything like log, metrics, Json data on which action needs to be taken by a consumer(subscriber).

Batches in Kafka:

As sending messages one by one can become overhead when the Kafka topic(we will discuss this in detail in the next section) is getting a lot of messages as we need to do handshaking which required a network round trip. This is can be handled by publishing a batch of messages to the topic (and the same partition(which we will discuss in the next section about partition)). Batches help us to increase the throughput by sending large amounts of messages(configurable) but at the same time, we need to sacrifice latency as it takes time(because we need to set in config about the batch size and time in which we need to send) to send batch as compared to a single message(in which message comes and published to topic).

Topics:

Messages are sent to Topics in Kafka. It can be compared to database tables or folders in the filesystem. Topics can be then divided into partitions. Messages are written to topics in append-only fashions and read in the order in which they got appended on partitions.

We discussed messages in Kafka, a message can have an optional parameter bit of metadata called a key. This key helps us to send a particular type of message to the same partition. The simple scheme is to just generate the hash of the key and then select the partition number for that message by taking the result of the hash modulo. This key helps to send the same message to the same partition every time and can also help in handling idempotency problems.

Kafka Producers:

In this section, we will see how we can send messages to Kafka Topics and control producer behavior with different configuration tweaks.

How data is sent to Kafka?

Steps:

  1. Create a ProducerRecord which contains Topic and Value(data to be sent). Key(which can help us to route to specific partitions) and Partition are optional.
  2. The producer will serialize the key and value to byte arrays to send over the network.
  3. Data is sent to the partitioner which decides which partition will be routed to. If ProducerRecord contains the partition then the message is routed to the specified partition else in most cases its decision is based on the Producer Record key. If both are not there then the partitioner decides in a round-robin manner.
  4. Partitioner adds the message to a batch of records that are going to be sent to the same topic(+ partition). Note: we will discuss the batch size and time for partitioner wait(time) till the batch is not sent to the topic in producer config options.
  5. When the broker(Kafka server) receives a message it will return the acknowledgment by sending RecordMetaData which contains the topic, partition, and offset of the record in the corresponding partition.
  6. On failure, the broker will retry to send the message automatically(we can set this in config) and return an error in case all retries failed.

There are a few mandatory parameters that Kafka Producer Expect:

  1. bootstrap.servers : List of Kafka brokers for the initial connection to the Kafka cluster.
  2. key.serializer and value.serializer : Kafka Brokers expect the key/values as byte arrays so both these serializers serialize the message before they are available to the broker. As the producer interface allows to send of any format of the message as a key/value so this serializer converts these arrays to byte arrays.

Producer Configs:

  1. Acks: This config defines how many replicas can acknowledge the message appending on their partitions before we can consider messages successfully produced to Kafka Brokers.

acks parameter is a part of ProducerRecord as we can send this parameter as Record Level.

acks = 0

The producer will not wait for acknowledgment from any of the brokers before we can assume the message was sent successfully. This config option doesn’t let us know if the case message sending failed. This can be considered as fire and forgot and it will let us achieve higher throughput as we do not wait for an acknowledgment.

acks = 1

The producer will wait for acknowledgment from the leader replica only. If failure is received then we can retry to get it to succeed. We can handle retries by waiting for the message to be successfully sent or we can handle via calls with retry from the code side.

acks = all

The producer will wait till acknowledgment from all the in-sync replicas receives the message. This is safe but a bit slow as we need to wait till we got ack from all the replicas.

2. Compression Type:

By default, messages are sent as uncompressed. There are different compression types available like gzip, lz4, ztsd, and snappy in which data is compressed before sending to the broker.

Comparison of various compression algorithims:

3. Retries:

This parameter will tell the producer to retry sending a message to Broker in case the producer receives an error. The retries will tell how many times producers will try to send messages in cases of transient(server error, replicas problem) failures.

initialRetryTime: Initial value to calculate the retry time in milliseconds.

retries: Max number of retries per call.

Multiplier/Factor : Used for calculating the next retry time.

1st Retry = initial Retry Time.

Nth Retry = Random(previousRetryTime * (1 — factor), previousRetryTime * (1 + factor)) * multiplier

Max Retry Time: Maximum time for which the producer will try to retry to send messages.

According to Kaka Documentation, there are 3 parameters mainly:

  1. retries : Number of retries.
  2. retry.backoff.ms: Amount of time to wait before the next retry is attempted.
  3. deliver.timeout.ms: Max time for which the producer wait will try to send messages.

There are also non transient(like messages too large) failures for which the producer will not retry automatically and will return us a failure message.

Note: Retry can alter the ordering of messages.

4. Batch Size and Batch Time

As we discussed earlier that we can send messages in batches when we want to send multiple messages to the same producer. There are 2 configurations options that we can control:

batch.size: It denotes the max size(in bytes) of the batch that will producer will wait till it sends the message to the broker. This wait does not mean it will until batch size becomes this much. There can be cases in which this batch will contain only a single message.

linger.ms: It denotes the time for which Producer will wait till it will send out the message to the broker. By default, the producer will send out the message as soon as the sender thread is available so setting this parameter above 0 ms will enable batching to wait before sending the batch to the broker.

Both linger.ms and batch.size can increase the latency but can also increase the throughput. So if linger.ms reached threshold or batch.size reached threshold batch is sent to the broker.

With Node package both of the above parameters are not available but there is a sendBatch function available with which we can control things from the code side.

5. Client Id:

This can be any string and will be used by the brokers to identify messages sent from the client. It is used in logging and metrics, and for quotas.

6. Max In Flight Request Per Connection(max.in.flight.requests.per.connection):

This defines the max number of unacknowledged requests the client can send before blocking. Setting this one high can improve throughput as many messages can be sent but sometimes can be inefficient. Setting this to 1 ensures proper ordering as even with retries it will wait for messages to be delivered or fail till the next message can be sent.

7. Timeouts:

request.timeout.ms: Maximum time the producer will wait for a reply from the server while sending messages.

timeout.ms: Maximum time for which producer will wait for all in-sync replicas to send to meet acknowledgment configuration.

8. Maximum Request Size(max.request.size):

Maximum request size allowed to be sent by the producer. We can consider a Max request with a cap on the max size of a single message or a batch of messages with a max size cap. Brokers on other hand also have a limit on the max message size they can receive through message.max.bytes so it is good to have both these configs in sync to avoid rejection from the receiver end.

Serialization:

As we discussed there is one step of serialization when we produce data for brokers. If the data is a simple string or integer we can send the data directly but we can’t send data object type messages.

It is always advised to use some generic serialization library. Let's try to use Avro as a data serializer for our code.

Avro is the leading serialized used for a lot of data pipelines for serialization of data. With node, we can make use of https://www.npmjs.com/package/avsc

In Part 2 we will discuss Consumer and its different configs options in detail.

Reference: https://docs.confluent.io/home/overview.html

--

--

Rishabh Jain

Full Stack Developer — React, Node, Mongo DB, Postgres, RabbitMQ, AWS, Native Performance Engineering, Lambda, Javascript, Kubernetes, Docker.