Implementing Kafka Producers in .NET

Luca Marinucci
Julius Baer Engineering
8 min readApr 22, 2024

The ongoing process of digitalization is placing increasing demands on software applications: they need to be efficient, scalable, and fault-tolerant, and they should exchange their data in real time. This presents new challenges in software development that are not so easy to overcome. Apache Kafka® (or Kafka for short) is an excellent tool for implementing exactly these requirements. Kafka is a distributed event streaming application that offers scalability and high availability, as well as real-time communication between applications, and is extremely efficient.

This article explains how data is written to Kafka and what needs to be considered beforehand. Firstly, the fundamental concepts essential for proficiently utilizing this tool are presented. We will then look at an important factor of Kafka’s success: its efficiency. The article is rounded off with some .NET code samples that explain how to implement various producer strategies.

History

Kafka was designed by Jay Krebs in 2010 while working at LinkedIn to meet the challenges originating from the increasing amount of data and users[0] the platform was confronted with. One year later, the source code was placed under an open source license, became a graduate from the Apache Incubator program in 2012, and has since been further developed by the Apache Software Foundation. In 2014, the company Confluent was founded by Jay Krebs and former LinkedIn employees to offer business services around Kafka. In 2015, various large tech companies, such as Uber, Spotify, and Netflix, used the technology to optimize their data flows. In 2021, the release v3 introduced the Raft[1] consensus algorithm (KRaft[2]) to remove the ZooKeeper dependence.

Basic Concepts

Kafka is basically a platform for data exchange. In addition, it is also a distributed system consisting of nodes that include storage and that replicate their data to other nodes, to be able to be seamlessly replaced in the event of a failure. These nodes are called brokers, accordingly the whole platform is called a “broker cluster”. The exchanged data are actually messages comprised of a key and a value. The messages are created by producers and received from consumers. They are divided into topics and persisted sequentially in a commit log. This means that they are attached and stored like an application log or a database transaction log. Therefore, they can be read several times — at least until the retention time is reached; after that, they are deleted.

The topics are divided into partitions, whereby the order of the messages is only guaranteed within one partition. Inside a partition a message is accessed by its offset, i.e. the position in the partition.

The log compaction feature can be activated on topic level and ensures that only the most recent message per message key is retained. This makes it faster for consumers to read through a topic from the beginning, but also reduces the storage demands. Be aware, though, that it does not guarantee only one message per key exists at any moment in time, because compaction timing is non-deterministic.

In fact, the brokers physically only know partitions, whereas topics are a logical concept. For replication, the partitions are distributed to the desired number of replicas (replication factor) based on a leader/follower design . This means that messages of a particular partition can only be created by the corresponding leader. The followers with the same data as the leader are called in-sync-replicas, or ISRs for short.

As already mentioned, the recipients of the messages are referred to as “consumers”, and they subscribe to selected topics. They identify themselves via a self-defined “client id” and use the message’s offset to fetch the desired messages. This current position of a consumer is usually stored in Kafka itself in a special “metadata topic”. The consumer reports its last read position as committed offset. The consumers get the news via an infinite query loop, each followed by a predefined sleep interval.

Several consumers can be combined into a consumer group to make consumption fault-tolerant, whereby one consumer per partition is responsible for reading its messages: if the consumer crashes, the consumption is delegated to the next group member.

For this purpose, a broker is chosen as group coordinator: it is responsible for determining the group leader, and is also responsible in the event of its failure to choose a successor. With the help of regular heartbeats, the consumers report their availability. The leader, in turn, is responsible for assigning any partition to a specific consumer. A partition can be consumed by the leader or by a follower.

Efficiency through Batching

An important factor for Kafka’s success is its efficiency: due to various optimizations, even large amounts of data can be exchanged in a very short time.

On the one hand, data is already converted by the producer into an optimized binary format, which is first sent to the brokers and later to the consumers. This means that the chunks can be passed on without further processing.

On the other hand, the messages are grouped by the producer into batches before they are sent to the brokers. This results in larger packages that are transferred over the network which reduces the network overhead.

For special requirements, such as synchronizing two geographically-distant data centers, Kafka also offers the possibility to compress message blocks. Here, too, the entire compressed block is stored to prevent expensive I/O operations within the cluster.

Producer Implementation

The producer is responsible for creating the messages and for sending them to the cluster. By default, the messages are first grouped locally before forwarding them as a batch to the cluster. This local batching can be adapted to the own needs by either defining the batch size or specifying the wait time. As a developer, you can also opt-out of this strategy by using the ProduceAsync() method and awaiting it’s result, which allows sending one message after the other:

await producer.ProduceAsync(topic, new Message{ Key = key, Value = value });

Another option is to use the Produce()method. This uses the local batching that reduces the number of I/O operations within the cluster and therefore increases the throughput. Simply make sure to call Flush()before terminating the application to allow any message that was not yet sent to the cluster to be delivered:

foreach ((var key, var value) in keyPairs)
{
// enqueue messages locally so that Kafka can create message batches to optimize throughput.
// responses are handled in the callback
var message = new Message
{
Key = key,
Value = value
};
producer.Produce(topic, message, Handler);
}
// wait for in-flight messages to be delivered before closing he application
producer.Flush();

void Handler(DeliveryReport<string, string> report) { }

When building an application on top of a messaging system, it’s always important to check your delivery guarantee requirements and to make sure they are ful-filled by the chosen messaging system. Luckily Kafka covers all requirements and even supports exactly-once semantics.

For implementing a fire-and-forget scenario with the lowest latency possible a producer will not await any acknowledgement (Acks.None). In this scenario messages could of course be lost in case of a system failure. Therefore, this approach only provides at-most-once delivery guarantee. You could also await the acknowledgement from at least the leader broker (Acks.Leader), and therefore not await the responses from the followers, and in turn accept a slightly higher latency in favor of better durability. But in either case, the message is stored once at the most, as the leader could fail while the message is being replicated to the followers.

Kafka also supports at-least-once semantics: by awaiting the acknowledgement of all ISRs (Acks.All) and, in event of a failure, the producer would re-send the message (or message batch) again. For the event that the producer transfers batches instead of single messages, an edge-case needs to be considered: while the leader is replicating the messages to the follower, it could crash. Let’s assume a message batch of three messages (m1, m2, m3) would fail while replicating the last message of the batch (m3). In this case, the follower would have stored all messages in the batch except the last one. Because the leader is now gone, the previous follower would be elected as new leader and the producer would re-send the whole batch because it didn’t receive the expected acknowledgment from the previous leader. The new leader would then append all the messages from the batch, which would result in some duplicated messages (here m1 and m2):

To prevent duplicates in the above situation, the idempotent writes feature has been introduced. This guarantees that the same message is only stored once and in the right order by de-duplicating messages in the broker based on the sequence number provided by the producer; the producer adds a sequence number to every message, and when storing the messages, the leader ensures that the message with the same sequence number is only saved exactly once. In the example below, the new leader recognizes that the messages m1 and m2 have already been stored, and therefore adds only m3 to its log after the producer has re-sent the same message batch because of the failure during the first run (previous leader crashed while replicating m3):

var config = new ProducerConfig
{
BootstrapServers = "host1:9092,host2:9092",
ClientId = Dns.GetHostName(),
Acks = Acks.All,
EnableIdempotence = true, // remove duplicates and assert message order
// …
};
using (var producer = new ProducerBuilder<string, string>(config).Build())
{
// …
}

Kafka even allows the implementation of exactly-once delivery guarantees by providing transaction support. For example, if a producer wants to store data in different partitions and its consumers should only be able to access any of the data after all write operations were successful, then it could spawn a Kafka transaction.

By default, Kafka uses at-least-once semantics. By disabling automatic retries in the producer, and by committing the offset before processing the message in the consumer, you can switch to at-most-once. Ultimately, it depends on your requirements when choosing a strategy.

References

--

--