Streaming data changes in MySQL into ElasticSearch using Debezium, Kafka, and Confluent JDBC Sink Connector

How to stream data changes from MySQL into Elasticsearch using Debezium

Rizqi Nugroho
DANA Product & Tech
10 min readSep 4, 2020

--

Fast search capability, realtime integrated data, nowadays is a necessity if we want to build something like e-catalog or e-commerce. We don’t want our users to get angry because they spend a lot of time just to get information from our portal. We also want the product information that inputted by our product team in the different applications immediately ready to search by our users or customers.

Say the product team using MySQL as the main data source. And we will use ElasticSearch as a search engine service in our portal. We need every change in MySQL will immediately affect the ElasticSearch Index. How to achieve that requirement?

In this my first article, I will demonstrate how can we stream our data changes in MySQL into ElasticSearch using Debezium, Kafka, and Confluent JDBC Sink Connector to achieve the above use case requirement.

Solution high-level diagram

Solution HLD diagram

Short Brief about the technology

Debezium

Debezium is a distributed platform that turns your existing databases into event streams, so applications can quickly react to each row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely.

Debezium is open source under the Apache License, Version 2.0

Kafka

Apache Kafka is an open-source stream-processing software platform developed by LinkedIn and donated to the Apache Software Foundation, written in Scala and Java. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds.

Confluent JDBC Sink Connector

The Kafka Connect Elasticsearch sink connector allows moving data from Apache Kafka® to Elasticsearch. It writes data from a topic in Apache Kafka® to an index in Elasticsearch and all data for a topic have the same

Let’s start the tutorial

In this tutorial, we will use a separate container for each service and don’t use persistent volume. ZooKeeper and Kafka would typically store their data locally inside the containers, which would require you to mount directories on the host machine as volumes. So in this tutorial, all persisted data is lost when a container is stopped

Step 1 Run Zookeeper

Start zookeeper in the container using debezium/zookeeper image. The container will run with the name zookeeperdbz

> docker run -it — name zookeeperdbz -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.1

check the run log to verify zookeeper run successfully and listening on port 2181

Starting up in standalone modeZooKeeper JMX enabled by defaultUsing config: /zookeeper/conf/zoo.cfg
.
.
.
020-05-13 17:18:28,564 - INFO [main:NIOServerCnxnFactory@686] - binding to port 0.0.0.0/0.0.0.0:2181

Step 2 Run Kafka

Start Kafka in the container using debezium/kafka docker image. The container will run with the name kafkadbz

> docker run -it — name kafkadbz -p 9092:9092 --link zookeeperdbz:zookeeperdbz debezium/kafka

verify Kafka server started

Kafka started

Step 3 Run MySQL

In this demonstration, I will use a pre-configured Docker image that also contains sample data provided by Debezium.

Start MySQL in a container using debezium/example-mysql image. The container will run with name mysqldbz

> docker run -it -d --name mysqldbz -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql

the above command will create a container named mysqldbz.

Next, let’s execute the container and enter the interactive bash shell on the container.

> docker exec -it mysqldbz /bin/bash

To use capture the CDC on MySQL, Debezium needs bin_log configuration in our MySQL enabled. Thanks to Debezium, since we’re using a pre-configured MySQL Docker image, we don’t need to configure it anymore. Let’s check the configuration.

# more /etc/mysql/conf.d/mysql.cnf
MySQL bin_log enabled

As you can see, bin_log enabled, by default it’s disabled.

Check the sample DB

# mysql -u root -p 
Enter password: <enter your password>
mysql> use inventory
Database changed
mysql> show tables;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses |
| customers |
| geom |
| orders |
| products |
| products_on_hand |
+---------------------+
6 rows in set (0.00 sec)mysql> select * from customers;+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)

Step 4 Start Elastic Search Service

here we will use single-node elastic and elastic version 7.7. The container will run with the name elasticdbz.

> docker run -it --name elasticdbz -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.7.0

Step 5 Start Debezium Kafka Connect service

This service exposes REST API to manage the Debezium MySQL connector. The container will run with the name connectdbz.

> docker run -it --name connectdbz -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeperdbz:zookeeperdbz --link kafkadbz:kafkadbz --link mysqldbz:mysqldbz --link elasticdbz:elasticdbz debezium/connect

don’t forget to link this container with kafkadbz, zookeeperdbz, elasticdbz since this service needs to communicate with kafkadbz, zookeeperdbz, elasticdbz services.

check the status of Debezium Kafka Connect Service using CURL, from the response we will see we’re using version 2.4.0

> curl -H "Accept:application/json" localhost:8083/
{"version":"2.4.0","commit":"77a89fcf8d7fa018","kafka_cluster_id":"XcbUOTN_TNG4hCftkY_j3w"}

Let’s Register MySQL connector to monitor CDC in the inventory DB

> curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysqldbz",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafkadbz:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}'

verify the connector is registered in the list of connectors

> curl -H "Accept:application/json" localhost:8083/connectors/
["inventory-connector"]

now, inventory-connector is registered in the list of connectors

Step 6 Start Kafka Console consumer to watch changes on DB

This step is only for example watch the changes on DB, if you want to consume the topic, you have to write your Kafka Consumer

After deploying Debezium MySQL connector, it starts monitoring inventory database for data changes events.

To watch the dbserver1.inventory.customers topic, we will need to start Kafka console consumers. The container will run with the name watcher.

> docker run -it --rm --name watcher --link zookeeperdbz:zookeeperdbz --link kafkadbz:kafkadbz debezium/kafka watch-topic -a -k dbserver1.inventory.customers

after running the watcher, we can see that Debezium starts monitoring the inventory database and put the result as dbserver1.inventory.customers topic.

Debezium start monitor
"payload":{"before":null,
"after":{
"id":1004,
"first_name":"Anne",
"last_name":"Kretchmar",
"email":"annek@noanswer.org"
},
"source":{
"version":"1.1.1.Final",
"connector":"mysql",
"name":"dbserver1",
"ts_ms":0,
"snapshot":"true",
"db":"inventory",
"table":"customers",
"server_id":0,
"gtid":null,
"file":"mysql-bin.000003",
"pos":154,
"row":0,
"thread":null,
"query":null
},
"op":"c",
"ts_ms":1589504913171,
"transaction":null
}

Let’s compare with table inventory.customers

mysql> select * from customers;+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+

It looks like the last event from the Kafka topic matching with the records in the Customers inventory table

Let’s try to update the customer table.

mysql > UPDATE `inventory`.`customers` SET `last_name` = 'Kretchmar Kretchmer' WHERE `id` = 1004;

And here is the result in the watcher

...
"payload":{
"before":{
"id":1004,
"first_name":"Anne",
"last_name":"Kretchmar",
"email":"annek@noanswer.org"
},
"after":{
"id":1004,
"first_name":"Anne",
"last_name":"Kretchmar Kretchmer",
"email":"annek@noanswer.org"
},
...

What we just achieved until this step?

until the step, we have achieved to integrate MySQL-Debezium-Kafka.We will get streamed data from Kafka’s topic when there is new or changed data in MySQL.

What’s Next?

Let’s Start to make integration with ElasticSearch

To make an integration with Elastic Search we need Kafka Connect Elastic Sink Connector installed on our Debezium Kafka connect container.

Step 7 Download Kafka Connect Elastic Sink Connector https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch

Step 8 Extract downloaded zip file

Extracted zip file

Step 9 Rename lib folder into kafka-connect-jdbc

lib folder after renamed into kafka-connect-jdbc

Step 10 Copy kafka-connect-jdbc into debezium the container of kafka-connect

> docker cp /path-to-file/confluentinc-kafka-connect-elasticsearch-5.5.0/kafka-connect-jdbc/* connectdbz:/kafka/connect/

Step 11 Verify that all dependency is copied

> docker exec -it connectdbz /bin/bash
$ cd connect/kafka-connect-jdbc/
$ ls -all
ls -all result

Step 12 Restart Debezium Kafka Connect container

we need to restart Kafka connect service to make Kafka connect can detect newly installed connector plugin

> docker stop connectdbz
> docker start connectdbz

Step 13 Register ElasticsearchSinkConnector

> curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
{
"name": "elastic-sink",
"config": {
"connector.class":
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "dbserver1.inventory.customers",
"connection.url": "http://elasticdbz:9200",
"transforms": "unwrap,key",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "id",
"key.ignore": "false",
"type.name": "customer"
}
}'

verify ElasticsearchSinkConnector connector is registered in the list of connectors

> curl -H "Accept:application/json" localhost:8083/connectors/
["elastic-sink","inventory-connector"]

Step 14 Check MySQL ElasticSearch Synchronization

Let’s check if the databases and the search server are synchronized.

> curl ‘http://localhost:9200/dbserver1.inventory.customers/_search?pretty'{
"took" : 12,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 4,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "dbserver1.inventory.customers",
"_type" : "customer",
"_id" : "1001",
"_score" : 1.0,
"_source" : {
"id" : 1001,
"first_name" : "Sally",
"last_name" : "Thomas",
"email" : "sally.thomas@acme.com"
}
},
{
"_index" : "dbserver1.inventory.customers",
"_type" : "customer",
"_id" : "1004",
"_score" : 1.0,
"_source" : {
"id" : 1004,
"first_name" : "Anne",
"last_name" : "Kretchmar Kretchme",
"email" : "annek@noanswer.org"
}
},
{
"_index" : "dbserver1.inventory.customers",
"_type" : "customer",
"_id" : "1002",
"_score" : 1.0,
"_source" : {
"id" : 1002,
"first_name" : "George",
"last_name" : "Bailey",
"email" : "gbailey@foobar.com"
}
},
{
"_index" : "dbserver1.inventory.customers",
"_type" : "customer",
"_id" : "1003",
"_score" : 1.0,
"_source" : {
"id" : 1003,
"first_name" : "Edward",
"last_name" : "Walker",
"email" : "ed@walker.com"
}
}
]
}
}

as we can see, now all data in MySQL is synchronized. All the data in MySQL can be found in the above elastic index.

Let’s insert new data into the Customers table and see what happens in the elastic index.

mysql> insert into customers values(default, 'Rizqi', 'Nugrohon', 'rizqi.nugroho@example.com');
Query OK, 1 row affected (0.05 sec)
mysql> select * from customers;
+------+------------+--------------------+------------------------+| id | first_name | last_name | email |+------+------------+--------------------+-----------------------+|
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar Kretchme | annek@noanswer.org |
| 1005 | Rizqi | Nugrohon | rizqi.nugroho@example.com |+------+------------+--------------------+---------------------------+

Check elastic index

> curl ‘http://localhost:9200/dbserver1.inventory.customers/_search?pretty'{
"took" : 1476,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 5,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "dbserver1.inventory.customers",
"_type" : "customer",
"_id" : "1001",
"_score" : 1.0,
"_source" : {
"id" : 1001,
"first_name" : "Sally",
"last_name" : "Thomas",
"email" : "sally.thomas@acme.com"
}
},
...
{
"_index" : "dbserver1.inventory.customers",
"_type" : "customer",
"_id" : "1005",
"_score" : 1.0,
"_source" : {
"id" : 1005,
"first_name" : "Rizqi",
"last_name" : "Nugrohon",
"email" : "rizqi.nugroho@example.com"
}
}
]
}
}

Viola New data with first_name Rizqi is inserted

How about UPDATE statement

mysql> UPDATE `inventory`.`customers` SET `last_name` = 'Adhi Nugroho' WHERE `id` = 1005;
Query OK, 1 row affected (0.05 sec)
mysql> select * from customers;
+------+------------+--------------------+------------------------+| id | first_name | last_name | email |+------+------------+--------------------+-----------------------+|
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar Kretchme | annek@noanswer.org |
| 1005 | Rizqi | Adhi Nugroho | rizqi.nugroho@example.com |+------+------------+--------------------+---------------------------+

Again, check the elastic index

> curl ‘http://localhost:9200/dbserver1.inventory.customers/_search?pretty'...
{
"_index" : "dbserver1.inventory.customers",
"_type" : "customer",
"_id" : "1005",
"_score" : 1.0,
"_source" : {
"id" : 1005,
"first_name" : "Rizqi",
"last_name" : "Adhi Nugroho",
"email" : "rizqi.nugroho@example.com"
}
}
]
}
}

Mama mia, data updated !!!

Summary

Finally, we made integration between MySQL and ElasticSearch using Debezium. I hope this demonstration can help you to solve the data latency problem between MySQL DB and ElasticSearch. Now all changed in MySQL DB will immediately be affected in Elastic Index. You can try to use another DB such as PostgreSQL, Oracle, DB2, MSSQL, etc.

Reference :

https://medium.com/@erfin.feluzy/tutorial-streaming-cdc-mysql-ke-kafka-dengan-debezium-3a1ec9150cf8

https://debezium.io/documentation/reference/1.1/tutorial.html

https://docs.confluent.io/current/connect/kafka-connect-elasticsearch/index.html

--

--