Configuring a Kafka Connect pipeline on Kubernetes — Step By Step

Sumant Rana
The Startup
Published in
6 min readApr 17, 2020

Recently I installed and configured Kafka Connect on GKE (using Helm charts) and created an end to end pipeline to transfer data from a MySQL database to a text file using the JDBC Connector and the File Connector.

In this article, I will walk you through the steps and also point to the configuration changes I had to make in order to get it working successfully on Kubernetes. The files are available in the repo https://github.com/sumantrana/kafka-connect-pipeline.

Kafka Connect

Kafka Connect is a framework for moving data to and from other systems using Kafka as the middle man. Kafka Connect comes with a variety of inbuilt connectors that help to connect to existing systems transfer data to and from Kafka. Kafka Connect can be scaled as per requirements. It has a standalone version as well as a distributed version.

This article does not intend to explain in detail Kafka Connect, it’s architecture and its advantages but focuses more on its installation and configuration on Kubernetes and makes it work end to end in a standalone environment. For learning in-depth about Kafka Connect, please visit the official site here.

In this article, we will be creating an end to end pipeline with MySQL as a source of data and a text file as the sink. The following diagram depicts the flow of data:

Prerequisites:

  • Kubernetes cluster (GKE or equivalent)

I initially started with a 3 node (“n1-standard-1”) cluster, but eventually ran out of the CPU. So I had to increase the node pool size to 4.

To be able to install Kafka Connect and MySQL using helm charts.

  • MySQL client

To be able to connect to the MySQL server and create tables and insert data.

Install Kafka Connect

Clone Kafka Connect helm chart repository

git clone https://github.com/confluentinc/cp-helm-charts.git

Change default values

The default values.yaml is configured to install a 3 node Kafka cluster. This would require a lot more CPU and a bigger or better node pool. Therefore for the sake of this exercise, we will scale down Kafka to a single node cluster. The following configuration changes are required:

  • cp-helm-charts/charts/cp-control-center/values.yaml
servers: 1
brokers: 1
  • cp-helm-charts/charts/cp-kafka/values.yaml
"offsets.topic.replication.factor": "1"
  • cp-helm-charts/charts/cp-kafka-connect/values.yaml
"config.storage.replication.factor": "1"
"offset.storage.replication.factor": "1"
"status.storage.replication.factor": "1"
  • cp-helm-charts/charts/cp-control-center/values.yaml
"replication.factor": "1"

Install Kafka Connect helm chart

helm install <connect-release-name> cp-helm-chart

<connect-release-name> can be any name given to the installation. This will be prefixed to all the Kubernetes artifacts deployed as a part of the installation.

Wait till all the artifacts are deployed. This shows the sample output of the command kubectl get pods when the helm chart was installed with kafkaconnect as the <connect-release-name>:

NAME                                               READY   STATUS    
kafkaconnect-cp-control-center-784846dd89-v88zs 1/1 Running
kafkaconnect-cp-kafka-0 2/2 Running
kafkaconnect-cp-kafka-connect-6bcbd5cbbf-zjz4q 2/2 Running
kafkaconnect-cp-kafka-rest-864cc8c67f-9g7fc 2/2 Running
kafkaconnect-cp-ksql-server-7594f6d6b7-4bpxx 2/2 Running
kafkaconnect-cp-schema-registry-59f5887595-xdz8p 2/2 Running
kafkaconnect-cp-zookeeper-0 2/2 Running

Install and configure MySQL

Add helm repository (google repo for MySQL)

helm repo add stable https://kubernetes-charts.storage.googleapis.com/

Install MySQL chart

helm install <mysql-release-name> stable/mysql

Similar to Kafka Connect, <mysql-release-name> can be any string to uniquely identify this installation. I have used mysql as the release name for this installation.

Connect to MySQL

  • Create a port forward rule for MySQL service listening on port 3306
kubectl port-forward svc/<mysql-release-name> 3306
  • Export required environment variables
MYSQL_HOST=127.0.0.1
MYSQL_PORT=3306
MYSQL_ROOT_PASSWORD=$(kubectl get secret --namespace default mysql -o jsonpath="{.data.mysql-root-password}" | base64 --decode; echo)
  • Connect to MySQL
mysql -h ${MYSQL_HOST} -P${MYSQL_PORT} -u root -p${MYSQL_ROOT_PASSWORD}
  • Using the MySQL command prompt, create a database and a table
create database test;
use test;
create table`students` (
`name` varchar(50) DEFAULT NULL,
`id` int(11) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (`id`)
);
  • Insert some default data into the table
insert into students (name) values ('Aaren');
insert into students (name) values ('Aarika');
insert into students (name) values ('Abagael');

Configure connect-standalone

  • Connect to the Kafka connect server
kubectl exec -c cp-kafka-connect-server -it <kafka connect pod> -- /bin/bash
  • Optional: Install vi or nano editor and unzip (it is not present by default). Alternatively, files can be copied to the pod after editing in the local system
apt-get update
apt install nano
apt install unzip
  • Edit the standalone properties file
vi /etc/schema-registry/connect-avro-standalone.properties
  • Make change according to the environment
bootstrap.servers=<connect-release-name>-cp-kafka:9092key.converter.schema.registry.url=http://<connect-release-name>-cp-schema-registry:8081value.converter.schema.registry.url=http://<connect-release-name>-cp-schema-registry:8081rest.host.name=localhostrest.port=9083 
(It complains that the original port 8083 is already in use)

Configure JDBC Source Connector

  • Connect to the Kafka connect server (if not already connected)
kubectl exec -c cp-kafka-connect-server -it <kafka connect pod> -- /bin/bash
  • Create JDBC Source Connector properties file:
vi jdbc-connect.properties
  • Add the following properties
name=test-jdbcconnector.class=io.confluent.connect.jdbc.JdbcSourceConnectortasks.max=1connection.url=jdbc:mysql://<mysql-release-name>:3306/test?user=root&password=<mysql-root-password>incrementing.column.name=idmode=incrementingtopic.prefix=test-

Run and Verify JDBC Source Connector

  • Connect to the Kafka connect server (if not already connected)
kubectl exec -c cp-kafka-connect-server -it <kafka connect pod> -- /bin/bash
  • Download and copy the MySQL java connector jar
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.48.zipunzip mysql-connector-java-5.1.48.zip
  • Copy the connector jar to the right location
cp mysql-connector-java-5.1.48/mysql-connector-java-5.1.48-bin.jar /usr/share/java/kafka-connect-jdbc/
  • Execute the standalone connector to load data from MySQL to Kafka using JDBC Connector
connect-standalone /etc/schema-registry/connect-avro-standalone.properties jdbc-connect.properties
  • Configure kafka-avro-console-consumer
vi /usr/bin/kafka-avro-console-consumerDEFAULT_SCHEMA_REGISTRY_URL=” — property schema.registry.url=http://localhost:8081"

Replace localhost with <connect-release-name>-cp-schema-registry

  • Verify that data has been loaded into Kafka topic
kafka-avro-console-consumer --bootstrap-server=<connect-release-name>-cp-kafka:9092 --topic=test-students --from-beginning

The output should look something like this:

{"name":{"string":"Aaren"},"id":1}
{"name":{"string":"Aarika"},"id":2}
{"name":{"string":"Abagael"},"id":3}

This ensures that the jdbc source connector ran successfully and data is loaded into kafka topic “test-students”

If we use the generic kafka-console-consumer, we will still be able to see the output. But it won’t be structured/formatted and also it might contain extra characters. An example:

Adara&
Adda(
Addi*

Configure File Sink Connector

  • Connect to the Kafka connect server (if not already connected)
kubectl exec -c cp-kafka-connect-server -it <kafka connect pod> -- /bin/bash
  • Create a properties file for File Sink connector
vi file-sink-connector.properties
  • Add the following properties to the file
name=test-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/test.sink.txt
topics=test-students

Run and Verify File Sink Connector

  • Connect to the Kafka connect server (if not already connected)
kubectl exec -c cp-kafka-connect-server -it <kafka connect pod> -- /bin/bash
  • Execute the standalone connector to fetch data from Kafka topic to a text file using File SinkConnector
connect-standalone /etc/schema-registry/connect-avro-standalone.properties file-sink-connector.properties

If the process complains about the REST port “Address already in use”, update the “/etc/schema-registry/connect-avro-standalone.properties” file, change rest.port to 9084 and rerun.

  • Verify the data
cat /tmp/test.sink.txt

The output should show the records present in the database table

Struct{name=Aaren,id=1}
Struct{name=Aarika,id=2}
Struct{name=Abagael,id=3}

Verify Live Updates

Connect to the Kafka connect server using three different shells.

kubectl exec -c cp-kafka-connect-server -it <kafka connect pod> -- /bin/bash
  • On first one, run the jdbc connector.
connect-standalone /etc/schema-registry/connect-avro-standalone.properties jdbc-connect.properties
  • On the second one, run the file sink connector.
connect-standalone /etc/schema-registry/connect-avro-standalone.properties file-sink-connector.properties
  • On the third one, tail the file /tmp/test.sink.txt
tail -f /tmp/test.sink.txt

Add new data

  • Open a new terminal and connect to MySQL
mysql -h ${MYSQL_HOST} -P${MYSQL_PORT} -u root -p${MYSQL_ROOT_PASSWORD}
  • Insert a new row in the MySQL database table.
insert into students (name) values ('LiveTest');

Verify results

  • Check the terminal which is tailing on the file /tmp/test.sink.txt.
  • The newly inserted data should be visible.

--

--