Event Streaming and The Problem with Up-Scaling Partition Counts

Adam Bellemare
11 min readAug 15, 2020

--

A common goal of distributed and immutable log-based event brokers (such as Apache Kafka, Apache Pulsar, AWS Kinesis, to name a few) is to provide horizontal scaling capabilities for varying workloads. Partitions play an important role in this design, as they enable an application to process just a subset of the event stream. Partitions form the fundamental mechanism for horizontal scaling, as new consumers instances (same application, separate instance) can be created to consume from one or more partitions. Maximum parallelism (the number of concurrent application instances for a single application) is dictated by the number of partitions in an event stream.

The partition count is typically chosen at the creation time of the event stream. This count is based on a number of factors, including:

  • Event Rate — How many events per second?
  • Event Rate Growth — How much will this rate grow over time?
  • Desired Parallelism — How many parallel consumer instances will be needed?
  • Estimated Consumer Processing Rate — Any special considerations about how long these events may take to process?
  • Partition Overhead — Event broker and consumer metadata factors impose a practical overhead limit on partition count. Defaulting to very high partition counts may not be possible or realistic.

Partition counts can be easily scaled up by the event-broker admin tools. It is common for an event-stream to be created with an insufficient number of partitions, of with a partition count unsuitable for the growth of the event-stream event rate. Consider the following figure where a single new partition has been added.

Note that scaling the partition count down is rarely done (and isn’t supported by most event-brokers), as removing a partition will also remove all of its events.

Partitions and The Importance of Data Locality

Data locality is an important convention for the composition of an event-stream. Simply put, all events of the same key must go to the same partition, and is solely the responsibility of the producer.

Data locality enables full horizontal scaling of consumer systems by providing a very simple data-locality guarantee. This enables a consumer instance to read just a single partition to get all updates for that keyed event, such that it does not need to cross-reference with data from other partitions. This is a fundamental requirement for horizontal scaling of high-throughput systems and enables stateful horizontal scaling and high-throughput capabilities.

Factors Prohibiting Scale Up

There are a number of system and data factors that prohibit the seamless scaling up of event-stream partitions. These three factors will be the subject of the remainder of this article, and include:

  1. Consumer and Producer Detection — Do the consumer and producer applications automatically detect new partition counts? Do the consumers rebalance to allocate processing capacity?
  2. Keys and Partition Routing — Are the events keyed? Events of a given key must go the same partition as their predecessors, lest partition data locality be broken.
  3. Impacts on Consumer State — Can we scale up without affecting existing state? Unfortunately, for most consumer applications this answer is a definitive “no”.

Consumer Detection

Increasing the partition count for an event-stream is typically handled by notifying the consumers and producers that a change to the stream metadata has occurred. It is then up to both the consumers and the producer applications to accommodate the new partitions.

Many consumer applications only generate a list of partitions at application startup time, and maintain a static list throughout the execution of the program. This is commonly due to the convention that event-streams won’t dynamically increase in size during execution, and this assumption is built into many leading frameworks like Apache Spark, Apache Flink, and Apache Beam. For these frameworks, you must halt your application and restart it before the new partitions will be picked up, though of course this may change sometime in the future.

Keys and Partition Routing

One of the main barriers to seamless partition scaling is that events in the original partitions are not automatically repartitioned (at least not by any distributed event-broker implementations known to me). Reasons for this include:

  • Event streams are often too large to repartition without significant impact to the event broker
  • The producer is responsible for selecting the partition, and the broker will likely have no knowledge of the partitioning strategy.
  • The consumer may have computed state per event key, as is common in stateful event-driven applications. Changing the partition count breaks data locality by routing keyed events to different partitions.

Once a new partition is added, events of the same key as those already within the event stream may end up distributed to a new partition. This violates the requirement of data locality at the partition level, as is shown in the following figure.

You may be quick to note that it is the producer’s partitioner algorithm that is responsible for routing events to the partitions, and that perhaps there is a clever way of partitioning that can avoid this problem. I encourage you to keep thinking about this problem, but as it stands, I am unaware of an elegant solution that affords both the increased event-stream parallelism, without necessarily repartitioning existing events (If you happen to come up with an elegant solution to this, please let me know!).

Impacts on Consumer State

Consumer state, be it derived from a series of keyed events or materialized directly from a keyed entity stream, relies upon guaranteed partition data locality. Consider the addition of a consumer application with internal state stores, scaled to three consumer instances. The following figure illustrates the impact to downstream consumer state when a new partition is added to an existing stream.

Violating partition data locality results in two major issues. For one, stale state remains in the original state store (Consumer 0), creating a memory leak and depending on the business application, may result in erroneous calculations. The second issue is that the new state store (Consumer 2) consuming from the new Partition 2 does not have access to any of the previous state for some keys. This can (and usually does) result in invalid state.

Consider the impact of a series of events tallying a bank account balance. In the case that you, the customer, have your debit and credit events routed to the new partition, you may be surprised to see that your entire credit history has been wiped out. Farewell to your debts, but also, farewell to your previous paycheques.

While the issues with violating data locality are easy to see with internal state stores, what about if we use an external state store? Would this work any better?

Can we use an External State Store to Mitigate Data Locality violations?

An external state store is a state store that is materialized outside of the consumer instances. It is durable, its existence is independent of the consumer instances, and it provides each consumer with access to all of the data across all of the key domain (barring issues surrounding eventual consistency, which we won’t cover here). Examples of this include pretty much any and every highly scalable cloud datastore, such as AWS DynamoDB and Google BigTable to name a few.

Unfortunately, even an external state store does not rescue us from our problems, but simply exposes another issue. Consider the same topology with the internal data stores replaced with a single external data store, as shown below (Also note that Consumer 0 is consuming from Partition 1 & 2 in the sake of conserving space in my diagram).

There are several significant issues with trying to repair violated partition data locality with an external state store:

  • Consumers operate independently of one another, such that events for the same key may be processed out-of-order. Events are only guaranteed processing ordering within a single partition, not between partitions.
  • Resetting the state store greatly exacerbates the issue of partition data locality. Consider that a newly expanded event stream will have a far lower volume of events than an old partition. If the state must be rebuilt (due to changing business requirements, or perhaps a bug), events from the new partition may be independently processed significantly earlier than the old partition.
  • Temporarily unavailable partitions (due to broker outages) can exacerbate the issue, even if windowing is used. In a case where the partition outage exceeds the windowing duration, out-of-order processing can occur.

The usage of an external state store shifts the issue of incomplete local data history to one of cross-partition synchronization. You may find the usage of Windowing solutions to be a tempting approach to mitigating this issue, but unfortunately it runs into the scaling barriers of big data.

  • No matter how big of a window size you specify for reordering out out-of-order events between the old and new partitions, the difference in event time can always be just slightly larger. This reduces your algorithm’s effectiveness to that of best-effort.
  • Very large event streams may have a substantial amount of data over a very long period of time. It may be prohibitively expensive in both raw computing resources to attempt to mitigate this issue.
  • While you can identify out-of-order data by means of timestamp comparisons, it can be difficult to maintain sufficient intermediate state to properly accommodate its resolution.

These windowing / scale issues are best illustrated in the following example, where a new partition is added while the application is running.

In the previous figure you can see that Partition 1 is newly created, and immediately begins to get populated with new red triangle events. Even though a red triangle is produced to Partition 0 at t=50, it is not processed by Consumer 0 before Consumer 1 processes a significant number of red triangles up to t=250. If we suppose that the red triangle event at t=50 is a critical event that must be incorporated into the aggregate state, then we must ask ourselves: “is our application able to maintain sufficient intermediate state to handle out-of-order events?”. In this case, we would need to maintain at least 201 red triangle updates (t=50 to t=250), instead of aggregating them in-order as they arrive.

Still not convinced? Let’s take a look at your bank balances again, and see what replaying your bank balances would look like from time = 0, as seen below.

Since both Consumer 0 and Consumer 1 function fully independently, Consumer 1 immediately registers your -$45k transaction first, throwing your account into deficit and locking your out of your banking. Note that you didn’t actually join the bank until t=6,200,000, but that event is in another partition and has not yet been processed. Of course, if we were to force our application to maintain a very large window of events to ensure out-of-order processing results we could possibly catch this error. However, consider if this event-time spans multiple years, and then consider it again if it spans billions of accounts. The sheer volume of intermediate data that would be required to maintain this becomes untenable, as often happens in the real of big-data problems.

Solution: Create a New Stream

Partition data locality is essential for enabling high-volume and horizontal-scaling event-driven applications. In cases where the rate and volume of events is beginning to become too much for the existing event-stream, and where downstream consumer state is important, creating a new event stream with an increased partition count is the simplest way forward.

1. Create a new event stream with the desired partition count

Use historic growth trends to predict future growth, and select a partition count that is suitable for both current needs and projected needs. Provision your partition counts such that you won’t need to scale up again in the near future.

2. Determine a backfill strategy for historical event data

The event data in the old stream will need to be ported over to the new stream. This can be done in two main ways:

  • If your upstream producer is itself a stream-processor, you may be able to rewind its event-stream inputs to recreate the necessary events. In this case, create a new producer application to write to the new stream, while the old producer application maintains the old stream. (Note: This strategy assumes that the producer is deterministic and that it has access to all of its old data)
  • A second option is to consume the events from the old stream and produce them into the new stream using a custom stream processor. Once caught up, the old stream and the new stream will have the same data while preserving partition data locality.

3. Migrate the Consumer Applications to the New Stream

The consumers now need to move over to the new event stream. For stateful consumer applications this will necessarily mean resetting the state stores and reprocessing the events from the beginning of the input streams. This can be a lengthy process and is best done while the old application remains running. Once the new application is caught up and the state is generated, the old application can be halted and the new one brought into production usage.

While an external state store do have the advantage of global data locality, the main issue is that all of the offsets of the old event stream partitions have no definitive mappings to the new event stream partitions. In other words, our consumer has no idea where to start consuming the new stream such that it doesn’t introduce duplicate events, or worse, skip over some.

Now, it may be possible for you to maintain the consumer’s external state by using a system of windowing and timestamps. I cannot advocate this as a default tactic though, as it is certainly case-by-case basis, and it does beg the question as to how much time and risk it would take to do it correctly versus simply rebuilding the state store. Your individual use-cases will vary, and so I leave this up to you.

4. Delete or Archive the Old Event Stream

Once all consumers are moved off of the old event stream, it is safe to be deleted or archived. Do not keep duplicate copies of data around, as new consumers may inadvertently begin consumption from the old copy.

Conclusion

Dynamically increasing partition count on an event stream is best preserved for stateless event processing. While scaling up the partition is easy, ensuring downstream consumers are not unduly affected is still a very real and unsolved problem in the space of event-driven architectures. The incomplete mitigation efforts described in this article are restrictive, expensive, best-effort, and must be replicated for each consumer consumer. They impose a significant cost on consumer application developers, and prohibit them from independently selecting the appropriate technology for their application problem space.

Scaling up an event-stream partition count violates the convention of partition data locality. Given that most consumer designs rely heavily on this to achieve scalability, it is best to fully avoid it altogether. The best way to scale up a stream with increased partition counts is to simply create a new one and migrate the data and the consumers.

If you’re interested in Event-Driven Microservices and Architectures, check out my new book:

Originally published at http://adambellemare.com on August 15, 2020.

--

--

Adam Bellemare

Staff Technologist, former Data Engineer. Events, Microservices, Kafka, Streaming, and Data