Chunks Producer/Consumer

Handling Large Messages Within Kafka

Amit Pe'er
Wix Engineering
5 min readJul 28, 2020

--

Hello, Kafka fans. Have you ever encountered the following exception?

org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

Or this?

The message is 37455369 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

If the answer is yes — this post is for you. If the answer is no, then this post may be helpful for your future selves.

At Wix, we widely use Kafka in various use cases, across multiple micro-services. One day, I started to occasionally notice one of these exceptions. Indeed, in one particular use case, my application would sometimes try to produce a message larger than the maximum size allowed (which was 1 MB).

— So what do I do now?

Disclaimer — This article requires some basic knowledge about Kafka. Yet, the solutions provided below are somewhat higher in the abstraction level and do not dive deep into Kafka low-levels API.

Easy — remove the limitation.

As the typical developer I am, the first thing I tried to do is to figure out how to remove the limitation, or at least extend it. I poked around, googled it, and asked a few experts. After some investigation, I figured out that it’s not recommended, might slow down the whole Kafka cluster, and cause issues.

I had no choice but to scratch my head and try a different approach.

First thought — refactor

Of course, I could introduce a refactor within my application so that it would never attempt to produce large messages at all. Naturally, It was my least favorite choice, as it would consume too much time and effort. I didn’t want to change the architecture just to solve somewhat of an edge case.

Coming up next — external storage

The next idea was as follows: when a large message is encountered — upload it to external storage (such as S3), and produce a link to download it. The consumer would then have to go ahead and download the message from that storage. It sounded like a decent solution, but I was annoyed by the fact I introduced another point of failure to my application. I had to deal with a 3rd party storage solution, study its API, and it might be an opening for a whole new set of bugs. I preferred to remain within the Kafka context and enjoy its resilience.

Last but not least — Chunks Producer

I initially thought about is as pieces producer, but then someone suggested the word chunks and it sounded much better. The initial idea was the following:

Producer:

  1. Whenever a large message is encountered, split it into chunks such that no chunk is larger than X bytes (X = message.max.bytes property).
  2. Give each chunk a unique ID.
  3. Store each chunk in persistent storage. Luckily enough, I could use an in-memory key-value persistent store backed by Kafka, which was implemented here at Wix. This way, I could stay within Kafka-context as I desired, such that each chunk is actually produced to a compacted topic. Read more about it in bullet #3 here.
  4. Produce an array of all the chunk IDs.

Consumer:

  1. Consume the chunk IDs.
  2. For each ID, fetch the chunk from the datastore (eventually consistent spoiler: what happens if the chunk is not there yet?).
  3. Aggregate the chunks to the complete (original) message.
  4. Invoke the original handler with the original message.
  5. Delete the chunks from the store.

Fine-tuning & Pitfalls

As this was merely the draft, while I was implementing that feature (which was available as an opt-in only for other developers too), a few ideas for improvements came up.

Consider eventual consistency
Bear in mind that Kafka guarantees eventual consistency. Consider the following scenario:

  1. Producer: Chunks are delivered to the store in an async and eventual-consistent fashion (via Kafka or any other async framework).
  2. Producer: Chunk ID’s are produced.
  3. Consumer: Chunk ID’s are consumed.
  4. Consumer: Fails to get a chunk from the store — it’s not there yet. Bullet #2 didn’t wait until bullet #1 is finished.

This problem can be solved in different ways. I chose to retry on the consumer’s side until all the chunks have indeed arrived: for each chunk ID, try to fetch the chunk. If not found — throw a “blocking” exception which will cause a retry for the consumer and block the consumption of further messages.

Compression
Of course, if compressing the data helps you get below the threshold without having to split it— go for it and throw this post to the trash. But if compressing doesn’t help, I see no reason to do it and to split into chunks. Perhaps if your large messages throughput is extremely high and you really need to save space.

Disk > Memory
I had initially used an in-memory store to persist the chunks. The problem was that for a high latency of large messages my application memory could blow up. So I switched to using disk space: the same K/V store, backed by Kafka, but instead of reading/writing from an in-memory data structure — read/write from the disk (powered by H2 DB engine).

At least once
If you guarantee an at least once delivery, beware of the following pitfall:

  • Chunks consumer composes the original message.
  • The original handler is triggered.
  • Chunks are deleted from the store.
  • **RETRY.** for some reason, the chunks handler retries (due to the “at least once” guarantee - this can happen for different reasons).
  • Chunks consumer starts over again and tries to fetch the chunks from the store, but they are already deleted.

You could solve that in a few different ways. I chose not to discuss it in this post, since its a very unique edge case. Although I do hope it intrigues your imagination.

Summary

In this short post, I had introduced a few ideas as to how to handle large messages within Kafka world and a detailed implementation for one of them. The implementation details can vary, but the idea is pretty much the same: split to chunks; persist the chunks somewhere (preferably by using Kafka); produce a pointer/reference to the chunks; aggregate them on the other side; trigger the original handler.

As always, you’re welcomed here to read more of my posts. Thanks for reading, and see you next time.

Many thanks to Natan Silnitsky and Noam Berman for the support.

--

--