Managing Consumer Commits and Back-pressure With Node.js and Kafka in Production

Yoni Goyhman
WalkMe Engineering
Published in
5 min readApr 8, 2019

Kafka has established itself as the go to messaging system for Big data and large scale application.

Even though it is JVM based, you can find client implementations in just about every programming language.

While basic setup is quite easy, and multiple tutorials are available for getting started, moving from development to production is a different story, especially when working on a non native client like Node.js.

In this post I will go over the main challenges in creating a Kafka based Node.js consumer that is production ready.

During my work I have tried several Node.js Kafka clients, including node-rdkafka, kafka-node, kafkajs and even a native Java implementation using GraalVM.

I will not go over my impression of each library, but one thing in common for all solutions was they all share the same 2 challenges:

  1. Handling back-pressure
  2. Handling commits in a safe manner

One might expect these issues to be handled internally by Kafka or the implementing library, but for some reason they are considered “advanced” use cases, and are left to be handled by the developers.

The pain in the Back (Pressure)

Kafka is fast. Very fast. The first thing you notice when working with Kafka is that the bottle neck moves from fetching messages to actually handling them. Adding the fact that Node.js runtime is single threaded and has one call-stack, you can very easily run out of resources if not cautious.

Lets look at an example:

A Kafka client fetches messages as fast as possible, and passes them to a handle Callback. If this callback is is synchronized, the client might wait for it to finish before passing the next fetched message. In the meantime, newly fetched messages are stacked in the clients internal queue. We might have a way of limiting that queue size by setting queued.max.messages.kbytes, causing the client to stop fetching new messages after a certain queue size, but this configuration is not always available.

Handling messages in a synchronous manner is rarely what we want to do. Even though Node.js runtime is single threaded we would like to utilize the full potential of the host machine/container, and might be using 3rd party libraries for handling messages in parallel. This means our callback will be an asynchronous function, and the client will never stop fetching messages, causing Node.js to run out of memory due to a stack or heap overflow.

An absurd code that shows a possible delay in message handling

Let’s assume we have a topic that is already full. The code above has a handle function that consumes data with a 1 second delay. Once it starts consuming, our call-stack will be filled with as many messages it could fetch in 1 second, which could easily be 10K messages or even more.

Another problem might be an “out of memory” error due to multiple parallel handles. Even if the call-stack size remains reasonable, the handling itself might be memory consuming:

Big data, right?

In the code above, each message being handled consumes approximately 320 MB of memory. Running just 10 handles in parallel will consume 3.2 GB of memory and will cause an “out of memory” error. Especially if our availability strategy is to scale out multiple relatively small docker containers.

Handling Back-Pressure

The solution for this problem consists of 2 parts:

  1. Pausing the Kafka consumer when we reach a certain limit of fetched and unhandled messages, and resuming once we are done.
  2. Limiting the number of parallel handles.

Since we might not always have access to the clients’ internal queue, the best approach in this situation is to create and manage the queue as part of the application.

Here is an example of how to do it using node-rdkafka as the node client library:

Limiting the amount of handlers using the async library

In the code above, instead of directly handling the fetched messages, we define a queue with a max size and a max parallel handles (line 33), and push new messages into that queue (line 26).

We then pause the Kafka consumer once we reach the size limit (line 27).

Whenever the queue is drained of messages, we resume consumption (line 38). This guards us from potential memory issues in our application.

Notice the rebalance handling on line 49. Once a rebalance occurs, we no longer need to handle the remaining queued messages, as they will be redelivered to the new assignee that is part of our consumer group. Consequently, we drain our queue.

node-rdkafka manages consumer state per partition. We need to resume paused topics on rebalance. If not, it will remain paused and won’t consume even if we reassign.

Managing commits without losing data

Committing is the way of telling Kafka we are done processing a message. Commits are done per offset, per partition, per consumer-group of a specific topic. Committing offset 5 of partition 1 for some consumer group, means all messages up to offset 5 have been handled successfully, and should not be delivered anymore.

Most Kafka consumption examples and tutorials use an auto commit configuration, where the client will auto commit every time interval. This is dangerous because there is no guarantee that a message is successfully handled during this time period.

If, for example, our handling takes longer than the commit interval, and the handling fails, we would have lost the failing message as it would have been committed as successful to the broker.

Manual commit examples, for some reason, are always presented in a one by one message consumption flow: we consume a message, handle it, and then commit the offset. This flow is very limiting since we cannot handle messages in parallel.

For us to be able to safely commit messages, without a limitation on parallel handling, we need to manage the commits by ourselves. Adding to the example above, this is how it would look:

In the example above, we track all received messages (divided into partitions). We mark them as we successfully finish processing them (line 24 in manual-commits.js, line 29 in commit-manager.js).

We set a time interval to iterate over all the received messages (line 12 in commit-manager.js). On each such iteration, we find the first index we can commit in each partition (this would be the first unhandled index in each partition), and commit it to the broker (line 41, in commit-manager.js).

This way we can guarantee we only commit messages that were successfully processed, and we don’t lose any messages.

Notice that we also mark messages which have not been processed successfully as finished (line 24 in manual-commits.js). In a real application, we will use a retry mechanism of some sort to handle the message. In any case we will not leave an unhandled message in the array, as it will prevent commits to the specific partition.

Additional considerations

In this article, we went over some important concepts when using Kafka for high volume consumption. We learned how to safely handle our messages in parallel with the queue technique. We also learned how to prevent data loss by manually committing only handled messages.

Implementation may vary depending on the selected client library. Some libraries will require manual handling of heartbeats. Others will require more careful consideration, like retry mechanisms for pausing and resuming while re-balancing.

The concepts suggested in this article are relevant all the same. I’ve prepared another implementation using Kafkajs that can be found here.

--

--