Kafka White Paper

Kam Singh
15 min readAug 24, 2021

--

Kafka Introduction

Kafka was developed by LinkedIn in 2010, and it has been a top-level Apache project since 2012. It is a highly scalable, distributed, durable, and fault-tolerant publish-subscribe event streaming and messaging system and is written in Scala and Java. Kafka is maintained by Apache Software Foundation.

Following Diagram gives and overview of Kafka and its various types of clients.

Generally, there are two types of messaging patterns available, i.e. Queuing (point to point) and publish-subscribe (pub-sub) messaging system.

  1. Queuing (Point-to-Point)
  2. Publish-Subscribe (Pub/Sub)

Kafka enables both the above models through “Consumer group” concept making it scalable in processing and a multi-subscriber.

As with a queue, the consumer group allows you to divide up the processing over the members of the consumer group. As with the publish-subscribe, it allows you to broadcast messages to multiple consumer groups.

Key Concepts / Components of Kafka

Kafka is a multi-producer, multi-consumer structure, and it looks something like below diagram:

The Kafka architecture revolves around following core Components or Concepts:

  • Broker,
  • Messages/Records,
  • Topic,
  • Producers,
  • Consumers,
  • Streams,
  • Connector

Broker

One or more Kafka servers known as brokers as a part of Distributed messaging systems forms a cluster. Therefore, Broker is an instance or a single server of a Kafka cluster.

Within a cluster of brokers, a broker will be automatically elected as the controller. The controller is responsible for admin operations, such as partition assignment to broker, monitoring broker failures, etc.

Controller election

  1. The first server that is registered in the cluster creates a controller node in zookeeper- explained below in the document.
  2. Other servers that are also being registered to the cluster try to create a controller node in zookeeper but they get the notification “controller exists”.
  3. Other servers (brokers) then create a zookeeper watch to monitor the controller node.
  4. If the controller broker goes down, other brokers are notified. Then each one tries to become a controller but only one wins the race.

The Kafka cluster durably persists all published records/messages on the topic or topics — whether they have been consumed or not — using a configurable retention period or until a storage size is exceeded. These records then cannot be deleted or modify once they are sent to Kafka (this is known as “distributed commit log”).

Following diagram shows and Kafka cluster with three nodes i.e. Brokers. The diagram also captures the Zookeeper instances running on these servers which are described later in this paper.

Messages / Records

The smallest entity in Kafka’s ecosystem is called a message or record. A “message” is a key/value pair of data along with metadata such as a timestamp and message key. Messages are stored inside topics within a log structured format. Log is a persistent ordered data structure which only supports appends.

The record inside topic gets written sequentially with a sequential id (known as Offset) assigned for each message starting from 0. You cannot modify nor delete records from the topic. It is read from left to right and guarantees item ordering. Uniqueness of message is determined from a tuple of Partition, Topic and Offset.

Topics

The topic is the structure that holds the messages, or the records published to the Kafka broker. Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one, or many consumers that subscribe to the data written to it. Consumers can subscribe to a topic to get notified when new messages are added.

A topic can be divided into partitions, the place exactly where data is published. These partitions are distributed across the brokers in the cluster. Each partition is an ordered, immutable sequence of records that is continually appended to — a structured commit log. The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition. A general formula to pick up the number of partitions is based on throughput needs. If partition keys are not used then Kafka assigns the keys in a default manner which makes the ordering of messaging unguaranteed.

Each partition has one server which acts as the “leader” and zero or more servers which act as “followers”. The leader handles all read and write requests for the partition while the followers passively replicate the leader.

If the leader fails, the Kafka controller will detect the failure and elect a new leader from the pool of followers. The Leader assignment process is mentioned further below.

Each server acts as a leader for some of its partitions and a follower for others so load is well balanced within the cluster. Partitions allow Kafka to parallelize a topic by splitting the data of a topic across multiple brokers, thus adding an essence of parallelism to the ecosystem. i.e. each partition can be placed on a separate machine to allow for multiple consumers to read from a topic in parallel. This is how Kafka delivers high throughput. On the other hand more partition sometime leads to low latency as in Kafka the message can be available to consumers until it is committed means until it is replicated.

In the following diagram a topic is partitioned on the 3 brokers. The Leader is depicted in Green whilst a Follower is in Grey.

Leader assignment

The assignment of a leader for a particular partition happens during a process called partition leader election. This process happens when the topic/partition is created or when the partition leader (i.e. the broker) is unavailable for any reason.

  1. A broker X goes down. All partitions whose leader is broker X needs a reassignment.
  2. Controller nodes select leaders and followers for those partitions.
  3. Controller nodes send requests to each broker containing which either will be a new partition and followers.

Producers

Producers through Producer APIs can connect to Kafka Brokers and publish a stream of records or messages to one or more Kafka topics irrespective of the desired partition. A producer can also write to a selected partition with the help of the message key and partition that will generate a hash of key and map it to the selected partition.

Consumers

Consumers are the subscribers of Kafka topics. Through Consumer APIs an application can connect to Kafka Brokers and consume a stream of records or messages from one or more Kafka topics. The data is extracted from a Kafka topic using a Consumer Instance which is encapsulated inside a Consumer Group. It is basically a collection of one or more consumers working collectively in parallel to consume messages from topic partitions. Consumer group is a multi-threaded or multi-machine consumption from Kafka topics.

Each consumer has a consumer group ID. Consumers registered with the same group-id would be part of one group. Kafka stores the current offset per Consumer Group / Topic / Partition, as it would for a single Consumer. Therefore, Consumers would be able to consume only from the partitions of the topic which are assigned to it by Kafka. No two consumers of the same group-id (i.e. same consumer groups) would be assigned to the same partition. That means each partition is tied to only one consumer process per consumer group, that way it avoids reading the same record twice- Guaranteed each message is only processed once.

Consumers send an acknowledgment to Kafka upon ingesting the message. Therefore, the responsibility of what has been consumed is the responsibility of the consumer and this is also one of the main reasons why Kafka has such great horizontal scalability.

By storing the offset of the last consumed record for each partition, either in Zookeeper or in Kafka itself to keep track of consumed messages, a consumer can stop and restart without losing its place.

If you have consumers > partitions then some consumers will be idle because they have no partitions to read from. If you have partitions > consumers then consumers will receive messages from multiple partitions. If you have consumers = partitions, each consumer reads messages in order from exactly one partition (i.e. the load is balanced across consumers as equally as possible). In the following diagram two consumers in a Consumer group are connected to three partition meaning one of the consumers is getting messages from two partition.

In another example below, Server 1 holds partitions 0 and 3 and server 2 holds partitions 1 and 2. We have two consumer groups, A and B. A is made up of two consumers and B is made up of four consumers. Consumer Group A has two consumers, so each consumer reads from two partitions. Consumer Group B, on the other hand, has the same number of consumers as partitions so each consumer reads from exactly one partition. In the example above, when multiple consumers are subscribing to a Topic under a common Consumer Group ID, in this case, Kafka switches from pub/sub mode to a queue messaging approach. However, when Kafka allows only one consumer per topic partition, there can multiple consumer groups read from the same partition, in this case Kafka is acting as a pub/sub system- A Fan-out feature.

Unlike message queues where messages are removed from the queue when they are consumed, consuming a message in Kafka doesn’t remove it from the partition. Instead each consumer maintains an offset of the messages it has already consumed. In case of failure, the subscriber can then restart processing from where it left off.

Streams

Streams consume an input stream from one or more topics and produce an output stream to one or more output topics, effectively transforming the input streams to output streams.

A use case of this could be where a stream processor through Streaming APIs read data from a topic, running some form of analysis or data transformation, and finally writing the data back to another topic or shipping it to an external source.

Although Kafka enables this activity with regular Producers, with Streams we can achieve real-time stream processing rather than batch processing. Stream processors are independent of Kafka Producers, Consumers, and Connectors.

Kafka offers a streaming SQL engine called KSQL for working with Kafka Streams in a SQL-like manner without having to write code like Java. KSQL allows you to transform data within Kafka streams such as preparing the data for processing, running analytics and monitoring, and detecting anomalies in real-time.

Connector

Kafka Connect is a framework for importing data into Kafka from external data sources or exporting data to external sources like databases and applications. Connectors are the reusable producers or consumers that allow you to move large amounts of data in and out of Kafka to many common external data sources and provides a framework for creating your own custom connectors. For example, a connector to a relational database might capture every change to a table. Following Diagram highlights some of the connectors (source and sink) used to move data from one end to another. Connector workers with required plugins are installed on a separate cluster to Kafka Cluster generally to avoid interference and performance challenges. Some commercial offering of Kafka ships with the ready-to-use connectors.

Connectivity and Networking

Kafka uses binary TCP protocol in its native Kafka Java Producer/Consumer API communications for producing or consuming messages. The native Kafka protocol client is implemented in Java which achieves better network utilization. Serialization/deserialization is required in Kafka as the hard disk or network infrastructure is a hardware component and we cannot send java objects because it understands just bytes and not java objects. Therefore Java object state of the messages needs to be serialized into bytes to send it over the network or store it in the hard disk. Avro is a popular serialization/deserialization framework for Kafka key/value. Kafka clients handle load balancing, failover, and cluster expansion or contraction automatically as opposed to REST clients which typically require a third-party load balancer to achieve the same functionality. This provides the benefit is greater parallelism where a Kafka client will typically open up TCP connections to multiple brokers in the cluster and send or fetch data in parallel across multiple partitions of the same topic. There is a REST APIs in Apache Kafka for configuring Kafka Connect.

There are a number of third party REST Proxy implementations (such as the Confluent Kafka REST Proxy) which allow pub/sub over a REST interface but these are separate open source projects outside of Apache Kafka

What is the role of Zookeeper?

To run Kafka, you need Zookeeper. It can be understood as a Resource Management part of the whole ecosystem. It keeps the metadata about the processes running in the system, performs health check and broker leader selection, etc. It is a distributed key-value store and used for distributed cluster management.

Producer and consumers don’t directly interact with Zookeeper to know about the leader of a partition and other metadata, instead, they do metadata requests to Kafka Broker which in turn interact with Zookeeper and give metadata response and then producer and consumer get informed to which partition they need to write and read form respectively.

Zookeeper for Kafka performs following:

  • Controller election: The controller is one of the brokers and is responsible for maintaining the leader/follower relationship for all the partitions. When a node shuts down, it is the controller that tells other replicas to become partition leaders to replace the partition leaders on the node that is going away. Zookeeper is used to elect a controller, make sure there is only one and elect a new one it if it crashes.
  • Cluster membership: Zookeeper maintains a list of all the brokers that are functioning and are a part of the cluster at any given moment.
  • Topic configuration: — which topics exist, how many partitions each has, where are the replicas, who is the preferred leader, what configuration overrides are set for each topic
  • Quotas: How much data is each client allowed to read and write
  • Access Control Lists (ACL): Who is allowed to read and write to which topic (old high level consumer) — Which consumer groups exist, who are their members and what is the latest offset each group got from each partition.

Design Patterns

When setting up Consumers and Consumer Groups, we have several options to suit the different business needs. A consumer can read only those messages that have been written to all the replicas.

Single partition / Single consumer

Producer publishes messages into the single partition topic and the consumer consumes the messages from the single partition.

Of course, as there is a single partition this solution doesn’t scale. However, if you don’t need scaling it provides a total ordering of the messages which might be really helpful depending on your use cases.

If there are multiple Kafka node the partition can be replicated providing fault-tolerance. However, we need to restart the consumer in case of failure of the master partition.

Single partition + Multiple consumers (Fan-out)

This is a simple variation of the above where multiple consumers consume from the partition. As messages are not removed from Kafka when they are consumed, it’s possible to add more than one consumer, each maintaining its own message offset. Of course, the consumers must be in different consumer groups to consume from the same partition. This pattern is also known as fan-out.

Multiple partitions + Multiple consumers

At least-once delivery

Assumes that the consumer fails in the middle of fetching the message from a topic or committing the offset to Kafka. When the consumer recovers it will start from the previous offset, which means reprocessing some messages that were already processed. This is at-least-once delivery.

Exactly-once delivery

To achieve exactly-once delivery, no two consumers in a consumer group can pull the messages from the same partition. This ensures all messages are read and avoid any duplication; however, it also reduces data throughput. Kafka maintains load balance between Consumers and Partitions by evenly distributing the load between Consumers. Application while processing the message, keeps a track of each message by committing the offset along with a message to a transactional system. In case of a consumer crash, it will read the last committed offset from the transactional system and start reading after that point. This leads to no data duplication and no data loss but can lead to decreasing throughput.

At-most-once

Whilst Kafka guarantees at-least-once delivery by default, it also allows users to implement at most once delivery by disabling retries on the producer and committing its offset prior to processing a batch of messages. In a scenario, where the consumer has still not completed the processing of messages (or last read offset has not been committed in Kafka by the consumer), but the and crashes in between. So, when the consumer restarts, it will receive messages from the last committed offset and may lose some messages.

So …

When to use the same consumer group?

Consumers should be part of the same group when the consumer performing an operation needs to be scaled up to process in parallel. Consumers part of the same group would be assigned with different partitions. As said before, no two consumers of the same group-id would get assigned to the same partition. Hence, each consumer part of a group would be processing different data than the other consumers within the same group. Leading to parallel processing. This is one of the ways suggested by Kafka to achieve parallel processing in consumers. Here Kafka is acting as a queue/messaging platform.

When to use the different consumer group?

Consumers should not be within the same group when the consumers are performing different operations. Some consumers might update the database, while other sets of consumers might do some computations with the consumed data. In this case, we would want all these different consumers to be reading all the data from all the partitions. Hence, in this kind of use case consumers to read data from all the partitions, we should register these consumers with different group-id. Here Kafka is acting as a pub/sub platform.

How would the offsets be maintained for consumers of different groups?

Offset, an indicator of how many messages has been read by a consumer, would be maintained per consumer group-id and partition. When there are two different consumer groups, 2 different offsets would be maintained per partition. Consumers of different consumer groups can resume/pause independent of the other consumer groups. Hence, leaving no dependency between the consumers of different groups.

What are some of the use cases?

  • Event Stream Processing (Log data, Financial data, clickstreams, real-time processing)
  • Microservice designs (CQRS, Event Sourcing, Pub/Sub etc.)
  • Decoupling of the systems
  • IoT Edge processing (Sensor data etc)
  • Big data Ingestion -Data Pipelines
  • ML Pipelines (AI and Data Science)

Who are the other competitors?

How is Kafka different than other Pub Subs

  • Exactly once semantics
  • Guaranteed Delivery
  • Ordered Delivery
  • Persistence

Kafka isn’t the only player in the game when it comes to messaging queues. There are plenty of stand-in alternatives to Kafka, many of which are easier to set up- especially if you’re locked into a particular cloud provider. Here are some choices:

Amazon SQS: One of the oldest alternatives to Kafka. Runs into issues when throughput increases significantly.

Amazon Kinesis: The closest thing AWS has to “Kafka as a service”. Kinesis supports partitions like Kafka, as well as replicas.

Google PubSub: A highly convenient alternative to Kafka. Perhaps one of the best options for anybody looking to hit the ground running, as there is no configuration needed. Can scale to massive volumes.

RabbitMQ: A popular message broker, written in Erlang and similarly open-source. RabbitMQ plays well with almost every programming language.

What Kafka doesn’t do?

  • Kafka is not push-based
  • Kafka does not have individual message IDs. Messages are simply addressed by their offset in the log.
  • Kafka also does not track the consumers i.e. who has consumed what messages. All of that is left up to the consumers. This is why it is known as the Dumb broker / smart consumer model — does not try to track which messages are read by consumers.
  • Consumers can’t delete the message. Kafka keeps all messages for a set period of time.

Kafka AWS deployment options

https://aws.amazon.com/blogs/big-data/best-practices-for-running-apache-kafka-on-aws/

Commercial offerings

There are some commercial players who also offer Kafka with some additional benefits such as support, management, and configuration components. Hortonworks, Cloudera, Confluent are some of the players in this space. The below section covers some useful info on the Kafka offered by Confluent.

Some of Confluent Kafka’s offerings are free under the Confluent Community License, and some are only available through Confluent’s enterprise license. The Confluent Kafka Platform is free to download and use under the Confluent Community License. Unlike Apache Kafka, which is available under the Apache 2.0 license, the Confluent Community License is not open source and has a few restrictions. it offers additional features not available with the core Apache Kafka product. Some of the features included in the Confluent Community License are pre-built connectors, REST Proxy, KSQLDB, and Schema Registry.

Beyond the free-to-use Confluent Community License technology, Confluent also has a fee-based enterprise license that includes additional pre-built connectors and other tools such as Schema registry (supports schema definition for various message serialization choices e.g. Avro, Protobuf and JSON) and control center. More information can be found here, https://www.confluent.io/confluent-community-license-faq/

--

--