5 things I’d better know before starting work with Kafka

Kirill Velikanov
12 min readJan 1, 2024

--

Over the course of multiple years using Kafka for production workloads within different environments I came up with a base list of trade-offs one should consider for cooking it right. My use cases were mostly to support Digital Transformations and unlock real time data streaming within large organisations. While adopting it for enterprise use and for many different workloads I’ve faced lots of different challenges and non-obvious trade-offs, which I’d like to share with you today.

A producer an a consumer applications uses Kafka Broker and Schema Registry.

For those sitting in the back, and completely unfamiliar with the Kafka I’d recommend just to check Apache’s website, to gain basic understanding of the system at kafka.apache.org/intro. It is assumed that the reader is broadly familiarised with its basic concepts like ones below:

Kafka backups

First, let’s recall that records are kept in one topic, while offsets committed by consumers are stored in the other (called __consumer_offsets), and by default writes over these two topics are not atomic. Such default behaviour leads to two topics being backed up separately one from another, without any consistency guarantees, so after the recovery an inconsistent state is quite typical to occur:

  1. Offset topic is behind the data topic, leading to data reprocessing and duplicates after recovery by having consumers reading the same records.
  2. Offset topic is beyond the data topic, leading to records being skipped from processing.
  3. The above may happen only for some partitions and only for some of the topics, typically depending on the throughout of the data topic.
Producer application produces records. Consumer applications not only consumer records from data topic, but also commit offsets to __consumer_offset topic, to track down processed records.
Produced records and consumer offset relations.

In other words, you either face data duplication or data loss, on top of being back in time against RPO guarantees. While duplication is broadly addressed by applying idempotency techniques, data loss is generally not an option. Given Broker’s log is not editable, the only way to fix the consistency is to fix consumer offset values. There are many solutions to restore the offsets, starting from handpicking consumer offsets after restoration using application logs, up to enabling transactions across the whole cluster.

While Kafka transactions seem promising at the first glance I find it rather hard enabling it for all types of workloads, as some applications might not support it (e.g., interaction with external systems within single Kafka transaction) or performance degradation becomes too significant.

Compared to typical Relational Databases (RDBMS), where consistent backup is a built-in capability (e.g., “just copy the transaction log file”), one might wonder if a consistent Kafka state recovery after a failure is even achievable?

One of the most obvious ways is to try to do a filesystem snapshot of the cluster, same as one would do for RDBMS. Unfortunately, this doesn’t address the consistency problem between the data and offset topics as they’re not written atomically. Additionally, it’d be quite a challenging process to snapshot file systems under heavy use. One might consider doing an off-peak backup, e.g. overnight, but the consistency problem still persists. It’s worth mentioning, that Kaka is a Distributed System itself, and most likely applications using it are integrated with other systems, so a general issue of backing-up a distributed system in a consistent way is raised.

Well described on the Internet approach, is to run an “incremental backup” as a replicated (read cloned) Kafka cluster, launched during off-peak hours, running a replication process (for instance, using mirror-maker or cluster-link). It doesn’t guarantee full consistency, but at least it’d start after failure without data corruption. Good trade-off analysis of these approaches is available here.

Another option, would be to avoid mastering any data in Kafka at all, and using it only as a “persistent cache” type of system, so that restoration process could be managed as a simple “flush” of the data from external systems, to restore the initial state.

So is there an answer to backing up the cluster? I’d say the answer is redundancy and replication. Achieving High-availability for the primary cluster and having one or more clusters replicating data on the flight is the answer. The downside is that it might be quite expensive to build such a setup (check out Uber multi-region setup).

Key learning

Ensure clear backup/restore and fault tolerance plan for each of the dataset is identified. Analyse trade-offs having Kafka cluster to master the data. Consider supporting different consistency guarantees for different workloads.

Schema registry backup and recovery

Schema registries (Confluent’s or others) work over the Apache Kafka and are not a built-in capability of the Kafka Broker, but rather an add-on. Technically broker doesn’t “know” what kind of data it stores, and doesn’t analyse payloads, just copying bytes to and from broker nodes (it’s actually a significant bonus). All Kafka records are just bytes, and its serialisation/deserialisation is left upon producer/consumer application’s consideration.

In its core, the schema registry is a CRUD-like API for schema management (e.g., for Confluent it is REST-based), and a “façade-like” SDK over the Kafka producer and consumer SDK which simplifies serialisation/deserialisation routines in the application codebase. Kafka record holds only a reference ID of the schema, which consumers should be using to deserialize it, and typically it’s just a handful of bytes stored in both record key and value along with the payload.

Producer serialises data into a record, containing both binary payload and schema references as a Schema Registry ID. Consumer, upon retrieving the record, retrieves schema definition from the Schema Registry, and deserialises the binary payload using it for further processing.
Kafka records and Schema registry relations.

This setup operates well by default when references between records and schemas are stable and consistent, even when cluster mirroring is enabled. The true challenge occurs when schema IDs are changed in Schema registry and this might occur by multiple reasons:

  1. Some mirroring tools don’t allow setting the ID of the schema upon creation, so loss of sequence generator’s position might produce inconsistent schema references.
  2. In the setup when schemas are owned by the applications, after application start-up after the fault, newly registered schemas might receive another IDs.
  3. Some SaaS providers doesn’t provide the ability to assign an ID to schema, and doesn’t allow to manipulate ID sequence generator.

I wouldn’t say that it’s impossible to consistently replicate Schema Registry state and then recover it, but depending on specific setup and environment the tools and constraints may vary.

Key learning

Ensure a full set of tools are identified to recover Schema registry in exactly the same way as it is expected by the records stored in the Kafka brokers.

Schema evolution and compatibility guarantees

Using the AVRO schemas for Kafka topics it is possible to enable compatibility guarantee features, usually provided by the Schema Registry service. In short, this allows defining a set of constraints to future schema changes, ensuring an appropriate level of compatibility. The list of core guarantees options could be found below (more on this is here and here):

  1. None — compatibility validation is disabled, and I generally won’t recommend using it for large-scale setups.
  2. Backward — ensures records produced with the new schema will be processed by a consumer configured for the old schema.
  3. Forward — ensures consumers using the latest schema will be able to process records produced using the old schema.
  4. Full — both forward and backward guarantees are enabled.

I like to think of every topic as of the API exposing a specific dataset or specific application function. For every API there are upstream systems providing it, and downstream systems consuming it. For the different cases API might be owned either by the downstream systems, or by upstream ones, depending on specific use case and the pattern chosen. Comparing it to producer and consumer applications using the Kafka topics, it becomes clear that producers could be thought of as upstream systems, and consumers as downstream ones.

Why does it matter so much? By default, consumers which can’t deserialize records would produce a runtime error (for Java, it could be anything from NullPointerException to ClassCastException), which will stop topic processing, unless a specific error handling was put in place. Stopped message processing, will eventually lead to consumer group rebalancing (as Broker would identify stopped consumers as unhealthy), unless a specific configuration would be provided. One single non-deserialisable record, could lead to a consumer rebalance loop, increasing application resource consumption and generating lots of error logs. So unless the specific error handling has been added, I wouldn’t recommend compatibility mode “None”.

List of changes allowed for each of the compatibility modes is well defined here. Additionally, there could be implementation constraints, for instance in Java it is impossible to add enum values to the schema because older versions of the consumer apps won’t have enum Java value to deserialise into.

Now let’s check on some of the setups.

Single producer with multiple Consumers

Single producer with multiple consumers

Such case could be considered as a Producer application exposing it’s dataset to downstream systems, via it’s “Outbound API”, implemented as a Kafka topic for scalability and persistence reasons. For such case, I’d generally recommend to enable “Backward” compatibility for the topic schema, because of two reasons:

  1. Schema, the “Outbound API”, is owned and maintained by the Producer application. And it’s up to Producer application to evolve it.
  2. Backward compatibility allows for consumers to work with both “old” and “new” records produced by the application, so gradual evolution of the system is possible.

Multiple producers with a single Consumer

Multiple producers with a single Consumer.

I like to think of such case as if a Consumer application is exposing it’s “Inbound API”, and processes “requests” published by other applications into the Kafka topic. I’d recommend to first look at the Forward compatibility mode, because in such a case a Consumer application owns and maintains the schema, defining it’s evolution. This mode will allow for Producer application to post “old” records, not breaking the Consumer application.

Single producer with a single Consumer

Single producer and single consumer setup
Single producer with a single Consumer

This particular case always was challenging for me, as it’s not clear what would drive the change in the API. During the design phase, it could be identified that in future there should be more than one producer, or more than one consumer, and such case collapses into one of the above-mentioned. But in case data is not reusable by any other applications in the environment, I’d reconsider whether the capability being built requires to be split into Producer and Consumer application. If it’s still required to do so, a Full compatibility mode guarantee would be a wise choice.

Multiple producers with multiple Consumers

Multiple producers with multiple Consumers

Another challenging case, when API topic doesn’t have a single owner, and it’s unclear why it’ll evolve over the time. Generally I personally try to avoid such setups, but there are valid situations to use this pattern. If so, I’d recommend enabling the Full Compatibility mode, to ensure API remains consistent between different versions of the Producers and Consumers.

Key learning

Ensure schema ownership is well defined and an appropriate compatibility guarantee is chosen for each of the dataset. Ensure initial schema design has enough flexibility for future evolution and maintenance.

Record retention policy and compaction

During the very first encounters with Kafka one might build an understanding (like I did) that by default the whole record history for every workload should be accumulated forever and not a single record should be lost. While this could be justified with a particular use case, I’d generally recommend doing the decision of historical vs operational use case per each dataset during the design phase.

One of the obvious concerns accumulating full data history is volume (e.g. disk space), but it’s not the only one, and nowadays I won’t even consider it to be the main one. Given there is no out-of-the-box capability to set consumer offset for a new application to “30 days ago” (only “earliest” and “latest” are available by default), rollout of new features and applications might be complicated due to a need to process a full dataset of a particular historical topic.

Typical case might be when an existing dataset is extended and for a couple of months accumulates some new attribute for the brand new feature. When the feature is released, it might require processing of those last few months to accumulate initial state, and that’s where the default consumer offset values won’t help. Application can be coded in a way to “skip” records it’s not interested in (e.g. some attribute should be not null, or records should be of a particular version), but in the end all the records would be transferred from Kafka Broker to a consumer application to do the filtering.

I’d recommend clearly distinct operational vs analytical workloads, ensuring operational ones are highly-reusable and doesn’t require complex “filtering” to just start using the dataset. Kafka provides two mechanisms: retention and compaction. The former is implemented as a set of timeouts at the topic level, after which record might be erased. Note this setting is highly dependent on segment size, so having too large segments leads to records not being erased for a much longer period than one might expect.

The compaction mechanism is more sophisticated. For every record stored outside the latest segment, Kafka broker seeks for the records with the same key, and erases old copies in favour of the newest. This is especially useful for referential datasets, like product information or stock inventory figures, as it ensures only the latest state of an entity is present.

IMAGE OF COMPACTION MECHANISM

Compaction could be used to @erase@ data over time (for instance, to serve data privacy regulations), by producing a newer record with the same key having ‘null’ value.

Key learning

Ensure an appropriate retention strategy for each dataset is identified at the design stage. Identify cases when full datasets should be re-processed.

Long-running transactions

Imagine a requirement to call an external system, when processing Kafka records. For instance, sinking data to a third-party system. An external system might be slow or just unreliable, so some retry capability could be introduced along with exponential backoff strategy. How come this could be a problem?

IMAGE OF BROKEN THIRD-PARTY

Each consumer within a consumer group has inactivity timeout (max.poll.interval.ms), after which Broker will mark this consumer as inactive and would issue the rebalancing procedure, reassigning partitions with inactive consumers to other consumers within the group. In worst scenarios this process may infinitely loop, as none of the consumers would be able to process particular records, and the whole consumer group will just constantly rebalance.

A generic, but quite complex, approach to address such issues would be to process records asynchronously from topic records consumption. This could be done by utilising the Kafka Streams SDK copying records from incoming topics into a state (so-called KTable), and then having a background process running over it. While this looks quite simple on paper, corner cases and race conditions of the asynchronous processing might become overwhelming.

I wouldn’t recommend a single solution to address this as it depends on specific environment, but ultimately a conscious decision about the balance between timeouts for rebalancing and max allowed time to process records should be taken for each of the workloads.

Key learning

Ensure unavailability of external systems is accounted for when configuring consumers.

Summary

Using Kafka it’s possible to unlock the data and make it available, building a single-source-of-truth data distribution hub, unlocking real time decision making and build a set of real time data processing capabilities on top of it. It is a great tool to step forward to the Data mesh principles. The downside is that streaming systems are not so common, and it takes time learning to use it in the right way.

Compared to the Relational databases (e.g., PostgreSQL), Kafka is more tuneable and by default is more Available rather than Consistent (in terms of the CAP theorem), whilst it is possible to fine tune it to be a more CP (rather then AP) compared to the default settings. The only way I have found to utilise it effectively is to deeply understand its basic building blocks (partitions, segments, rebalancing, transactions etc), and for every new workload or datasets to have a separate design session , explicitly highlighting requirements and debating the implementation trade-offs.

The particular set of properties I’d recommend to consciously choose for each new dataset is the following:

  1. Replication factor, partition count and segment size.
  2. Consumer group layout and rebalance timeouts.
  3. Retention timeout and compaction.
  4. Schema and compatibility settings.
  5. Producer batch size and linger.ms.

Before setting up a cluster, I’d also recommend to clearly define desirable workloads, identifying consistency vs availability trade-offs, performance requirements and backup strategy. Separately, I’d recommend a standalone trade-off analysis, when considering using Kafka as a master data storage, clearly identifying backup/recovery strategy and tools to implement it.

--

--