Data Streaming With Apache Kafka

Munish Goyal
The Startup
Published in
24 min readJul 6, 2020

Kafka is a streaming platform capable of handling trillions of events a day. At its core, it is distributed, horizontally-scalable (because of built-in partitioning), fault-tolerant (because of replications), low latency (partially because reads and writes are done at the constant time due to ordered immutable sequence data-structure), commit log (records can be created, but not updated).

A streaming platform (stream of records) has three key capabilities:

  1. Streams of records are stored in a fault-tolerant durable way.
  2. Process streams of records as they occur, almost real-time with low latency.
  3. Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.

Kafka is horizontally scalable, and can:

  • scale to 100s of brokers
  • scale to millions of messages per second

Kafka has low latency (less than 10ms), which makes it behave like a real-time system.

Kafka allows you to decouple data streams and systems: The Source System pushes data to Kafka, and then the Target System sources the data from Kafka. In this way, Kafka is used as data transportation mechanism.

Because of this, it is generally used for two broad classes of applications:

  1. Building real-time streaming data pipelines that reliably get data between systems or applications
  2. Building real-time streaming applications that transform or react to the streams of data

Kafka allows you to have a huge amount of messages go through a centralized medium and store them without worrying about things like performance or data loss. This means that it is perfect for use as the heart of your system’s architecture, acting as a centralized medium that connects different applications. It can be the centerpiece of event-driven architecture and allows you to truly decouple applications from one another.

It allows you to easily decouple communication between different (micro)services. With the Stream API, it is easier than ever to write business logic which enriches Kafka topic data for service consumption.

The key reason Kafka has grown in popularity — businesses nowadays benefit greatly from event-driven architecture. A single real-time even broadcasting platform with durable storage is the cleanest way to achieve such an architecture.

Kafka as the heart of your system architecture:

Examples of data streams that you can have in Kafka:

  • Chat events
  • Website events
  • Pricing data
  • Financial transactions
  • User interactions

Kafka’s general use cases:

  • Messaging System
  • Activity Tracking
  • Application Logs gathering
  • De-coupling of system dependencies
  • Gather metrics from many different locations
  • Stream processing (with the Kafka Streams API or Spark for example)
  • Integration with Spark, Flink, Storm, Hadoop, and many other Big Data technologies

Some real-world examples:

  • Netflix uses Kafka to apply recommendations in real-time while you’re watching TV shows.
  • Uber uses Kafka to gather user, taxi, and trip data in real-time to compute and forecast demand, and compute surge pricing in real-time.
  • Linkedin uses Kafka to prevent spam, collect user interactions to make better connections recommendations in real-time.

Shared Message Queues vs. Kafka

Kafka works well as a replacement for more traditional message brokers (such as ActiveMQ or RabbitMQ). Message brokers are used for a variety of reasons (to decouple processing from data producers, to buffer unprocessed messages, etc.). In comparison to most shared messaging systems:

  • Once a consume pulls a message, it is erased from the shared message queue; which is an issue if you have multiple consumers. While multiple consumers may connect to the shared queue, they must all fall in the same logical domain and execute the same functionality.
  • Kafka has better throughput with built-in partitioning (so, horizontally scalable) and replication (so, fault-tolerant), and has low latency which makes it a good solution for large scale message processing applications.

Traditional Publisher-Subscribe Models vs. Kafka

The logical segregation of the publisher from the subscriber in traditional pub-sub models allows for a loosely-coupled architecture, but with a limited scale. Scalability is limited as each subscriber must subscribe to every partition in order to access the messages from all partitions. Thus, while traditional pub-sub models work for small networks, the instability increases with the growth in nodes. As every message is broadcasted to all subscribers, scaling the processing of the streams is difficult as the subscribers are not in sync with each other.

Kafka builds on the publish-subscribe model with the advantage of a message queuing system. It achieves this with:

  • the use of consumer-groups
  • message retention by brokers

Within a consumer-group, for a subscribed topic, only one consumer from the group actually consumes the message from a given partition of the topic, and so each particular message is read by only one of the consumers within a consumer-group. The messages are also retained by the brokers in their topic partitions, unlike traditional message queues.

Multiple consumer-groups can read from the same set of topics, and at different times catering to different logical application domains. Thus, Kafka provides both the advantage of high scalability via consumers belonging to the same consumer group and the ability to serve multiple independent downstream applications simultaneously.

Kafka Overview

An application, a producer, sends a particular type of messages to Kafka, and these messages are stored in the associated topic (associated with the type of message sent) as records Each topic is generally divided into a number of partitions (saved on peer-to-peer nodes, known as brokers), and is subscribed by multiple consumers-groups (each consumer-group is an application) with each partition read by only one consumer from a consumer-group.

A Topic (similar to a Table in RDMBS, but here each record is immutable) is a category or feed name to which records are published. It is identified by its name and there is no limit on the number of topics or records in a topic. Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one or many consumers that subscribe to the data (records of the topic) written to it.

For example, consider an example to a topic named cabs_gps that contains GPS position of each cab. Each cab will send a message containing cab_id, latitude, and longitude to Kafka periodically (say 30 seconds). The messages get saved as records on the topic.

These records of a particular topic are later received by other applications, called consumer-groups, who have subscribed to that topic.

A topic can get quite big, so it is divided into partitions of smaller size for scalability. We need to choose the number of partitions (such as 8 for small topics and 32 for large topics) and replication-factor (such as the recommended value of 3; note that replications are done on different nodes called brokers with each partition having one of the brokers as leader and rest of the brokers are followers) at the creation time of a topic. Instead of manually creating topics, we can also configure brokers to auto-create topics when a non-existing topic is published to.

For each topic, the Kafka cluster maintains a partitioned log that looks like this:

Each partition has ordered, immutable sequence of records that is continually appended to — a structured commit log. Once a record is committed to a partition, it can’t be changed and its position can’t be altered.

Records in a partition are assigned a sequence_id called offsets (similar to primary_key of a record in a RDBMS table) which gets incremented for each new message in that partition. The offset uniquely identifies a record within the partition. Kafka guarantees that all the messages inside a partition are ordered in the sequence they came in, but note that there is no such guaranteed ordering across the partitions.

Similarly, Kafka guarantees that the records are read (by consumer-groups) in order within each partition, but again it doesn’t guarantee that ordering in which reads will take place by a consumer-group from all partitions for a given topic. This means that the consumer-group application should expect reading records in order but with a slight offset. This offset can be as high as the number of partitions of the topic. So, the consumer-group application should be tolerant of slight offset in order of each record, or each group of record for which intra-group ordering matters should have some key which should be sent along with each message by the producer.

Kafka stores records for a set amount of time using a configurable retention period (default value of 7 days), such as one day or until some size threshold is met. Consumers themselves poll Kafka for new messages and say what records they want to read. This allows them to increment/decrement the offset they’re at as they wish, thus being able to replay and reprocess events.

Each consumer-group represents an application and has one or more consumer processes (just called as consumers) inside. In order to avoid two processes reading the same message twice, each partition of the topic is tied to only one consumer process per consumer-group.

Kafka consumers belonging to the same consumer-group share a consumer_group_id. The consumers in a group then divide the topic partitions fairly among themselves by establishing that each partition is only consumed by a single consumer from the consumer-group. Each partition is connected to only one consumer from the consumer-group, whereas one consumer might be responsible for reading messaging from multiple partitions.

Ideally, the number of partitions is equal to the number of consumers. Should the number of consumers be greater, the excess consumers are idle, wasting client resources. If the number of partitions is greater, some consumers will read from multiple partitions.

Commit Log

A commit log (also referred to as write-ahead log, transaction log) is a persistent ordered data structure which only supports appends. You cannot modify nor delete records from it. It is read from left to right and guarantees items ordering.

Kafka actually stores all of its messages on disk and having them ordered in the structure lets it take advantage of sequential disk reads.

  • Reads and writes are done at a constant time O(1) (knowing the record ID), which compared to other structures’ O(logN) operations on disk is a huge advantage, as each disk seek is expensive.
  • Read and writes do not affect another. Writing would not lock reading and vice-versa.

These two points have huge performance benefits since the data size is completely decoupled from the performance. Kafka has the same performance whether you have 100KB or 100TB of data on your server.

Persistence to Disk

Kafka actually stores all of its records to disk and does not keep anything in RAM. There are numerous optimizations behind this that make it feasible:

  • Kafka has a protocol that groups messages together. This allows network requests to group messages together and reduces network overload, the server in turn persists chunk of messages in one go, and consumer fetch larger linear chunks at once.
  • Linear reads/writes on a disk are fast. The concept that modern disks are slow is because of numerous disk seeks, something that is not an issue in big linear operations.
  • These linear operations are heavily optimized by the OS, via read-ahead (prefetch large block multiples) and write-behind (group small logical writes into big physical writes) techniques.
  • Modern OSs cache the disk in free RAM (called pagecache).
  • Since Kafka stores messages in a standardized binary format unmodified throughout the whole flow (producer -> broker -> consumer), it can make use of the zero-copy optimization. That is when the OS copies data from the pagecache directly to a socket, effectively bypassing the Kafka broker application entirely.

All of these optimizations allow Kafka to deliver messages at near network speed.

Producers

Producers can choose to receive acknowledgment of data writes:

  • acks=0: Producer won’t wait for an acknowledgment (possible data loss)
  • acks=1: Producer waits for leader acknowledgment (limited data loss)
  • acks=all: Leader + replicas acknowledgment (no data loss)

Unless you need lower latency, it is highly recommended to use the all acknowledgment setting.

Producers can choose to send a key with the message. A key can be a string, number, anything. Generally, it is some attribute, such as chat_id. If key=null, then the messages are sent in a round-robin fashion to brokers. But if a key is sent, then all the messages for which the key returns the same hash (via a mechanism called hashing) will always go to the same partition. This also means that all updates related to given chat_id will always go to the same partition.

As long as the number of partitions remains constant for a topic (no new partition), records with same key (such as a value of chat_id) will always go to the same partition.

This way, the producer can make sure that records related to a particular key are always sent to a single partition of the corresponding topic. And this is important when the order in which records are read by the consumer-group application has to be correct for a given key (such as timely ordered messages of a chat with a given value of chat_id).

Efficiently Producing Messages:

  • Kafka is optimized for transmitting messages (a JSON dump) in batches rather than individually, so there’s a significant overhead and performance penalty in producing one message at a time.
  • The message delivery can fail in a number of ways, so the mechanism to produce messages need to have automatic retries.
  • The underneath synchronous method to produce messages is blocking synchronous call, and hence makes the calling method inefficient.
  • So, it makes sense to write a decorator for underneath synchronous method so that messages are actually not sent directly but are just accumulated (such as in Redis) and so return immediately, and these accumulated messages are then sent in batches with retry mechanism to handle failures, after
  • every 1 second (or maybe some fraction) or so by a cron job
  • or when message buffer reaches a specific threshold
  • The decorator should also support different acknowledgments for data writes.

Depending on what kind of data you produce, enabling compression may yield improved bandwidth and space usage. Compression should be done on message sets (who are accumulated in the buffer and are yet to be sent) rather than on individual messages. This improves the compression rate and generally means that compressions work better the larger your buffer gets. Popular compression algorithms for this purpose are:

Brokers

Kafka receives, stores, and sends messages on different nodes (different servers machines, like different EC2 machines) called brokers. Each broker is identified by its ID (integer), broker.id.

A broker can contain multiple partitions of the same topic (as happens usually), and can also contain partitions from different topics (as happens usually). This is similar to setup on automatic scheduling on Kubernetes Cluster where a server node can have multiple replicas of the same service, and can also contain replicas from different services.

For a given topic, data from each partition is replicated across multiple brokers in order to preserve the data in case one broker dies. One of the brokers is elected as partition leader (leader) or a particular partition of a given topic. It is the node through which applications both write/read for that partition. It replicates the data it receives to N other brokers, called followers or replicas. A subset of replicas that is currently alive and caught-up to the leader is called in-sync replicas (isr). They store the data as well and are ready to be elected as leader in case the leader node dies. The replication factor (replication-factor) represents the number of times each partition will be replicated.

This helps us to configure the guarantee that any successfully published message will not be lost. Having the option to change the replication factor lets you trade performance for stronger durability guarantees, depending on the criticality of the data.

Each broker acts as a leader for some of its partitions and a follower for others so that the load is well balanced within the cluster.

At the time of configuring a broker, we need to set its unique ID (broker.id), the url (and port) it will listen to (listener), and its logs directory (log.dirs).

Consumer Offsets

Kafka stores the offsets at which a consumer-group has been reading. The consumer offsets are committed in a Kafka topic named __consumer_offsets, and are NOT saved in metadata managed by Zookeeper. The consumer offset is the only metadata retained on a per-consumer basis, and it is controlled by the consumer. When a consumer in a group has processed data received from Kafka, it should be committing the offsets.

If a consumer dies, another consumer from the consumer-group will be able to read back from where the previous left off thanks to the committed consumer offsets!

Consumers can choose when to commit offsets. There are 3 delivery semantics:

  • (Message is processed) At most once:
  • Offsets are committed as soon as the messages are received.
  • If the processing goes wrong, the message will be lost (it won’t be read again).
  • (Message is processed) At least once (usually preferred):
  • Offsets are committed after the message is processed.
  • If the processing goes wrong, the message will be read again.
  • This can result in duplicate processing of messages. Make sure your processing is idempotent (that is, processing again the same message won’t impact your systems).
  • Exactly once: (the gold standard)
  • Can be achieved by Kafka => Kafka workflows using Kafka Stream API
  • For Kafka => External system workflows, we generally use an idempotent consumer

If a consumer is removed from the group — by being stopped or timing out or losing connection — or if a consumer is added to a group, partitions are rebalanced across the consumers in the group. So the assignment of a partition to a specific consumer may change.

Consumer Offsets are used to keep track of where a consumer group is up to for a given partition of a given topic. The key for a consumer offset is consumer-group:topic-name:partition-number, and the value is the offset within that partition that was last committed by a consumer in the group while processing messages from that partition.

Rebalancing

As a consumer group scales up and down, the running consumers split the partitions up amongst themselves. Rebalancing is triggered by a shift in ownership between a partition and consumer which could be caused by the crash of a consumer or broker or the addition of a topic or partition. It allows for safe addition or removal of the consumer from the system.

On startup, a broker is marked as the coordinator for the subset of consumer groups that receive the RegisterConsumer Request from consumers and returns the RegisterConsumer Response containing the list of partitions they should own. The coordinator also starts failure detection to check if the consumers are alive or dead. When the consumer fails to send a heartbeat to the coordinator broker before the session timeout, the coordinator marks the consumer as dead and a rebalance is set in place to occur. This session time period can be set using the session.timeout.ms property of the Kafka service. The heartbeat.interval.ms property makes healthy consumers aware of the occurrence of a rebalance so as to re-send RegisterConsumer requests to the coordinator.

For example, assuming consumer C2 of Group A suffers a failure, C1 and C3 will briefly pause consumption of messages from their partitions and the partitions will be up for reassignment between them. Taking from the earlier example when the consumer C2 is lost, the rebalancing process is triggered and the partitions are re-assigned to the other consumers in the group. Group B consumers remain unaffected from the occurrences in Group A.

Zookeeper

How does a producer/consumer know which broker is the leader of a particular partition (of a given topic)? Kafka stores such metadata in a service called Zookeeper.

Zookeeper has a distributed key-value store. It is highly-optimized for reads but writes are slower. It stores metadata and handles the mechanics of clustering (heartbeats, distributing updates/configurations, etc.). It manages brokers and helps in performing partition leader elections (note that elections take place only when they are required (such as a broker containing a master partition goes down)).

A Kafka client could be a consumer or a producer. Clients fetch metadata information from Kafka brokers directly, who themselves talk to Zookeeper. Then it can connect to needed brokers.

This means that once you connect to any broker, you will be connected to the entire cluster. That is the reason every Kafka broker is also called a bootstrap server.

It allows clients of the service to subscribe and have changes sent to them once they happen.

Zookeeper by design operates in a cluster with an odd number of Zookeeper servers, such as 3, 5, 7. One of these servers is a leader (handles writes) the rest of the servers are followers (handle reads). This makes the Zookeeper extremely fault-tolerant and it ought to be, as Kafka heavily depends on it.

Each server exposes two ports (by default 2888, and 3888):

  • Peers use the former port, follower port, to connect to other peers. Such a connection is necessary so that peers can communicate. More specifically, a ZooKeeper server uses this port to connect followers to the leader. When a new leader arises, a follower opens a TCP connection to the leader using this port.
  • Because the default leader election also uses TCP, we currently require another port for the leader election. This is the second port, leader port, in the server entry.

A Zookeeper cluster can have multiple servers, but it exposes a single port, clientPort, for clients to connect. It is by default 2181.

Quick Start

Installation of Zookeeper and Kafka

Standard Images for Kafka and Zookeeper:

Non-Docker based Setup:

Download the latest stable binary from Kafka Downloads.

Example on typical installation on Mac OS using Kafka binary:

# download binary and untar it
cd ~
mv Downloads/kafka_2.12-2.0.0.tgz .
tar -xzf kafka_2.12-2.0.0.tgz
cd kafka_2.12-2.0.0
# tesk kafka
java -version # Kafka needs Java
bin/kafka-topics.sh # should list man page for kafka-topics
# if you don't have Java 8, then install it
brew tap caskroom/versions
brew cask install java8

Example of typical installation on Ubuntu:

# in Ubuntu 14.04, you can install Java 8 as:
sudo add-apt-repository ppa:webupd8team/java -y
sudo apt-get update
sudo apt-get install oracle-java8-installer
sudo apt-get install oracle-java8-set-default
java -version
# add following line to your ~/.bash_profile
export KAFKA_DIR="/Users/mugoyal/kafka_2.12-2.0.0"
export PATH="$PATH:$KAFKA_DIR/bin"
# test changes
source ~/.bash_profile
echo $KAFKA_DIR
# /Users/mugoyal/kafka_2.12-2.0.0
echo $PATH
# /Users/mugoyal/miniconda3/bin:/Library/Frameworks/Python.framework/Versions/3.6/bin:/Users/mugoyal/Code/zendesk/docker-images/dockmaster/bin:/Users/mugoyal/miniconda3/bin:/Library/Frameworks/Python.framework/Versions/3.6/bin:/Users/mugoyal/.nvm/versions/node/v6.12.3/bin:/Users/mugoyal/.rvm/gems/ruby-2.5.0/bin:/Users/mugoyal/.rvm/gems/ruby-2.5.0@global/bin:/Users/mugoyal/.rvm/rubies/ruby-2.5.0/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/Library/TeX/texbin:/opt/X11/bin:/Users/mugoyal/.rvm/bin:/Applications/MySQLWorkbench.app/Contents/MacOS:/Users/mugoyal/.rvm/bin:/Users/mugoyal/.rvm/bin:/Applications/MySQLWorkbench.app/Contents/MacOS:/Users/mugoyal/kafka_2.12-2.0.0/bin
# test kafka again
kafka-topics.sh # should list man page for kafka-topics

Starting Zookeeper and Kafka

Let’s see the contents of the Kafka directory:

ls -lhFa
# LICENSE
# NOTICE
# bin/
# config/
# libs/
# site-docs/

Before you start Zookeeper and Kafka Server, generally it is best to change the setting dataDir=/tmp/zookeeper of config/zookeeper.properties to point to some permanent place, such as <KAFKA_DIR>/data/zookeeper. Similarly, you can change the setting log.dirs=/tmp/kafka-logs from config/server.properties to log.dirs=<KAFKA_DIR>/data/kafka.

Now, first let’s start the Zookeeper server as:

# run `zookeeper-server-start.sh` for man pagezookeeper-server-start.sh config/zookeeper.properties
# INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

Then, start the Kafka server as:

# run `kafka-server-start.sh` for man pagekafka-server-start.sh config/server.properties
# INFO Registered broker 0 at path /brokers/ids/0 with addresses: ArrayBuffer(EndPoint(10.18.2.94,9092,ListenerName(PLAINTEXT),PLAINTEXT)) (kafka.zk.KafkaZkClient)
# INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

Listing, Creating, Deleting, Describing, or Changing Topics using Zookeeper

To deal with topics, we use kafka-topics --zookeeper <zookeeper_hosts> <subcommand> command. Here, <zookeeper_hosts> is a connection string for zookeeper in the form <host>:<port>. Multiple zookeeper hosts can be provided in a comma-separated form to allow fail-over.

Let’s create a topic named ‘first_topic’ with 3 partitions and replication_factor of 1. Note that for replication_factor of N, you must have at least N brokers running.

# run `kafka-topics.sh` for man pagekafka-topics.sh --zookeeper localhost:2181 --topic first_topic --create --partitions 3 --replication-factor 1
# WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
# Created topic "first_topic"

Listing existing topics:

kafka-topics.sh --zookeeper localhost:2181 --list
# first_topic

Getting details of a topic:

kafka-topics.sh --zookeeper localhost:2181 --topic first_topic --describe
# Topic:first_topic PartitionCount:3 ReplicationFactor:1 Configs:
# Topic: first_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
# Topic: first_topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
# Topic: first_topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0

Let a producer publish messages to Kafka

To add messages to a topic, we can use the console producer kafka-console-producer --broker-list <broker_hosts> --topic <topic_name> command. Here, <broker_hosts> is a connection string for brokers in the form <host>:<port>. Multiple broker hosts can be provided in a comma-separated form to allow fail-over.

Let’s add messages to a topic:

# run `kafka-console-producer.sh` for man pagekafka-console-producer.sh --broker-list localhost:9092 --topic first_topic

This gives you > prompt to write messages. Once you are done, you can then stop the producer using Ctrl-c. For example,

>my first message to kafka
>here goes the second message
>3
>4
>5
>6
>7
>^C%

Let a consumer-group read messages from Kafka

To read messages from a topic, we can use console consumer kafka-console-consumer --bootstrap-server <broker_hosts> --group <consumer-group-name> --topic <topic_name> OPTIONS command. Here, <broker_hosts> is a connection string for brokers in the form <host>:<port>. Multiple broker hosts can be provided in a comma-separated form.

Here,

--group <consumer_group_id>       # the consumer group id of the consumer
--topic <topic_id> # the topic id to consume on
--from-beginning # if the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message
--offset <offset_id> # the offset id to consume from or `'earliest'` which means from beginning, or `'latest'` which means from end (default: `'latest'`)

We can create a single consumer to subscribe to a topic as:

# run `kafka-console-consumer.sh` for man pagekafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --from-beginning

If you don’t run above command with --from-beginning, then it will receive (and list) only new messages as they are produced. Notice that there is no consumer-group name specified so far. If you stop the command and re-run it then the consumer will again receive all the messages from the beginning, this is because kafka-console-consumer creates a random consumer-group if group is not specified.

But, now let’s try it with a single consumer in a named group:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --group some_application_name --topic first_topic --from-beginning

This consumer will also receive all the messages from the beginning, but if you stop above command and re-run it then it won’t receive messages from the beginning as consumer offset has already been committed in Kafka for the given named consumer-group. Now, even if you stop the consumer and re-run it later then it will then receive all the messages that were produced starting from its last committed consumer-group offset position.

Let’s create multiple consumers as part of a group and subscribe to a topic:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --group my_first_application --topic first_topic
kafka-console-consumer.sh --bootstrap-server localhost:9092 --group my_first_application --topic first_topic

This creates two consumers, as part of a given common consumer-group, listening to a given topic. Here, you will notice that as new messages are produced, some of the messages will be received by the first consumer and the rest of them with the second consumer. If we create more consumers, then messages will be load-balanced among all of those.

Listing, Deleting, Describing, and Resetting Offsets for Consumer Groups

To list, delete, describe, or to reset offsets for consumer groups, use kafka-consumer-groups --bootstrap-server <broker_hosts> <subcommand> command. Here, <broker_hosts> is a connection string for brokers in the form <host>:<port>. Multiple broker hosts can be provided in a comma-separated form.

Here, some of the frequently used subcommands are:

--list                                 # list all consumer groups
--group <group_name> --describe # describe consumer group and list offset lag (number of messages not yet processed) related to given group
--group <group_name> --members # describe members of the group
--group <group_name> --reset-offsets # reset offsets of consumer group. You must choose one of the following reset specifications: `--to-datetime`, `--by-period`, `--to-earliest`, `--to-latest`, `--shift-by`, `--from-file`, `--to-current`

Now, let see what basic things we can do with kafka-consumer-group:

# run `kafka-consumer-groups.sh` for man pagekafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# some_application_name
# my_first_application
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my_first_application --describe
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
# first_topic 0 2 2 0 consumer-1-c5271eff-af4c-4a49-8344-c4f48192be28 /10.18.2.94 consumer-1
# first_topic 1 1 1 0 consumer-1-c5271eff-af4c-4a49-8344-c4f48192be28 /10.18.2.94 consumer-1
# first_topic 2 2 2 0 consumer-1-fe63a3eb-5fe6-48d5-a3b5-e0e7ecaf15d8 /10.18.2.94 consumer-1

It says the LAG (that is, LOG-END-OFFSET - CURRENT-OFFSET) is 0 for all the partitions which means that all consumers are active and if you stop and re-run them at this stage, they won’t receive any more messages unless new messages are produced. Also looking at CONSUMER-ID we know that one of the consumers is assigned to partition number 0 and 1, and the second one is assigned to partition number 2. If you stop and re-run these consumers, you might get another mapping.

Using reset-offsets option you can reset the offsets. For example,

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my_first_application --reset-offsets --to-earliest --execute --topic first_topickafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my_first_application --reset-offsets --shift-by -2 --execute --topic first_topic

Web-based Configuration Tools

Instead of using these CLI commands to know the status, configuration, data you can use a web-based tool such as Kafka Manager or even apps such as Kafka Tool.

With our basic setup, Kafka Tools’s configuration would be:

Once configured, you can explore, topics, their data, partitions, brokers, consumer-groups, etc.:

KafkaCat is an open-source alternative to using the Kafka CLI. It is a generic non-JVM producer and consumer for Apache Kafka. Think of it as a netcat for Kafka.

Dockerfile for Kafka: kafka-docker

Docker for Zookeeper: cp-zookeeper

Kafka APIs

In Kafka, the communication between the clients and the servers is done with TCP protocol. As part of the main Kafka project, only the Java clients are maintained, but clients are available in other languages are available as independent open-source projects.

Kafka includes five core APIs:

Producer API

Producer API allows an application to publish a stream of records to one or more Kafka topics.

Consumer API

Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.

Streams API

Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.

Connector API

Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.

AdminClient API

AdminClient API allows managing and inspecting topics, brokers, and other Kafka objects.

Kafka Clients

Kafka Python Client: kafka-python

References:

Kafka Ruby Client: ruby-kafka

References:

Confluent Schema Registry

What is Confluent Schema Registry:

Confluent Schema Registry provides a serving layer for your metadata. It provides a RESTful interface for storing and retrieving your Avro, JSON Schema, and Protobuf schemas.

It stores a versioned history of all schemas based on specified subject name strategy, provides multiple compatibility settings, and allows the evolution of schemas according to the configured compatibility settings and expanded support for these schema types.

It provides serializers that plug into Apache Kafka clients that handle schema storage and retrieval for Kafka messages that are sent in any of the supported formats.

Schema Registry lives outside of and separately from your Kafka brokers. You producers and consumers still talk to Kafka to publish and read data (messages) to topics. Concurrently, they can also talk to Schema Registry to send and receive schemas that describe the data models for the messages.

Schema Registry is a distributed storage layer for schemas that uses Kafka as its underlying storage mechanism. Some key design decisions:

  • Assigns a globally unique ID to each registered schema. Allocated IDs are guaranteed to be monotonically increasing but not necessarily consecutive.
  • Kafka provides the durable backend, and functions as a write-ahead changelog for the state of Schema Registry and schemas it contains.
  • Schema Registry is designed to be distributed, with single-primary architecture, and Zookeeper/Kafka coordinates primary election (based on the configuration).

Subject, and Subject Name Strategy:

A serializer registers a schema in Schema Registry under a subject name, which defines a namespace in the registry:

  • Compatibility checks are per subject
  • Versions are tied to subjects
  • When schemas evolve, they are still associated with the same subject but get a new schema ID and version.

The subject name depends on the subject name strategy. Three supported strategies include:

Kafka topic vs. Schema vs. Subject:

A Kafka topic contains messages, and each message is a key-value pair. Either the message key or the message value, or both can be serialized as Avro, JSON, or Protobuf. A schema defines the structure of the data format. The Kafka topic name can be independent of the schema name. Schema Registry defines a scope in which schemas can evolve, and that scope is the subject. The name of the subject depends on the configured subject name strategy which by default is set to derive the subject name from the topic name.

Monitoring Tools

Kafka Consumer Lag Checking: Burrow

References:

Burrow lets you track the stored consumer group offsets versus the latest offsets in the partition. This is measured as lag, being the number of messages it is behind.

The HTTP Server in Burrow provides a convenient way to interact with both Burrow and the Kafka and Zookeeper Clusters. Requests are simply HTTP calls, and all responses are formatted as JSON. One can periodically poll the HTTP API and populate metrics in Datadog .

Further Readings

  • Connector API helps you to connect various services to Kafka as a source or sink.
  • Log Compaction is an optimization that reduces log size. Extremely useful in changelog streams.
  • Exactly-once Message Semantics guarantees that messages are received exactly once. This is a big deal, and it is difficult to achieve.

Here are some related interesting stories that you might find helpful:

--

--

Munish Goyal
The Startup

Designing and building large-scale data-intensive cloud-based applications/APIs.