Load test a Kafka application with Gatling

A brief description of how I’ve modified a Gatling plugin to suit our needs and create a meaningful load test scenario for our application

Davide Pedone
NEW IT Engineering
5 min readJul 22, 2024

--

Recently I’ve been asked to setup a load test on a really interesting project which I’m working on and I would like to tell you about but…it’s classified. I could tell you, but then I’d have to kill you. So, for yours (and mine) safety, allow me to be a bit generic describing the architecture and what we want to achieve: we have an application that consumes messages from a Kafka topic “A”, it does some processing calling different APIs and publishing intermediate and then final status update to a Kafka topic “B”. It goes without saying that we want to know how long it will take for a message to reach the final status and how many message per second our application can process.

Quote from the movie Top Gun.
Image from https://giphy.com/

Back in days when I started working on it the most recently updated third party plugin listed on Gatling docs was, you don’t say, Gatling Kafka Plugin from Tinkoff.

Matching request and response

Out of the box the plugin allows to match messages based on the payload or providing a function to extract the desired information (key, field from payload, etc) but neither of the two options could work for us because we need two different functions, one for the request and one for the response. A small addition to the protocol builders

Changes on KafkaProtocolBuilder classes

allows to easily customise the match logic:

Sample usage of the generic KafkaMatcher

I’ve then started a local Kafka cluster, all required applications and setup a scenario, and within a few minutes I was able to run the first test. Everything went smoothly, results met expectations so I was almost ready to close my task but, as a good boy, I first tried to run some scenarios against DEV environment and unfortunately the Gatling report showed only failures. The “it works on my machine” curse struck again.

Investigating and fixing the failures

The first possible cause off the top of my head was related to offset: locally I had an empty topic, in DEV environment the consumer topic contains thousands of old messages and, since according to kafka stream docs default value for auto.offset.reset is earliest, I’ve set the property to latest and give the test another go. The outcome was slightly better but some messages were still reported as failure and also the response time section looked very concerning.

A failure means that Gatling was not able to find a message in consumer to topic for a given produced message, so since that I don’t trust my code that much I thought that somehow under heavy load my application was not able to process some of the consumed messages. I checked the logs: no errors or issue. I was even able to see the messages on kafka topic, so what was going on? Let’s have a look at plugin source code, in particular at KafkaRequestReplyAction scala class:

As you can see here in the send callback we simply call the tracker’s track method in order to listen for a reply. So far so good, let’s check then TrackersPool class:

Ok, now it’s clear. When the track method is called for the first time the ConcurrentHashMap is empty, so it has to create first a KafkaStream but this could require some time. No big deal locally with only 1 empty partition, but could be an issue on DEV/higher environments with multiple partitions and thousands of messages, mTLS, network latency, etc.

I’ve then edited the TrackersPool class, adding a method to get the tracker for a given topic and a KafkaStreamStateListener. The idea is maybe not elegant but simple: instantiate the stream and wait for it to be in the RUNNING status before start sending messages. A few runs against DEV environment confirmed that I was then able to match all the sent messages. Then it was time to tackle what I hope is the latest problem, response time.

Again, inspecting the code in TrackersPool class clearly highlights why the response time in report was way more high than expected. As you can see on line 35 of TrackersPool.scala class, when the plugin creates the MessageConsumed object it considers the current timestamp, meaning that: given x the timestamp of the sent message, y the timestamp of the response produced by our application under test and z the timestamp of when Gatling has consumed the message, then in the report we will see the value of z-x instead of y-z that would makes much more sense for our testing purposes. The CustomProcessor class I’ve created and injected into the stream contains the “old” logic for notifying Gatling that a message has been consumed, but gives us access to the Kafka record object and therefore the timestamp.

Make life easier for the QA team

As mentioned earlier, the tested application produces multiple messages to publish processing updates. Some of them are just informative, some of them are warnings due temporary errors/issues, some of them are actually a final state. For our load testing purposes, the QAs not only wanted that the scenario to match the “final” state exclusively but also to be able to differentiate between success and failure. These QAs…

Developers will never admit it but they ❤ QA team

Seriously, those requests were legit and allows to have meaningful test scenarios. To ignore the intermediate messages the generic KafkaMatcher came to the rescue. In the responseMatch function we can easily implement our own logic to return the matching byte array only for final success messages but this leads to a problem: the failure messages are not matched and are reported as failed in the Gatling report, making them indistinguishable from real unmatched messages.

I’ve then enhanced/fixed the existing KafkaCheck.Simple to allow defining a condition to determine whether a matched message should be considered a success or a failure, and to pass a custom error message that will show up in the Gatling report.

Usage of KafkaCheck.Simple to determine if a message should be considered as success or failure

Let me know what do you think about it or propose a better solution in the comments. You can find all the described changes in this MR on GitHub.

--

--