Shagun Sodhani
7 min readJan 11, 2016

Kafka is a distributed and scalable messaging system for real-time consumption of messages. It outperforms traditional messaging systems in many ways.

  1. The traditional systems provide very strong delivery guarantees which increase system complexity and may not be needed in all cases.
  2. The emphasis is not so much on throughput.
  3. Support for distributed setup is weak.
  4. Performance degrades as messages began to accumulate.

To counter these issues, many specialized systems have come up. Facebook uses Scribe where each frontend machine sends log data to a set of Scribe machines. These machines aggregate the log entries and dump them to HDFS. Yahoo’ data highway generates “minute files” based on the aggregated data. Cloudera’s Flume is more flexible and has better-distributed support but all these systems are meant for the offline consumption of data and use a “push” model where the data is “pushed” to the consumers. Kafka uses a “pull” model making it easier to rewind a consumer.

Architecture

In Kafka, a stream of messages of a particular type defines a topic. A producer can publish messages to a topic and these messages are stored in a set of servers called brokers. A consumer can subscribe to multiple topics and consume the messages from the subscribed topics by pulling from the brokers.

A message is simply a payload of bytes and a set of messages can be sent together in a single publish request. A consumer creates message streams for the topic to which it wants to subscribe. Any message published to the topic are evenly distributed into the message streams. These streams provide an iterator over the stream of messages being processed. Both point-to-point delivery and publication-subscription model are supported.

Single Partition Implementation

Storage

Topics are divided into multiple partitions which are stored on brokers. Each partition corresponds to a logical log implemented as a set of segment files of the same size. Published messages are appended to the latest segment files and these files are flushed to the disk. Only flushed messages are available to consumers.

Messages are addressed by their logical offset in the log which becomes equivalent to a message id. This avoids the overhead of maintaining an auxiliary, seek-intensive random-access index to map messages to their id. Consumers pull data via asynchronous requests that specify the offset of the first message to be consumed and the number of bytes to fetch. Brokers maintain a sorted list of offsets including the offset for the first message in each segment and use this list to serve the request.

Efficient transfer

A producer can submit multiple messages in a single request. Similarly, the consumer pulls multiple messages in a request even though these messages are iterated on one at a time. There is no caching at Kafka layer and only file system page cache is used to reduce overhead. Kafka uses the sendfile API (of \*nix operation systems) to transfer bytes from file channel to a socket channel thereby saving on multiple copy and system calls.

Stateless broker

The consumer maintains information about how many messages it has consumed and the broker is stateless. This reduces the overhead for the broker and allows consumers to re-consume data by replaying messages in case of errors. For example, the consumer may choose to flush the data only periodically and hence some data may be lost if the consumer crashes. Using a pull model makes it easier to rewind a consumer.

A stateless broker makes it difficult to delete a message since the broker does not know if the message has been consumed by all the subscribed consumers. A simple time-based SLA is used where a message is deleted if it is stored beyond a certain time. It is assumed that consumers would have consumed the data in this duration. Such long retention is feasible as Kafka can hold a large amount of data without losing on performance.

Delivery Guarantee

Kafka guarantees at-least-once delivery. This means application would need to take care of duplicate messages though most of the time each message is delivered exactly once. Providing exactly-once semantics would require two-phase commits and is not always required. Only messages from a single partition are guaranteed to be delivered in order. CRC is stored for each message to check for I/O and network errors. Since replication is not supported, a failed broker means all unconsumed messages are lost.

Distributed Coordination

For publishing messages, producers can choose the partition either randomly or semantically by using a partitioning function.

A consumer group is a set of consumers that jointly consumes a set of subscribed topics. So any message in the topic is delivered to only one of the consumers in the group and coordination is required only between consumers in the same consumer group.

A partition within a topic is taken to be the smallest unit of parallelism. At any time, all messages from a given partition are consumed by a single consumer within a consumer group. Now coordination is needed only when rebalancing the load. For good load balancing, the number of partitions in a topic should be more than the number of consumers in the consumer group.

Consumers take a decentralized approach to coordination by using highly available consensus service called Zookeeper. Kafka uses Zookeeper for:

  1. detecting the addition and the removal of brokers and consumers and triggering actions in consumers.
  2. keeping track of the consumed offset of each partition.

Consumers and brokers make registry entry in zookeeper to store their information. The broker registry contains the broker’s hostname and port, and the set of topics and partitions stored on it. The consumer registry includes the consumer group to which a consumer belongs and the set of topics that it subscribes to. Each consumer group is associated with an ownership registry and an offset registry. The ownership registry has one path for every subscribed partition and the path value is the id of the consumer currently consuming from this partition. The offset registry stores the offset of the last consumed messages for each subscribed partition.

When a broker fails, its partitions are removed from the broker registry. When a consumer fails, its entry in the consumer registry and owned partitions in the ownership registry are lost. When a consumer is started or when a new broker/consumer is added, a rebalance process is initiated among consumers to determine the new subset of partitions that they are going to consume from. Each consumer first computes the set of partitions and set of consumers available for the topic that it has subscribed to. Then, it deterministically picks one subset of partitions for itself and makes appropriate changes in the ownership registry. It then starts pulling data starting from the offset defined in the offset registry which is updated as messages are consumed. It could be the case that a consumer tries to gain ownership of a partition held by some other consumer (which has not received the notification for rebalancing process so far). In that case, the first consumer gives up the ownership of all the partitions it own and retries the rebalance process after some time. In practice, the process is seen to stabilize after a few retries.

Kafka at LinkedIn

At LinkedIn, a Kafka cluster is co-located with each data center to which the frontend servers push their log data. Another Kafka cluster is setup in a separate datacenter for offline analysis. Consumers pull data from Kafka cluster on live data centers and then run jobs to load this data into a data warehouse and Hadoop Cluster. The end-to-end latency for this complete pipeline is 10 seconds.

An auditing system is used to track the number of messages lost along the entire pipeline. The messages carry the timestamp and the server name where they are generated and each producer periodically publishes monitoring events with records of the number of messages published by that producer for each topic. The consumers can now compare the number of generated messages with the number of consumed messages.

A special Kafka input format is implemented to allow MapReduce jobs to directly read from Kafka. Avro is used as the serialization protocol as it supports schema evolution. For each message, the id of its Avro schema and the serialized bytes are stored in the payload. A schema registry service is used to map the schema id to actual schema. The consumer looks up the schema registry to retrieve the corresponding schema.

Experiments

The paper compares Kafka’s performance with Apache ActiveMQ and RabbitMQ. All the tests use a single broker and a single producer/consumer. For producer test, the order of performance is:

Kafka with batching > Kafka without batching > rabbitMQ > ActiveMQ.

One reason for its superior performance is that Kafka producer does not wait for an acknowledgment from the broker. The paper argues that losing some log entries may be acceptable as long as the number of lost entries is very small but no numbers have been shown to support the hypothesis that indeed very few records are lost. Kafka also benefits from its efficient storage format and batching messages amortizes the RPC overhead as well. But Kafka in batch mode has not been compared with other alternatives in batch mode. Moreover, Kafka’s performance graph is showing a lot of downward spikes which have not been explained. In the case of consumer test, it again outperforms other alternatives. One reason is its efficient storage format and other is the stateless broker. It also benefits from the use of sendfile API. But no tests have been performed to show how it performs in a distributed environment as the number of producers/consumers increase.

Future Work

The paper mentions the following areas for improvements:

  1. Adding built-in replication of messages across brokers for data recovery in case of machine failure.
  2. Adding stream processing capabilities like support for windowing or join functions.