Syncing SQL Server database using Kafka (Part 2/3)

Running Kafka Connect

Adrian Edbert Luman
3 min readFeb 21, 2018

Continuing from previous part where we install Kafka using Confluent

On this part we will run our Kafka Connect to automatically catch changes on SQL Server

Test Run Kafka

First we’ll try our Kafka using it’s native Consumer and Producer

Let’s start by running our Kafka, this can be done using confluent CLI

$ confluent start

Your terminal should show something like this

Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]

This notifies that the command actually run 5 different services one by one, each with it’s own uses

Starting Producer

Confluent distribution will put the Kafka executable on /bin/ directory

A Kafka Producer will create a message to be queued in Kafka

$ /bin/kafka-console-producer --broker-list localhost:9092 --topic newtopic

Kafka is a subscribe based message queue, it is pull based, this means that to get a message you have to subscribe to a topic. In this particular example we assign a new topic called ‘newtopic’ to be subscribed later

After that you will be able to type some message, let’s give it some message

>asdf
>hello world
>

Starting Consumer

Kafka Consumer will subscribe to the topic and getting a message from there

$ /bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic newtopic --from-beginning

The ‘from-beginning’ arguments is there so the consumer will read the message from the very beginning. If the arguments is left out then the consumer will only read message that is passed to Kafka from the moment it subscribe to the topic

The above command should show something like this on your terminal

asdf
hello world

If we add a message to the producer, the consumer will also update the message that it read

Kafka Connect

Kafka Connector integrates another system into Kafka, for this particular case we want to connect a SQL Server table and then create a topic for the table

Kafka Connect has two properties, a source and a sink. The source will read from the database table and produce a message to Kafka based on the table row, while the sink will consume message from Kafka and put it on another system

Connect Source

Let’s load a source first, the file for Kafka properties (that specifically using JDBC) is usually located at /etc/kafka-connect-jdbc/

$ confluent load testsource -d /etc/kafka-connect-jdbc/source-sqlserver-user.properties

The terminal should show something like this

{
"name": "testsource",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": " jdbc:sqlserver://localhost:1433;database=test;username=admin;password=adminpassword;",
"mode": "timestamp",
"timestamp.column.name": "LastUpdate",
"topic.prefix": " my-timestamp-",
"table.whitelist": "user",
"validate.non.null": "false",
"name": "source-sqlserver-user"
},
"tasks": []
}

Run this command to see the log

$ confluent log connect

To check if the message got to Kafka or not, run a Consumer to the topic

$ /bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic my-timestamp-user --from-beginning

Connect Sink

Now let’s try running a sink, in this particular example we try to sink it into a PostgreSQL database

$ confluent load testsink -d /etc/kafka-connect-jdbc/sink-postgres-user.properties

The terminal should show something like this

{
"name": "testsink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "my-timestamp-user",
"connection.url": "jdbc:postgresql://localhost:5432/test?user=admin&password=adminpassword",
"insert.mode": "insert",
"table.name.format": "user",
"pk.mode": "record_value",
"pk.fields": "userid",
"name": "sink-postgres-user"
},
"tasks": []
}

If everything is going as expected, then the PostgreSQL database should be synced with the SQLServer database

Any insertion or update from SQL Server database will be synced to the PostgreSQL database

Sources:
https://docs.confluent.io/current/connect/quickstart.html

--

--