Apache Kafka CLI commands cheat sheet

Tim van Baarsen
9 min readSep 29, 2019

Do you make changes to your Kafka cluster using the CLI?
Do you always have to look up the CLI commands and options?
My Apache Kafka, CLI cheat sheet might be helpful for you!

In this short blog post, you find my Kafka CLI cheatsheet (I’ll add more commands to the cheat sheet regularly).

Kafka CLI

All the commands used in this blogpost are included in the Apache Kafka distribution. The file extension of the scripts in the Apache Kafka distribution is .sh

Be aware that the Confluent Kafka distribution dropped the .sh file extension!

Docker

For every command, I also provide a practical working example you can execute in a running Kafka Docker container. I’m using the Confluent Kafka distribution in this cheat sheet.

Kafka & Zookeeper Docker Compose file

You start a Kafka broker (including Zookeeper) using this docker-compose example file:

Copy the content in a file with the name: docker-compose.yml

docker-compose.yml file to run Kafka and Zookeeper. https://gist.github.com/j-tim/cb630c3646b5987e45e2b8cf847f4149

Start Kafka & Zookeeper using Docker Compose

Start the official Confluent Kafka and Zookeeper Docker containers using Docker Compose:

docker-compose up -d

In case you don’t have the docker images on your system, docker will pull the images from Dockerhub.

Attach to the Kafka Broker running in Docker

Attach to the Kafka Docker container to execute operations on your Apache Kafka cluster.

docker exec -it kafka bash

Unset the JMX port in the Kafka Docker container

In case you configured a JMX_PORT in your docker-compose file for the Kafka Docker container you have to unset the JMX_PORT environment variable

unset JXM_PORT

otherwise, you will run into issues when executing the CLI like:

JMX port already in use

Stop Kafka & Zookeeper using Docker Compose

docker-compose down -v

Cluster and broker(s)

Show the version of the Kafka broker

$KAFKA_HOME/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 --version

🐳 Example to execute in Docker:

./usr/bin/kafka-broker-api-versions --bootstrap-server localhost:9092 --version

Connect to Zookeeper

Apache Kafka is depending* on Apache Zookeeper for:

  • cluster membership
  • controller election
  • topic configuration
  • access control lists
  • quotas.

The meta-data for those operations are stored outside of Kafka in a separate Zookeeper cluster.

* from version 2.8 onwards Apache Kafka is not depending on Zookeeper anymore. For more details see: Apache Kafka Needs No Keeper: Removing the Apache ZooKeeper Dependency on the Confluent blog.

To be able to see metadata of the Kafka cluster from Zookeeper first connect to Zookeeper using the zookeeper-shell command that ships with the Kafka distribution.

$KAFKA_HOME/bin/zookeeper-shell.sh $ZK_HOSTS:2181

🐳 Example to execute in Docker:

./usr/bin/zookeeper-shell zookeeper:2181

Output:

Connecting to zookeeper:2181
Welcome to ZooKeeper!
JLine support is disabled
WATCHER::WatchedEvent state:SyncConnected type:None path:null

You are now connected to the ZooKeeper cluster.

Show the Kafka cluster id

$KAFKA_HOME/bin/zookeeper-shell $ZK_HOSTS:2181 get /cluster/id

🐳 Example to execute in Docker:

./usr/bin/zookeeper-shell zookeeper:2181 get /cluster/id

Output:

Connecting to zookeeper:2181WATCHER::WatchedEvent state:SyncConnected type:None path:null
{"version":"1","id":"cDpWBoJpQraTnNqSwB_4Tg"}

In this example, the id of the Kafka cluster is: cpWBoJpQraTnNqSwB_4Tg

List brokers in the Kafka cluster

Let’s list the broker in the Kafka cluster

ls /brokers/ids

Output:

[1]

In my docker setup, there is only one Kafka broker in the cluster.
The id of the Kafka broker is: 1

Show details of a Kafka broker

get /brokers/ids/1

Show all the topics that exist in the cluster

ls /brokers/topics

Output:

[my-first-topic]

Show details of a specific topic

get /brokers/topics/my-first-topic

Exit the zookeeper shell

exit

From Kafka version 3.0.0. onwards ‘exit’ doesn’t work ‘quit’ should be used instead.

Topics

In this chapter, you find the CLI command and options that are related to Kafka topics.

Create a Kafka topic

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_HOSTS --create --topic $TOPIC_NAME --partitions 3 --replication-factor 1

🐳 Example to execute in Docker:

./usr/bin/kafka-topics --zookeeper zookeeper:2181 --create --topic my-first-topic --partitions 3 --replication-factor 1

Create a Kafka topic in case the topic doesn’t exist

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_HOSTS --create --topic $TOPIC_NAME --partitions 3 --replication-factor 1 --if-not-exists

🐳 Example to execute in Docker:

./usr/bin/kafka-topics --zookeeper zookeeper:2181 --create --topic my-first-topic --partitions 3 --replication-factor 1 --if-not-exists

Note the CLI will not give an error in case the topic already exists.

Create a Kafka topic with a short retention

By default, a Kafka topic has a retention period of 7 days.
This example shows the command to create a topic with a retention period of 10 seconds.

./usr/bin/kafka-topics --bootstrap-server localhost:9092 --create --topic my-topic-with-short-retention-period --partitions 3 --replication-factor 1 --config retention.ms=10000 --config segment.ms=10000

List Kafka topics

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_HOSTS --list

🐳 Example to execute in Docker:

./usr/bin/kafka-topics --zookeeper zookeeper:2181 --list

List Kafka topics and exclude internal topics

Exclude listing of external topics like “__consumer_offsets”

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_HOSTS --list --exclude-internal

🐳 Example to execute in Docker:

./usr/bin/kafka-topics --zookeeper zookeeper:2181 --list --exclude-internal

Show the Kafka topic details

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_HOSTS 
--topic
$TOPIC_NAME --describe

🐳 Example to execute in Docker:

./usr/bin/kafka-topics --zookeeper zookeeper:2181 --topic my-first-topic --describe

Increase the number of partitions of a Kafka topic

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_HOSTS 
--alter --topic my-first-topic --partitions 5

🐳 Example to execute in Docker:

./usr/bin/kafka-topics --zookeeper zookeeper:2181 --alter --topic my-first-topic --partitions 5

Note:

  • The number of partitions for a topic can only be increased.
  • If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected.

Change the retention time of a Kafka topic

259200000 ms = 3 days

$KAFKA_HOME/bin/kafka-configs.sh --zookeeper $ZK_HOSTS --alter --entity-type topics --entity-name my-first-topic --add-config retention.ms=259200000

🐳 Example to execute in Docker:

./usr/bin/kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name my-first-topic --add-config retention.ms=259200000

Purge a Kafka topic

At the time of writing, there is no single command to purge a topic.
As a workaround, you can purge a topic by changing the retention time of a topic to one minute.

$KAFKA_HOME/bin/kafka-configs.sh --zookeeper $ZK_HOSTS --alter --entity-type topics --entity-name my-first-topic --add-config retention.ms=1000

Wait one minute. Kafka will now remove records older than one minute from the topic. Now delete the retention.ms configuration from the topic so it will default back to the retention configuration of the Kafka cluster. The default retention period of a Kafka topic is seven days (unless you configured it differently).

$KAFKA_HOME/bin/kafka-configs.sh --zookeeper $ZK_HOSTS --alter --entity-type topics --entity-name my-first-topic --delete-config retention.ms

In case you had a specific retention period specified on the topic. You need to apply that retention period again (in this example 3 days retention period)

$KAFKA_HOME/bin/kafka-configs.sh --zookeeper $ZK_HOSTS --alter --entity-type topics --entity-name my-first-topic --add-config retention.ms=259200000

🐳 Example to execute in Docker:

./usr/bin/kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name my-first-topic --add-config retention.ms=1000

Wait one minute:

./usr/bin/kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name my-first-topic --delete-config retention.ms

List Kafka topics (with configuration values) that have specific configuration overrides

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_HOSTS --describe --topics-with-overrides

🐳 Example to execute in Docker:

./usr/bin/kafka-topics --zookeeper zookeeper:2181 --describe --topics-with-overrides

Show specific Kafka topic configuration overrides

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_HOSTS --describe --entity-type topics --entity-name my-first-topic

🐳 Example to execute in Docker:

./usr/bin/kafka-configs --zookeeper zookeeper:2181 --describe --entity-type topics --entity-name my-first-topic

Note in case there a no particular property overrides for the topic this command will not show default cluster properties applied to the topic.

Show partition offsets of a Kafka topic

$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic $TOPIC_NAME

🐳 Example to execute in Docker:

./usr/bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-first-topic

Example output:

my-first-topic:0:3
my-first-topic:1:3
my-first-topic:2:3

Format is: topicname:partition-id:offset
In this example, the output means: on all 3 partitions of topic with the name my-first-topicthere are 3 records.

By default, the latest offset for all partitions is shown.
Configuration options:

  • latest offset: --time -1
  • earliest offset: --time -2

to show the earliest offset:

./usr/bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-first-topic --time -2

Example output:

my-first-topic:0:0
my-first-topic:1:0
my-first-topic:2:0

Show offset for specific partition(s) of a Kafka topic

$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic $TOPIC_NAME --partitions partition-id, another-partition-id

🐳 Example to execute in Docker:

./usr/bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-first-topic --partitions 0,2

Example output:

my-first-topic:0:3
my-first-topic:2:3

Delete a Kafka topic

$KAFKA_HOME/bin/kafka-topics.sh — zookeeper $ZK_HOSTS --delete 
--topic $TOPIC_NAME

🐳 Example to execute in Docker:

./usr/bin/kafka-topics --zookeeper zookeeper:2181 --delete --topic my-first-topic

Producers

Produce a message to a Kafka topic

$KAFKA_HOME/usr/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic $TOPIC_NAME

A prompt will open:

>

Type in a message and press enter to publish it to the topic:

>Hello World

Note:

  • Repeat the same step to publish more messages
  • The message published to Kafka has a null key
  • Press Ctrl+C to exit

🐳 Example to execute in Docker:

./usr/bin/kafka-console-producer --broker-list localhost:9092 --topic my-first-topic

Produce messages to a Kafka topic from a file

Example file topic-input.txt (make sure each message is on a new line):

Hello World
Kafka Rocks!
Happy Streaming

Produce messages to the topic from the file:

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-first-topic < topic-input.txt

🐳 Example to execute in Docker:

./usr/bin/kafka-console-producer --broker-list localhost:9092 --topic my-first-topic < topic-input.txt

Produce messages to Kafka with both key and value

By default messages sent to a Kafka topic will result in messages with nullkeys. In this example, the separator between the key and the value is: :

$KAFKA_HOME/usr/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic $TOPIC_NAME --property parse.key=true 
--property key.separator=:

🐳 Example to execute in Docker:

./usr/bin/kafka-console-producer --broker-list localhost:9092 --topic some-topic --property parse.key=true --property key.separator=*

Example input:

>key:value
>foo:bar
>anotherKey:another value

Consumers

Consume a Kafka topic

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic $TOPIC_NAME

🐳 Example to execute in Docker:

./usr/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic my-first-topic

If a consumer group id is not specified, the kafka-console-consumer generates a random consumer group.

Consume a Kafka topic and show both key, value and timestamp

By default, the console consumer will only the value of the Kafka record.
Using this command you can show both the key and value.

$KAFKA_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic some-topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true

🐳 Example to execute in Docker:

./usr/bin/kafka-console-consumer --bootstrap-server localhost:9092 \    --topic some-topic \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.timestamp=true
\
--property print.key=true \
--property print.value=true

Consume a Kafka topic from the beginning

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-first-topic --from-beginning

🐳 Example to execute in Docker:

./usr/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic my-first-topic --from-beginning 

Miscellaneous

Find all the partitions where one or more of the replicas for the partition are not in-sync with the leader.

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --under-replicated-partitions

🐳 Example to execute in Docker:

./usr/bin/kafka-topics --zookeeper zookeeper:2181 --describe --under-replicated-partitions

Tap the 👏 button if you found this article useful!

Any questions or feedback? Reach out to me on Twitter: @TimvanBaarsen

--

--

Tim van Baarsen

I’m a creative and passionate software developer living in the Netherlands. Occasional meetup & conference speaker.