How to Build your First Real-Time Streaming(CDC) system(Setup-Part 2)

Rohan Mudaliar
Analytics Vidhya
Published in
7 min readMar 11, 2020

In article 1 of this POC, we looked at the problem statement that we are looking at, the key concepts and the tech stack that we are using to solve our problem. As a quick recap let us look at the same.

Note: If you need a better understanding of the entire problem, please read the first part of the article, it contains detailed information regarding the above-mentioned details.

Problem Statement Recap

We had an e-commerce company where the business team wanted reports which contained some real-time updates to make decisions. Our backend is built in java and it uses microservices. The architecture is as below:

system diagram

The business team wants to build a dashboard using this data and they would use this to make some marketing decisions. All these data are present in 3 different databases used in 3 different systems from where we would need to get our information. We decided on the below tech stack:

Tech Stack:

We decided on the below tech stack to be used for the project:-

  • Mysql 8.0.18
  • Apache Kafka connect
  • Apache Kafka
  • Apache Kafka Streams
  • Elastic search.
  • Docker
  • Swagger UI
  • Postman

Overall Technical Tasks:

In terms of the overall technical tasks we had the below:-

  1. Setting up local infrastructure using docker.
  2. Data Ingestion into Kafka from MySQL database using Kafka connect.
  3. Reading data using Kafka streams in Java backend.
  4. Creating indices on Elasticsearch for the aggregated views.
  5. Listening to the events in real-time and updating the same.
  6. Setting up your local and running the java code

Note: This article assumes that the user already has MySQL, docker and the other basic components installed on their system and covers only the setup for this project. One thing to note is I am using a mac system, the setup might slightly vary for windows. For detailed steps on installing docker and MySQL refer below links:-

1. Setting up local infrastructure using docker.

So for the readers who do not know what docker is, “ Docker is an open-source platform for developing, shipping, and running applications. Docker enables you to separate your applications from your infrastructure so you can deliver software quickly.” For more information please do check

So I am using docker to create the below images in my local:-

  • Elasticsearch 7.5.1
  • Kibana:7.5.1
  • zookeeper:1.0
  • Kafka:1.0
  • Kafka Connect 1.0

I am attaching the docker file used for reference below:

version: "3.5"
services:
# Install ElasticSearch
elasticsearch:
container_name: elasticsearch
image: docker.elastic.co/elasticsearch/elasticsearch:7.5.1
environment:
- discovery.type=single-node
ports:
- 9200:9200
- 9300:9300
kibana:
container_name: kibana
image: docker.elastic.co/kibana/kibana:7.5.1
environment:
- elasticsearch.url= http://localhost:9200
ports:
- 5601:5601

#debezium
zookeeper:
image: debezium/zookeeper:1.0
hostname: zookeeper
container_name: zookeeper
ports:
- 2181:2181
- 2888:2888
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: debezium/kafka:1.0
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- ADVERTISED_HOST_NAME=192.168.0.250
# If we wanted to connect to Kafka from outside of a Docker container, then we’d want Kafka to advertise its address via the Docker host,
# which we could do by adding -e ADVERTISED_HOST_NAME= followed by the IP address or resolvable hostname of the Docker host,
# which on Linux or Docker on Mac this is the IP address of the host computer (not localhost).


schema-registry:
image: confluentinc/cp-schema-registry
ports:
- 8181:8181
- 8081:8081
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
links:
- zookeeper
connect:
image: debezium/connect:1.0
ports:
- 8083:8083
links:
- kafka
- schema-registry
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter

Assuming you have your docker installed in your local, you need to run the below command in terminal.

docker-compose up

This will create the images and bring up the above-mentioned services. To verify the same we can manually check if the services are up:-

Kafka connect-curl -H "Accept:application/json" localhost:8083/

Kibana should be accessible via the below URL.

http://localhost:5601/app/kibana#/management/kibana/index_pattern

You can manually check for the other services by checking if there are active processes running the respective ports (i.e Kafka-9092, zookeeper-2181, elastic search-9200)

2. Data Ingestion into Kafka from MySQL database using Kafka connect.

In step 1 we have created the infrastructure required and we have Kafka connect up. In this step we need to do the following subtasks:-

  • Preparing Mysql for real-time updates and give appropriate permissions to the user to access database

So for real-time updates, it is not enough if we just get the data from the different databases, but we also need to get a snapshot of the different updates, deletes, etc performed on different systems.

So Mysql has something called Binlogs which helps with this and MongoDb has something called oplog.

(https://docs.mongodb.com/manual/core/replica-set-oplog/).

Setting up of MySQL is a DB task and I am attaching a reference of an article I found that would help you with the same:

https://www.anicehumble.com/2016/12/enabling-mysql-binary-logging-on-macos.html

A MySQL user must be defined that has all of the following permissions on all of the databases that the connector will monitor:

Read up more on the configurations for MySQL at the official debezium MySQL documentation(https://debezium.io/documentation/reference/1.0/connectors/mysql.html)

  • Create a MySQL connector using Kafka Connect.

Once you do have Binlogs enabled on your MySQL server, the next step is to create a connector. To explain in simple terms connector creates a connection from the source system and sends the data to read to Kafka sink or reads from a Kafka source and pushes to a sink system.

We are using the Debezium connector(https://www.confluent.io/hub/debezium/debezium-connector-mysql) for our purpose. This would read all the DDL’s and DML’s from binlogs of MySQL and push into Kafka creating Kafka topics.

The simplest way to do this is via the postman. Hit localhost:8083/connectors/ with the below sample request.

{ 
"name":"test-order-connector",
"config":{
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"tasks.max":"1",
"database.hostname":"localhost",
"database.port":"3306",
"database.user":"root",
"database.password":"password",
"database.server.id":"1",
"database.server.name":"test",
"database.whitelist":"wms",
"database.history.kafka.bootstrap.servers":"kafka:9092",
"database.history.kafka.topic":"test.wms"
}
}

In the above JSON, most of the keywords are self-explanatory, one thing to note is you always need to change database.server.id, database.server.name and name every time you create a new connector topic.

So after running the above JSON on postman/curl, run the below command to list the Kafka topics in your local:-

kafka-topics --zookeeper localhost:2181 --list

The newly created topics would be of the type.

<database.server.name>. < databasename >.< tablename >

For our example, our table name would be something like

test.wms.order

when you do run the above command you should see tables of the above format.

Kafka event sample:

If the setup is done the Kafka event would be in the below format, check for before and after tags in the Kafka event. Before is null for inserts and not null for updates.

“payload”: {
“before”: null,
“after”: {
“id”: 2,
“wmsrecord_id”: “1”,
“item_type”: null,
“shipment_cost”: “24.5”,
“created_at”: 1580774645000000,
“courier”: “FETCHR”,
“order_id”: “1”
},
“source”: {
“version”: “1.0.3.Final”,
“connector”: “mysql”,
“name”: “localtest15031242pm”,
“ts_ms”: 0,
“snapshot”: “last”,
“db”: “wms”,
“table”: “logistics_demo”,
“server_id”: 0,
“gtid”: null,
“file”: “mysql-bin-changelog.248436”,
“pos”: 511,
“row”: 0,
“thread”: null,
“query”: null
},
“op”: “c”,
“ts_ms”: 1584342852164
}

Troubleshooting tips:

There are times when the topic is not created in the given format, this would mean the data has not flown from MySQL to Kafka topic. So there could be the below reasons for the this:-

  • MySQL bin logs have been not configured properly. Check the Binlogs in MySQL manually to verify if bin logs are written.
  • Database user does not have required permissions to access bin logs. This is another common problem that I did run into. Check Debezium documentation(https://debezium.io/documentation/reference/1.0/connectors/mysql.html) for particular permissions required.
  • Database.serverid has to be unique every time.

Summary of the tasks:

So below is the summary of what we have done in this article:-

  • Setup the MySQL database for Binlog events.
  • Used Debezium Kafka Connect to create topics in local Kafka.

The next step is to read the events and create indices on the elastic search and build our real-time system. Let’s see how we can do this in article 3.

If you do like this article, please do read the subsequent articles and share your feedback. Find me on LinkedIn at rohan_linkedIn.

--

--