Apache BookKeeper Insights Part 1 — External Consensus and Dynamic Membership
The BookKeeper replication protocol is pretty interesting, it’s quite different to other replication protocols that people are used to in the messaging space such as Raft (RabbitMQ Quorum Queues, Red Panda, NATS Streaming) or the Apache Kafka replication protocol. But being different means that people often don’t understand it fully and can either get tripped up when it behaves in a way they don’t expect or not use it to its full potential.
This series aims to help people understand some fundamental insights into what makes BookKeeper different and also dig into some of the nuances of the protocol. We’ll dig into the “why” the protocol is the way it is and also some of the ramifications of those design decisions.
One of the best ways I know of how to describe design decisions is via comparison. Comparing one thing against another is a great way to discuss trade-offs, weak/strong points and many other aspects.
I’m going to use both Raft and Apache Kafka as comparison points. I am not going to try to persuade you that BookKeeper is better than other protocols, this is not a thinly veiled marketing piece. This post is about teaching the mechanics of the BookKeeper protocol and ramifications.
Also note that this is not an in-depth look at Raft or Kafka. I will be describing enough of those protocols for my aims, but will be glossing over large amounts of complexity. If you want to understand Raft and Apache Kafka more, the protocols are well documented elsewhere.
This first post describes the biggest difference between BookKeeper and other replication protocols. This difference informs most of the later posts on the nuances of the protocol also.
Integrated vs External Coordination
Raft is an “integrated” protocol. What I mean by that is that the control plane and data plane are both integrated into the same protocol and that protocol is exercised by the storage nodes which are all equal peers. Each node has all the data locally stored in persistent storage.
The same is true of Apache Kafka albeit with the use of ZooKeeper for metadata, though this will be removed soon (KIP-500).
With Raft, we have a steady state where replication is being performed, then periods of perturbation which trigger elections. Once a leader is elected, the leader node handles all client requests and replication of entries to followers.
With Raft the leader learns where in the log each follower is and starts replicating data to each follower according to their position. Because the leader has all the state locally, it can retrieve that state and transmit it, no matter how far behind a follower is.
With Kafka, the followers send fetch requests to the leader, the requests include their current position. The leader, having all the state locally, simply retrieves the next data and sends it back to the follower.
A byproduct of replication being performed by stateful fully integrated nodes is that cluster membership is relatively static. Yes you can perform cluster operations to add and remove members, but these are very infrequent operations with limits. The membership of a Raft cluster and the replicas that form a Kafka topic can be considered fixed in terms of the normal operation of the protocol.
BookKeeper is different. It has the consensus and storage separated. The storage nodes are simple and basically store and retrieve what they are told to. They understand almost nothing of the replication protocol itself. The replication protocol is external to the storage nodes and lives in the BookKeeper client. It is the client that performs the replication to the storage nodes.
BookKeeper was designed to act as the distributed log storage sub-system of another data system, like a messaging system or database, for example Apache Pulsar. Pulsar brokers use BookKeeper to store topics and cursors and each broker uses the BookKeeper client to do the reading and writing to those BookKeeper nodes.
The client is external and stateless which has a number of cascading effects that inform the design of the rest of the protocol. For example, because the client doesn’t have the full state locally, it needs to treat failure differently.
With Raft, if one node becomes unavailable for an hour we don’t have a big problem. When the node comes back, the stateful leader will simply resume replication to the follower from where it left off. The BookKeeper client doesn’t have that luxury, if it wants to continue to make progress, it can’t be storing the last X hours of data in memory, it must do something differently.
Because the replication and coordination logic lives externally of the storage nodes (in the client), the client is free to change the membership of a ledger when failure occurs. This dynamic membership is a fundamental feature differentiator and is one of BookKeeper’s most compelling features.
A data system like Pulsar having a separate storage layer has its downsides, like an extra network hop before any data hits disk and having to operate a separate cluster of bookies. If BookKeeper didn’t offer some truly valuable features, then it would become more of a liability than an asset. Luckily for us, BookKeeper has many wonderful features that make it worth it.
Now that we’ve set the scene, we’ll dig further in to explore how an integrated, fixed membership protocol like Raft compares to an external consensus, dynamic membership protocol like BookKeeper.
Each of our three protocols all have the concept of a commit index, though they have different names. A commit index is an offset in the log where all entries at that point and before will survive a certain agreed number of node failures.
In each case, an entry must reach a certain replication factor to be considered committed:
- For Raft it is a cluster majority and the guarantee is that committed entries will survive the permanent loss of any minority of nodes (N/2). So Raft requires a majority quorum to acknowledge an entry and to consider that entry committed.
- For Kafka it depends on various configs. Kafka supports majority quorum behaviour via the use of the client config acks=all and the broker config min-insync-replicas=[majority]. By default it only requires the leader to persist an entry before acknowledging it.
- For BookKeeper it is the Ack Quorum (AQ)and the guarantee is that committed entries will survive the permanent loss of any (AQ-1) of bookies.
NOTE: Because each protocol is different I will refer to the quorum that is required for an entry to be considered “committed” as the Commit Quorum. This is my own invented term for this post.
Raft calls this point in the log the commit index, Kafka calls it the High Watermark and BookKeeper calls it the Last Add Confirmed (LAC). Each protocol relies on this commit index to deliver its consistency guarantees.
In Raft and Kafka this commit index is transmitted between the leader and followers and so each node will have its own current knowledge of what the commit index is. The leader always knows the fully up to date value of the commit index whereas the followers may have a stale value, but that is ok.
With Kafka, the leader includes the High Watermark in its fetch responses to followers.
With BookKeeper, the LAC is included with every entry that is sent to the storage nodes. The storage nodes themselves have little use for it, but it allows clients to retrieve this vital information at a later point. So the client that is writing a ledger knows the current LAC and the storage nodes may have a slightly stale view of the LAC, but this is also fine and the protocol handles that. More on this later.
Reads that go past the commit index would be dirty reads where there are no guarantees that you’d be able to read the same entry again. Entries beyond the commit index could be lost or could be replaced with a different entry. For that reason each of the protocols don’t allow readers to read past this point.
Raft/Kafka Properties and Behaviour
With a Raft based system, you’ll specify your replication factor and that will translate into a Raft cluster of that many Raft members. With Kafka, that translates into a topic with that many replicas.
Raft members and Kafka replicas are fixed in terms of the steady state replication. One cost of this fixed membership is the tension between replication factor, availability and latency.
In an ideal world we’d want each entry to be fully replicated before being acknowledged. But followers can go down or be slow. Having a cluster become unavailable for writes because a single node becomes unavailable is not acceptable to most people with good reason. So the compromise is to reduce safety a little in order to gain availability and lower latency. We allow a minority of members to be unavailable and still offer good data safety and continued availability.
That is why Raft and Kafka really need a commit quorum that is lower than the replication factor.
This reduction in safety can be mitigated by simply increasing the replication factor. So if you want guarantees that committed entries will survive the loss of 2 nodes, then you’d need a replication factor of 5. You pay more for storage and network and also latency takes a small hit, but you only need the fastest 2 of your 4 followers to confirm an entry in order to acknowledge the entry to the client. So even with 2 slow nodes, you have acceptable latency and you reach your minimum rep factor that you are comfortable with.
An invariant is something that must be true at all times. You can look at the state of a system at any time and verify that its state conforms to the invariant. For example, an invariant may state that no committed entries are lost.
Liveness properties tell us what must happen at some point, for example, eventually a leader must be elected given that a majority of nodes are eventually functional and can see each other.
Our integrated log replication protocols have, among others, the invariants:
- Entries are appended to the leader’s log in temporal order.
- The leader appends entries to follower logs in the same order as its own.
- Committed entries are never lost as long as no majority of nodes die with total data loss (Only applies ot Kafka with ack=all, min-insync-replicas=[majority]).
- The log on a follower node, is identical to the current leader’s log, from the follower’s committed index and down.
One liveness property is that given all nodes are functional and have visibility of each other then eventually, any given committed entry will become fully replicated (as long as the prefix of the log is also fully replicated). In other words, the log tail will eventually reach the desired replication factor.
We can think of a replicated Raft log as being split into 3 zones of safety. At the head, beyond the committed index we’re in the danger zone, these entries have no guarantees and may be lost. Then the committed prefix of the log can be split into the head that reaches the majority quorum but not fully replicated yet and the tail that is fully replicated.
Prefix RF >= Entry RF >= Suffix RF
The rule is that for any given offset in the log, the prefix from that point must have reached the same or higher replication factor and the suffix after that point must have reached the same or lower replication factor.
What does all this mean for the administrator?
When everything is going well, we’d expect a small uncommitted zone, a small committed head and a very large committed tail. But things don’t always go well and the committed head/tail can be of arbitrary length — the tail could be 0 length meaning no there are no fully replicated entries. This could happen because a follower is too slow (and past data retention) or it could mean a follower just died catastrophically and started up empty.
The point is that the replication factor is not a guarantee but a desired goal. The only guarantee is the commit quorum. So the commit quorum is the minimum guaranteed replication factor. As an administrator, you need to plan your procedures and planning around that value, not just the replication factor. Hence why some people run Raft and Kafka with rep factors of 5.
Recovery from failure
Systems that use integrated replication protocols make recovery from total disk failure “relatively” simple. Any empty follower can be refilled from the current leader in exactly the same way as a follower that is mostly caught up. Replication saves the day.
Easy to reason about
All these properties make reasoning about the state of a Raft/Kafka log relatively simple:
- The members are fixed so we know where the data is.
- We know only the head of the log might be at the commit quorum or less.
- We know that if we lose a member it can get rehydrated by its stateful peers via the replication protocol.
- We also have to accept that the replication factor is a goal not a guarantee because the committed head and tail can be of arbitrary length so might need to increase the rep factor to a high value.
Now let’s take the same look at BookKeeper.
BookKeeper Properties and Behaviour
BookKeeper has similar configurations for the desired replication factor and for the commit quorum.
NOTE: I will assume that the Ensemble Size is equal to our Write Quorum as striping lowers read performance and makes it not worthwhile in practice.
Write Quorum (WQ) is our replication factor and Ack Quorum (AQ) is our commit quorum. Most people simply set Ack Quorum to be the majority quorum, so with a Write Quorum of 3, the Ack Quorum is set to 2. It would be reasonable to expect that using the quorum values of WQ=3 and AQ=2 would translate to the same behaviour as Raft or Kafka.
The answer is WQ and AQ do not map onto their equivalents in Raft or Kafka and to understand why we need to look more closely at the protocol with its external consensus and dynamic membership.
External, Stateless Client
The replication and consensus logic lives in the client. The client is stateless, it cannot keep data in memory for any arbitrary length of time until a bookie becomes available. So it stays nimble and simply selects a new bookie to replace the one that it cannot write to and continues on its way. This dynamic membership change is called an ensemble change.
This ensemble change is basically an operation to update the ledger metadata in ZooKeeper as well as resending all uncommitted entries to the new bookie.
The result of these ensemble changes is that a ledger can be considered a series of mini-logs (we’ll call them fragments) that constitute a larger log. Each fragment has a range of contiguous entries where each entry shares the same bookies (it’s ensemble). Each time a write to a bookie fails, the client does an ensemble change and carries on, creating a ledger that is formed from 1 or more fragments.
If we were to look at each individual fragment, we’d see a similar pattern to a Raft log or Kafka topic partition. The current fragment can be split into the same three zones: committed tail, committed head and uncommitted zone.
When an ensemble change occurs, the current fragment terminates at the end of the committed head (those entries that have reached Ack Quorum). The new fragment starts at the beginning of the uncommitted zone.
This leaves non-active fragments with entries that can remain at the Ack Quorum. Unlike Raft or Kafka, the core BookKeeper replication protocol will not eventually replicate those AQ entries in order to reach WQ — they will remain at Ack Quorum. Those entries can only be brought up to WQ via the use of a separate recovery process but that process is not part of the core protocol (and by default runs daily if enabled).
This means that a ledger could look like this:
This means that not only the head of a ledger may see entries at AQ, there can be multiple sections at this lower replication factor.
The fact that sections in the middle of a ledger can remain at AQ is a surprise to many. Most people probably expect a Raft/Kafka-like pattern where only the head sees this.
Is it important to note that Raft and Kafka logs can have arbitrarily long committed heads where entries have only reached the commit quorum but not replication factor. So whether you are an administrator of Kafka or BookKeeper, the fact is that the commit quorum is what counts.
Ack Quorum Isn’t What You Probably Think It is
The fact that BookKeeper uses an external replicator (the client) makes a big difference to our choice of commit quorum. Essentially the Ack Quorum isn’t really like it’s equivalents in Raft and Kafka.
As discussed earlier, because Raft and Kafka have fixed membership they really need a commit quorum that is lower than the replication factor or else suffer big availability and latency issues. The commit quorum is the compromise between safety and availability/latency.
A BookKeeper ledger is different though, it does not have fixed membership. If one bookie becomes unavailable, we swap it out for another and continue. This makes the Ack Quorum not equal to the Raft majority quorum or Kafka’s configured quorum.
With BookKeeper we can set the commit quorum to be equal to the replication factor, i.e WQ=AQ. If we set WQ=3, AQ=3 and one bookie is down, we select a new bookie and carry on. Notice that when WQ=AQ we don’t have the three zones of committed head/tail and uncommitted. It’s either fully replicated and committed or not.
This also means we don’t have sections in the middle of a ledger at a lower rep factor anymore. The entire tail reaches WQ.
In terms of data safety this is great. BookKeeper doesn’t need a majority quorum to offer high availability, we can tell BooKeeper to only acknowledge fully replicated entries.
There are of course some limits and impacts that need to be considered before you switch your AQ from a majority quorum to your replication factor.
Firstly, using WQ=AQ without loss of availability only applies when you have enough bookies. If you only have 3 bookies and use WQ=3, then you have a fixed membership like Raft. If you have 4 bookies then as soon as one bookie is down, you’re down to 3 again and fixed membership. So you would want to have many more than 3, opting for more smaller bookies than fewer large ones. If you have 5 bookies or less you may still want the wriggle room that AQ<WQ gives you.
Availability % does take a small hit when using WQ=AQ as availability now also depends on operations to ZooKeeper succeeding. As soon as a write to a bookie fails, we must be able to complete an ensemble change in order to be able to resume and get entries acknowledged.
However, I consider that we’re already in that boat anyway. Ledgers are small, bounded logs unlike Raft and Kafka’s theoretical infinite logs. Ledgers act as log segments and so they are getting created and closed constantly, and this requires successful metadata operations, so you cannot go for long without metadata changes in any case.
Write latency will have more variance as ensemble changes will cause more write latency. Ensemble changes are normally extremely fast but if ZooKeeper is under heavy load then it could be possible for slow ensemble changes to cause write latency spikes. So if having constant low latency is very important then you’ll likely want to stick with AQ being a majority quorum.
Replication Factor of 2
Why can’t we have Raft clusters of 2 members? Because a single node going down makes the cluster unable to make progress. We still get redundancy but we get worse availability than a single node. Likewise with Kafka, we can either offer a rep factor 1 or of 3 but not 2. To guarantee a rep factor of 2 you need to set min-insync-replicas=2. So if one replica goes down, we have the same issue as Raft.
But with BookKeeper, we can use a rep factor 2 without an issue. We simply set WQ=2 and AQ=2. We get redundancy and also don’t lose availability if a single node goes down. That’s pretty neat.
In this first post we’ve focused on BookKeeper’s external consensus and dynamic ledger membership and how that contrasts to a more traditional fully integrated protocol like Raft and Apache Kafka with fixed membership.
We’ve seen that BookKeeper’s dynamic membership allows it to side step the usual compromise between safety and availability/latency. Where conservative configurations with Raft might choose a rep factor of 5 to ensure it can survive the loss of 2 nodes, with BookKeeper we can achieve similar results with only a replication factor of 3. We can even choose WQ=4, AQ=3 to allow us to reduce the extra latency from slow ensemble changes. You have a bit more freedom than you think when setting your Write Quorum and Ack Quorum.
We also saw that when AQ < WQ you may have blocks in the middle of your ledger that only reach AQ replication, which can surprise people. In a later post we’ll look at potential tweaks to the protocol that could change this behaviour and why it might not be worth it or even safe.
This is by no means the end of the ways that BookKeeper differs from integrated protocols like Raft and Kafka. There are many more things to consider when trying to understand the BookKeeper replication protocol in detail.
In the next post we’re going to look at another aspect of the BookKeeper replication protocol that is necessary due to its external consensus: handling client failure and the closing of ledgers correctly. There be some dragons and many BookKeeper open source developers have fallen foul of this tricky area of the protocol.
Finally, as with everything, it’s all about trade-offs. Integrated protocols make different trade-offs to BookKeeper and neither is “the best” and this post or even this series is not an attempt to do a This Versus That comparison. The comparison is there as a vehicle for learning.