Starting Kafka Cluster As a Single Docker Service — A Fallacy ?

Prateek
6 min readApr 19, 2018

--

In the previous story, we created a Kafka cluster with 3 brokers. In that example, we setup each broker as a separate docker service targeted on specific nodes and exposing different ports.

Next question is: Instead of setting each Kafka broker as separate docker service, can we set up Kafka broker as single service replicated in all swarm nodes?

Here is the docker-compose.yml for that setup:

version: '3'
services:
zookeeper:
image: kafka:latest
volumes:
- zoo-service-data:/tmp/zookeeper
ports:
- 2181:2181
networks:
- kafka-net
deploy:
mode: global
placement:
constraints:
- node.labels.zoo==1
command: /kafka/bin/zookeeper-server-start.sh /kafka/config/zookeeper.properties

kafka1:
image: kafka:latest
volumes:
- kafka-service-logs:/tmp/kafka-logs
- /var/run/docker.sock:/var/run/docker.sock
ports:
- 9093:9093
networks:
- kafka-net
deploy:
mode: global
command: /kafka/bin/kafka-server-start.sh /kafka/config/server.properties --override zookeeper.connect=zookeeper:2181 --override listeners=INT://:9092,EXT://0.0.0.0:9093 --override listener.security.protocol.map=INT:PLAINTEXT,EXT:PLAINTEXT --override inter.broker.listener.name=INT --override advertised.listeners=INT://:9092,EXT://node3:9093 --override broker.id=-1

networks:
kafka-net:

volumes:
kafka-service-logs:
zoo-service-data:

Let’s go through the details:

  1. Zookeeper service definition is the same
# Name of the seri
zookeeper:
# Image Name
image: kafka:latest
# map the volume zoo-service-data to /tmp/zookeeper
# /tmp/zookeeper is defined in zookeeper.properties
volumes:
- zoo-service-data:/tmp/zookeeper

# Expose port 2181. This is not required
ports:
- 2181:2181

# Add to the same overlay network
networks:
- kafka-net

deploy:

# global mode
mode: global

# deploy to node3 which is labeled as zoo=1
placement:
constraints:
- node.labels.zoo==1

# Run the zookeeper
command: /kafka/bin/zookeeper-server-start.sh /kafka/config/zookeeper.properties

2. Let’s look at the more interesting definition: Kafka service

# The service name
kafka:
# Image
image: kafka:latest
# map a volume to /tmp/kafka-logs
# /tmp/kafka-logs is defined in server.properties
volumes:
- kafka-service-logs:/tmp/kafka-logs
# Expose the service port
ports:
- 9093:9093

# Add to same overlay network
networks:
- kafka-net

deploy:
# one instance is deployed to all swarm nodes
# An instance is deployed to node3, node4 and node5
mode: global

# Run kafka with override properties
command: /kafka/bin/kafka-server-start.sh /kafka/config/server.properties
# zookeeper is pointed to zookeeper service
--override zookeeper.connect=zookeeper:2181
# define two listeners:
# INT://:9092 => listens on docker container hostname and port 9092
# EXT://0.0.0.0:9093 => listens on all IPs
--override listeners=INT://:9092,EXT://0.0.0.0:9093
# Indicates that communication is in Plain text
--override listener.security.protocol.map=INT:PLAINTEXT,EXT:PLAINTEXT
# Inter-broker communication happens on this listener
--override inter.broker.listener.name=INT
# advertise the listeners
# INT://:9092 => Internal listener is advertised to listen on
# container hostname and port.
# This works because they are in same network
# EXT://node3:9093 => External listener is advertised
# to listen on a swarm node.
# There is no good way to know which node is the instance running
# The port in swarm mode is load balanced to any swarm nodes
--override advertised.listeners=INT://:9092,EXT://node3:9093
# add broker id to -1 so that broker gets assigned a broker id
--override broker.id=-1

Let’s walk through what happens when we run the docker stack

  • Kafka service gets deployed to all nodes of the swarm cluster nodes (node3, node4 and node5 ), because of mode=global
  • Each swarm cluster node creates a volume which is mapped to the kafka service instance running in that node
  • Kafka service port 9093 is exposed on the swarm cluster
  • A request to any swarm node at port 9093 get routed to any one of the service instance running in node3, node4 or node5
  • The inter broker communication between Kafka services a.k.a. brokers is done through INT listener. That listener uses docker container hostname to communicate. That is possible because they are in the same overlay network.
  • Each Broker gets unique id which is saved in meta.properties in the network. If the broker service is restarted it uses the broker-id saved in that file . The broker id remains the same even if the broker service is restarted.

Let’s test is out by starting it using Docker Stack.

docker stack deploy -c docker-compose.yml kfk

The stack successfully starts the Zookeeper and Kafka service.

Let’s do some experiments to see and understand whether this is working.

Experiment 1: Producing message on topic (topic-1–1) with one partition and one replica

node1 > bin/kafka-topic.sh --create --zookeeper node3:2181 --topic test_1_1 --partitions 1 --replication 1 

The topic gets created !! .

node1 > bin/kafka-console-producer.sh --broker-list node3:9093 --topic test_1_1

If it works, try restarting the producer. We will find that producer works intermittently. We will see this warnings and then an Error:

WARN [Producer clientId=console-producer] Received unknown topic or partition error in produce request on partition test-1-1-0. The topic/partition may not exist or the user may not have Describe access to it (org.apache.kafka.clients.producer.internal.Sender)ERROR Error when sending message to topic test-1-1 with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic partition

Experiment 2: Producing message on topic (topic-1–3) with one partition and three replica

node1 > bin/kafka-topic.sh --create --zookeeper node3:2181 --topic test_1_1 --partitions 1 --replication 3

If it works, try restarting the producer. We will find that producer works intermittently. We will see this warnings and then an Error:

WARN [Producer clientId=console-producer] Got error produce response with correlation id X on topic partition test-1-3-0, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internal.Sender)ERROR Error when sending message to topic test-1-3 with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition

Experiment 3: Producing message on topic (topic-3–1) with three partition and one replica

node1 > bin/kafka-topic.sh --create --zookeeper node3:2181 --topic test_3_1 --partitions 3--replication 1

The topic gets created !! .

node1 > bin/kafka-console-producer.sh --broker-list node3:9093 --topic test_3_1

If it works, try restarting the producer. We will find that producer works intermittently. We will see this warnings and then an Error:

WARN [Producer clientId=console-producer] Received unknown topic or partition error in produce request on partition test-3-1-0. The topic/partition may not exist or the user may not have Describe access to it (org.apache.kafka.clients.producer.internal.Sender)ERROR Error when sending message to topic test-3-1 with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic partition

Why is the producer not working most of the time, when Kafka brokers seems to be running and inter-broker communication is working? After reading through some details in Kafka documentation, this is my understanding why Kafka as a service does not work.

As per the documentation here is explanation of few relevant errors: UnknownTopicOrPartition: This request is for a topic or partition that does not exist on this broker.
NotLeaderForPartition: This error is thrown if the client attempts to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date.

Consider, topic-1–1 scenario:

  • The producer connects with broker (node3:9093) specified in the broker-list.
  • The swarm cluster which acts as load balancer forwards the request to one of the instances of Kafka service.
  • The Kafka broker provides the information to the client about which broker has the topic and who is the leader.
  • Since all EXT listeners have listed node3:9093 as address to connect to the listener, client sends the request to write the message to node3:9093
  • Swarm load balances the request. The request may or may not go to the brokers which maintains the partitions (In case of topic-1–1, only one broker has the partition, since replica is set to 1)

Hence we UnknownTopicOrPartition warnings which finally leads to error.

Consider, topic-1–3 scenario:

  • The producer connects with broker (node3:9093) specified in the broker-list.
  • The swarm cluster which acts as load balancer forwards the request to one of the instances of Kafka service.
  • The Kafka broker provides the information to the client about which broker has the topic and who is the leader.
  • Since all EXT listeners have listed node3:9093 as address to connect to the listener, client sends the request to write the message to node3:9093
  • Swarm load balances the request. The request will go a broker which has the partition, but may not be the leader.

Hence we NotLeaderForPartition warnings which finally leads to error.

Here are some of the details from the documentation which suggests the same:

These requests to publish or fetch data must be sent to the broker that is currently acting as the leader for a given partition. This condition is enforced by the broker, so a request for a particular partition to the wrong broker will result in an the NotLeaderForPartition error code.

It seems that because of Docker Stack load balancing, we cannot guarantee that the request will be send to the leader of the partition. Currently, it seems that we cannot set Kafka brokers as one service :( .

The previous post, where each broker was set up as a separate service seems to be how we would have to proceed.

--

--