Apache Kafka CLI commands cheat sheet
--
Do you make changes to your Kafka cluster using the CLI?
Do you always have to look up the CLI commands and options?
My Apache Kafka, CLI cheat sheet might be helpful for you!
In this short blog post, you find my Kafka CLI cheatsheet (I’ll add more commands to the cheat sheet regularly).
Overview
- Kafka & Zookeeper Docker Compose file
- Start Kafka & Zookeeper using Docker Compose
- Attach to the Kafka Broker running in Docker
- Unset the JMX port in the Kafka Docker container
- Stop Kafka & Zookeeper using Docker Compose
- Show the version of the Kafka broker
- Connect to Zookeeper
- Show the Kafka cluster id
- List brokers in the Kafka cluster
- Show details of a Kafka broker
- Show all the topics that exist in the cluster
- Show details of a specific topic
- Exit the zookeeper shell
- Create a Kafka topic
- Create a Kafka topic in case the topic doesn’t exist
- Create a Kafka topic with a short retention period
- List the Kafka topics
- List Kafka topics and exclude internal topics
- Show the Kafka topic details
- Increase the number of partitions of a Kafka topic
- Change the retention time of a Kafka topic
- Purge a Kafka topic
- List Kafka topics (with configuration values) that have specific configuration overrides
- Show specific Kafka topic configuration overrides
- Show partition offsets of a Kafka topic
- Show offset for specific partition(s) of a Kafka topic
- Delete a Kafka topic
Kafka CLI
All the commands used in this blogpost are included in the Apache Kafka distribution. The file extension of the scripts in the Apache Kafka distribution is .sh
Be aware that the Confluent Kafka distribution dropped the .sh
file extension!
Docker
For every command, I also provide a practical working example you can execute in a running Kafka Docker container. I’m using the Confluent Kafka distribution in this cheat sheet.
Kafka & Zookeeper Docker Compose file
You start a Kafka broker (including Zookeeper) using this docker-compose example file:
Copy the content in a file with the name: docker-compose.yml
Start Kafka & Zookeeper using Docker Compose
Start the official Confluent Kafka and Zookeeper Docker containers using Docker Compose:
docker-compose up -d
In case you don’t have the docker images on your system, docker will pull the images from Dockerhub.
Attach to the Kafka Broker running in Docker
Attach to the Kafka Docker container to execute operations on your Apache Kafka cluster.
docker exec -it kafka bash
Unset the JMX port in the Kafka Docker container
In case you configured a JMX_PORT in your docker-compose file for the Kafka Docker container you have to unset the JMX_PORT environment variable
unset JXM_PORT
otherwise, you will run into issues when executing the CLI like:
Stop Kafka & Zookeeper using Docker Compose
docker-compose down -v
Cluster and broker(s)
Show the version of the Kafka broker
$KAFKA_HOME/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 --version
🐳 Example to execute in Docker:
./usr/bin/kafka-broker-api-versions --bootstrap-server localhost:9092 --version
Connect to Zookeeper
Apache Kafka is depending* on Apache Zookeeper for:
- cluster membership
- controller election
- topic configuration
- access control lists
- quotas.
The meta-data for those operations are stored outside of Kafka in a separate Zookeeper cluster.
* from version 2.8 onwards Apache Kafka is not depending on Zookeeper anymore. For more details see: Apache Kafka Needs No Keeper: Removing the Apache ZooKeeper Dependency on the Confluent blog.
To be able to see metadata of the Kafka cluster from Zookeeper first connect to Zookeeper using the zookeeper-shell
command that ships with the Kafka distribution.
$KAFKA_HOME/bin/zookeeper-shell.sh $ZK_HOSTS:2181
🐳 Example to execute in Docker:
./usr/bin/zookeeper-shell zookeeper:2181
Output:
Connecting to zookeeper:2181
Welcome to ZooKeeper!
JLine support is disabledWATCHER::WatchedEvent state:SyncConnected type:None path:null
You are now connected to the ZooKeeper cluster.
Show the Kafka cluster id
$KAFKA_HOME/bin/zookeeper-shell $ZK_HOSTS:2181 get /cluster/id
🐳 Example to execute in Docker:
./usr/bin/zookeeper-shell zookeeper:2181 get /cluster/id
Output:
Connecting to zookeeper:2181WATCHER::WatchedEvent state:SyncConnected type:None path:null
{"version":"1","id":"cDpWBoJpQraTnNqSwB_4Tg"}
In this example, the id of the Kafka cluster is: cpWBoJpQraTnNqSwB_4Tg
List brokers in the Kafka cluster
Let’s list the broker in the Kafka cluster
ls /brokers/ids
Output:
[1]
In my docker setup, there is only one Kafka broker in the cluster.
The id of the Kafka broker is: 1
Show details of a Kafka broker
get /brokers/ids/1
Show all the topics that exist in the cluster
ls /brokers/topics
Output:
[my-first-topic]
Show details of a specific topic
get /brokers/topics/my-first-topic
Exit the zookeeper shell
exit
From Kafka version 3.0.0. onwards ‘exit’ doesn’t work ‘quit’ should be used instead.
Topics
In this chapter, you find the CLI command and options that are related to Kafka topics.
Create a Kafka topic
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_HOSTS --create --topic $TOPIC_NAME --partitions 3 --replication-factor 1
🐳 Example to execute in Docker:
./usr/bin/kafka-topics --zookeeper zookeeper:2181 --create --topic my-first-topic --partitions 3 --replication-factor 1
Create a Kafka topic in case the topic doesn’t exist
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_HOSTS --create --topic $TOPIC_NAME --partitions 3 --replication-factor 1 --if-not-exists
🐳 Example to execute in Docker:
./usr/bin/kafka-topics --zookeeper zookeeper:2181 --create --topic my-first-topic --partitions 3 --replication-factor 1 --if-not-exists
Note the CLI will not give an error in case the topic already exists.
Create a Kafka topic with a short retention
By default, a Kafka topic has a retention period of 7 days.
This example shows the command to create a topic with a retention period of 10 seconds.
./usr/bin/kafka-topics --bootstrap-server localhost:9092 --create --topic my-topic-with-short-retention-period --partitions 3 --replication-factor 1 --config retention.ms=10000 --config segment.ms=10000
List Kafka topics
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_HOSTS --list
🐳 Example to execute in Docker:
./usr/bin/kafka-topics --zookeeper zookeeper:2181 --list
List Kafka topics and exclude internal topics
Exclude listing of external topics like “__consumer_offsets”
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_HOSTS --list --exclude-internal
🐳 Example to execute in Docker:
./usr/bin/kafka-topics --zookeeper zookeeper:2181 --list --exclude-internal
Show the Kafka topic details
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_HOSTS
--topic $TOPIC_NAME --describe
🐳 Example to execute in Docker:
./usr/bin/kafka-topics --zookeeper zookeeper:2181 --topic my-first-topic --describe
Increase the number of partitions of a Kafka topic
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_HOSTS
--alter --topic my-first-topic --partitions 5
🐳 Example to execute in Docker:
./usr/bin/kafka-topics --zookeeper zookeeper:2181 --alter --topic my-first-topic --partitions 5
Note:
- The number of partitions for a topic can only be increased.
- If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected.
Change the retention time of a Kafka topic
259200000 ms = 3 days
$KAFKA_HOME/bin/kafka-configs.sh --zookeeper $ZK_HOSTS --alter --entity-type topics --entity-name my-first-topic --add-config retention.ms=259200000
🐳 Example to execute in Docker:
./usr/bin/kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name my-first-topic --add-config retention.ms=259200000
Purge a Kafka topic
At the time of writing, there is no single command to purge a topic.
As a workaround, you can purge a topic by changing the retention time of a topic to one minute.
$KAFKA_HOME/bin/kafka-configs.sh --zookeeper $ZK_HOSTS --alter --entity-type topics --entity-name my-first-topic --add-config retention.ms=1000
Wait one minute. Kafka will now remove records older than one minute from the topic. Now delete the retention.ms configuration from the topic so it will default back to the retention configuration of the Kafka cluster. The default retention period of a Kafka topic is seven days (unless you configured it differently).
$KAFKA_HOME/bin/kafka-configs.sh --zookeeper $ZK_HOSTS --alter --entity-type topics --entity-name my-first-topic --delete-config retention.ms
In case you had a specific retention period specified on the topic. You need to apply that retention period again (in this example 3 days retention period)
$KAFKA_HOME/bin/kafka-configs.sh --zookeeper $ZK_HOSTS --alter --entity-type topics --entity-name my-first-topic --add-config retention.ms=259200000
🐳 Example to execute in Docker:
./usr/bin/kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name my-first-topic --add-config retention.ms=1000
Wait one minute:
./usr/bin/kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name my-first-topic --delete-config retention.ms
List Kafka topics (with configuration values) that have specific configuration overrides
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_HOSTS --describe --topics-with-overrides
🐳 Example to execute in Docker:
./usr/bin/kafka-topics --zookeeper zookeeper:2181 --describe --topics-with-overrides
Show specific Kafka topic configuration overrides
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_HOSTS --describe --entity-type topics --entity-name my-first-topic
🐳 Example to execute in Docker:
./usr/bin/kafka-configs --zookeeper zookeeper:2181 --describe --entity-type topics --entity-name my-first-topic
Note in case there a no particular property overrides for the topic this command will not show default cluster properties applied to the topic.
Show partition offsets of a Kafka topic
$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic $TOPIC_NAME
🐳 Example to execute in Docker:
./usr/bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-first-topic
Example output:
my-first-topic:0:3
my-first-topic:1:3
my-first-topic:2:3
Format is: topicname:partition-id:offset
In this example, the output means: on all 3 partitions of topic with the name my-first-topic
there are 3 records.
By default, the latest offset for all partitions is shown.
Configuration options:
- latest offset:
--time -1
- earliest offset:
--time -2
to show the earliest offset:
./usr/bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-first-topic --time -2
Example output:
my-first-topic:0:0
my-first-topic:1:0
my-first-topic:2:0
Show offset for specific partition(s) of a Kafka topic
$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic $TOPIC_NAME --partitions partition-id, another-partition-id
🐳 Example to execute in Docker:
./usr/bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-first-topic --partitions 0,2
Example output:
my-first-topic:0:3
my-first-topic:2:3
Delete a Kafka topic
$KAFKA_HOME/bin/kafka-topics.sh — zookeeper $ZK_HOSTS --delete
--topic $TOPIC_NAME
🐳 Example to execute in Docker:
./usr/bin/kafka-topics --zookeeper zookeeper:2181 --delete --topic my-first-topic
Producers
Produce a message to a Kafka topic
$KAFKA_HOME/usr/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic $TOPIC_NAME
A prompt will open:
>
Type in a message and press enter to publish it to the topic:
>Hello World
Note:
- Repeat the same step to publish more messages
- The message published to Kafka has a
null
key - Press Ctrl+C to exit
🐳 Example to execute in Docker:
./usr/bin/kafka-console-producer --broker-list localhost:9092 --topic my-first-topic
Produce messages to a Kafka topic from a file
Example file topic-input.txt
(make sure each message is on a new line):
Hello World
Kafka Rocks!
Happy Streaming
Produce messages to the topic from the file:
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-first-topic < topic-input.txt
🐳 Example to execute in Docker:
./usr/bin/kafka-console-producer --broker-list localhost:9092 --topic my-first-topic < topic-input.txt
Produce messages to Kafka with both key and value
By default messages sent to a Kafka topic will result in messages with null
keys. In this example, the separator between the key and the value is: :
$KAFKA_HOME/usr/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic $TOPIC_NAME --property parse.key=true
--property key.separator=:
🐳 Example to execute in Docker:
./usr/bin/kafka-console-producer --broker-list localhost:9092 --topic some-topic --property parse.key=true --property key.separator=*
Example input:
>key:value
>foo:bar
>anotherKey:another value
Consumers
Consume a Kafka topic
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic $TOPIC_NAME
🐳 Example to execute in Docker:
./usr/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic my-first-topic
If a consumer group id is not specified, the kafka-console-consumer
generates a random consumer group.
Consume a Kafka topic and show both key, value and timestamp
By default, the console consumer will only the value of the Kafka record.
Using this command you can show both the key and value.
$KAFKA_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic some-topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true
🐳 Example to execute in Docker:
./usr/bin/kafka-console-consumer --bootstrap-server localhost:9092 \ --topic some-topic \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.timestamp=true \
--property print.key=true \
--property print.value=true
Consume a Kafka topic from the beginning
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-first-topic --from-beginning
🐳 Example to execute in Docker:
./usr/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic my-first-topic --from-beginning
Miscellaneous
Find all the partitions where one or more of the replicas for the partition are not in-sync with the leader.
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --under-replicated-partitions
🐳 Example to execute in Docker:
./usr/bin/kafka-topics --zookeeper zookeeper:2181 --describe --under-replicated-partitions
Tap the 👏 button if you found this article useful!
Any questions or feedback? Reach out to me on Twitter: @TimvanBaarsen