Syncing SQL Server database using Kafka (Part 2/3)
Running Kafka Connect
Continuing from previous part where we install Kafka using Confluent
On this part we will run our Kafka Connect to automatically catch changes on SQL Server
Test Run Kafka
First we’ll try our Kafka using it’s native Consumer and Producer
Let’s start by running our Kafka, this can be done using confluent CLI
$ confluent start
Your terminal should show something like this
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
This notifies that the command actually run 5 different services one by one, each with it’s own uses
Starting Producer
Confluent distribution will put the Kafka executable on /bin/ directory
A Kafka Producer will create a message to be queued in Kafka
$ /bin/kafka-console-producer --broker-list localhost:9092 --topic newtopic
Kafka is a subscribe based message queue, it is pull based, this means that to get a message you have to subscribe to a topic. In this particular example we assign a new topic called ‘newtopic’ to be subscribed later
After that you will be able to type some message, let’s give it some message
>asdf
>hello world
>
Starting Consumer
Kafka Consumer will subscribe to the topic and getting a message from there
$ /bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic newtopic --from-beginning
The ‘from-beginning’ arguments is there so the consumer will read the message from the very beginning. If the arguments is left out then the consumer will only read message that is passed to Kafka from the moment it subscribe to the topic
The above command should show something like this on your terminal
asdf
hello world
If we add a message to the producer, the consumer will also update the message that it read
Kafka Connect
Kafka Connector integrates another system into Kafka, for this particular case we want to connect a SQL Server table and then create a topic for the table
Kafka Connect has two properties, a source and a sink. The source will read from the database table and produce a message to Kafka based on the table row, while the sink will consume message from Kafka and put it on another system
Connect Source
Let’s load a source first, the file for Kafka properties (that specifically using JDBC) is usually located at /etc/kafka-connect-jdbc/
$ confluent load testsource -d /etc/kafka-connect-jdbc/source-sqlserver-user.properties
The terminal should show something like this
{
"name": "testsource",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": " jdbc:sqlserver://localhost:1433;database=test;username=admin;password=adminpassword;",
"mode": "timestamp",
"timestamp.column.name": "LastUpdate",
"topic.prefix": " my-timestamp-",
"table.whitelist": "user",
"validate.non.null": "false",
"name": "source-sqlserver-user"
},
"tasks": []
}
Run this command to see the log
$ confluent log connect
To check if the message got to Kafka or not, run a Consumer to the topic
$ /bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic my-timestamp-user --from-beginning
Connect Sink
Now let’s try running a sink, in this particular example we try to sink it into a PostgreSQL database
$ confluent load testsink -d /etc/kafka-connect-jdbc/sink-postgres-user.properties
The terminal should show something like this
{
"name": "testsink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "my-timestamp-user",
"connection.url": "jdbc:postgresql://localhost:5432/test?user=admin&password=adminpassword",
"insert.mode": "insert",
"table.name.format": "user",
"pk.mode": "record_value",
"pk.fields": "userid",
"name": "sink-postgres-user"
},
"tasks": []
}
If everything is going as expected, then the PostgreSQL database should be synced with the SQLServer database
Any insertion or update from SQL Server database will be synced to the PostgreSQL database
Sources:
https://docs.confluent.io/current/connect/quickstart.html