Replicate your relational data into Kafka

Josh Hicks
Data Weekly by Jumpmind
2 min readApr 4, 2018

Kafka has proven to be a reliable and popular tool to stream your data to a variety of locations. It provides a simple setup and configuration and can be scaled out as needed.

The Problem

How to send a steady stream of data changes as from a relational database or multiple relational databases into your Kafka pipeline.

The Solution

Capture data changes in your source databases with a tool such as SymmetricDS. SymmetricDS can then stream changes in near real-time to your Kafka queues over a variety of topics.

Setup Change Capture Against The Source Database

Download SymmetricDS and install. Then setup a master node that will connect to your source databases. The quick config wizard will walk you through an interface to allow you to select which tables you would like to replicate to Kafka.

Setup A Second Node To Receive Changes

Once you have your source database configured you can setup (add) a second node to your replication scenario. For this node I usually choose an H2 database since its lightweight and requires no additional installation. This H2 database will just be used to hold the SymmetricDS runtime tables so that replication can perform properly.

Setup Load Filter to Publish Changes to Kafka

SymmetricDS publishes changes to Kafka using a load filter extension. Extensions are intersection points in the SymmetricDS process that allow users to override default behaviors.

In this instance we would like to override the process of writing to a target database (default behavior) and instead connect and send changes to a Kafka topic.

Follow this documentation to setup the Kafka load filter extension.

Note: Lines 124,127, and 129. These allow you configure the extension to your environment.

  • Line 124 to change your Kafka URL
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  • Line 127 to change your producer id (optional)
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "symmetricds-producer");
  • Line 131 to change your Kafka topic
String topic = "test";

Or set it to your SymmetricDS channel to allow different tables publishing to different topics

String topic = context.getBatch().getChannelId();

Verify

Your all set to make some changes to your source database and verify that they are publishing to your Kafka queues.

--

--