Kafka Connect Cluster: An Introduction

Seyed Morteza Mousavi
clay-one
Published in
10 min readDec 19, 2018

Apache Kafka is a streaming platform that allows developers to process streams of data easily and in real time. These data come from a variety of sources. In the simplest form, you have a producer application that generates a message and publishes it to Kafka cluster and in other hands, you have another application that consumes this message. This is typical for some scenarios, but sometimes you need to process streams of data that is not in your Kafka cluster. These data may be located in a SQL database like SQL Server, MySQL or a simple CSV file. In order to process those data, you have to move them from your database to the Kafka cluster. To this end, you have some options and two of them are:

  1. Create an application that reads data from your source storage system and produces them to Kafka cluster.
  2. Or use Kafka Connect to move your data easily from source storage system to your Kafka cluster

If you choose the first option you need to write codes that move your data to the Kafka cluster. Your code must deal with the failure of your application (for example it must store the offset of the last record of tables that are moved to Kafka, so it can continue to copy the records that were not inserted into Kafka), scalability, polling and….

But if you choose the second option you can move data without writing any code. In fact, the Kafka Connect does the same job as the first option but in a scalable and fault-tolerant way. The process of copying data from a storage system and move it to Kafka Cluster is so common that Kafka Connect tool is created to address this problem.

Notice that transferring direction can be vice versa. That means you can move data from Kafka cluster to database system using Kafka Connect. In this article, we focus on copying data from external storage to Kafka. I share my understanding of some of the concepts of Kafka Connect cluster and after that, I show you how I created a Kafka Connect cluster.

Kafka Connect Concepts

Kafka Connect uses workers for moving data. Workers are just simple Linux (or any other OS) processes. Kafka Connect can create a cluster of workers to make the copying data process scalable and fault tolerant. Workers need to store some information about their status, their progress in reading data from external storage and so on. To store those data, they use Kafka as their storage. Note that Kafka Connect cluster (which is a cluster of workers) is completely different from the Kafka cluster (which is a cluster of Kafka brokers).

More workers mean that your copying process is more fault tolerant. Consider the scenario that you want to move data from your SQL Server database to Kafka. You can have a cluster of single worker (single process) and your data will be copied until when your worker process or machine restarted (for example for maintenance reason) or crashed. Your data will not be transferred unless the server becomes alive. But if you have 2 workers, when one of the workers stopped, your data copying process will be continued by the second worker. This way you achieve high availability.

Here you can see that 4 workers inside 3 physical machine have made a Kafka Connect cluster of 4 workers that each one of them is copying data from a database in parallel.

In the above image, if we have 12 tables inside the SQL Server database, each worker might read from 3 tables of the database. When one worker becomes unavailable other 3 workers take the responsibility of the unavailable worker:

So probably 3 live workers now must do worker 1 responsibility so that each worker now should read 1 additional table (each one of them now reads from 4 tables).

If you just need to run your Kafka Connect in a single worker you can use standalone mode of the worker that store its data on disk. Actually, there is 2 type of workers and what we have discussed until now was distributed mode of worker that allows you to create a cluster of workers. I don’t cover standalone mode today.

Workers are a physical concept. They are actually processes that run inside JVM. Your job in Kafka Connect concepts is called a connector. It is something like this:

  • Copy records from table ‘accounts’ of my MySQL to Kafka topic ‘accounts’. It is called a source connector because you move data from external storage to Kafka.
  • Copy each message from Kafka topic ‘product-events’ to a CSV file ‘myfile.csv’. It is called sink connector because you move data from Kafka to external storage.

It does not define which worker is responsible for doing the job. Workers can cooperate with each other to do this job. Each connector has a set of one or more tasks that these tasks can copy data in parallel. Having multiple workers allow you to distribute these tasks in several machines.

Because there are many storage systems, Kafka Connect must know somehow, how to copy data from one source to Kafka or vice versa. For this purpose when you want to submit a connector to Kafka Connect cluster, you must specify a connector plugin. Connector plugin typically is one or more jar file that knows how to copy data from a specific storage system to Kafka or vice versa. For example, when you want to load data from SQL database like (SQL server, MySQL and …) you need a JDBC connector plugin.

JDBC connector is included by default in Confluent installation package but if you need a connector that is not installed by default you first need to install Confluent Hub client and using this client you can install any connector that is available inside Confluent Hub. In the next section, I show you how you can view a list of already installed connector plugins. If you didn’t find your connector plugin inside Confluent Hub, you can implement your own connector plugin.

To run worker you must provide a configuration file (which is a .properties file) like this:

connect-distributed worker.properties

Here worker.properties contains a list of worker configurations. We will describe some of the main configurations in the next section. But for now, you must know that you can set plugins directory path in plugin.path configuration. Confluent ships with sample worker configuration file that you can look at them to find your installed plugin path. For example looking at /etc/kafka/connect-distributed.properties sample:

Shows that my plugin path is /usr/share/java.

Now we are ready to create a Kafka Connect cluster.

Setup Machines

First, we need to prepare our machines. I have 3 Virtual Machine named:

  • Kafka: The Kafka standalone broker that I want to move my data and also keep states of my Connector cluster instance and has 192.168.80.30:9092 URL. An instance of Zookeeper is also running in this machine.
  • Worker1: First worker instance
  • Worker2: Second worker instance

All of them installed in Ubuntu. And my SQL Server is located in 192.168.80.1 in my Windows 10.

Configure Workers

In both Worker1 and Worker2, I did the following steps. Create a directory named worker in the home directory:

Now I want to create the worker configuration file. connect-distributed.properties is good sample file for worker configuration. Copy etc/kafka/connect-distributed.properties to ~/worker/myworker.properties:

And open it in your editor. I change the following properties:

  1. Change bootstrap.servers=localhost:9092 to your Kafka servers in my case it is bootstrap.servers=192.168.80.30:9092
  2. Change group.id=connect-cluster to group.id=my-example-connect-cluster. Not that workers with the same group.id is joined to the same cluster. So you have to ensure that all your workers have the same group.id value.

Now you need to place your JDBC driver inside JDBC plugin directory. In my case, because I want to move data from SQL Server, I downloaded SQL Server JDBC jar file from Maven repository.

After downloading JDBC driver you must move it to JDBC connector plugin directory. For example, because my JRE version is 10 I downloaded mssql-jdbc-7.0.0.jre10.jar file and copied it to the plugin folder.

So copy SQL Server JDBC driver to JDBC plugin folder:

This is contents of my JDBC connector plugin directory (/usr/share/java/kafka-connect-jdbc) now:

If you don’t place this driver file inside plugin directory you will receive: SQLException No suitable driver found for jdbc sql server error. Note that JDBC driver for PostgreSQL and SQLite databases already exists in the directory. For other databases like MySQL, you have to copy their drivers. Also, the driver must be installed in all workers, otherwise you might receive another error when you are trying to submit your connector to Kafka Connect cluster:

Now you are ready to join your workers to the Kafka Connect cluster by running following command (In both Worker1 and Worker2):

Some logs appear on the screen and in the last line you will see:

This indicates that your workers are working successfully.

Move Your Data From SQL Server to Kafka Cluster

Now you have Kafka Connect cluster. You can view your worker information by running following command (in each worker) that call worker REST API:

You can also view the list of supported plugins using this REST API:

Now let’s create a simple database and table in SQL Server that we want to copy it’s data to Kafka. I create a database called ConnectSampleDb which has a single table ConnectSampleTable as shown below.

And fill ConnectSampleTable like the picture

Now I create a file named sample-sql-server-jdbc-connector.json inside worker folder of Woker1 with the following content:

This file contains the body of the HTTP request that we want to send to Kafka Connect REST API to create a source connector job. You must run the following command to create a connector (you see the JSON response in the line after that):

The above request and response show that our connector is submitted to Kafka Connect cluster successfully. You can also view the list of connectors using following curl command:

To get status of a specific connector you can run the following command:

This indicates that your connector is working. You can use status API to monitor your connector. You can call all REST API from all workers (Worker1, Worker2).

So let’s view the list of topics:

As you see test-sql-server-jdbc-ConnectSampleTable is the topic that contains data from ConnectSampleTable. We can view moved data using Kafka console consumer:

As you can see the result contains both schema and payload. If you want just payload you can change myworker.properties before running worker:

The result then should be like this:

I added another record with name ‘Harry’ to ConnectSampleTable and the result added to the topic successfully:

Let’s continue and create another table inside ConnectSampleDb database. I create a table named ConnectSecondTable:

Insert one item into the table:

View topics list:

As you see test-sql-server-jdbc-ConnectSecondTable topic is created for new table. Viewing new topic shows it successfully copied our ConnectSecondTable records.

As you saw the JDBC connector plugin by default watch any changes in the database including newly create tables. If you want to move data from one or a small number of tables you can use table.whitelist configuration in sample-sql-server-jdbc-connector.json file before submitting your connector as described with other configurations in the official document. Note that you can update your connector configuration by the REST API.

To test that our Kafka Connect cluster is fault tolerant, I killed Worker 1 process and insert another record inside ConnectSecondTable:

And viewing topic data shows that new record is inserted:

It is interesting that you can also view data of connect-offsets topic:

But to get better information you need to print key (separated by ‘-’):

As you see there is much more information in keys:

  • Name of connector
  • Name of database and table.

This way Kafka Connect can manage different connector offsets.

Conclusion

In this article, we introduced the Kafka Connect Cluster and some of its concepts like workers, connectors and connector plugins. We learned how to configure a set of workers that can coordinate connector tasks inside the cluster. We noticed that by default JDBC connector watches any changes in the database and move them to Kafka. I hope this article might be helpful for you. If you feel that you can add more information to this article or some of the information is not correct, please feel free to comment below.

--

--