Ping-Min Lin | Software Engineer, Logging Platform
In our first Using graph algorithms to optimize Kafka operations post, we highlighted the leader skew issue, one of the pain points we’ve experienced while maintaining our 3,000+ node Kafka deployment, and we discussed how we modeled the problem using a directed graph. We then described the “chain-swapping” mechanism to move excessive traffic from an overloaded broker to a broker with less load, and we detailed why this method wasn’t as useful in the common case of multiple overloaded brokers in the cluster. In this article, we discuss how we use flow network algorithms to deal with the downsides of the previous solution and solve the problem in general.
One-shot leader rebalancing for a skewed topic: Maximum Flow Problem
Flow networks are often used to model optimization problems, especially in logistics. It turns out we can transform the rebalancing problem into a Maximum Flow problem and use existing flow algorithms to solve it.
The graph we established in the previous article treated partitions as an attribute on the edges between brokers. However, this doesn’t accurately reflect their relationship in Kafka; partitions are logical and brokers are physical, and neither of them belongs to the other. We, therefore, promote partitions as a first-class citizen of the graph by making it another type of node. Now, instead of connecting the leader broker to follower brokers for each partition in the topic, we create an edge from the leader broker node to the partition node, and then we create edges with a capacity of 1 pointing from the partition node to the follower broker nodes. The result of applying this new graph formulation to our previous example is a bipartite graph like the following:
If we now set Broker 1 as the source and Broker 4 as the sink, we can find the maximum flow using algorithms available in most graph libraries (e.g., networkx flow module in Python, jgrapht’s alg.flow package in Java). One possible result goes through Broker 1, Partition 1, Broker 2, and Partition 2 before arriving at Broker 4.
This is similar to what we had in the directed graph from the first post: an edge in the flow from one broker to another broker via a partition indicates a leader swap on that partition. If a partition is not in the flow, no leader swap happens on that partition. Each partition has at most one edge flowing in, and the “one leader per partition” constraint will always hold since the capacity of each edge is 1.
Knowing how a single leader swap works, let’s extend our graph to offload multiple leaders. We first compute the average number of leaders per broker, L_avg. Overloaded brokers have more leaders than L_avg, while underloaded brokers have fewer leaders than L_avg. We connect all the overloaded brokers to a dummy source node and all underloaded brokers to a dummy sink node. These two dummy nodes will be the source and sink of our maximum flow when we apply the flow algorithms. Assigning the capacity of the edges from the source to the brokers and the brokers to the sink is a bit trickier. From the source to the overloaded brokers, we assign the capacity as the floored absolute difference between the number of leaders on that broker and L_avg; suppose L_avg = 1.75 and Broker 2 has 4 leaders, then the capacity on the edge between the source and Broker 2 would be floor(abs(4–1.75)) = 2. The floor operation is applied because our Kafka operations are inherently discrete (i.e., there can’t be a 0.75 leader swap); meanwhile, if the algorithm swaps away too many leaders from an overloaded broker, some underloaded broker has to take the toll and may turn into an overloaded broker. For the edges from underloaded brokers to the sink, the capacity assignment is the same.
Applying the maximum flow algorithm directly to this graph would produce a result that helps the skew a bit, but it wouldn’t be very effective. While we try not to overshoot the leader distribution by flooring the difference, at the same time we are losing the flexibility to adjust the brokers that have leader count close to L_avg. In the case above, since Broker 3 is allocated 0 capacity on it’s edge to the sink, it will not be able to increase its leader count and “absorb” a leader from the overloaded brokers, even if such a swap would reduce the overall skew across the partitions.
180 partitions spread across 122 brokers. Leaders aren’t fully balanced after applying the above method.
To mitigate this, we allocate two additional dummy nodes: the source_residue node and the sink_residue node. The source_residue has an edge from the source and an edge to each overloaded broker. The sink_residue is the direct opposite: each underloaded broker points to it, and it, in turn, points to the sink. The capacity between these two residue nodes and the brokers is always 1, but the edges between the terminals and residue nodes are a bit more complicated. As you can see in the name of these two nodes, they exist to “collect” the residue that we lost when flooring the difference on edges between the source to the brokers and the brokers to the sink. The capacity from the source to the source_residue is the sum of the leftovers of the overloaded brokers, while the underloaded brokers’ counterpart goes to the capacity of the edge from the sink_residue to the sink. How does this help? It gives a second chance to the brokers by providing the flexibility to nudge the leader count on brokers with already near-average leaders.
We can almost always achieve perfect leader balance of a topic using the maximum flow algorithms after adding the shared residue mechanism to the graph (the exception is cases of extremely skewed topics that have low or no connectivity between overloaded and underloaded brokers):
Leaders are balanced, but the number of reassignments increased a lot.
Although the topic in the above example is now fully balanced, the number of reassignments is extremely high compared to the theoretical minimum. This is because a lot of the reassignments don’t actually improve the skew since the algorithm doesn’t know what the distribution looks like and only tries to push as much from the source to the sink as possible. This potentially causes a lot of “see-saw” reassignments resulting in zero net skew change (i.e., moving a leader from a slightly overloaded broker to a slightly underloaded broker). We implemented two optimizations to help mitigate the issue.
The first one is to introduce a separate cost attribute to some edges, which will be incurred for every unit flow going through the edge. After applying the max-flow algorithm to the graph, we can apply minimum-cost-flow algorithms to solve for flows that minimize the cost. By setting the cost from the broker nodes to the partition nodes to 1, we penalize flows that are too long, favoring shorter reassignment chains. Placing a cost of 1 on edges from the source to the source_residue and from the sink_residue to the sink prioritizes the flow to use up the safe capacity that is allocated to the brokers before using the shared residue capacity, thus reducing see-saw reassignments.
The second optimization is pruning. If we remove the dummy nodes from the result flow, what’s left is the broker nodes and partition nodes. We can decompose the remaining graph into several weakly connected components. Each of these components represents a series of reassignments for which only the brokers in the given connected component are involved, and those brokers will not be involved in any other reassignments outside of that set of operations. If we calculate the “contribution” of each connected component to the result maximum flow, we can pick ones that actually reduce the skew and prune those that aren’t actually helpful, thereby reducing unnecessary reassignments.
After all these optimizations, the reassignment solution in the previous section can be improved to use only 15 reassignments. We think it is a reasonable result that uses only 4 more reassignments than the lower bound.
After the 2 optimizations. Leaders are balanced and operations are close to the lower bound.
Another example with a 120-partition topic covered by 135 brokers.
This started as a side project to automate the tedious manual rebalancing of troublesome imbalanced topics. However, it turned out to be a very useful tool for our Kafka operations. Our journey to explore and apply other interesting concepts in our Logging Platform team won’t stop here. At Pinterest Engineering, we are always looking for opportunities to combine different skills and backgrounds to create more value. If you are passionate about innovative solutions that solve interesting challenges, Pinterest Engineering would be a great playground for you to unleash your creativity!
Great thanks to Vahid Hashemian, Ambud Sharma, Henry Cai, Yu Yang, Heng Zhang and Eric Lopez on the Logging Platform team for providing support and feedback to this project.