Streaming data changes in MySQL into ElasticSearch using Debezium, Kafka, and Confluent JDBC Sink Connector
How to stream data changes from MySQL into Elasticsearch using Debezium
Fast search capability, realtime integrated data, nowadays is a necessity if we want to build something like e-catalog or e-commerce. We don’t want our users to get angry because they spend a lot of time just to get information from our portal. We also want the product information that inputted by our product team in the different applications immediately ready to search by our users or customers.
Say the product team using MySQL as the main data source. And we will use ElasticSearch as a search engine service in our portal. We need every change in MySQL will immediately affect the ElasticSearch Index. How to achieve that requirement?
In this my first article, I will demonstrate how can we stream our data changes in MySQL into ElasticSearch using Debezium, Kafka, and Confluent JDBC Sink Connector to achieve the above use case requirement.
Solution high-level diagram
Short Brief about the technology
Debezium
Debezium is a distributed platform that turns your existing databases into event streams, so applications can quickly react to each row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely.
Debezium is open source under the Apache License, Version 2.0
Kafka
Apache Kafka is an open-source stream-processing software platform developed by LinkedIn and donated to the Apache Software Foundation, written in Scala and Java. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds.
Confluent JDBC Sink Connector
The Kafka Connect Elasticsearch sink connector allows moving data from Apache Kafka® to Elasticsearch. It writes data from a topic in Apache Kafka® to an index in Elasticsearch and all data for a topic have the same
Let’s start the tutorial
In this tutorial, we will use a separate container for each service and don’t use persistent volume. ZooKeeper and Kafka would typically store their data locally inside the containers, which would require you to mount directories on the host machine as volumes. So in this tutorial, all persisted data is lost when a container is stopped
Step 1 Run Zookeeper
Start zookeeper in the container using debezium/zookeeper image. The container will run with the name zookeeperdbz
> docker run -it — name zookeeperdbz -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.1
check the run log to verify zookeeper run successfully and listening on port 2181
Starting up in standalone modeZooKeeper JMX enabled by defaultUsing config: /zookeeper/conf/zoo.cfg
.
.
.
020-05-13 17:18:28,564 - INFO [main:NIOServerCnxnFactory@686] - binding to port 0.0.0.0/0.0.0.0:2181
Step 2 Run Kafka
Start Kafka in the container using debezium/kafka docker image. The container will run with the name kafkadbz
> docker run -it — name kafkadbz -p 9092:9092 --link zookeeperdbz:zookeeperdbz debezium/kafka
verify Kafka server started
Step 3 Run MySQL
In this demonstration, I will use a pre-configured Docker image that also contains sample data provided by Debezium.
Start MySQL in a container using debezium/example-mysql image. The container will run with name mysqldbz
> docker run -it -d --name mysqldbz -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql
the above command will create a container named mysqldbz.
Next, let’s execute the container and enter the interactive bash shell on the container.
> docker exec -it mysqldbz /bin/bash
To use capture the CDC on MySQL, Debezium needs bin_log configuration in our MySQL enabled. Thanks to Debezium, since we’re using a pre-configured MySQL Docker image, we don’t need to configure it anymore. Let’s check the configuration.
# more /etc/mysql/conf.d/mysql.cnf
As you can see, bin_log enabled, by default it’s disabled.
Check the sample DB
# mysql -u root -p
Enter password: <enter your password>mysql> use inventory
Database changedmysql> show tables;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses |
| customers |
| geom |
| orders |
| products |
| products_on_hand |
+---------------------+6 rows in set (0.00 sec)mysql> select * from customers;+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+4 rows in set (0.00 sec)
Step 4 Start Elastic Search Service
here we will use single-node elastic and elastic version 7.7. The container will run with the name elasticdbz.
> docker run -it --name elasticdbz -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.7.0
Step 5 Start Debezium Kafka Connect service
This service exposes REST API to manage the Debezium MySQL connector. The container will run with the name connectdbz.
> docker run -it --name connectdbz -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeperdbz:zookeeperdbz --link kafkadbz:kafkadbz --link mysqldbz:mysqldbz --link elasticdbz:elasticdbz debezium/connect
don’t forget to link this container with kafkadbz, zookeeperdbz, elasticdbz since this service needs to communicate with kafkadbz, zookeeperdbz, elasticdbz services.
check the status of Debezium Kafka Connect Service using CURL, from the response we will see we’re using version 2.4.0
> curl -H "Accept:application/json" localhost:8083/
{"version":"2.4.0","commit":"77a89fcf8d7fa018","kafka_cluster_id":"XcbUOTN_TNG4hCftkY_j3w"}
Let’s Register MySQL connector to monitor CDC in the inventory DB
> curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysqldbz",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafkadbz:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}'
verify the connector is registered in the list of connectors
> curl -H "Accept:application/json" localhost:8083/connectors/
["inventory-connector"]
now, inventory-connector is registered in the list of connectors
Step 6 Start Kafka Console consumer to watch changes on DB
This step is only for example watch the changes on DB, if you want to consume the topic, you have to write your Kafka Consumer
After deploying Debezium MySQL connector, it starts monitoring inventory
database for data changes events.
To watch the dbserver1.inventory.customers
topic, we will need to start Kafka console consumers. The container will run with the name watcher.
> docker run -it --rm --name watcher --link zookeeperdbz:zookeeperdbz --link kafkadbz:kafkadbz debezium/kafka watch-topic -a -k dbserver1.inventory.customers
after running the watcher, we can see that Debezium starts monitoring the inventory database and put the result as dbserver1.inventory.customers
topic.
"payload":{"before":null,
"after":{"id":1004,
"first_name":"Anne",
"last_name":"Kretchmar",
"email":"annek@noanswer.org"
},
"source":{"version":"1.1.1.Final",
"connector":"mysql",
"name":"dbserver1",
"ts_ms":0,
"snapshot":"true",
"db":"inventory",
"table":"customers",
"server_id":0,
"gtid":null,
"file":"mysql-bin.000003",
"pos":154,
"row":0,
"thread":null,
"query":null
},
"op":"c",
"ts_ms":1589504913171,
"transaction":null}
Let’s compare with table inventory.customers
mysql> select * from customers;+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
It looks like the last event from the Kafka topic matching with the records in the Customers inventory table
Let’s try to update the customer table.
mysql > UPDATE `inventory`.`customers` SET `last_name` = 'Kretchmar Kretchmer' WHERE `id` = 1004;
And here is the result in the watcher
...
"payload":{
"before":{
"id":1004,
"first_name":"Anne",
"last_name":"Kretchmar",
"email":"annek@noanswer.org"
},
"after":{
"id":1004,
"first_name":"Anne",
"last_name":"Kretchmar Kretchmer",
"email":"annek@noanswer.org"
},
...
What we just achieved until this step?
until the step, we have achieved to integrate MySQL-Debezium-Kafka.We will get streamed data from Kafka’s topic when there is new or changed data in MySQL.
What’s Next?
Let’s Start to make integration with ElasticSearch
To make an integration with Elastic Search we need Kafka Connect Elastic Sink Connector installed on our Debezium Kafka connect container.
Step 7 Download Kafka Connect Elastic Sink Connector https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch
Step 8 Extract downloaded zip file
Step 9 Rename lib folder into kafka-connect-jdbc
Step 10 Copy kafka-connect-jdbc into debezium the container of kafka-connect
> docker cp /path-to-file/confluentinc-kafka-connect-elasticsearch-5.5.0/kafka-connect-jdbc/* connectdbz:/kafka/connect/
Step 11 Verify that all dependency is copied
> docker exec -it connectdbz /bin/bash
$ cd connect/kafka-connect-jdbc/
$ ls -all
Step 12 Restart Debezium Kafka Connect container
we need to restart Kafka connect service to make Kafka connect can detect newly installed connector plugin
> docker stop connectdbz
> docker start connectdbz
Step 13 Register ElasticsearchSinkConnector
> curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
{
"name": "elastic-sink",
"config": {
"connector.class":
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "dbserver1.inventory.customers",
"connection.url": "http://elasticdbz:9200",
"transforms": "unwrap,key",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "id",
"key.ignore": "false",
"type.name": "customer"
}
}'
verify ElasticsearchSinkConnector connector is registered in the list of connectors
> curl -H "Accept:application/json" localhost:8083/connectors/
["elastic-sink","inventory-connector"]
Step 14 Check MySQL ElasticSearch Synchronization
Let’s check if the databases and the search server are synchronized.
> curl ‘http://localhost:9200/dbserver1.inventory.customers/_search?pretty'{
"took" : 12,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 4,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "dbserver1.inventory.customers",
"_type" : "customer",
"_id" : "1001",
"_score" : 1.0,
"_source" : {
"id" : 1001,
"first_name" : "Sally",
"last_name" : "Thomas",
"email" : "sally.thomas@acme.com"
}
},
{
"_index" : "dbserver1.inventory.customers",
"_type" : "customer",
"_id" : "1004",
"_score" : 1.0,
"_source" : {
"id" : 1004,
"first_name" : "Anne",
"last_name" : "Kretchmar Kretchme",
"email" : "annek@noanswer.org"
}
},
{
"_index" : "dbserver1.inventory.customers",
"_type" : "customer",
"_id" : "1002",
"_score" : 1.0,
"_source" : {
"id" : 1002,
"first_name" : "George",
"last_name" : "Bailey",
"email" : "gbailey@foobar.com"
}
},
{
"_index" : "dbserver1.inventory.customers",
"_type" : "customer",
"_id" : "1003",
"_score" : 1.0,
"_source" : {
"id" : 1003,
"first_name" : "Edward",
"last_name" : "Walker",
"email" : "ed@walker.com"
}
}
]
}
}
as we can see, now all data in MySQL is synchronized. All the data in MySQL can be found in the above elastic index.
Let’s insert new data into the Customers table and see what happens in the elastic index.
mysql> insert into customers values(default, 'Rizqi', 'Nugrohon', 'rizqi.nugroho@example.com');
Query OK, 1 row affected (0.05 sec)mysql> select * from customers;
+------+------------+--------------------+------------------------+| id | first_name | last_name | email |+------+------------+--------------------+-----------------------+|| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar Kretchme | annek@noanswer.org |
| 1005 | Rizqi | Nugrohon | rizqi.nugroho@example.com |+------+------------+--------------------+---------------------------+
Check elastic index
> curl ‘http://localhost:9200/dbserver1.inventory.customers/_search?pretty'{
"took" : 1476,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 5,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "dbserver1.inventory.customers",
"_type" : "customer",
"_id" : "1001",
"_score" : 1.0,
"_source" : {
"id" : 1001,
"first_name" : "Sally",
"last_name" : "Thomas",
"email" : "sally.thomas@acme.com"
}
},
...
{
"_index" : "dbserver1.inventory.customers",
"_type" : "customer",
"_id" : "1005",
"_score" : 1.0,
"_source" : {
"id" : 1005,
"first_name" : "Rizqi",
"last_name" : "Nugrohon",
"email" : "rizqi.nugroho@example.com"
}
}
]
}
}
Viola New data with first_name Rizqi is inserted
How about UPDATE statement
mysql> UPDATE `inventory`.`customers` SET `last_name` = 'Adhi Nugroho' WHERE `id` = 1005;
Query OK, 1 row affected (0.05 sec)mysql> select * from customers;
+------+------------+--------------------+------------------------+| id | first_name | last_name | email |+------+------------+--------------------+-----------------------+|| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar Kretchme | annek@noanswer.org |
| 1005 | Rizqi | Adhi Nugroho | rizqi.nugroho@example.com |+------+------------+--------------------+---------------------------+
Again, check the elastic index
> curl ‘http://localhost:9200/dbserver1.inventory.customers/_search?pretty'...
{
"_index" : "dbserver1.inventory.customers",
"_type" : "customer",
"_id" : "1005",
"_score" : 1.0,
"_source" : {
"id" : 1005,
"first_name" : "Rizqi",
"last_name" : "Adhi Nugroho",
"email" : "rizqi.nugroho@example.com"
}
}
]
}
}
Mama mia, data updated !!!
Summary
Finally, we made integration between MySQL and ElasticSearch using Debezium. I hope this demonstration can help you to solve the data latency problem between MySQL DB and ElasticSearch. Now all changed in MySQL DB will immediately be affected in Elastic Index. You can try to use another DB such as PostgreSQL, Oracle, DB2, MSSQL, etc.
Reference :
https://medium.com/@erfin.feluzy/tutorial-streaming-cdc-mysql-ke-kafka-dengan-debezium-3a1ec9150cf8
https://debezium.io/documentation/reference/1.1/tutorial.html
https://docs.confluent.io/current/connect/kafka-connect-elasticsearch/index.html