Queues for Kafka

Andrew Schofield
9 min readMay 26

--

Apache Kafka is a wildly successful, open-source, event-streaming platform. Its combination of horizontal scaling and strong ordering is really powerful for creating modern, responsive, event-driven architectures. Over the years I’ve been working with users of Kafka, the challenge they mention most often is to do with consumer groups and scaling up the number of consumers.

Consumer groups are designed for ordered delivery of streams of events to a group of consumers. With a consumer group, Kafka looks at the topics the consumers have subscribed to and assigns the topics’ partitions to the consumers in the group. Each consumer in a consumer group is granted exclusive access to consume from the partitions it has been assigned. This means that you need to have at least as many partitions as consumers. As a result, users sometimes “over-partition” which means creating additional partitions purely to enable scaling up the number of consumers to cope with peak loads.

Consumer groups are great for throughput and ordering, but many people do not always need the guarantees that assignment brings. They use consumer groups because that’s what Kafka provides.

Thinking about this, I wondered whether Kafka could be improved by an alternative to consumer groups for situations in which the consumers do not need partition-based ordering. And that’s what led to KIP-932: Queues for Kafka.

What’s a KIP?

A Kafka Improvement Proposal is a proposal to make a major change to the open-source Apache Kafka project. It all starts by publishing a proposal document on a wiki which is reviewed by the community, commented on and revised until everyone is happy that it’s in good shape. Then, the KIP is voted on by the members of the community. If it gets 3 or more positive votes by the project’s leaders (called committers), it is adopted and the code gets written and merged into Kafka.

Creating a significant new KIP and implementing it is an expensive undertaking. It takes a lot of work to get agreement that the proposal is ready, and the bar is very high to get the code accepted into Kafka. This is to be expected for such a mission-critical piece of software.

Queues for Kafka

KIP-932 introduces a new kind of group called share groups. Share groups do not replace consumer groups. You choose which kind of group you want to use based on the consumption behaviour you desire.

Share groups and consumer groups compared

The consumers in a share group work cooperatively together in a way that will be familiar to users of traditional message queues. You will be able to write Kafka consumer applications that use share groups to consume records from Kafka topics, and you can just scale up the number of consumers without having to worry about partitions. This is just like using a traditional message queue with multiple receiving applications.

In a share group, it is as if every consumer is assigned all partitions to share with all of the other consumers, and then they are given distinct sets of records to consume. This is a lot like a durable shared subscription in JMS.

The consumers can acknowledge record delivery individually, but more commonly they’ll do it a batch at a time. This is good for efficiency. Here’s an example of a snippet of Java code taken from the KIP for an application that uses a share group and acknowledges the records as a batch:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("enable.auto.commit", "false");
props.setProperty("group.type", "share");
props.setProperty("group.id", "myshare");

KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(props,
new StringDeserializer(),
new StringDeserializer());

consumer.subscribe(Arrays.asList("foo"));
while (true) {
// Fetch a batch of records acquired for this consumer
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
doProcessing(record);
}

// Commit the acknowledgement of all the records in the batch
consumer.commitSync();
}

Apart from the line that sets the property group.type to “share”, the code would be identical if it was using a consumer group. Each call to poll returns a batch of records which are then processed and the call to commitSync acknowledges their delivery. The KIP also describes other ways to acknowledge delivery. It’s important to bear in mind that this KIP is not yet approved, so the details might change quite a bit.

The point to take away is that this is a Kafka application using an extension to the Kafka client API, and you can run as many instances of it as you like without worrying about partitions.

How is this a queue?

From the point of view of an application using a share group, it behaves just like a traditional message queue.

The messages on a message queue have multiple states, although you may not really see them directly. Let’s consider an existing MQ system that supports the JMS API. Initially, a message on a queue starts off available for consumption. As it’s consumed using the JMS API, the message is logically but not yet physically removed from the queue, and it’s not visible to other applications. Once the application acknowledges delivery, the message is actually removed from the queue. There have been multiple state changes for the message.

Queues for Kafka is introducing a similar idea of states for the records which are in the process of being delivered. It calls them in-flight records. The share group is essentially managing the delivery of the in-flight records.

The set of in-flight records for a share-partition has a start offset and an end offset. As in-flight records complete delivery, the start offset moves forwards along the partition and the end offset also moves forwards ahead of it. The distance between these offsets is limited to a maximum number of in-flight records, just for the purposes of resource management. The segment of the partition between the start offset and the end offset is a sliding window that moves as records are consumed. Ideally, the sliding window of in-flight records would be very near the end of the partition most of the time, which means that the consumers are keeping up with the arrival of new records.

When a record is fetched for a consumer in a share group, it is acquired for that consumer, which is a change of state. It stays in this state for a limited period of time (30 seconds by default) which should be sufficient to process the record and acknowledge it in most cases. When the consumer acknowledges the record, that’s another change of state. But if the record is not acknowledged sufficiently quickly, perhaps because the consumer crashed or couldn’t process it, it becomes available for another delivery attempt.

The delivery attempts are counted so that unprocessable records can be handled automatically. By default, each record is permitted 5 delivery attempts, after which it is logically archived and will not be delivered again. What this means in practice is that an occasional poison message does not break things, it just causes a temporary inconvenience.

In contrast, when you’re using a consumer group, the records do not have states. The consumer group has a committed offset for each topic-partition and records have offsets either before or after that committed offset. There’s no per-record state management at all.

Interesting, but I have questions

So where are the queues in Kafka? Aha. You noticed. Share groups let you consume records from topics as if they were queues. You can take an existing application that uses a queue and convert it to use a topic and a share group in a way that would be really difficult with a consumer group. But it would be inconvenient to introduce queues as a separate resource alongside topics. That’s why the KIP introduces share groups as a new way of consuming records from topics. Nothing changes for the producers. So, it’s “Queues for Kafka” but it’s still Kafka.

What if I have 2 share groups subscribed to the same topic? It’s as if there are two queues backed by the same topic, independently managing the in-flight records and delivering the records to their groups of consumers.

Can I use consumer groups and share groups together on the same topic? Yes, absolutely.

Is this point-to-point or publish/subscribe messaging? To use JMS terminology, it’s publish/subscribe. Of course, if there’s just a single share group subscribed to a topic and no consumer groups, it just boils down to the same thing.

Do share groups support ordering? No, delivery is unordered. I can see that key-based ordering would be a valuable improvement, but KIP-932 doesn’t include that. Sounds like a future KIP to me.

Do share groups support exactly-once semantics? Not yet. Delivery is at-least-once. I know how to do exactly-once, but didn’t want to make KIP-932 too big.

Does this mean users of traditional MQ brokers such as ActiveMQ can throw them away and use Kafka? Not so fast. The way I think of it is that Kafka will now be able to support a variety of queuing use-cases that previously used to demand a separate MQ broker. But, this is still Kafka and you need to write a Kafka client application to use it. If you had deployed an MQ broker alongside Kafka just to handle queuing, maybe it’s no longer required.

Does this mean that all MQ brokers can be replaced with Kafka? Not at all. With KIP-932, Kafka doesn’t suddenly gain two-phase commit and API compatibility with legacy systems (well, not yet anyway). There are some COBOL programs that will live forever.

But aren’t message queues and event streams fundamentally different? Well, that’s what some people will tell you, but my view is different. That’s possibly slightly heretical but I’m good with that.

As with most things, it’s not black or white. Let’s think about cloud messaging services. The extremely high-scale cloud queue services have APIs designed primarily for scale. They’re at one end of the spectrum. They silently split the queues into shards for scalability, and replicate them across servers and zones for availability. They’re good at sharing the messages with a lot of consumers, but if you want to distribute messages to multiple targets, you need to use another service too. At the other end, there are event-streaming platforms with partitioned or sharded topics, and consumers are intimately aware of the partitioning and their position in the stream of events. They’re at the event-streaming end of the spectrum. If you want fine-grained sharing of events to a lot of consumers, you’re out of luck.

Traditional server-based MQ systems are kind of in the middle in terms supporting both message queues and publish/subscribe.

Just as some traditional MQ systems also support publish/subscribe even though they’re queue-based to their core, Queues for Kafka gives queuing behaviour to Kafka consumers in a share group, even though it’s partitioned topics all the way down. So, Queues for Kafka extends the reach of Kafka by providing an alternative model for consuming applications.

Why use Kafka for this?

Queues for Kafka are not like other queues. There are three fundamental benefits to using Kafka for queuing use-cases that I see.

First, Kafka topics can have infinite storage. A Kafka topic doesn’t have a “maximum queue depth”. It is of course true that you need to ensure that, over time, the consumers of records can keep up with the producers, but a temporary backlog of a large number of records during a daily peak every morning or an application upgrade should not impact the performance of the system. The phrase “an empty queue is a fast queue” should set off alarm bells. It’s an excuse.

Second, because a share-partition is just a sliding window on a Kafka topic-partition, you can reset the position of the window to any point in time. This idea is known by terms such as “message replay” or “point-in-time recovery”. It’s possible because Kafka doesn’t immediately throw away consumed records. You can retain records based on capacity, time or just keep them all on cheap cloud storage. In Kafka, retention is an administrative decision.

Finally, wouldn’t you like to use other nice things in the Kafka ecosystem such as schemas or Apache Flink with your queues? Of course you would. Because a share group is just a way of consuming records from Kafka topics, it plays nicely with everything that already exists.

What next?

Obviously, the first thing is to get KIP-932 approved and implemented in Apache Kafka. Because it’s quite a large proposal, I imagine that it will initially start in a kind of “preview” state before eventually reaching production-readiness.

The KIP also mentions some possible future work including dead-letter queues, key-based ordering and exactly-once semantics.

There’s a lot of work ahead to make this a reality. Sounds like a lot of fun.

--

--

Andrew Schofield

Software engineer and architect at Confluent. Messaging expert. Apache Kafka contributor. My words and opinions are my own.