Apache Kafka leaves the Zoo

Apache Kafka has become de facto the standard system for brokering messages in highly available environments. The promise of linear scalability and fault tolerance implied the need of distributed architecture, which typically involves coordination between nodes. Creators of Kafka leveraged Apache ZooKeeper to guarantee strong consistency of cluster state, its configuration and discovery of peers. ZooKeeper dependency confuses newcomers and makes Kafka deployment more complex. Hereby essay describes the process of replacing ZooKeeper with Atomix, a reactive Java framework for building fault-tolerant distributed systems.


Strong consistency, required

Modern distributed systems often distinguish between data that requires strong consistency guarantees, and information that may become coherent at some point in the future. Apache Kafka defines different protocol for handling updates of cluster-level metadata (e.g. cluster state, topic configuration, ISR list) and separate mechanism for replicating topic partitions between brokers. Cluster metadata is modified infrequently, its size does not exceed few megabytes, but every node needs to have a consistent view at any point in time. Updates to meta-information shall not appear on common read or write request path, not to degrade performance. On the contrary, Kafka records stored in topics are constantly appended, partition size can exceed gigabytes, but only subset of nodes manage given topic-partition.

For the remaining part of the article, let us focus on the sub-system that will manage cluster metadata in strongly consistent way. In terms of CAP theorem, the module needs to fall under “CP” category. This means that we trade off availability in favor of consistency and tolerance of network partitioning. It seems better for the system to become shortly unavailable, instead of running into split-brain situation, which usually causes data corruption. The solution requires quorum of nodes (n/2 +1) to be reachable to operate correctly.

Network partitioning in Apache Kafka cluster

Few words on ZooKeeper

Creators of Apache Kafka decided to leverage Apache ZooKeeper as a cluster coordination service. ZooKeeper provides configuration management capabilities, distributed synchronization, group membership and leader election receipts. Its data model represents a set of nodes (called “znodes”) organized in hierarchical tree-like data structure. Every node can have mix of different characteristics — persistent, ephemeral or sequential — and contain arbitrary payload stored as binary byte array. Apache Kafka persists data in human-readable JSON format. If you have not done already, download official ZooKeeper distribution which provides CLI interface to interact with the service. Below example presents configuration of Kafka topic, its number of partitions and assignment to brokers:

[zk: localhost:2181(CONNECTED) 0] get /brokers/topics/my-topic
{"version":1,"partitions":{"1":[0,1],"0":[1,0]}}
cZxid = 0x32
ctime = Wed Jan 30 22:11:49 PST 2019
mZxid = 0x32
mtime = Wed Jan 30 22:11:49 PST 2019
pZxid = 0x34
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 48
numChildren = 1

Nodes of ephemeral type disappear once client session (Kafka broker) that owns given znode disconnects from the cluster. ZooKeeper provides the mechanism of one-time watches that enables clients to receive notifications once znode representing certain path, or any of its children, is created, updated or removed. Apache Kafka implements service discovery using ephemeral data and watches. Every broker subscribes to child notifications of /brokers/ids to receive an event whenever peers leave or join the cluster. Exactly one of Kafka instances within the cluster acts as controller who manages state of partitions, replicas and performs administrative tasks. Every broker is eligible to become controller and watches for removal of ephemeral /controller node. Upon notification of key deletion, server initiates leader election process. The first engine that successfully creates new znode wins the election.

[zk: localhost:2181(CONNECTED) 0] ls /brokers/ids
[0, 1]
[zk: localhost:2181(CONNECTED) 1] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.1.8:9092"],"jmx_port":-1,"host":"192.168.1.8","timestamp":"1548914820968","port":9092,"version":4}
cZxid = 0x18
ctime = Wed Jan 30 22:07:01 PST 2019
mZxid = 0x18
mtime = Wed Jan 30 22:07:01 PST 2019
pZxid = 0x18
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1000005c2fd0000
dataLength = 192
numChildren = 0
[zk: localhost:2181(CONNECTED) 2] get /controller
{"version":1,"brokerid":0,"timestamp":"1548914821132"}
cZxid = 0x1a
ctime = Wed Jan 30 22:07:01 PST 2019
mZxid = 0x1a
mtime = Wed Jan 30 22:07:01 PST 2019
pZxid = 0x1a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1000005c2fd0000
dataLength = 54
numChildren = 0

Sequential znodes guarantee unique names by automatically assigning a sequence number suffix. Apache Kafka uses various nodes to broadcast cluster state changes, e.g. /isr_change_notification for updating ISR list of topic-partitions. Sequential nodes guarantee entry ordering and form a foundation of ZooKeeper queue recipe. Example value of notification about ISR change captured while running Kafka inside IDE:

/isr_change_notification/isr_change_0000000015
{
"version": 1,
"partitions": [
{ "topic": "__consumer_offsets", "partition": 28 },
{ "topic": "__consumer_offsets", "partition": 22 }
]
}

Kafka brokers heavily depend on connectivity with ZooKeeper ensemble. If the link breaks, users may experience messages lost, but luckily only with relaxed setup of acks property on producer side (different than all). I recommend reading an article series by Jack Vanlightly, who performed extensive Kafka resilience testing.

To summarize, Apache ZooKeeper is an excellent tool for managing data in strongly consistent fusion. It has been used by number of big data Apache projects in Hadoop ecosystem (e.g. Hadoop MapReduce, HBase, Solr, Neo4J, Mesos). But in 2019, we may have other cool options…


Welcome Atomix, Swiss knife for distributed systems

Atomix is an event-driven Java framework which solves problems commonly encountered by developers of distributed systems. It provides distributed version of well known Java primitives such as maps, locks and counters which are backed by one of distributed protocols — Raft, Gossip or Primary-Backup. Library ships building blocks that enable implementation of cluster management, group membership, leader election, distributed concurrency control, partitioning, and replication.

We require strong consistency guarantees from ZooKeeper replacement, so the natural choice points us to Raft protocol for storing Kafka metadata. Atomix implements enhanced Multi-Raft algorithm that partitions key set among configurable number of independent consensus groups, and coalesces all heartbeats of single node to minimize network traffic. Each group has its own Raft leader. ZooKeeper maintains only one leader who broadcasts updates to all replicas. Detailed explanation of ZAB (ZooKeeper Atomic Broadcast) and Raft algorithms is beyond the scope of hereby article, but both protocols have very much in common. In fact the major difference between basic Raft version and ZAB is that the later synchronizes followers with newly elected leader before normal operation phase, whereas Raft simplifies algorithm states and performs mentioned activity as part of normal operation mode.

Curious readers may wonder why to replace ZooKeeper with Atomix? Best answer is the simplicity of deployment and architecture. Kafka would no longer rely on external service, which simplifies Kubernetes manifests. For small and medium size cluster, every node could have identical characteristics (broker and Raft node), there would be no split between brokers and ZooKeeper servers while thinking about Kafka infrastructure. Large deployments may choose only a subset of hosts to participate as Raft nodes in consensus groups due to performance reasons. The larger the cluster, the longer it takes for quorum (n/2 + 1) of nodes to acknowledge write requests.


And so they code…

Making extensive changes to code base of big projects such as Apache Kafka, can be a challenging activity. Luckily interaction with ZooKeeper is mostly handled by two Scala classes — kafka.zk.KafkaZkClient and kafka.zookeeper.ZooKeeperClient. KafkaZkClient provides over 100 logical methods such as setting controller epoch, triggering update of partition leader and ISR list, or discovering all reachable brokers in the cluster. On the other hand, ZooKeeperClientis responsible for purely technical aspects — sending requests to ZooKeeper, notifying whenever watch has been executed, reconnection with the ensemble upon network disruption etc. Following the pragmatic approach, I have decided to reimplement ZooKeeperClient and leverage Atomix framework. Of course, proposed decision has its cons. From design perspective it is far from being perfect. ZooKeeper JAR remains on the classpath as Atomix response codes and exceptions have to be translated to ZK domain. The main advantage is the ease of maintenance while pulling changes from upstream Kafka Git repository.

I have decided to store Kafka metadata in io.atomix.core.map.AtomicMap whose key corresponds to path of the znode and value represents binary data. Support of additional ZooKeeper features required introduction of few helper data structures:

  • io.atomix.core.map.AtomicCounter to generate unique identifiers of client session that will be further assigned as owner of ephemeral data.
  • io.atomix.core.map.AtomicCounterMap to produce sequence numbers for sequential nodes.
  • io.atomix.core.multimap.AtomicMultimap to maintain mapping of all ephemeral paths actively owned by given broker.
  • io.atomix.core.lock.AtomicLock guarantees exclusive write access to AtomicMap and AtomicMultimap during the process of stale ephemeral data deletion.

Atomix distributed map API provides the ability to conditionally modify value of given key using optimistic locking, which resolves the problem of concurrent updates. The biggest challenge was the implementation of ephemeral data as Atomix does not ship hereby feature out-of-the-box. Having the choice of setting Time-To-Live on AtomicMap keys and periodically refresh those before expiration, or manually removing stale data based on notifications from Atomix membership protocol, I have decided to continue with the second option. Implementation details can be viewed on my GitHub repository — https://github.com/lukasz-antoniak/kafka.


Demo time!

The easiest way to play with “Kafka-Atomix” distribution, is to download source code from Git repository and navigate to config/kafka-atomix folder. You will find a simple Docker Compose manifest that provisions three node Kafka cluster. Script assumes that you have build the image locally from source, so just replace all occurrences of image: ‘kafka-atomix:latest’ with image: ‘lantonia/kafka-atomix:latest’ in docker-compose.yml to pull ready to use binaries from Docker Hub. Execute $ docker-compose up to begin the journey…

First let us create “test” topic, send and receive some messages.

$ docker exec -it kafka0 /bin/bash
# ./bin/kafka-topics.sh --create --zookeeper atomix:///volume/kafka/config/atomix-admin.conf --replication-factor 2 --partitions 2 --topic test
# ./bin/kafka-console-producer.sh --broker-list kafka0:9092 --topic test --request-required-acks -1
# ./bin/kafka-console-consumer.sh --bootstrap-server kafka0:9092 --topic test --from-beginning

You may inspect the current state of the cluster by viewing active brokers and elected controller.

# ./bin/kafka-cluster.sh --atomix /volume/kafka/config/atomix-admin.conf --describe --keys /brokers/ids,/controller --include-children
10 /brokers/ids {"data":"[null]","version":0,"owner":0}
52 /brokers/ids/0 {"data":"{\"listener_security_protocol_map\":{\"PLAINTEXT\":\"PLAINTEXT\"},\"endpoints\":[\"PLAINTEXT://kafka0:9092\"],\"jmx_port\":-1,\"host\":\"kafka0\",\"timestamp\":\"1549061407166\",\"port\":9092,\"version\":4}","version":0,"owner":2}
30 /brokers/ids/1 {"data":"{\"listener_security_protocol_map\":{\"PLAINTEXT\":\"PLAINTEXT\"},\"endpoints\":[\"PLAINTEXT://kafka1:9092\"],\"jmx_port\":-1,\"host\":\"kafka1\",\"timestamp\":\"1549061406309\",\"port\":9092,\"version\":4}","version":0,"owner":1}
51 /brokers/ids/2 {"data":"{\"listener_security_protocol_map\":{\"PLAINTEXT\":\"PLAINTEXT\"},\"endpoints\":[\"PLAINTEXT://kafka2:9092\"],\"jmx_port\":-1,\"host\":\"kafka2\",\"timestamp\":\"1549061407431\",\"port\":9092,\"version\":4}","version":0,"owner":3}
37 /controller {"data":"{\"version\":1,\"brokerid\":1,\"timestamp\":\"1549061406676\"}","version":0,"owner":1}

Login to the server currently acting as controller and simulate network partitioning.

$ docker exec -it kafka1 /bin/bash
# iptables -I INPUT -j DROP
# iptables -I OUTPUT -j DROP

Open command prompt of other two brokers. Inspect the state of the cluster. You should notice only two active instances and re-election of the controller. In-sync replica set of our “test” topic has also shrinked.

$ docker exec -it kafka0 /bin/bash
# ./bin/kafka-cluster.sh --atomix /volume/kafka/config/atomix-admin.conf --describe --keys /brokers/ids,/controller --include-children
10 /brokers/ids {"data":"[null]","version":0,"owner":0}
52 /brokers/ids/0 {"data":"{\"listener_security_protocol_map\":{\"PLAINTEXT\":\"PLAINTEXT\"},\"endpoints\":[\"PLAINTEXT://kafka0:9092\"],\"jmx_port\":-1,\"host\":\"kafka0\",\"timestamp\":\"1549061407166\",\"port\":9092,\"version\":4}","version":0,"owner":2}
51 /brokers/ids/2 {"data":"{\"listener_security_protocol_map\":{\"PLAINTEXT\":\"PLAINTEXT\"},\"endpoints\":[\"PLAINTEXT://kafka2:9092\"],\"jmx_port\":-1,\"host\":\"kafka2\",\"timestamp\":\"1549061407431\",\"port\":9092,\"version\":4}","version":0,"owner":3}
219 /controller {"data":"{\"version\":1,\"brokerid\":0,\"timestamp\":\"1549061587798\"}","version":0,"owner":2}
# ./bin/kafka-cluster.sh --atomix /volume/kafka/config/atomix-admin.conf --describe --keys /brokers/topics/test --include-children
76 /brokers/topics/test {"data":"{\"version\":1,\"partitions\":{\"1\":[1,2],\"0\":[0,1]}}","version":0,"owner":0}
93 /brokers/topics/test/partitions {"data":"[null]","version":0,"owner":0}
80 /brokers/topics/test/partitions/0 {"data":"[null]","version":0,"owner":0}
213 /brokers/topics/test/partitions/0/state {"data":"{\"controller_epoch\":1,\"leader\":0,\"version\":1,\"leader_epoch\":0,\"isr\":[0]}","version":1,"owner":0}
97 /brokers/topics/test/partitions/1 {"data":"[null]","version":0,"owner":0}
241 /brokers/topics/test/partitions/1/state {"data":"{\"controller_epoch\":2,\"leader\":2,\"version\":1,\"leader_epoch\":1,\"isr\":[2]}","version":1,"owner":0}

Let us send few more messages. Finally restore network on partitioned container. You should see that broker successfully rejoins the cluster. ISR list of “test” topic expands.

$ docker exec -it kafka1 /bin/bash
# iptables -D INPUT 1
# iptables -D OUTPUT 1

To complete demo, shutdown Kafka cluster by invoking $ docker-compose down --volumes.


Summary

Hope you liked the demo! Well, I cannot yet recommend decommissioning your reliable Apache Kafka cluster and provisioning Kafka-Atomix distribution. More resilience tests need to be performed. I am looking forward to checkout Blockade which introduces randomized hardware failures into the cluster. Next step is to support Kafka 2.x, because current modifications apply only to branch 1.1. Kafka 2.x takes advantage of ZooKeeper multi() operation, which will most probably translate to Atomix transactions (two-phase commit protocol).

Are there any other attempts to replace ZooKeeper in Apache Kafka? Yes. Banzai Cloud maintains a version that depends on etcd (KIP-273), whereas Jocko projects aims to completely rewrite Kafka in Go using Raft for consensus and Serf for service discovery.

Feel free to checkout project source code available on GitHub and play with sample three node “dockerized” environment! Hopefully the project will make Kafka cluster even easier to deploy and maintain.