What I have learned from Kafka partition assignment strategy

Anyi Li
5 min readDec 1, 2017

--

Apache Kafka has been gained so many attentions those days, it is almost the required piece if you would like to build modernized ETL pipeline in Big data era. Not only it is cool since it comes with Kafka streams, Kafka connect which can glue pieces of ETL pipeline together, but it is extremely useful for streaming in/out data, triggering events, decoupling services etc…. I got to know and play Kafka since its early version 0.7.x almost 4 years ago (it is almost the end of 2017 😢), at that time I was developing an internal project for consuming Kafka message inside Apache Storm. Still remembered the time I was first impressed by how simple the Kafka API was and how eager I would love to trash all the jms API. Finally Apache Kafka has an official release version 1.0 this October. Compared to the earlier version, it has been a dramatically change. APIs get much simpler but powerful, added so many nice features that we wanted…

Although I have been using Kafka for streaming, pub/sub for quite some time, there are still couple of settings confused me or not clear during the first deployment. I have been using Kafka Source from Apache Flume as Kafka consumer. Except the usual settings provide by Flume (topics, bootstrap_servers), you could pass any valid Kafka settings by using prefix kafka.consumer . To increase the though-put while consuming the messages, you can simply use more Kafka Sources. Kafka should be able to dynamically assign the partitions to each consumer. Under the circumstance of some consumers failed to send hear-beats to the Kafka server, rebalance will be trigger, Kafka will reassign the partitions to the lived consumers. Everything seems to be perfect at this moment, but …. there is a mysterious setting partition.assignment.strategy need to be set properly to have Kafka behave as what you would love to.

According to the Kafka document https://kafka.apache.org/documentation/ partition.assignment.strategy is

The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used

The default setting is class org.apache.kafka.clients.consumer.RangeAssignor or you can have another option org.apache.kafka.clients.consumer.RoundRobinAssignor . In general, one of them should work for your case, otherwise you can also swap your customized assignor by implementing interface org.apache.kafka.clients.consumer.internals.PartitionAssignor Those are called partition assignment strategy. Now what is it ?

RangeAssignor for 1 topic

Let us start understanding the first and default option RangeAssignor. Suppose we have 1 topic (t0) with 8 partitions (p0, p1, p2, p3, p4, p5, p6, p7) and 3 consumers (c0, c1, c2). When all consumers are alive, the partitions will be assigned as c0 ->[p0, p1, p2], c1 -> [p3, p4, p5], c2 -> [p6, p7]. It all looks great if your consumers are subscribed to consume messages from one topic. However, the since Kafka 0.9, it is allowed to subscribed to multiple topics using Java regular expression pattern. Then the partition assignment becomes complicated. Suppose we have another 2 (t1, t2) topics, each has 2 partitions. Assignment will be

RangeAssignor for 3 topics

It turns out c2 did not get any new partition assignment! Think about under an extremely case, there are 100 topics matched the consumer pattern, only one topic has 8 partitions, all others have only 1 partition for each, there are total 107 partitions. Assume there are 3 consumers (c0, c1, c2). Using RangeAssignor, Kafka will assign 102 partitions to c0, 3 partitions to c1, 2 partitions to c2. c0 is doing all the heavy lifting jobs to work so hard to consume all messages. But c1 and c2 after finishing consuming message, they will stay idle without doing anything. Since Apache Flume likes to trigger backoff on c1 and c2 threads once there are no messages to consume, if the backoff timeout is not set properly. Kafka server will lose heartbeats from consumers c1 and c2, rebalance will be triggered. Not only we are loading consumer c0 heavily, but also downgrade the message consumption rates. Why RangeAssignor is not doing the right thing as suggested by its name. Because it has been stated very clear in the Javadoc

The range assignor works on a per-topic basis.For each topic,….

Aha! Unfortunately, RangeAssignor is the default setting for partition assignment strategy. I have using it without thinking for the production environment. Once I realized Kafka consumers are not consuming messages fast enough, I just naively add more consumers to believe they will boost the throughput. Sadly, the extra consumers will only help consumer 0 to split the 8 partitions topic, but without touching those 99 partitions assigned to consumer 0 😿

After digging into the Kafka documents and asking Google for help (as always, ask Google, even my daughter knew it 😘). I start setting partition.assignment.strategy to use RoundRobinAssignor . For one topic scenario, it works similarly as RangeAssignor . However, for multiple topics, the behaviors of two assignors are completed different.

RoundRobinAssignor for 1 topic

Since RoundRobinAssignor algorithms treat all topic-partitions as a whole partitions group, Kafka will try to evenly distribute all partitions into consumers. As showing in the figure using the same 3 topics with 3 consumers

RoundRobinAssignor for 3 topics

RoundRobinAssignor make each consumer assigned even number of partitions over multiple topics. As we can see, even under the extreme case like above with 107 partitions over 100 topics. For 3 consumers (c0, c1, c2), c0 and c1 will get 36 partitions for each, c2 will be assigned 35 partitions.

So the take-away lesson is to subscribe multiple topics, better to use RoundRobinAssignor if the number of partitions for each topic are not even. But If partitions of each topic are even or just one topic. You could use either RoundRobinAssignor or RangeAssignor . At this point, I guess I am still confused why Kafka providers RangeAssignor ??? 😕 . Well, it will be another fun journey to dig into the code or Google it again.

--

--

Anyi Li

Associate Attending at Memorial Sloan Kettering, form engineering at IBM and Yahoo, healthcare, AI, and everything related to data