Processing Large Messages with Kafka Streams

Philipp Schirmer
bakdata
Published in
5 min readFeb 20, 2020

--

Photo by Glen Noble on Unsplash

Kafka Streams is a DSL that allows easy processing of stream data stored in Apache Kafka. It abstracts from the low-level producer and consumer APIs as well as from serialization and deserialization. Apache Kafka is designed to handle many small messages. However, with the increasing popularity of Kafka Streams and Apache Kafka as a streaming data processing platform, use cases dealing with very large messages arise.

Apache Kafka limits the maximum size a single batch of messages sent to a topic can have on the broker side. This limit is configurable via the max.message.bytes configuration and uses a default of 1MB. Nevertheless, there will always be single messages exceeding this limit. In some environments, e.g., Confluent Cloud, we might not even be able to control this limit arbitrarily. Thus, we need to come up with a solution to process messages regardless of their size.

A common solution to this problem is to split documents into chunks so that each message is smaller than the maximum size and process the chunks separately. However, this requires changes to the underlying processing logic. Furthermore, some use cases, such as complex NLP tasks, require knowledge of the complete document.

Therefore, we came up with a different solution. We implemented a SerDe, that transparently stores large messages on Amazon S3: https://github.com/bakdata/kafka-s3-backed-serde. This SerDe works with any other SerDe. When serializing a message, it uses the actual SerDe and checks if the serialized message size exceeds a configurable limit. If that is the case, the serialized message is stored on Amazon S3 and the unique URI of the S3 object is sent to Kafka. If the message does not exceed the maximum message size, it is directly sent to Kafka. When deserializing a message, the actual message is downloaded from S3, if needed, and then deserialized using the actual SerDe.

In order to deserialize the messages properly, we use the first byte as a flag to denote whether it is stored on S3 or not. Thus, the overhead of using Kafka S3-backed SerDe is solely one byte. If a message is larger than the limit, the message sent to Kafka is much smaller. However, serialization takes a bit longer because the message needs to be sent to S3. The key of the stored S3 objects is partly randomly generated. It always starts with a configurable prefix followed by the topic name and either keys/ or values/. Because we cannot determine the partition and offset a message will be written to, we finally append a random UUID to the S3 object key. Thereby, each message has a unique key on S3 because the probability of a collision is negligible for random UUIDs.

When serializing keys with our SerDe, it is important to note that the partitioning of the down-stream Kafka topic changes because the unique S3 URI is used to assign a message to a partition. In general, we think that Kafka message keys should be much smaller than message values and thus there is, though technically possible, no need to use our SerDe for keys.

Usage

Using the SerDe is as simple as using any other SerDe:

A simple Kafka Streams application that reads from an input topic and writes to an output topic. Data is text-based and serialized using Kafka S3-backed SerDe.

Alternatively, the SerDe can be registered as the default SerDe:

Configuring Kafka S3-backed SerDe as default SerDe.

You can add it via Gradle:

compile group: 'com.bakdata.kafka', name: 's3-backed-serde', version: '1.1.1'

Or via Maven:

<dependency>
<groupId>com.bakdata.kafka</groupId>
<artifactId>s3-backed-serde</artifactId>
<version>1.1.1</version>
</dependency>

Large messages stored on S3 are not automatically deleted by Kafka S3-backed SerDe. However, with Kafka, data is usually stored for a limited amount of time by defining topic retentions. S3 offers a similar concept, Object Expiration, which allows us to have the same retentions for messages stored on S3 as for messages stored in the corresponding Kafka topic.

Configuration

The SerDe offers the following configurations:

s3backed.key.serde Key SerDe class to use. All SerDe configurations are also delegated to this SerDe.

  • Type: class
  • Default: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
  • Importance: high

s3backed.value.serde Value SerDe class to use. All SerDe configurations are also delegated to this SerDe.

  • Type: class
  • Default: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
  • Importance: high

s3backed.base.path Base path to store data. Must include bucket and any prefix that should be used, e.g., s3://my-bucket/my/prefix/.

  • Type: string
  • Default: “”
  • Importance: high

s3backed.max.byte.size Maximum serialized message size in bytes before messages are stored on S3.

  • Type: int
  • Default: 1,000,000
  • Importance: medium

s3backed.access.key AWS access key to use for connecting to S3. Leave empty if AWS credential provider chain should be used.

  • Type: password
  • Default: “”
  • Importance: low

s3backed.secret.key AWS secret key to use for connecting to S3. Leave empty if AWS credential provider chain should be used.

  • Type: password
  • Default: “”
  • Importance: low

s3backed.region S3 region to use. Must be configured in conjunction with s3backed.endpoint. Leave empty if default S3 region should be used.

  • Type: string
  • Default: “”
  • Importance: low

s3backed.endpoint Endpoint to use for connection to Amazon S3. Must be configured in conjunction with s3backed.region. Leave empty if default S3 endpoint should be used.

  • Type: string
  • Default: “”
  • Importance: low

s3backed.path.style.access Enable path-style access for S3 client.

  • Type: boolean
  • Default: false
  • Importance: low

Kafka Connect

Kafka Connect is a framework to connect external systems to your Kafka cluster. In order to read and write data serialized using the S3-backed SerDe with Kafka Connect, we also provide a Converter. It is available on Maven Central as com.bakdata.kafka:s3-backed-connect:1.1.1. Once the JAR is deployed on your Connect cluster, you can configure the converter of your Connect job as com.bakdata.kafka.S3BackedConverter. Just like the SerDe, you can use any existing Converter to delegate serialization. In addition to the configurations available for the SerDe (except s3backed.key.serde and s3backed.value.serde), you can configure the following:

s3backed.converter Converter to use. All converter configurations are also delegated to this converter.

  • Type: class
  • Default: org.apache.kafka.connect.converters.ByteArrayConverter
  • Importance: high

Limitations

Because SerDes cannot determine whether a serialized message has been actually sent and committed to Kafka, there is the risk of having messages stored on S3 that are not associated with any committed Kafka message. This potentially wastes some S3 storage but does not have any impact on the correctness of the SerDe.

Messages serialized with the SerDe can also not be processed using Confluent KSQL because custom formats cannot yet be implemented.

Conclusion

Apache Kafka is the state-of-the-art stream processing platform and with its increasing popularity, use-cases arise which require processing of large messages. Kafka itself is not able to process messages of arbitrary size and the environment where Kafka is running may impose further restrictions.

We implemented an open-source SerDe that transparently handles such messages by storing them on Amazon S3. This SerDe can be used with any existing Kafka Streams application and requires only few configuration changes. No changes to the processing logic or the sent messages are required. Furthermore, the SerDe can also be used by the Kafka Connect framework.

As storing messages on S3 comes with a small overhead, the SerDe is best used with data containing many records small enough to be directly sent to Kafka and few records exceeding the Kafka limits. Nevertheless, the SerDe can process data containing only records exceeding the limits. In such case, an evaluation should be made if the size of the messages can be reduced or whether Kafka is an appropriate processing platform.

--

--