Pulsar on Kubernetes: Enabling Kafka-on-Pulsar (KoP) with Pulsar Operators

Sherlock Xu
7 min readApr 25, 2023

--

In my previous blog posts, I demonstrated how to use Pulsar Operators to quickly spin up a Pulsar cluster on Kubernetes and leverage tiered storage to offload data to AWS S3 storage. Pulsar Operators provide a controllable and lightweight approach for installing and managing Pulsar instances on Kubernetes. This allows users to enjoy a consistent experience with native Pulsar features in a cloud-native environment, while benefiting from the enhanced scalability, observability, and security offered by Kubernetes.

In this post, I will demonstrate how to enable Kafka-on-Pulsar (KoP) on Kubernetes, another powerful and frequently-used capability of Pulsar. I will show you how the experience of using KoP is greatly enhanced when combined with Pulsar Operators.

A short intro to KoP

KoP is built on top of Pulsar’s protocol handler mechanism, which introduces a Kafka protocol handler on Pulsar brokers. In simpler terms, by adding the KoP protocol handler to an existing Pulsar cluster, you can easily migrate your Kafka applications without modifying the code.

I don’t think KoP is designed to persuade Kafka users to abandon Kafka. In fact, if you are using Kafka, you can continue to do so if you want to use KoP. This protocol handler allows you to use Kafka applications together with a variety of Pulsar’s powerful features, such as enterprise-grade multi-tenancy, infinite message retention with BookKeeper, tiered storage, and Pulsar functions. Both Kafka and Pulsar clients can work together to produce and consume messages. For more information about KoP, check out the KoP GitHub repository and this DZone blog post Understanding Kafka-on-Pulsar (KoP): Yesterday, Today, and Tomorrow.

Before you begin

Create a Kubernetes cluster (v1.16 <= Kubernetes version < v1.26) with kubectl installed. To provide persistent storage for BookKeeper and ZooKeeper, you must configure a default storage class. The following is my Amazon EKS environment for your reference.

kubectl get nodes -o wide

NAME STATUS ROLES AGE VERSION INTERNAL-IP EXTERNAL-IP OS-IMAGE KERNEL-VERSION CONTAINER-RUNTIME
ip-192-168-12-243.ap-southeast-1.compute.internal Ready <none> 119s v1.22.17-eks-a59e1f0 192.168.12.243 xx.xxx.xxx.xx Amazon Linux 2 5.4.238-148.347.amzn2.x86_64 docker://20.10.17
ip-192-168-32-103.ap-southeast-1.compute.internal Ready <none> 117s v1.22.17-eks-a59e1f0 192.168.32.103 xx.xxx.xxx.xxx Amazon Linux 2 5.4.238-148.347.amzn2.x86_64 docker://20.10.17
ip-192-168-84-233.ap-southeast-1.compute.internal Ready <none> 2m7s v1.22.17-eks-a59e1f0 192.168.84.233 xx.xxx.xxx.x Amazon Linux 2 5.4.238-148.347.amzn2.x86_64 docker://20.10.17
kubectl get sc

NAME PROVISIONER RECLAIMPOLICY VOLUMEBINDINGMODE ALLOWVOLUMEEXPANSION AGE
gp2 (default) kubernetes.io/aws-ebs Delete WaitForFirstConsumer false 15m

Installing Pulsar and KoP with Pulsar Operators

Follow the steps below to complete the installation. Note that some steps in this section are similar to those in my previous blog post, so I will not explain the details here.

1. Install Operator Lifecycle Manager (OLM).

curl -sL https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.24.0/install.sh | bash -s v0.24.0

2. Install Pulsar Operators and the CRDs of Pulsar components.

kubectl create -f https://raw.githubusercontent.com/streamnative/charts/master/examples/pulsar-operators/olm-subscription.yaml

3. Create a Kubernetes namespace called pulsar.

kubectl create ns pulsar

4. Create a Pulsar cluster with KoP enabled with the following command. The YAML file in the command defines the ZooKeeper, broker, and BookKeeper custom resources without Pulsar proxies. In the broker manifest, spec.config.protocolHandlers.kop.enabled is set to true to enable KoP.

kubectl create -f https://raw.githubusercontent.com/streamnative/charts/master/examples/pulsar-operators/kop.yaml

Conventionally, using KoP involves setting a bunch of configurations, like messagingProtocols, allowAutoTopicCreationType, kafkaListeners and entryFormat in broker.conf or standalone.conf. In a containerized environment, you may use PULSAR_PREFIX_ to add these configurations to the broker CR’s manifest. With Pulsar Operators, all these configurations are already wrapped and you can enable KoP directly for the broker without configuring anything.

5. Check the status of Pods.

kubectl get pods -n pulsar

NAME READY STATUS RESTARTS AGE
bookies-bk-0 1/1 Running 0 57s
bookies-bk-1 1/1 Running 0 57s
bookies-bk-2 1/1 Running 0 57s
bookies-bk-auto-recovery-0 1/1 Running 0 11s
brokers-broker-0 1/1 Running 0 61s
brokers-broker-1 1/1 Running 0 60s
zookeepers-zk-0 1/1 Running 0 2m59s
zookeepers-zk-1 1/1 Running 0 2m59s
zookeepers-zk-2 1/1 Running 0 2m59s

6. Check the status of Services.

kubectl get svc -n pulsar

NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
bookies-bk ClusterIP 10.100.128.28 <none> 3181/TCP,8000/TCP 4m20s
bookies-bk-auto-recovery-headless ClusterIP None <none> 3181/TCP,8000/TCP 4m20s
bookies-bk-headless ClusterIP None <none> 3181/TCP,8000/TCP 4m20s
brokers-broker ClusterIP 10.100.160.21 <none> 6650/TCP,8080/TCP,9092/TCP 6m22s
brokers-broker-headless ClusterIP None <none> 6650/TCP,8080/TCP,9092/TCP 6m22s
zookeepers-zk ClusterIP 10.100.73.123 <none> 2181/TCP,8000/TCP,9990/TCP 6m22s
zookeepers-zk-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP,8000/TCP,9990/TCP 6m22s

In the above output, you can see that the Pulsar broker Pods are being exposed internally on three ports via a ClusterIP Service called brokers-broker. Port 9092 can be used to connect to KoP. In the next section, I will use an internal Kafka client to test the connection to KoP. Note that if your Kafka application needs to access KoP outside the Kubernetes cluster, you may need to set up a gateway to route traffic with tools like Istio. For more information, see Configuring Istio.

Producing and consuming messages with KoP

KoP allows both Kafka clients and Pulsar clients to send messages to and consume messages from the Pulsar cluster that has KoP enabled.

1. Before testing sending and receiving messages, let’s obtain the endpoints that the brokers-broker Service uses to expose the broker Pods. Run the following command.

kubectl describe svc brokers-broker -n pulsar

2. In the output below, the endpoints 192.168.16.89:9092,192.168.57.39:9092 will be used later for connecting to KoP.

Name:              brokers-broker
Namespace: pulsar
Labels: cloud.streamnative.io/app=pulsar
cloud.streamnative.io/cluster=brokers
cloud.streamnative.io/component=broker
Annotations: <none>
Selector: cloud.streamnative.io/app=pulsar,cloud.streamnative.io/cluster=brokers,cloud.streamnative.io/component=broker
Type: ClusterIP
IP Family Policy: SingleStack
IP Families: IPv4
IP: 10.100.160.21
IPs: 10.100.160.21
Port: tcp-pulsar 6650/TCP
TargetPort: 6650/TCP
Endpoints: 192.168.16.89:6650,192.168.57.39:6650
Port: http 8080/TCP
TargetPort: 8080/TCP
Endpoints: 192.168.16.89:8080,192.168.57.39:8080
Port: kop 9092/TCP
TargetPort: 9092/TCP
Endpoints: 192.168.16.89:9092,192.168.57.39:9092 # For internal connection to KoP
Session Affinity: None
Events: <none>

2. Note that currently there is no topic in the default namespace public/default.

kubectl -n pulsar exec brokers-broker-0 -- bin/pulsar-admin topics list public/default
Defaulted container "pulsar-broker" out of: pulsar-broker, init-sysctl (init)
# No topic here.

3. Create a Kafka client Pod on Kubernetes. It will be deleted once you exit the Pod.

kubectl -n pulsar run kafka-client --rm -it --image bitnami/kafka:3.1.0 -- bash

Expected output:

If you don't see a command prompt, try pressing enter.
I have no name!@kafka-client:/$

4. There are some tools inside the Kafka client container that you can use.

ls /opt/bitnami/kafka/bin

kafka-acls.sh
kafka-cluster.sh
kafka-configs.sh
kafka-console-consumer.sh
kafka-console-producer.sh
kafka-consumer-groups.sh
...

5. Start a Kafka producer and send some messages. Use the endpoints you obtained above for --broker-list.

./opt/bitnami/kafka/bin/kafka-console-producer.sh --broker-list 192.168.16.89:9092,192.168.57.39:9092 --topic kop-test
> Message 1 from Kafka Producer
> Message 2 from Kafka Producer
> Message 3 from Kafka Producer
> Message 4 from Kafka Producer

6. Do not close this producer and open a new terminal to start a Kafka consumer. You should be able to see that the messages have been consumed successfully.

kubectl -n pulsar exec kafka-client -- ./opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.16.89:9092,192.168.57.39:9092 --topic kop-test --from-beginning
Message 1 from Kafka Producer
Message 2 from Kafka Producer
Message 3 from Kafka Producer
Message 4 from Kafka Producer

You can continue to send more messages. This is what I saw in my terminals.

Sending a message with the Kafka client

7. Open a third terminal and run the following command to check the topic created.

kubectl -n pulsar exec brokers-broker-0 -- bin/pulsar-admin topics list public/default
Defaulted container "pulsar-broker" out of: pulsar-broker, init-sysctl (init)
persistent://public/default/kop-test-partition-0

8. Now let’s try something interesting. In the third terminal, exec into a broker Pod, create a Pulsar producer, and send a message.

kubectl -n pulsar exec brokers-broker-0 -- ./bin/pulsar-client produce kop-test -n 1 -m "Message 1 from Pulsar Producer"

You should be able to see that the message has been received by the Kafka consumer.

Message 1 from Kafka Producer
Message 2 from Kafka Producer
Message 3 from Kafka Producer
Message 4 from Kafka Producer
Message 5 from Kafka Producer
Message 1 from Pulsar Producer # New message sent by the Pulsar Producer.

You can send more messages to test it. Here is what I saw in my terminals.

The Kafka consumer received the message sent by the Pulsar producer

9. Do not close the above 3 terminals. Open a fourth terminal to create a Pulsar consumer.

kubectl -n pulsar exec brokers-broker-0 -- ./bin/pulsar-client consume -s sub-name kop-test -n 0

This is the expected output. The consumer has successfully subscribed to the topic.

2023-04-25T03:56:49,952+0000 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.MultiTopicsConsumerImpl - [kop-test] [sub-name] Success subscribe new topic persistent://public/default/kop-test in topics consumer, partitions: 1, allTopicPartitionsNumber: 1

10. Send a message from the Kafka producer. Both the Kafka consumer and the Pulsar consumer can receive the message.

Top-left: Kafka Producer; Top-right: Kafka Consumer; Bottom-left: Pulsar Producer; Bottom-right: Pulsar Consumer

11. Similarly, send a message from the Pulsar producer. Both the Kafka consumer and the Pulsar consumer can receive the message.

Top-left: Kafka Producer; Top-right: Kafka Consumer; Bottom-left: Pulsar Producer; Bottom-right: Pulsar Consumer

Conclusion

Pulsar Operators not only support the KoP protocol handler, but also AoP (AMQP-on-Pulsar) and MoP (MQTT-on-Pulsar), which can be enabled in a similar way. I will provide more tutorials for using them and other Pulsar features on Kubernetes in future blog posts.

--

--