Apache Kafka as a centralized state management platform — Part 2

Nicola Atorino
DraftKings Engineering
13 min readJul 7, 2023

Kafka for centralized storage and state sharding

Apache Kafka, an open-source distributed streaming platform, is used by DraftKings (DK) for data management and streaming applications due to its high scalability, low latency, and fault-tolerant storage. Kafka allows for various use cases such as data streaming, and state management. DraftKings employs Kafka as a push-based mechanism to distribute data from a database to multiple services, reducing strain on the database and internal traffic, following several architectural patterns that can cover different use cases. In the previous article, we explored the capabilities of Apache Kafka for state management and described a pattern to leverage its use to offload database traffic. In this article, we’re going to describe two additionally useful patterns that will solve some of the problems outlined in the previous cases and will extend our possibilities in terms of distributed processing.

The first part of this article can be found here : https://medium.com/draftkings-engineering/apache-kafka-as-a-centralized-state-management-platform-part-1-aa6d3fe4024b

Use Kafka as Centralized Store for Distributed Applications

This pattern is useful to distribute a stateful application. In these cases, a microservice would need to consume a stream of messages that sends updates for particular events, then maintain an aggregate of information for each event, in order to handle further processing. It’s not unusual for this kind of stream to produce thousands of messages per second, so the partitioning capabilities of Kafka are extremely valuable to orchestrate the consumption. By implementing a set of consumers in the same consumer group, processing will be distributed correctly between the many instances of the application, and rebalancing in case of crashes, deployments, or change in the number of instances will be correctly handled by the broker and the consumer group coordinator out of the box.

Use cases example:

  • updates of probabilities for sport events that need to be compared to the previous values
  • stock exchange price fluctuation — to generate an analysis of changes overtime or produce an average per day
  • wallet management — maintain a log of transactions together with a real-time snapshot of the balance in order to accept/refuse new transactions processing
  • fraud detection — keep track of behavioral patterns and detect anomalies over an established flow
  • shared sessions in multiplayer games — keep a centralized leaderboard up to date by events coming from different producers

To handle state management, it is possible to implement a dedicated ledger topic where each instance of the service can read from and write to. The ledger topic will then act as a centralized, persistent store for the system, surviving service crashes and redeployments. To allow this, a ledger consumer can be implemented in the service, this time with a different consumer group per instance, to ensure that each instance has the entire state at its disposal, including everything that was published by any other replica. The service can then maintain the ledger in its own memory, populate it in full form at the startup, and receive updates when they are produced, this way achieving almost real-time synchronization between the various components.

In case of rebalancing, the stream topic will be evenly distributed following the new topology, and every consumer can then resume from the last committed offset for its own consumer group, with the guarantee that the ledger already contains all necessary information processed up until this point.

There is a short-circuit mechanism in place to avoid concurrency issues due to the fact that a published message has to be consumed by the instance that published it. This allows every instance, upon publishing the state update to the ledger, to populate its own internal memory. This means the published message does not have to wait the round-trip to the Kafka broker between publishing and re-consuming the same message that it produced in order to populate its own state. This guarantees protection against race conditions.

Design Pattern for a centralized ledger

Considerations

One thing to take into consideration when planning a solution like this is the actual coordination between stream processing and ledger processing. To achieve 100% data consistency, the offsets of the processing topic must be committed only after there is a guarantee from the broker that the ledger has been correctly updated. Otherwise, in case of crashes or ungraceful shutdown there is the risk of committing an offset — thus marking a stream message as consumed — before the ledger is updated with the result of the processing. This kind of synchronization, however, can be costly in terms of performance, so the decision to handle these edge cases explicitly instead of relying on a simpler solution has to be taken based on the criticality of the business case, the impact of potential data loss, and the possibility of this happening in a given infrastructure.

One other consideration to take is data retention. The TTL of the items in memory do not necessarily have to match the retention of the ledger topic, however, is usually advised to do so in to have consistency between the two layers. An example of this is to set the same retention period between the items in memory and the Kafka ledger. Alternatively, the topic can be set to have no retention at all. This is useful in cases when the decision to remove an item from the ledger is taken explicitly by the application, that can send a deletion request when it deems a specific item not necessary anymore. In this case, the ledger will receive what it’s called a ‘tombstone’ message, that will trigger removal of all messages with the same key upon compaction. Tombstone messages are consumed like any other message, and the service can react to this particular message type by removing the item from the memory state, thus keeping synchronization with the state of the ledger. It is important to make sure that memory purging is properly handled in the application, time-based or explicitly, otherwise memory leaks are guaranteed in the long run.

Use Cases

A pattern like this is very useful when the partitioning logic of the stream topic does not match the partitioning logic of the ledger. If this is the case, there is no guarantee that an instance processing a message from a specific partition may never require data that was retrieved and published by another instance. Another use case is when the complete state is needed for overall processing, like notifying the system with an atomic update that any of the events is in a particular state.

An example for the first case described above may be an application that maps external data to internal entities. In this case, the input topic may be partitioned based on the 3rd party event Id, while the output topic will need to be partitioned by the internal event Id. Of course, the Ids won’t match in this case, so there is no way of knowing in which partition the published internal event will end up. When, instead, the state of an instance can be properly isolated from the state of the others, we could spice this pattern up a notch and introduce state sharding.

Use Kafka to Handle State Sharding

Let’s now consider a slightly different use case when each instance of a distributed system does NOT need to handle state management outside of the partitions that have been assigned to it. In this situation, it is not necessary for each replica to consume the entire ledger topic, but instead, it will be enough to consume only the part of it that is usually tied to the partition that has been assigned to it for the stream.

In this case, the configuration of the ledger topic must match the configuration of the stream topic, in terms of number of partitions and partitioning key. This way, every event that is streamed from partition 1 will store its current state in partition 1 of the ledger, and so forth. With this approach, each instance does not have to maintain the entire ledger in memory, only the shard related to the partitions it has been assigned to. This has obvious advantages in terms of scalability and resource management when it comes to high volume of items to track.

However, there is a complication. When rebalancing, partition reassignment is automatic and the default mechanism of the broker is to assign them based on load. Therefore, predicting what partitions would be assigned to a specific consumer can be complicated. The consumption of the ledger must follow the assignment of the stream topic partitions, with the instance explicitly consuming a specific set of ledger partitions, based on the stream partitions that were assigned to it by the consumer group coordinator for the stream topic.

When a group rebalancing for the stream topic is triggered:

  • The consumer will stop consumption from the currently assigned partitions. This ensures no concurrency issues can happen during the rebalancing process
  • The consumer will receive a notification that the currently assigned partitions have been removed. This can be used as a trigger to stop consumption of the ledger partitions that have been removed and remove every item in the state coming from the removed partitions
  • Once all consumers have confirmed that they have handled the PartitionRemoved event correctly, the consumer will receive a notification that a new set of partitions have been assigned. When this happens, the consumer needs to react by requesting consumption of the same partitions that were assigned to it
  • Once the ledger partitions have been consumed and the state is populated in memory, consumption of the stream topic can resume.
  • There is no possibility of race conditions here as the instance that was responsible to populate the state in those particular ledger partitions had their stream partitions removed already as part of the second step, so consumption and processing have been stopped already

This diagram shows a potential initial state of an application exploiting this pattern:

Design Diagram for state sharding in Kafka — initial state

While this one shows the state of the system after a rebalancing caused by the removal of one of the instances :

Design Diagram for state sharding in Kafka — state after removal of a consumer

A deeper look on various concerns

Performance

  • In Kafka there is usually a tradeoff between throughput and latency. For moderate traffic there is usually no need to delve deep into this, but when the amount of data to process becomes very high it is important to properly configure the system to behave exactly as expected
  • Producer Configuration — Every producer can have a batch size defined that can be configured, and that would allow the client to ‘batch’ multiple messages into a single request to the server. The higher the batch size, the higher the potential throughput of the application, since there will be fewer trips to the server. However, a batch size too large could impact the memory consumption of the application and could even increase latency, since the client will linger for a certain period of time waiting for messages to accumulate. properties : batch.sizelinger.msmax.request.size
  • Consumer Configuration — Consumers can define different configurations based on the use case. For example, it is possible to configure a minimum amount of data to be returned from the server to allow accumulation of more messages before returning to the client. This could increase throughput, at the cost of additional latency. properties : fetch.min.bytesfetch.max.bytesmax.partition.fetch.bytesfetch.max.wait.ms
  • Timeouts and heartbeats — How long the broker should wait for confirmation before starting the rebalancing process, in the case of a consumer losing connection (or crashing). We want to be reactive in these cases to quickly recover proper state of processing, however we do not want to cause unnecessary rebalancing in case of a network blip that would be solved quickly. properties : session.timeout.msheartbeat.interval.ms
  • Compression — Kafka allows messages to be compressed in several codecs, to reduce both the storage needed and the network bandwidth, at the cost of more CPU cycles and a slight delay in message dispatch. There are several codecs to choose from that all define different balances between gains and tradeoffs. Compression effectiveness also depends on producer batching configuration (the higher the batch size, the more effective compression will result, especially if the data is fairly repetitive, like XML or JSON data). properties : compression.type

there are deep dives available online, one example is this : Message compression in Apache Kafka — IBM Developer .

There are several other considerations to make in regard to performance tuning that would deal with:

  • proper number of partitions based on the use case — usually a higher number of partitions would ensure a higher potential throughput, but too many partitions may have significant tradeoffs in term or rebalancing speed or cluster performances.
  • partition replication and producer acknowledgment — partition replica would ensure better availability in case of broker failures, but acknowledgment of confirmation for all replica can be costly in terms of producers’ throughput.
  • producer and consumer buffer size — a higher buffer size for clients would allow for a potential higher throughput and diminished latency, or even better resiliency in case of unexpected spikes of messages to be produced/consumed, but at cost of more resources necessary for the clients.
  • data retention policies, segment size, and cleanable dirty ratio — smaller segment sizes and highest dirty ratio would ensure that topics are better maintained and unneeded data is removed quickly, but this could be costly for the broker to maintain since it increases the amount of work in the background.

A good starting point for some of these configurations can be this article : Deep dive into Apache Kafka storage internals: segments, rolling and retention (strimzi.io)

All the configuration details for the broker, the topics, the producers and the consumers can be found here: Apache Kafka Configuration

Fault tolerance

Given the rebalancing system that the broker has in place, with multiple services deployed, the solution is able to withstand failures of one or more consumers automatically. The result of rebalancing is a short interruption of processing of the partitions that are being rebalanced. There are different ways to manage these cases, and they depend on how we have configured the consumer to ‘confirm’ to the broker that a message has been processed. Kafka can be configured to confirm the consumption of a message to the broker explicitly for each message, or instead have a asynchronous automatic commit system that sends the confirmation in bulk at an interval. If the consumer does not have a chance of gracefully shut down, there is the possibility that some of the processed messages could be consumed again after rebalancing has completed. The consumers need to take this into account and define an AtLeastOnce consuming approach to be able to deal with potential duplicates correctly. properties : enable.auto.commitauto.commit.interval.ms

As in every microservice architecture, the overall health of the system depends on the health of the individual services and being capable of handling failures is an integral part of every solution design. When dealing with Kafka applications, there are other things to take into consideration apart from the consumer health that we just discussed. A consumer service may need to confirm that the producer of the messages it expects is healthy. This is often done by implementing a ‘heartbeat’ kind of message in the stream, that the consumer expects at regular intervals even if there are no other messages generated as part of the normal flow. Failure to receive this heartbeat message may mean that the producer is unavailable and thus the system may not be healthy as a whole. Usually, a heartbeat message is expected every few seconds, and the consumer defines a grace period after which it needs to react based on the use cases, by sending an alert to a monitoring system, or by stopping processing any internal work that relies on data being correctly up to date. For example, a processing stream that relies on real-time information for a sport event may react to this by stopping accepting bets in case the heartbeat message is not received, since there is a reasonable suspicion that we are failing to get the latest information about the event.

There is a dedicated DK article that explains in detail how components manage resiliency in case of failures of external dependencies: Resiliency in Feeds Integration | DraftKings Engineering | Medium

Even tough Kafka ensures data retention via partition replica, at DraftKings there is an additional layer of resiliency. Compacted topics that are expected to be the single source of truth are also backed up at regular intervals (usually once per day). This is done in order to ensure disaster recovery capabilities if for any reason the production cluster gets corrupted or is completely unavailable and data cannot be recovered easily. Not every topic has a backup. Most of the data does not need to be recovered this way as it can always be retrieved from other sources. This backup is more of an exception than a norm.

Monitoring and alerting

Kafka exposes a good number of metrics to be monitored to ensure the system is healthy. It exposes commit latency, used buffer size, rebalancing latency, throughput of producers, and much more. A complete list of metrics is available at Apache Kafka / Monitoring . The client libraries expose a way to retrieve these metrics to be sent to any monitoring solution. At DraftKings, App Metrics backed up by Grafana or Datadog is the most commonly used framework.

However, one of the main metrics used is simply the latency of processing. Each Kafka message has a timestamp that defines the moment in which the producer has produced the message. By comparing this timestamp against the one recorded when the consumer has received the message, and then again once the message has been processed, we can have a clear number explaining the consumer delay and the processing time for each message. With this minimal setup, is pretty simple to understand if the consumer is ‘falling behind’ and is not able to catch up with the rate of messages being produced upstream.

Another valid way of measuring consumption delay is comparing the latest committed offset of a consumer group on each partition against the latest produced offset: also in this case, if the gap between the two values increases too much, an automatic alert can be implemented in order to notify the possibility of data being delayed over an accepted threshold.

These alerts could be then used to evaluate how to improve the system — for example, scaling up or scaling down the consumer, change the configuration at cluster, topic, producer or consumer level, increase or decrease the resources dedicated to each instance of the service, and so on.

Want to learn more about DraftKings’ global Engineering team and culture? Check out our Engineer Spotlights and current openings!

--

--