Kafka Workshop: How to Efficiently Use Kafka in a High-Volume Production Environment

Dave Musicant
DraftKings Engineering
28 min readJun 23, 2023

--

Co-written by Martin Ivanov, Dimitar Nanov, Vadym Kutsenko and David Musicant

Part 1: Getting Started

In this workshop, you’ll do hands on coding to learn about:

  1. When to use Kafka vs a message queue such as RabbitMQ
  2. The key components of Kafka and how to properly tune them
  3. Creating highly efficient consumers to handle the data streams
  4. Creating well configured producers that are optimized for high throughput
  5. How to use advanced configurations for cleanup policies, compression and more to fine tune your stream handling

How to use this workshop

Whenever you need to be running commands or writing code, we’ll mark that section with “Hands On”. We’ll also have “Extra Credit” callouts within the exercises where you can try to change things up and guess what outcome you’ll get.

All code for the workshop is on GitHub: https://github.com/dmusicant-dk/kafka-workshop

0. What is Kafka?

Apache Kafka is a distributed event streaming platform that offers high availability and fault tolerance by replicating data across multiple brokers. Data is organized into topics which, in turn, are subdivided into partitions. It’s optimized for write-heavy workloads and it efficiently manages large volumes of data by scaling through the addition of brokers.

0.a. Kafka Topic vs. RabbitMQ Queue

A Kafka topic is essentially a distributed log (as in a log file). Producers append messages to the tail of the topic, while consumers read them and, optionally, remember their position. Consumers can also seek to a specific position in the log of messages.

Multiple consumers can read from the same topic, and each consumer will receive all the messages in the topic. This is, in contrast to a queue, where each consumer receives a message and the message is removed from the queue. In order for multiple consumers to receive the same message in RabbitMQ, the message must be published to a fanout exchange, which will broadcast the message to all queues that are bound to it.

Kafka topics, in contrast to RabbitMQ queues, are not suited for per-item work distribution since partitions are their smallest division. Consumers can only split work within the same consumer group on a per partition basis, with a maximum of N consumers in the same group sharing work from N partitions. This is because of how consumers track progress and message organization in Kafka. Furthermore, one of Kafka's core strengths is its message ordering guarantee, ensuring consumers read messages in the order they are written.

1. Starting a Kafka cluster

This is the first exercise in the series. We will go through the basics of starting a Kafka cluster using Docker Compose. We will also learn how to execute commands on the Kafka nodes and run simple clients.

  1. Make sure you have either Docker or Rancher Desktop installed
  2. Clone this repository locally:
git clone https://github.com/dmusicant-dk/kafka-workshop.git

The repository includes a few helper scripts which will make it easier to work with the cluster. Bash and PowerShell scripts are included. If you are using MacOS you can use the Bash scripts or optionally install PowerShell.

1.a. Getting familiar with and starting the Kafka cluster

Hands On

  1. Open the docker-compose.yml file in the root of the repository with your preferred editor. This file contains the configuration for the Kafka cluster with two services:
    -
    zookeeper - used to store the metadata of the Kafka cluster.
    - broker - the Kafka node which will store the messages.
    - Newer versions of Kafka have Zookeeper functionality built into the node but for simplicity, we’ll use a separate Zookeeper node.
  2. Open a terminal and navigate to the root of the repository
  3. Run the following command to start the cluster: docker compose up -d
  4. You can verify this is running with the command docker compose ps

1.b. Executing commands on the Kafka nodes

We’ll be using the script to make calls into the docker containers to interact directly with Kafka.

Hands On

1. Run the following command to see the list of topics:

# Note: This will show an empty list for now

# Windows
.\kafka-exec.ps1 "kafka-topics --bootstrap-server=localhost:9092 --list"

# Linux / MacOS
./kafka-exec.sh "kafka-topics --bootstrap-server=localhost:9092 --list"

2. Create a topic by running the following command:

# Command: 
# kafka-exec.ps1 - a helper script which will execute the
# command on the Kafka node
# Arguments:
# kafka-topics - the Kafka command to create a topic
# --bootstrap-server - the address of the Kafka node
# --create - create a topic
# --topic=test-topic - the name of the topic
# --partitions - the number of partitions for the topic. The
# messages will be distributed across the partitions.
# The number of partitions can be increased to
# improve the throughput of the topic
# --replication-factor - the number of Kafka nodes which will store
# the messages

# Windows
.\kafka-exec.ps1 "kafka-topics --bootstrap-server=localhost:9092 --create --topic=test-topic --partitions=1 --replication-factor=1"

# Linux / MacOS
.\kafka-exec.ps1 "kafka-topics --bootstrap-server=localhost:9092 --create --topic=test-topic --partitions=1 --replication-factor=1"

This will create a topic called test-topic with 1 partition and 1 replication factor.

Note: The topic will inherit the default cluster topic configuration. This includes the topic’s cleanup policy. The default cleanup policy is deletewhich means that messages will expire after some time. We will cover this in more detail in the Part 4: Advanced Configuration below.

3. Run the following command to see deatils on our new topic:

# Windows
.\kafka-exec.ps1 "kafka-topics --bootstrap-server=localhost:9092 --describe --topic=test-topic"

# Linux / MacOS
./kafka-exec.sh "kafka-topics --bootstrap-server=localhost:9092 --describe --topic=test-topic"

2. Run Some Clients

2.a. Run a producer and consumer

Note: In Parts 2 and 3, you’ll get to write some real code and design your consumers and producers. This current exercise is just to introduce these concepts.

Hands On

1. Change the current directory to the exercise folder: cd exercises/

2. Open a terminal and run the Kafka BasicProducerproject:

# The producer will start producing messages to the test-topic topic. 
# There may be a few errors logged in the console while the producer
# initiates the connection to the Kafka cluster. This is expected.
# You should see the messages being produced in the console.
dotnet run --project KafkaWorkshop.Exercises.BasicProducer

3. Open another terminal and run the Kafka BasicConsumerproject:

# The consumer will start consuming messages from the test-topic topic. 
# You should see the messages being produced and consumed in the console.
#
# You will also see that previously produced messages are being consumed
# as well. This is because the consumer is using the "earliest" offset.
dotnet run --project KafkaWorkshop.Exercises.BasicConsumer

4. Stop only the consumer (by pressing ctrl-c)

You will see the messages are still being produced. This is because messages are stored in the Kafka cluster.

5. Start the consumer again

Notice the consumer will continue processing messages from the point where it was stopped.

2.b. Run multiple consumers in the same consumer group

Hands On

1. Stop all running consumers (by pressing ctrl-c)

2. Open two terminals and run the following command in each of them to start two consumers:

# You will see that the messages are being consumed by only 
# one of the consumers.
#
# This is because both of the consumers have the consumer group
# "test-consumer-group" and the topic we created earlier has a
# single partition
dotnet run --project KafkaWorkshop.Exercises.BasicConsumer

3. Stop only the consumer that is consuming messages.

Notice the other consumer starts consuming messages. This is because the consumer group is now balanced across the consumers (between partitions and consumers).

2.c. Run multiple consumers with a different consumer group

In order for both consumers to receive all messages from the same partition, we need to run them with different consumer groups.

Hands On

1. Stop all running consumers (by pressing ctrl-c)

2. Run the following command to start a consumer with the consumer group test-consumer-group-1:

dotnet run --project KafkaWorkshop.Exercises.BasicConsumer test-consumer-group-1

3. In a second terminal, run the following command to start a consumer with the consumer group — test-consumer-group-2:

# Note the "2" at the end of test-consumer-group-2
dotnet run --project KafkaWorkshop.Exercises.BasicConsumer test-consumer-group-2

You will see that both consumers are now consuming the same messages. Essentially, the messages in the topic are broadcast to all consumers in the consumer group.

3. Making the cluster more resilient

Up to now we were running a single Kafka node. This is not a very resilient setup. If the node goes down, we lose all the messages stored in the node. To make the cluster more resilient we can run multiple Kafka nodes. If one of the nodes goes down, the messages will still be available on the other nodes.

3.a. Run a cluster of 3 Kafka nodes

We’re going to use a new docker compose file: docker-compose-multi-node.yml. It contains the configuration for a cluster of 3 Kafka nodes.

Notice that instead of a single broker service we have broker-1, broker-2 and broker-3. Each of these is a Kafka node with it's own unique id so the nodes can communicate with each other.

Hands On

1. Stop all running producers and consumers

2. Open a terminal and change the current directory to the root of the repository

3. Stop the currently running “cluster” by running the following command:

# This must be run in the kafka-workshop's root directory
docker compose down

4. Start the cluster again by running the following command:

# This must be run in the kafka-workshop's root directory
docker compose -f docker-compose-multi-node.yml up -d

5. Wait for the cluster to start. You can check the status by running the following command:

# This must be run in the kafka-workshop's root directory
docker compose -f docker-compose-multi-node.yml ps

6. Now create a topic with 3 partitions and 3 replicas:

# This must be run in the kafka-workshop's root directory
#
# Notice the settings:
# --partitions=3
# --replication-factor=3

# Windows
.\kafka-exec.ps1 "kafka-topics --bootstrap-server=localhost:9092 --create --topic=test-topic --partitions=3 --replication-factor=3"

# Linux / Mac
./kafka-exec.sh "kafka-topics --bootstrap-server=localhost:9092 --create --topic=test-topic --partitions=3 --replication-factor=3"

7. Inspect the topic by running the following command:

# Windows
.\kafka-exec.ps1 "kafka-topics --bootstrap-server=localhost:9092 --describe --topic=test-topic"

# Linux / Mac
./kafka-exec.sh "kafka-topics --bootstrap-server=localhost:9092 --describe --topic=test-topic"

You will see that the topic now has 3 partitions, spread over all 3 nodes in the cluster. In addition, each partition is replicated to all 3 nodes. This means that if one of the nodes goes down, the messages will still be available on the other nodes and consumers and producers will still be able to access the consume and produce messages.

8. Now run a single producer and a single consumer in the terminals. You can use the commands with or without the consumer group specified.

Notice the messages are being produced and consumed as expected.

9. Stop one of the nodes:

docker compose -f docker-compose-multi-node.yml stop broker-1

Notice they continue producing and consuming messages (possibly after a bit of a delay).

10. Stop all running producers and consumers

11. Bring the cluster down by running the following command:

docker compose -f docker-compose-multi-node.yml down

In Part 1 we learned how to start a Kafka cluster using Docker Compose. We also learned how to execute commands on the Kafka nodes, how to run some clients and how to make the cluster more resilient by running multiple Kafka nodes.

Part 2: Consumers

If you skipped part 1, just make sure:

  1. You have either Docker or Rancher Desktop installed
  2. Clone this repository locally: git clone https://github.com/dmusicant-dk/kafka-workshop.git

In this part of the Kafka workshop, you’ll learn how to set up your consumers in a safe and scalable manner to handle messages streamed through Kafka. We’ll cover parallel consumption, deserialization and caching.

1. Concepts

1.a. Consumers internals

The consumer provided by the Confluent Kafka .NET client library is fully asynchronous (despite having blocking APIs). It utilizes a background thread which consumes batches of messages and stores them in internal queues (by topic/partition). These are then served to your code, in order, when Consume is called.

1.b. Configuration options

All configuration properties are exposed through the ConsumerConfig class as fields or can be set as a string key-value pairs like so:

var consumerConfig = new ConsumerConfig
{
// Prefer this over setting Set(name, value) approach.
BootstrapServers = "localhost:9092",
GroupId = "some-consumer-group"
};

//Not recommended
consumerConfig.Set("fetch.wait.max.ms", "100");

var consumer = new Consumer<int, string>(consumerConfig).Build()

Below are some of the key configurable options for consumers. In most cases, stick to the default.

(You may have to scroll the below table to the right to see all data. Original gist here)

The full list of options can be found here.

Property names are directly mapped between C# and C implementations and have the same behavior and meaning.

1.b.1. Understanding Partition Assignment

What’s a better approach?

There are multiple approaches to making this a better design, e.g. using diff consumer groups, making diff topics + diff consumer groups. But a very simple way to prevent a single failed consumer from causing all others to pause their processing is to simply choose the CooperativeStickyAssignor. This will ensure that the other consumers aren't affected.

Note: If you instead used KafkaConsumer.assign to choose the partition for your jurisdiction consumer, then there will be no rebalancing. But in general, the regular subscribe method is the preferred approach.

1.c. Parallel Consumption

In 95% of use cases, parallel consumption is an over-engineering approach to the problem.

This is a technique where your code spawns multi consumers in a single application to try to get higher throughput. This can have unintended consequences as you’ve essentially added parallelism and asynchronous handling on top of an already asynchronous process.

The better approach:

  1. Do as little as possible in your consumer loop
  2. Defer work to another thread where needed
  3. Use a queue, if needed, with a guarantee of order

2. Exercises

What do I do if I’m stuck? It is highly recommended to try and solve these problems yourself and use online documentation to make it through these exercises. But if you’re absolutely stuck, we have some example solutions for each exercise in “proposed solutions”.

2.a. Deserialization

Kafka stores and transmits all data, including messages, headers and keys, as byte arrays. This means that every message you produce will be serialized into a byte array. As well, to consume a message, it needs to be deserialized out of the bytes in the array.

Kafka only provides built-in serializers and deserializers for the following data types:

  • String
  • Long
  • Double
  • Integer
  • Bytes

For all other types, you will need to code a custom serializer/deserializer to produce/consume it. You do this by implementing the IDeserializer interface.

public interface IDeserializer<T>
{
/**
* T = Your object type
* ReadOnlySpan<byte> = The data to deserialize into the type "T"
* bool isNull = whether this data to deserialize is null. This is powerful
* and allows you to implement safe deserialization on any
* input/output (including structs/value types)
* SerializationContext context = Can be ignored in most scenarios
*/
T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context);
}

Hands On

  1. Do these prerequisites:
# 1. Start up the Kafka broker
docker compose -f docker-compose-multi-node.yml up -d

# 2. Run the following command (from the repo's
# root directory) to create the topic:
./kafka-exec.ps1 "kafka-topics --bootstrap-server=localhost:9092 --create --partitions=1 --replication-factor=3 --topic=kafka-exercise-consumer-1"

# 3. Run the test data generator to populate the topic:
./generate-test-data.ps1 deserialization

2. Open the exercises/KafkaWorkshop.Exercises.Consumer.Deserialization/ project for the exercise.
- It contains boilerplate code. Write all the exercise’s code there.

3. Implement a simple consumer, that consumes from topic kafka-exercise-consumer-1.
- Previous examples can be used to guide you

4. Deserialize the incoming data in the following way:
- Key = long ( 64bit integer ) using built-in Kafka deserializer
- Value = JSON string matching this contract:

//WARNING: Make sure to handle nulls!
public record User(long Id, string FirstName, string LastName, DatetimeOffset DateOfBirth);

5. Make it print all consumed messages to the console (including any null ones — note the keys)

6. Compile and run!

2.b. Real World Basic Structure — Using a Background Task

In the majority of use cases, your consumers will run in a background task. This task would live for the entire life-span of the application. You’ll now write code to run your consumer in a more real-world setup as a background task.

Hands On

  1. Do these new prerequisites for exercise 2:
# 1. (ONLY if you shut it down after the last exercise) 
# Start up the Kafka broker
docker compose -f docker-compose-multi-node.yml up -d

# 2. We are going to reuse the topic from the previous exercise.
# There is no need to create a new one unless you killed the docker
# container
# ./kafka-exec.ps1 "kafka-topics --bootstrap-server=localhost:9092 --create --partitions=1 --replication-factor=3 --topic=kafka-exercise-consumer-1"

# 3. (ONLY if it's not still running):
# Run the test data generator to populate the topic:
./generate-test-data.ps1 deserialization

2. Open the exercises/KafkaWorkshop.Exercises.Consumer.BasicStructure/ project for the exercise.
- It contains boilerplate code. Write all the exercise’s code there.

3. Reuse and refactor your consumer code from the last exercise into a .NET BackgroundService (called UsersConsumerService).
- Note: This background service is already registered in the application’s DI container. You will have to put your code in the UsersConsumerService class.
- Important: Don’t forget about the deserializer for the
user type

4. Optional Extra Credit: Easy configuration
-
Make your consumer properties, the topic name and consumer group configurable via appsettings.json with an IOptions<T> interface

5. Compile and run!

2.c. Building In-Memory Distributed/Persistent Cache

Creating a well managed in-memory cache can be messy and difficult. With Kafka — and the right configurations — we can get this functionality with little effort.

We’ll use a compacted topic with in-memory state. The data can be loaded on startup and ordered in a key-value manner. This is a powerful technique which allows us to have fast in-memory state that’s distributed and persisted.

Why this works so well

A topic can be thought of as analogous to a log of data. Using the above settings, we’re able to load all of that data on startup. The compaction ensures our consumer always has the latest version of each message by “key”. This internal storage of the data is very fast to interact with and consume since it is persisted in memory.

This Exercise

  • We’ll implement a full fledged HTTP API to serve user data
  • The user data is stored in a Kafka topic with changes flowing in real time
  • Our application will read the whole topic from the beginning and construct an in-memory cache

Hands On

  1. Do these new prerequisites for exercise 3:
# 1. (ONLY if you shut it down after the last exercise) 
# Start up the Kafka broker
docker compose -f docker-compose-multi-node.yml up -d

# 2. Run the following command to create a compacted topic:
# Notice we used "cleanup.policy = compact" (This will be
# explained more in the last session in this workshop:
# Advanced Configuration)
./kafka-exec.ps1 "kafka-topics --bootstrap-server=localhost:9092 --create --partitions=1 --replication-factor=3 --topic=kafka-exercise-user-data-topic --config cleanup.policy=compact --config segment.ms=30000 --config min.cleanable.dirty.ratio=0.01"

# 3. Run the test data generator to populate the topic with NEW data:
./generate-test-data.ps1 users-api-data

2. Open the exercises/KafkaWorkshop.Exercises.UsersApi/ project for the exercise.
- It contains boilerplate code. Write all the exercise’s code there.

3. Implement the in-memory database in the file UsersDb.cs as follows:
- This class is registered in the application’s DI container as a singleton

4. Implement the logic to indicate our UsersDb is "ready" using the UsersDbLoaderMonitor class in UsersDb.csas follows:
- Add a boolean property IsLoaded signifying the User DB is filled
- Expose a method that sets it to true

5. Using your code from the previous exercise, implement a consumer inside the background service defined in UsersConsumerService.csas follows:
- Change the topic to “kafka-exercise-user-data-topic”
- Give it a unique/different consumer group id
- Don’t forget to add your config object to the DI
- Inject the UsersDb into your consumer UsersConsumerService
- Inject the UsersDbLoaderMonitor into your UsersConsumerService
- Store the data in the UsersDb when a message is consumed
- If the value consumed is null, delete the record from UsersDb
- Change ExecuteAsync to set the UsersDbLoaderMonitor to true when EOF is reached, otherwise, store/remove the User
You can get this info off the result returned from Consume()

6. Assign each consumer its own unique consumer group using a GUID
- This ensures each consumer receives the full “snapshot”

7. Set your ConsumerConfig to have:

8. Implement three API endpoints in Program.cs:

9. Build and run the application while running the test data generator.
- You will see new user records appear in the API in real time.

Part 3: Producers

Make sure you completed Part 2: Consumers first!

In this part of the workshop, you will learn how to set up your producers, produce messages and optimize for throughput and reliability.

1. Concepts

1.a. Producer Internals

The Confluent client library is fully asynchronous and handles a lot of the “hard work” for you with the following characteristics:

  • An internal queue of your produced messages (client side)
  • It grabs a “batch” of messages and compresses them as a group (Compression on a batch is more efficient than on single messages)
  • The compressed batch is sent to the Kafka broker (This enables optimizing network requests)
  • Order of messages per topic and partition is guaranteed to be the same as the order in which produced
  • An ACK callback can be configured to be received from the broker

1.b. Configuration options

All configuration properties are exposed on the ProducerConfig class as fields and can be provided as string key-value pairs like so:

//Confluent.Kafka.ProducerConfig
var producerConfig = new ProducerConfig
{
// Prefer this over using the Set(name, value) approach.
BootstrapServers = "localhost:9092",
QueueBufferingMaxMessages = 10000
};

//Not preferred, instead use the above injected into DI
producerConfig.Set("acks", "all");

var producer = new ProducerBuilder<int, string>(producerConfig).Build()

Below are some of the key configurable options for consumers. In most cases, stick to the default:

The full list of options can be found here.

Property names are directly mapped between C# and C implementations and have the same behavior and meaning.

2. Exercises

What do I do if I’m stuck? It is highly recommended to try and solve these problems yourself and use online documentation to make it through these exercises. But if you’re absolutely stuck, we have some example solutions for each exercise in “proposed solutions”.

2.a. Blocking vs Non-Blocking Producer

The .NET producer has two methods for producing messages:

  1. A Produce method that takes a DeliveryHandler delegate
    - This is a callback that will be fired once the configured acks are received
  2. A ProduceAsync method that, if awaited, will return once acks are received

To gain the maximum throughput from the client and Kafka’s capabilities, the Produce method is preferred, as it is non-blocking and allows the client to manage traffic.

Important: invocation order of the callback is guaranteed to be the same as the producing order

Hands On

  1. Do these new prerequisites for exercise 1:
# 1. Start up the Kafka broker (if not already running from before)
docker compose -f docker-compose-multi-node.yml up -d

# 2. Create the topic by running from the root of the repository:
./kafka-exec.ps1 "kafka-topics --bootstrap-server=localhost:9092 --create --partitions=1 --replication-factor=3 --topic=kafka-exercise-producer-1"

2. Open the exercises/KafkaWorkshop.Exercises.Producer.BlockingVsNonBlocking project
- It contains boilerplate code. Write all the exercise’s code there.

3. Implement a simple producer, that produces to topic kafka-exercise-producer-1 as follows:
- Set its serializers for the key and value to both handle UTF8 strings
- You can look at the producer from the Getting Started session for help

4. Configure your producer with the following settings:

5. Create a loop that goes for a total of 5 iterations

6. Create the messages inside this loop with each message’s data:

7. Set the DeliveryReport action to write out to the console the topic, partition and offset on which it produced a message
- Hint: Get this from the “report” parameter in the callback, not from your global variables

8. Add something like the following (To prevent the program from exiting before the delivery reports can be run)
-
Inside the report callback, increment some “global” counter/int
- At the end of ProduceNonBlocking, check the counter < 5, and in a while wait on a task delay (exit when it is greater than or equal to 5)

9. Run the code!

Try the other “Produce” Method

  1. Change the Produce invocation to ProduceAsync and await it
  2. Change from writing out the message in the report, to be written right after calling the method
    - Hint: Get the report returned from the method
  3. Run this code!

Notice now you don’t need to await some counter as the message producer keeps the program running. The messages will be sent one by one (in batches of one message). This is not a desired behavior in tight loops, but is useful when you are in a single message context (e.g. a WebAPI action logging individual requests). The Kafka client will still batch messages coming from different threads.

2.b. Serialization

All data in Kafka (keys, messages, headers) is stored in a byte-array form. Serialization and deserialization is handled by the app based on an agreed upon format and contracts between the consumer and producer. The client library comes with a few built in de/serializers for primitive types — string/long/int — and provides an easy interface to implement custom ones.

To make a custom Serializer, you’ll define a class that implements the ISerializer<TOutput> interface, and you’ll implement the method byte[] Serialize(T data, SerializationContext context).

Hands On

  1. Do these new prerequisites for exercise 2:
# 1. (If you shut it down after the last exercise) Start up the Kafka broker
docker compose -f docker-compose-multi-node.yml up -d

# 2. Create the topic by running from the root of the repository:
./kafka-exec.ps1 "kafka-topics --bootstrap-server=localhost:9092 --create --partitions=1 --replication-factor=3 --topic=kafka-exercise-producer-2"

2. Open the exercises/KafkaWorkshop.Exercises.Producer.Serialization project
- It contains boilerplate code. Write all the exercise’s code there.

3. Create a serializer for the value type User called UserSerializer
- Implement ISerializer<User?>
- We’ll serialize it from Json with the property JsonNamingPolicy.CamelCase
- Make sure to handle the case where the user is null
- You'll be serializing it to UTF8 bytes

4. Implement a simple producer, that produces to topic kafka-exercise-producer-2configured with:
— (You can use your code from the previous example to start)

5. Serialize the incoming data in the following way:

6. Put your message creation and producing inside the for loop

7. Run the code!

Note: You won’t need to wait on a counter here as we’re waiting on a key press at the console

2.c. Building an In-Memory distributed/persistent cache — writing data

In this final exercise we will extend the UsersApi from Part 2: Consumers with endpoints to add users to our distributed cache.

Hands On

  1. Do these new prerequisites for exercise 3:
# 1. Start up the Kafka broker (if not already running from before)
docker compose -f docker-compose-multi-node.yml up -d

# 2. (If you tore down your Kafka cluster after the last exercise)
# Re-create the topic by running from the root of the repository:
./kafka-exec.ps1 "kafka-topics --bootstrap-server=localhost:9092 --create --partitions=1 --replication-factor=3 --topic=kafka-exercise-user-data-topic --config cleanup.policy=compact --config segment.ms=30000 --config min.cleanable.dirty.ratio=0.01"

2. Open the exercises/KafkaWorkshop.Exercises.UsersApi/ project
- It contains boilerplate code. Write all the exercise’s code there.

3. Copy over the UserSerializer from the last exercise (you might need to change the namespace)

4. Modify UsersProducer.cs as follows:
- Make a constructor that creates a producer using your DI’d Kafka config
- Use the “kafka-exercise-user-data-topic” topic like in the Consumer exercise.
- Implement the below 2 methods:

5. Implement 3 new endpoints (based on Http Method) in Program.cs.

6. Run the code

7. Test the endpoints with either Postman or the curl commands below:

#### 
#### You might need to change the port number depending on
#### how your service started up
####

# Check users
curl http://localhost:5001/users

# Create a user
curl -X POST http://localhost:5001/users -H "Content-Type: application/json" -d "{ \"Id\": 1, \"FirstName\": \"Joan\", \"LastName\": \"Sampson\", \"DateOfBirth\": \"1998-06-19T15:00:00+00:00\"}"

# Update a user
curl -X PUT http://localhost:5001/users/1 -H "Content-Type: application/json" -d "{ \"Id\": 1, \"FirstName\": \"Joannie\", \"LastName\": \"Sampsonite\", \"DateOfBirth\": \"1997-06-19T15:00:00+00:00\"}"

# Delete a user
curl -X DELETE http://localhost:5001/users/1

Note: The produced User messages will eventually be received by all consumers (in this case only our UsersApi). This essentially turns our UsersApi into an eventually consistent distributed cache. If we run multiple instances of the UsersApi, they will all receive the same updates and construct the same state.

Part 4: Advanced Configuration

Make sure you completed Part 1: Getting Started, Part 2: Consumers and Part 3: Producers first!

1. Cleanup policies: compact and delete

Data retention and message retention in Kafka are configured via retention cleanup policies. There are two types:

Changing The Cleanup Policy

  • Set it at the Topic level
  • Can be changed dynamically
  • If not specified on topic creation, will use Kafka the default (in most cases Delete policy)

1.a. How the compact cleanup policy works

Note that the Key field is required. Also, compaction is not happening continually, but instead is kicked off based on the combination of topic configurations:

Hands On

1. Open 3 separate terminal windows and make sure you’re in the root of the kafka-workshop repository in each

2. Start a Kafka cluster in one of the terminals:
- docker compose -f docker-compose-multi-node.yml up -d

3. Create a compacted topic by running the following command from the same terminal:

# Command: kafka-exec.ps1                - a helper script which will execute 
# the command on the Kafka node
# Arguments:
# kafka-topics - the Kafka command to create a topic
# --bootstrap-server - the address of the Kafka node
# --create - create a topic
# --partitions - the number of partitions for the
# topic
# --replication-factor - the number of Kafka nodes which
# will store the messages
# --topic - the name of the topic
# --config cleanup.policy - "compact" tells kafka to only keep
# the latest version per message key
# --config segment.ms - tells kafka to roll the logs into
# a new file every "x" seconds even
# if not full
# --config min.cleanable.dirty.ratio - tells it at what % of the log is
# compacted/deleted, to not rerun the
# cleanup policy. In our case, if more
# than 1% is compacted, don't rerun it
./kafka-exec.ps1 "kafka-topics --bootstrap-server=localhost:9092 --create --partitions=1 --replication-factor=3 --topic=compacted-topic --config cleanup.policy=compact --config segment.ms=30000 --config min.cleanable.dirty.ratio=0.01"

4. Run a test data generator:
- ./generate-test-data.ps1 retention-policy-compact-publisher
- Note: The generator will produce messages to a predefined topic until it’s stopped with a random key between 1 and 10 and a value equal to the timestamp at the moment of producing. Both key and value are strings. It will produce 50 messages in bulk after which it will produce a message every 500 ms.

5. In one of the other open terminals run the following command in order to tail one of the Kafka nodes’ logs:
- docker compose -f docker-compose-multi-node.yml logs — follow broker-1

6. In the third terminal run the following command in order to consume the messages from the compacted topic:

# Command: kafka-exec.ps1                - a helper script which will execute 
# the command on the Kafka node
# Arguments:
# kafka-console-consumer - reads data from kafka, i.e. a consumer
# --bootstrap-server - the address of the Kafka node
# --topic - the name of the topic
# --from-beginning - tells it to start reading from
# offset "0" instead of the current
# --property print.[key|value|offset] - tells it to print this field to the
# console for each message read
./kafka-exec.ps1 "kafka-console-consumer --bootstrap-server=localhost:9092 --topic=compacted-topic --from-beginning --property print.key=true --property print.value=true --property print.offset=true"

7. Observe the Kafka server logs from the second terminal and wait a 1–2 minutes. At some point you will see a log like the following:

broker-1  | [2023-04-12 16:00:07,211] INFO Cleaner 0: Beginning cleaning of log compacted-topic-0 (kafka.log.LogCleaner)
broker-1 | [2023-04-12 16:00:07,212] INFO Cleaner 0: Building offset map for compacted-topic-0... (kafka.log.LogCleaner)
broker-1 | [2023-04-12 16:00:07,223] INFO Cleaner 0: Building offset map for log compacted-topic-0 for 1 segments in offset range [455, 514). (kafka.log.LogCleaner)
broker-1 | [2023-04-12 16:00:07,224] INFO Cleaner 0: Offset map for log compacted-topic-0 complete. (kafka.log.LogCleaner)
broker-1 | [2023-04-12 16:00:07,224] INFO Cleaner 0: Cleaning log compacted-topic-0 (cleaning prior to Wed Apr 12 15:59:51 UTC 2023, discarding tombstones prior to upper bound deletion horizon Tue Apr 11 15:59:21 UTC 2023)... (kafka.log.LogCleaner)
broker-1 | [2023-04-12 16:00:07,225] INFO Cleaner 0: Cleaning LogSegment(baseOffset=0, size=801, lastModifiedTime=1681315161690, largestRecordTimestamp=Some(1681315162680)) in log compacted-topic-0 into 0 with an upper bound deletion horizon 1681228761690 computed from the segment last modified time of 1681315161690,retaining deletes. (kafka.log.LogCleaner)
broker-1 | [2023-04-12 16:00:07,225] INFO Cleaner 0: Cleaning LogSegment(baseOffset=455, size=5251, lastModifiedTime=1681315191950, largestRecordTimestamp=Some(1681315192943)) in log compacted-topic-0 into 0 with an upper bound deletion horizon 1681228761690 computed from the segment last modified time of 1681315191950,retaining deletes. (kafka.log.LogCleaner)
broker-1 | [2023-04-12 16:00:07,232] INFO Cleaner 0: Swapping in cleaned segment LogSegment(baseOffset=0, size=801, lastModifiedTime=1681315191950, largestRecordTimestamp=Some(1681315192943)) for segment(s) List(LogSegment(baseOffset=0, size=801, lastModifiedTime=1681315161690, largestRecordTimestamp=Some(1681315162680)), LogSegment(baseOffset=455, size=5251, lastModifiedTime=1681315191950, largestRecordTimestamp=Some(1681315192943))) in log Log(dir=/var/lib/kafka/data/compacted-topic-0, topicId=WNZOUdX2QsOb2LJl5h3TxA, topic=compacted-topic, partition=0, highWatermark=542, lastStableOffset=542, logStartOffset=0, logEndOffset=543) (kafka.log.LogCleaner)
broker-1 | [2023-04-12 16:00:07,237] INFO [kafka-log-cleaner-thread-0]:
broker-1 | Log cleaner thread 0 cleaned log compacted-topic-0 (dirty section = [455, 514])
broker-1 | 0.0 MB of log processed in 0.0 seconds (0.2 MB/sec).
broker-1 | Indexed 0.0 MB in 0.0 seconds (0.4 Mb/sec, 48.0% of total time)
broker-1 | Buffer utilization: 0.0%
broker-1 | Cleaned 0.0 MB in 0.0 seconds (0.4 Mb/sec, 52.0% of total time)
broker-1 | Start size: 0.0 MB (68 messages)
broker-1 | End size: 0.0 MB (9 messages)
broker-1 | 86.8% size reduction (86.8% fewer messages)
broker-1 | (kafka.log.LogCleaner)

The logs indicate the topic partition which we produced the messages into was compacted. Notice the lines like below show it went from 68 messages to 9:
- broker-1 | Start size: 0.0 MB (68 messages)
- broker-1 | End size: 0.0 MB (9 messages)

8. Stop the test data generator from the first terminal and the kafka-console-consumer from the third terminal.

9. Wait another 1–2 minutes for Kafka to compact the partition again.

10. Repeat step 6 to run the consumer only.
- Notice only a small amount of messages will be consumed as old messages with repeating keys were compacted.
- Notice that you might still have duplicates. Compaction does not guarantee deduplication

11. Stop the kafka-console-consumer and tear down the cluster from one of the open terminals:
- docker compose -f docker-compose-multi-node.yml down

1.b. How the delete cleanup policy works

Hands On

1. Open 3 separate terminal windows and make sure you’re in the root of the kafka-workshop repository in each

2. Start a Kafka cluster in one of the terminals:
docker compose -f docker-compose-multi-node.yml up -d

3. Create a topic with a deletion policy by running the following command from the same terminal:

# Command: kafka-exec.ps1                - a helper script which will execute 
# the command on the Kafka node
# Arguments:
# kafka-topics - the Kafka command to create a topic
# --bootstrap-server - the address of the Kafka node
# --create - create a topic
# --partitions - the number of partitions for the
# topic
# --replication-factor - the number of Kafka nodes which
# will store the messages
# --topic - the name of the topic
# --config cleanup.policy - "delete" tells kafka to only keep
# the messages that fit in the
# retention configuration chosen (a
# time based one in this example)
# --config retention.ms - tells kafka to keep messages for
# a max of "X" seconds (e.g. 60)
# --config segment.ms - tells kafka to roll the logs into
# a new file every "x" seconds even
# if not full
./kafka-exec.ps1 "kafka-topics --bootstrap-server=localhost:9092 --create --partitions=1 --replication-factor=3 --topic=expiring-topic --config cleanup.policy=delete --config retention.ms=60000 --config segment.ms=30000"

4. Run a test data generator:
- ./generate-test-data.ps1 retention-policy-delete-publisher

5. In one of the other open terminals run the following command in order to tail one of the Kafka nodes’ logs:
- docker compose -f docker-compose-multi-node.yml logs — follow broker-1

6. In the third terminal run the following command in order to consume the messages from the expiring topic:

# Command: kafka-exec.ps1                - a helper script which will execute 
# the command on the Kafka node
# Arguments:
# kafka-console-consumer - reads data from kafka, i.e. a consumer
# --bootstrap-server - the address of the Kafka node
# --topic - the name of the topic
# --from-beginning - tells it to start reading from
# offset "0" instead of the current
# --property print.[key|value|offset] - tells it to print this field to the
# console for each message read
./kafka-exec.ps1 "kafka-console-consumer --bootstrap-server=localhost:9092 --topic=expiring-topic --from-beginning --property print.key=true --property print.value=true --property print.offset=true"

7. Observe the Kafka server logs from the second terminal and wait 1–2 minutes. At some point you will see logs like the following that indicate a partition segment expired, i.e. it was deleted.:

broker-1  | [2023-04-12 16:11:25,615] INFO [LocalLog partition=expiring-topic-0, dir=/var/lib/kafka/data] Rolled new log segment at offset 566 in 1 ms. (kafka.log.LocalLog)
...broker-1 | [2023-04-12 16:11:56,100] INFO [UnifiedLog partition=expiring-topic-0, dir=/var/lib/kafka/data] Deleting segment LogSegment(baseOffset=402, size=9345, lastModifiedTime=1681315854830, largestRecordTimestamp=Some(1681315855820)) due to retention time 60000ms breach based on the largest record timestamp in the segment (kafka.log.UnifiedLog)

8. Stop the test data generator from the first terminal and the kafka-console-consumer from the third terminal.

9. Wait another 1–2 minutes for Kafka to delete a few segments

10. Repeat step 6 to run the consumer only.
- Nothing should be printed on the screen as all messages have expired.

11. Stop the kafka-console-consumer and tear down the cluster from one of the open terminals:
- docker compose -f docker-compose-multi-node.yml down

1.c. How the combined compact,delete cleanup policy works

Compact + Delete
  • Note that the Key field is required.
  • The “Cleaner point”, i.e. when the policy will kick in to start cleanup, can be controlled by the properties retention.ms or/and retention.bytes
  • Kafka stores messages that have the same key into the same partition (this is due to them all getting the same hashcode). As a result, log compaction can happen within a single Kafka broker (per partition replica)
  • Also, compaction is not happening continually, but instead is kicked off based on the combination of topic configurations:

Hands On

This one’s extra credit!

2. Deep Dive: Understanding Segments, Partitions and Compression

2.a. Segments and Partitions

  • Compaction and retention works only for non-active segments
  • A Producer can choose to publish to a specific partition using ProducerRecord's partition number parameter
    - This cannot be done via kafka-console-producer
    - Kafka by default will choose the partition for the producer
  • WARNING: You cannot scale simply by adding more consumers
    - Only one consumer from a consumer group (same group.id) will be able to read from the same partition at a time
    - To increase parallelism, you need more consumers and partitions

You can increase your understanding of how Kafka works by reading more detail on Kafka storage internals: segments, rolling and retention

2.b. Compression

Kafka supports compressing messages before they are sent from the producer and before they are stored on disk. Kafka supports configuring this compression to be initiated from either the producer or the broker.

It supports the following compression types (with some of the following tradeoffs):

  • It is better to use compression configured on the producer to remove CPU overhead from the brokers
    - Set compression.type to be the same in the producer and topic configs
    - This distributes the compression across all producers instead of all brokers (e.g. 100 producers to 3 brokers)
  • Compression reduces the amount of disk space needed to store messages
  • While compressing the message adds a little latency in message dispatch from the producer, with much lower network bandwidth, overall it should improve the performance of producers and consumers

You can learn much more about the different configurations and changes to your flow by reading more on Message compression in Apache Kafka.

You‘re Ready to Explore Kafka Further Now’!

Congratulations, you finished the Kafka workshop. Check out our the DraftKings blog where we have some upcoming articles on how we use Kafka at DraftKings.

We’d also love to hear from you in the comments on how this workshop went for you and how you plan to use Kafka at your company.

Want to learn more about DraftKings’ global Engineering team and culture? Check out our Engineer Spotlights and current openings!

--

--