Setup Kafka with Debezium using Strimzi in Kubernetes

Kubernetes automates the distribution and scheduling of application containers across a cluster in an efficient way. Strimzi gives an easy way to run Apache Kafka on Kubernetes or Openshift and Debezium provides custom Kafka connectors.

This blog will demonstrate the steps to deploy Debezium in Kubernetes using Strimzi. We will be using the example from Debezium (https://github.com/debezium/debezium-examples/tree/master/unwrap-smt). The scenario we are going to use is to receive events from MySQL database(source) using Debezium connector and save them in Elastic search(sink), using Strimzi in Kubernetes development or test environment.

Flow Diagram

Prerequisites:

Note: Replace the text styled in italics with your corresponding values.

Step1: Strimzi artifacts

Download the artifacts from strimzi repository ( https://github.com/strimzi/strimzi-kafka-operator/releases) and extract the folder — strimzi-0.11.4.zip.

Step2: Deploy Strimzi Cluster operator

1)Replace the files with the your namespace: I used default.

sed -i ‘s/namespace: .*/namespace: default/’ install/cluster-operator/*RoleBinding*.yaml

2)And apply the operator

kubectl apply -f install/cluster-operator

Step3: Deploy Kafka cluster operator

This will deploys Kafka and Zookeeper with necessary configurations.We will be deploying a non-persistent cluster which is for development and testing and not for production.

kubectl apply -f examples/kafka/kafka-ephemeral.yaml

And check the pods and services using kubectl command.

Step4: Test the Kafka set up

Producer : Use the producer utility in the Kafka pod , and send message to the bootstrap service (here it is my-cluster-kafka-bootstrap) using the below command and enter messages.

kubectl exec -i my-cluster-kafka-0 -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic strimizi-my-topic

Consumer: Use the consumer in the Kafka pod to receive messages. In another terminal execute the below command and see the messages

kubectl exec -i my-cluster-kafka-0 -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic strimizi-my-topic --from-beginning

To find out the topics in the Kafka cluster use the below command.(Zookeeper is accessible through localhost:2181 and my-cluster-zookeeper-client is not accessible) from outside.

kubectl exec my-cluster-kafka-0   -- bin/kafka-topics.sh --list --zookeeper localhost:2181

Step5: Deploy Kafka Connect with custom connectors

5.1) Add the required connectors to Strimzi Kafka connect image.In our case, add Debezium MySQL connector and confluent Elastic search connector to Strimzi Kafka Connect image.

  • Create a DockerFile with the below content and save it .
FROM strimzi/kafka-connect:0.11.4-kafka-2.1.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001

And place the connector plugins in your local machine.

Folder structure

5.2)Build the docker file

docker build -t deb_connectors .
docker images

Inspect the docker image plugin directory and check the added connector plugins:

docker run -it deb_connectors sh

5.3)Tag and Push to Docker hub

Create an account in Docker Hub if you don’t have. Now in command line execute the below commands to log in and push the new image.

docker login
docker images

Tag and Push the image

docker tag image yourusername/repository:tag
docker tag f38eef0fb770 yourusername/deb_connectors:part1
docker push yourusername/deb_connectors:part1

Refer the docker tutorials for more details on build and push.

5.2)Modify and Deploy KafkaConnect in the cluster

Take the yaml file from Strimzi example folder (examples/kafka-connect/kafka-connect.yaml) and add your new image .

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnect
metadata:
name: my-connect-cluster
spec:
---------
bootstrapServers: my-cluster-kafka-bootstrap:9093
image: yourusername/deb_connectors:part1

tls:
trustedCertificates:
--------

and apply

kubectl apply -f examples/kafka-connect/kafka-connect.yaml

Check the Pods and Services

kubectl get svc
kubectl get pods
kubectl describe pod connectorPodName (validate your new image used by the connector POD)

Execute the below command to list the available connector plugins

kubectl exec -i my-cluster-kafka-0 -- curl -X GET http://my-connect-cluster-connect-api:8083/connector-plugins

Step 6: Create a MySQL test DB and expose as a service

kubectl create -f https://raw.githubusercontent.com/sincysebastian1/Kubernetes/master/DebeziumConnectors/mysql.yaml
kubectl expose deployment mysql --port=3306 --target-port=3306

Step7: Start the Connectors

Take file source.json from the Debezium repository

https://raw.githubusercontent.com/debezium/debezium-examples/master/unwrap-smt/source.json

and modify the field value database.history.kafka.bootstrap.servers.

In my case it is "database.history.kafka.bootstrap.servers": "my-cluster-kafka-bootstrap:9092"

Take es-sink.json from https://github.com/debezium/debezium-examples/blob/master/unwrap-smt/es-sink.json and modify the elastic search end point with yours.

Now execute the below commands

cat source.json | kubectl exec -i my-cluster-kafka-0 -- curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://my-connect-cluster-connect-api:8083/connectors -d @-
cat es-sink.json | kubectl exec -i my-cluster-kafka-0 -- curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://my-connect-cluster-connect-api:8083/connectors -d @-
  • View the connectors
kubectl exec -i my-cluster-kafka-0 -- curl -X GET http://my-connect-cluster-connect-api:8083/connectors
  • View all the topics
kubectl exec my-cluster-kafka-0   -- bin/kafka-topics.sh --list --zookeeper localhost:2181
  • View the messages from mysql connector in Kafka the topic “customers”
kubectl exec -i my-cluster-kafka-0 -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic customers --from-beginning
  • View the records in Elastic Search
curl 'http:// http://your_elasticsearch_endpoint:9200/customers/_search?pretty'

Step8: Modify MySQL records and validate in Elastic search

  • Login to MySQL terminal and run
kubectl exec -it mysql-5fc9f7d58-hvf9g  -- bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
insert into customers values(default, 'hello', 'strimzi', 'hello@example.com');
  • And see the changes are reflected in Elastic search
curl 'http:// http://your_elasticsearch_endpoint:9200/customers/_search?pretty'

References