How to Integrate Kafka Connect With Mysql Server on Command Line Interface Over Multi-Node Multi-Broker Architecture

Suraj Saha
The Startup
Published in
7 min readAug 20, 2020
Photo by Luke Chesser on Unsplash

Before we start our progress one must look at the installation of Kafka into the system. Similar to the installation of Kafka blog we will be using Ubuntu 18.04 for the execution of our steps. We will follow step wise starting from creation of Multi-Node Multi-broker architecture over Docker.

1. Installation of Docker

First we need to install docker on our system. It is advisable to use ‘sudo’ command in every step as it provides us with administrator privileedges. One must be familiar with the password of the system admin in order to use ‘sudo’.

$ sudo apt-get update$ sudo apt-get install docker.io docker-compose

2. Creation of “.yml” file

Now one can create a directory naming “Docker_Kafka”

$ mkdir Docker_Kafka$ cd Docker_Kafka

Now we need to write down a yml file naming “docker-compose.yml” containg following code :

version: ‘2’services:zookeeper-1:image: confluentinc/cp-zookeeper:latesthostname: zookeeper-1ports:- “12181:12181”environment:ZOOKEEPER_SERVER_ID: 1ZOOKEEPER_CLIENT_PORT: 12181ZOOKEEPER_TICK_TIME: 2000ZOOKEEPER_INIT_LIMIT: 5ZOOKEEPER_SYNC_LIMIT: 2ZOOKEEPER_SERVERS:Zookeeper1:12888:13888;zookeeper2:22888:23888;zookeeper-3:32888:33888zookeeper-2:image: confluentinc/cp-zookeeper:latesthostname: zookeeper-2ports:- “22181:22181”environment:ZOOKEEPER_SERVER_ID: 2ZOOKEEPER_CLIENT_PORT: 22181ZOOKEEPER_TICK_TIME: 2000ZOOKEEPER_INIT_LIMIT: 5ZOOKEEPER_SYNC_LIMIT: 2ZOOKEEPER_SERVERS:Zookeeper1:12888:13888;zookeeper2:22888:23888;zookeeper-3:32888:33888zookeeper-3:image: confluentinc/cp-zookeeper:latesthostname: zookeeper-3ports:- “32181:32181”environment:ZOOKEEPER_SERVER_ID: 3ZOOKEEPER_CLIENT_PORT: 32181ZOOKEEPER_TICK_TIME: 2000ZOOKEEPER_INIT_LIMIT: 5ZOOKEEPER_SYNC_LIMIT: 2ZOOKEEPER_SERVERS:Zookeeper1:12888:13888;zookeeper2:22888:23888;zookeeper-3:32888:33888kafka-1:image: confluentinc/cp-kafka:latesthostname: kafka-1ports:- “19092:19092”depends_on:- zookeeper-1- zookeeper-2- zookeeper-3environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:19092kafka-2:image: confluentinc/cp-kafka:latesthostname: kafka-2ports:- “29092:29092”depends_on:- zookeeper-1- zookeeper-2- zookeeper-3environment:KAFKA_BROKER_ID: 2KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092kafka-3:image: confluentinc/cp-kafka:latesthostname: kafka-3ports:- “39092:39092”depends_on:- zookeeper-1-zookeeper-2- zookeeper-3environment:KAFKA_BROKER_ID: 3KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:12181,zookeeper-2:12181,zookeeper-3:12181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:39092

Basically we are creating three zookeeper nodes on port number 12181, 22181 and 32181. Over it we are creating three kafka broker on port numbers 19092, 29092 and 39092. On execution of the above .yml file as shown below Kafka Nodes will select one Leader Broker among the provided brokers. Other brokers will act as the followers. Whenever a producer-consumer process takes place, it is the work of the leader where to store and how much crunch of data needed to be stored at each location. Based on the replication factor and number of partition provided by the developer the leader acts acordingly. If for some reason the leader broker crashes, one of the follower broker will be elected as the leader and takes over the responsibility from where the previous leader fails. We need to add kafka-1, kafka-2 and kafka-3 broker hosts to the client /etc/hosts file. Use the following command:

$ sudo gedit /etc/hosts

The host file look something like shown below. Bold lines are the modification one have to make in the file. First few lines are shown below:

127.0.0.1 localhost192.168.1.231 kafka-1 kafka-2 kafka-3 (if running on different machine)or127.0.0.0.1 kafka-1 kafka-2 kafka-3 (if running on same machine)# The following lines are desirables for Ipv6 capable hosts::1 ip6-localhost ip6-loopback

Now open a terminal having home path as the path of the directory “Docker_Kafka” and run:

$ sudo docker-compose up

It will take some time to load the data. Make sure that internet is on as it might need to download some of the files.

3. Create a Database to act as a DataSource

Let’s create a table employee for the purpose of making our own Data Source. We will make it as simple as possible. After logining into the MySql server, let create a database name ‘exp’.

> CREATE DATABASE exp

Now create a table inside the ‘exp’ database. For learning purpose lets make the schema of the table as simple as possible. We need to make a column as AUTO_INCREMENT for any change in the database, Kafka Connector will publish data from the datasource to the Kafka Consumers observing this column only. In our example, let us make ‘eid’ column as both primary key and auto_increment column.

> CREATE TABLE employee(
eid int(11) NOT NULL AUTO_INCREMENT,
ename varchar(20) DEFAULT NULL,
esal int(8) DEFAULT NULL,
edep varchar(20) DEFAULT NULL,
PRIMARY KEY (`eid`)
)

4. Creation of .properties files for the connection to Data Source

In order to setup the architecture one has to first set up the Multi-broker achitecture using Docker-compose in second step (2). Apart from this we need to set up two more properties files. The first property file is the “mysql.properties” and second is “woker.properties”. The naming of this property files need not to be the same. The ‘worker.properties’ files contains the configuration set up and constraints that are required by the Kafka connector in order to understand the architecturre over which it is running and over which broker it needs to publish the data and its format. The other ‘mysql.properties files contains various information and variable over the data source used to fetch the data.

Worker.properties:

common-worker-configs
bootstrap.servers=127.0.0.1:29092, 127.0.0.1:39092, 127.0.0.1:19092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverterinternal.value.converter.schemas.enable=false
# this config is only for standalone workers
offset.storage.file.filename=standalone.offsets
offset.flush.interval.ms=10000

In worker.properrties files, we set various aspect of variables required by the connector before connecting to the broker. Such as ‘bootstrap.servers’ store the port number and the ip address of all the brokers running on the architecture. The data will be published in the key-value pair. Thus, we can mention in which format we want our data to consume. Mostly, the systems uses JSON format, apart from it we can even use AVRO format for the consumption of the data. The ‘standalone.offsets’ file contains all the data and their offset value to which a consumer has consumed the data.

mysql.properties

name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/bank?useSSL=false
connection.password=xander21
connection.user=XANDER
mode=incrementing
incrementing.column.name=eid
topic.prefix=Topic123-

Name of the user, database and password are the adjustment one has to make before connecting to the database. Username and password may vary from person to person. Incrementing coloumn name is the column that is strictly incrementing column in the table of one database to use to detect new rows. This column must not be nullable. If one does not have a column with these properties, one have to update one of the column with following SQL command:

ALTER TABLE <table_name> MODIFY COLUMN <coloumn_name> INT auto_incrementing ALTER TABLE <table_name> ADD PRIMARY KEY (<coloumn_name>)

topic.prefix: Prefix to append to table names to generate the name of the Kafka topic to publish data to, or in the case of a custom query, the full name of the topic to publsih to. Example: If topic.prefix=test- and if the name of the table is employee, then the topic name to which connector publishes the messages would be test-employee.

5. Download MySQL Connector

Now we have to download MySQL Connector for Java. It is required by the connector in order to connect to the MySQL database. We con download the .jar files from here. Now we need to copy the MySQL Connector jar files to existing Kafka Connect JDBC Jars. In Ubuntu system we can find it in /usr/share/java/kafka-connect-jdbc.

6. Generate a topic inside Kafka Broker

Lets generate a topic using Kafka Topic API under any one of the Kafka Broker with multiple replication factor and partitions. If there are n number of broker and n number of replicas then no two replicas will be stored under same broker. If the leader broker fails to provide support, then one of the follower broker with its own replica of data becomes the next leader and the respective replica is called primary replica and other replica becomes the secondary replicas. The secondary replicas update themselves with respect to the primary replica. This leader-follower architecture is what makes the Kafka fail-safe protected and robust in nature.

We are using the name of the topic as Test123-employee because ‘Test123-’ is the mentioned prefix by the us in one of the properties files. Thus which data from which datasource connector will be determined by the group of topics having the same prefix. Under same group of topics if certain consumer wants to display data only from certain table then for that reason ‘employee’ has been added in the postfix. The postfix of the topic name determines from which particular table the consumer is consuming the data or in other words kafka broker understands to which consumer process it must sends the data while subscribing to a particular topic by the respective consumer.

$usr/bin>kafka-topics --create --bootstrap-server localhost:39092 --replication-factor 3 --partitions 3 --topic Test123-employee

7. Execution of the Kafka Connect API

Execution begins from this part. Start the zookeeper and kafka broker list using docker. Set the path of the docker compose file into the terminal first and then run the command below. Let both the properties and the docker-compose file be in a folder name kafka so the path is

$cd /kafka$kafka> sudo docker-compose up

Then in another terminal connect the connector i.e, standalone-connector file with properties file. Make the /usr/bin file in the path

$ cd /usr/bin$usr/bin> sudo standalone-connect /kafka/worker.properties /kafka/worker.properties

The above command will take some to work. After the connection is established start a consumer process under any one of the zookeeper server with the topic “Topic123-employee” and then try to insert the data in employee table as stated in the mysql properties file under “exp” database name.

$usr/bin> kafka-consumer --broker-list 127.0.0.1:19092 --topic Topic123-employee --from-beginning

Now insert the data into the database and observe the consumer process on the terminal. In order to insert the data into the database we can write:

> INSERT INTO employee( ename, esal, edep) values (‘Barney’, 32596, “Big Data Developer”)

We haven’t inserted into the eid or ‘employee-id’ column as the value of the column will be auto inserted with each new insertion in an incrementing manner. The above mentioned process has been displayed in the video below.

Intergration of Kafka Connector with MySQL Database

Hopefully the above blog helps you and keep learning data architecture.

--

--