Running Kafka in Docker Machine

Marcelo Hossomi
12 min readMar 8, 2019

Recently, we decided to use Kafka in our project. It was something new to me, and as usual I immediately looked up in Docker Hub for a Kafka Docker image.

Sometimes I have trouble with Docker in this project, because due to some circunstances I have to work on Windows. I also have to use Virtual Box for various tasks, which means I’m forced to use Docker Toolbox or run Docker in a regular Virtual Box VM.

I quickly found out that this setup leads to some connectivity issues with Kafka. Here’s the story.

Bonus: though my use case involves Docker Machine, this article applies to any scenario in which Kafka and clients run in different hosts.

The failed attempt

Surprisingly, there isn’t an official Kafka image, though there were several options to choose from. At first, I went with Spotify’s because it included Zookeeper in the same container, which seemed convenient.

Warning: this image is unmaintained, don’t use it!

$ docker run --rm —-name kafka \
-p 2181:2181 -p 9092:9092 \
-e ADVERTISED_HOST=dockervm \
spotify/kafka

Initially, I didn’t understand what ADVERTISED_HOST was. The image documentation said to use the Docker Machine address, which I mapped to dockervm in my hosts file.

Logs showed that it started fine:

INFO success: zookeeper entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
INFO success: kafka entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)

But when I tried to connect a producer:

$ kafka-console-producer.bat --broker-list dockervm:9092 \
--topic test
> Hello
WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 2 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[repeats infinitely]

I am used to be suspicious of my setup, so I tried running the producer from within the Kafka container. Unfortunately, the result was the same. As an extra blind effort, I tried ADVERTISED_HOST=localhost, but still no luck:

$ kafka-console-producer.bat --broker-list dockervm:9092 \
--topic test
> Hello
WARN [Producer clientId=console-producer] Connection to node 1001 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
WARN [Producer clientId=console-producer] Connection to node 1001 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[repeats infinitely]

I had to get Kafka working quickly to start developing, so for now I gave up on Docker and ran it manually following their own quickstart guide.

Understanding the problem

A while later I came back to this problem, now decided to really understand what was going on. Started by remembering a few points about Kafka:

Kafka is distributed. Given a replication factor greater than 1, each partition data is replicated in different brokers. Any of those broker can be elected the partition leader responsible for all IO, while the others will only replicate it.

Leader election, among other administrative tasks, is handled by the cluster controller, an additional role performed by one of the cluster brokers.

Each broker is also a bootstrap server. Clients may connect to any broker to immediately know about the whole cluster, such as who and where partition leaders are. Therefore, each broker must know where all the others are located.

Kafka uses Zookeeper to manage service discovery for brokers. Zookeper stores brokers’ endpoints and also notifies them about changes in the cluster. At startup, brokers register in Zookeeper with their endpoint.

As a consequence of the above, to connect to Kafka a client has to:

  1. Query any broker for cluster metadata;
  2. Fetch cluster metadata, which contains the registered endpoints of each broker in the cluster and the leader of each partition.
  3. Derive the partition leader’s endpoint from the metadata;
  4. Connect to the partition leader for further IO.

That last step holds the catch:

If the broker and the client are in different hosts, the endpoint registered by the broker is likely unaccessible from one of them.

I learned that the variable ADVERTISED_HOST is mapped to Kafka property advertised.host.name, which is the host the broker registers with, we can then experiment and understand better why my first attempt failed. First, I used ADVERTISED_HOST=dockervm as illustrated below.

Using ADVERTISED_HOST=dockervm. Red means error.

In this scenario, dockervm is resolvable from the client’s perspective because I mapped it manually in my hosts file (I could also have used the local IP). On the other hand, it’s unresolvable from Kafka’s perspective.

The client, however, fails to fetch metadata reporting LEADER_NOT_AVAILABLE. The documentation states that this error happens when the partition leader has not been elected yet.

Interestingly, the broker logs showed that it was trying to connect to itself through the advertised host (and obviously failing). This happens because the broker was also elected the cluster controller, so it was checking the registered brokers availability for leader election. As the only candidatedockervm:9092 is not resolvable, the election failed.

Next, I tried using ADVERTISED_HOST=localhost, illustrated below.

Using ADVERTISED_HOST=localhost. Red means error.

In this scenario, localhost is resolvable in both Kafka’s and client’s perspectives. The election succeeds and the client also successfully fetches metadata, but it is told that the partition leader is at localhost, which is incorrect. This explains why Connection to node could not be established.

Solving the problem

Note: Docker image properties map to Kafka properties in server.properties file. Here I will always state both, with the Kafka property in parenthesis. This is important if you are not running Kafka in Docker.

To solve this problem, the broker needs to listen to two endpoints: one for internal and other for external access, which also has to be advertised from the client’s perspective. This can be achieved by configuring named listeners:

  • KAFKA_LISTENERS (listeners): the list of endpoints to which the broker will bind to. A listener is defined as NAME://host:port.
  • KAFKA_ADVERTISED_LISTENERS (advertised.listeners): the endpoints the broker will register with. The names must be a subset of the listeners.

Firstly, I changed the Docker image. There are some better options, such as Confluent’s and wurstmeister’s (both authors also provide a Zookeeper image). I chose the latter for it has an advantage I will use later.

Then finally, here is a working docker-compose.yml sample (and a small hint on YAML):

version: "2.1"
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS:
"INTERNAL://kafka:9090,\
EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS:
"INTERNAL://kafka:9090,\
EXTERNAL://dockervm:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
"INTERNAL:PLAINTEXT,\
EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL

Notice the two named listeners:

  • INTERNAL: bound to kafka:9090 and advertised as the same. Note that Docker Compose DNS resolves kafka to the container address.
  • EXTERNAL: bound to 0.0.0.0:9092 but advertised as dockervm:9092.

KAFKA_INTER_BROKER_LISTENER_NAME (inter.broker.listener.name) defines which listener other brokers will use to communicate with this broker, so we set it to INTERNAL.

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP (listener.security.protocol.map) is used to configure encryption in each listener.

However, if there are two advertised listeners, how will the client know the correct one? Actually, not all advertised listeners are returned in the metadata:

When a client queries metadata through a specific listener, Kafka will only return advertised listeners with the same name. I will show this in action shortly.

So our client must connect through the EXTERNAL listener, and in fact we have no other option as it is the only one accessible from outside the Docker Machine. Our current setup is illustrated below:

Internal and external listeners configured. Names abbreviated.

And then, finally a success:

$ kafka-console-producer.bat --broker-list dockervm:9092 \
--topic test
> Hello
$ kafka-console-consumer.bat --bootstrap-server dockervm:9092 \
--topic test \
--from-beginning
Hello

A fewLEADER_NOT_AVAILABLE errors may occur at first because the partition leader election takes a short while.

Note: you “can” change your hosts file to map kafka to the Docker Machine and avoid having to set advertised.listeners. But that doesn’t mean you should! This is a terrible non-portable hack.

Multiple brokers

Note: there are several layers involved: host, Docker Machine and containers. To avoid confusion let’s define some names. In this section:

  • Container port is a port exposed by the container and accessible from the Docker Machine.
  • Machine port is a port exposed by the Docker Machine and accessible from the host.
  • Port mapping is essentially forwarding a machine port to a container port.

Kafka is more fun with multiple brokers. Well, we are using Docker Compose, so why not just scale up it with:

$ docker-compose up --scale kafka=3

You will immediately notice that Docker Compose will complain about port 9092 being already allocated. Of course, you cannot map three container ports to the same machine port. The solution:

# showing only modified properties
services:
kafka:
ports:
- "9092-9094:9092"

This allows Docker Compose to map the container port 9092 to any machine port from 9092 to 9094, as long as it’s available. Try again and:

$ # showing only relevant columns
$ docker ps
CONTAINER ID PORTS NAMES
2c2a186b6ea3 0.0.0.0:9092->9092/tcp kafka_1
acc192a36f53 0.0.0.0:9093->9092/tcp kafka_2
33aac9307d75 0.0.0.0:9094->9092/tcp kafka_3

Unfortunately, all but one of the brokers will quickly die. All listeners are configured to bind to a kafka hostname, but now that name does not represent a single container. Docker Compose DNS behavior in this case is somewhat undefined, causing some brokers to fail to start their listeners:

ERROR [KafkaServer id=1003] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.KafkaException: Socket server failed to bind to kafka:9090: Address not available.

Also, the cluster controller is unable to verify their availability, so leader election will also fail like we’ve seen before.

Also again, the advertised listeners ports are all but one wrong too. This is our current setup illustrated:

Multiple brokers and their advertised listeners with incorrect setup. The star marks the partition leader, the square marks the controller. Red means error or incorrect configuration.

Suppose the INTERNAL listeners were correct and that broker 3 was elected leader of a certain partition. When a client connects to the cluster, it will be told that the leader is at dockervm:9092 as broker 3 is advertising that listener.

However, the machine port 9092 is mapped to broker 1, so the client will connect to it instead of broker 3, causing a NOT_LEADER_FOR_PARTITION error. It would only work if the leader was broker 1.

In summary, this is the setup we need:

  • The INTERNAL listeners’ hostnames should be resolvable by the Docker Compose DNS, such as their container ID;
  • The EXTERNAL advertised listeners’ ports should match their container’s mapped machine port.
Multiple brokers and their advertised listeners with correct setup. The star marks the partition leader, the square marks the controller. Container IDs changed for readability.

This is why I chose wurstmeister’s image: it provides two variables that lets you define and inject commands in listeners: HOSTNAME_COMMAND and PORT_COMMAND. I’m not sure the others have such feature. Below, some modifications to make our docker-compose.yml file work:

# showing only modified properties
services:
kafka:
image: wurstmeister/kafka
environment:
HOSTNAME_COMMAND:
"echo $$HOSTNAME"
PORT_COMMAND:
"docker port $$(HOSTNAME) 9092/tcp | cut -d: -f2"
KAFKA_LISTENERS:
"INTERNAL://_{HOSTNAME_COMMAND}:9090,\
EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS:
"INTERNAL://_{HOSTNAME_COMMAND}:9090,\
EXTERNAL://dockervm:_{PORT_COMMAND}"
volumes:
- /var/run/docker.sock:/var/run/docker.sock

Notice how the listeners now have the placeholders _{HOSTNAME_COMMAND} and _{PORT_COMMAND}. The container start script will replace those by the result of their corresponding commands, allowing us to define listeners within the container’s environment.

The hostname is pretty easy. By default, the container hostname is the container ID, which the Docker Compose DNS is able to resolve. The $HOSTNAME environment variable is just what we need.

The port is trickier, though, as only Docker itself truly knows the container mapped machine port. As nicely explained here, I defined a volume that allows us to access the Docker daemon from inside a container. This way, we can use docker port <container> to retrieve the container port mapping and, with some fiddling, determine the machine port:

$ docker port kafka_3
9092/tcp -> 0.0.0.0:9094
$ # cut at : and get slice 2
$ docker port kafka_3 | cut -d: -f2
9094

Now I can create a topic with replication factor 3 and successfully connect to it. Notice how I even connect the consumer through another broker:

$ kafka-topics.bat --zookeeper dockervm:2181 --create --topic test \
--partitions 3 \
--replication-factor 3
$ kafka-console-producer.bat --broker-list dockervm:9092 \
--topic test
> Hello
$ kafka-console-consumer.bat --bootstrap-server dockervm:9094 \
--topic test \
--from-beginning
Hello

Exploring the cluster

When I finally got this working, everything was still a bit foggy in my head. Let’s take some time and see with our own eyes where we ended up.

First thing is to verify the topic configuration. As expected, there are 3 partitions with 3 replicas each. Also, each replica of a given partition is in a different broker. The leaders are also all in different brokers, but I believe that was a coincidence.

$ kafka-topics.bat --zookeeper dockervm:2181 \
--describe --topic test
Topic: test PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test Partition: 0 Leader: 1001 Replicas: 1001,1002,1003 Isr: 1001,1002,1003
Topic: test Partition: 1 Leader: 1002 Replicas: 1002,1003,1001 Isr: 1002,1003,1001
Topic: test Partition: 2 Leader: 1003 Replicas: 1003,1001,1002 Isr: 1003,1001,1002

Then, after producing a few messages on the topic, I started a consumer and verified its group state. Also as expected, the messages were distributed round-robin among all partitions and the consumer was able to receive all messages (remember that order is not guaranteed across partitions).

$ kafka-console-consumer.bat --bootstrap-server dockervm:9094 \
--topic test --from-beginning \
--group test-consumer
MSG 3
MSG 6
MSG 9
MSG 2
MSG 5
MSG 8
MSG 1
MSG 4
MSG 7
MSG 10
$ kafka-consumer-groups.bat --bootstrap-server dockervm:9092 \
--describe --group test-consumer
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
test 1 4 4 0
test 0 3 3 0
test 2 3 3 0

As I promised, I used kafkacat to analyze cluster metadata provided by one of the brokers. Confluent has a Docker image with it.

It’s important to run this container in the same network as Kafka to use a listener, not advertised listener. Docker Compose usually creates a network (in my case it was kafka_default), which also allows me to use a container name as broker address.

It’s especially interesting to see how the returned endpoints change according to the listener used to connect. This is using the INTERNAL listener:

$ # INTERNAL listener
$ docker run --network kafka_default confluentinc/cp-kafkacat \
kafkacat -b kafka_1:9090 -L
Metadata for all topics (from broker -1: kafka_1:9090/bootstrap):
3 brokers:
broker 1001 at 2c2a186b6ea3:9090
broker 1003 at 33aac9307d75:9090
broker 1002 at acc192a36f53:9090
2 topics:
topic "test" with 3 partitions:
partition 0, leader 1001, replicas: 1001,1002,1003, isrs: 1001,1002,1003
partition 1, leader 1002, replicas: 1002,1003,1001, isrs: 1002,1003,1001
partition 2, leader 1003, replicas: 1003,1001,1002, isrs: 1003,1001,1002

Then using the EXTERNAL listener:

$ # EXTERNAL listener
$ docker run --network kafka_default confluentinc/cp-kafkacat \
kafkacat -b kafka_1:9092 -L
Metadata for all topics (from broker -1: kafka_1:9092/bootstrap):
3 brokers:
broker 1001 at dockervm:9092
broker 1003 at dockervm:9094
broker 1002 at dockervm:9093
2 topics:
topic "test" with 3 partitions:
partition 0, leader 1001, replicas: 1001,1002,1003, isrs: 1001,1002,1003
partition 1, leader 1002, replicas: 1002,1003,1001, isrs: 1002,1003,1001
partition 2, leader 1003, replicas: 1003,1001,1002, isrs: 1003,1001,1002

In the case above, when kafkacat connected through port 9092 it used the EXTERNAL listener, so Kafka returned all advertised EXTERNAL listeners.

In the multi-broker setup, we guaranteed the listener names matched across containers because we used Docker Compose scale mechanism, so the names were centralized in the docker-compose.yml file. If you configure each container individually, pay attention to the listener names.

Out of curiosity, I verified. Modified the docker-compose.yml file to explicitly start two Kafka containers without --scale. Notice the different listener names:

version: "2.1"
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka_1:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS:
"INTERNAL://kafka_1:9090,\
A://:9092"
KAFKA_ADVERTISED_LISTENERS:
"INTERNAL://kafka_1:9090,\
A://dockervm:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
"INTERNAL:PLAINTEXT,\
A:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
kafka_2:
image: wurstmeister/kafka
ports:
- "9093:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS:
"INTERNAL://kafka_2:9090,\
B://:9092"
KAFKA_ADVERTISED_LISTENERS:
"INTERNAL://kafka_2:9090,\
B://dockervm:9093"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
"INTERNAL:PLAINTEXT,\
B:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL

Then connecting kafkacat through listener A, listener B is not returned:

$ # A listener
$ docker run --network kafka_default confluentinc/cp-kafkacat \
kafkacat -b kafka_1_1:9092 -L
Metadata for all topics (from broker -1: kafka_1_1:9092/bootstrap):
1 brokers:
broker 1001 at dockervm:9092

Beyond the Docker Machine

Even though my use case was specifically for the Docker Machine, this article is applicable to any scenario in which Kafka and clients run in different hosts. Instead of a Docker Machine, it could be AWS or other remote host.

This post covers with more details how to work with AWS.

--

--