Making Kafka & RabbitMQ Integration easier with Spring Cloud Stream : Part 1

Integrating messaging systems can be a pain.

There are so many things that have to be taken into account when systems of this kind need to be integrated with other systems.

Two of the main factors of system integration in my opinion are the conversion of messages between the system producing the message & system consuming it i.e a customer model to a canonical model and how the two systems actually communicate with one another.

We could have a system that consumes an XML message and produces a message in JSON. We could have one that consumes AVRO and produces XML the list goes on.

I’ve been pondering in my head for a while now what would be the most pain free way and quickest way of consuming an Avro formatted message from a Kafka Topic, converting it into JSON and sending it to a RabbitMQ queue.

Interest in Apache Kafka is growing rapidly. More and more companies are utilising Kafka and the ecosystem it provides. Because of this interest, integration between Kafka and legacy systems usually happens at some point as companies move onto a newer system and off a legacy one.

A useful part of the Kafka ecosystem is Kafka Connect.

Kafka Connect is a framework for connecting databases, key-value stores and external systems to Kafka. It provides source connectors that have the ability to stream database changes to Kafka Topics. It also provides sink connectors that have the ability to stream records from Kafka Topics and store them in ElasticSearch, HDFS, S3, RabbitMQ and many more.

Example, we have a MySQL database and every time we add a row we want to send it to ElasticSearch. We would create a configuration JSON file that contains fields such as the MySQL database URL, Table Name and the name of the Topic where the data needs to be sent. Kafka Connect exposes a HTTP endpoint that this configuration can be posted to for it to be created. We would create another configuration file for this data to be retrieved from the Topic and inserted into ElasticSearch.

The only issue with this solution is that we are providing a Source and Sink for our data but we cant perform the transformation we need.

Another way is using Kafka Streams.

Kafka Streams is a client library for building applications and microservices where the input and output data are stored in Kafka Topics. So we could have an application that is streaming near-real time data into a Kafka Topic, our Kafka Streams application would consume messages, transform them and send them back to Kafka in a new Topic or straight to RabbitMQ. The one and only problem with this solution is that it involves a bit of leg work to get it up and running.

Spring Cloud Stream can help us out here.


Spring Cloud Stream is a framework that makes building event & message driven microservices a walk in the park. It’s super simple to get up & running and have events & messages flowing between the two systems.

Spring Cloud Stream is made up of three parts. Destination Binders, Destination Bindings and Messages.

The Destination Binders provide the necessary configuration and implementation to facilitate integration with external messaging systems. So there would be a Destination Binder for Kafka, RabbitMQ, GCP PubSub etc.

Destination Bindings bridge between the external messaging systems and application provided Producers and Consumers of messages. Think of the Destination Binders as the means of our application integrating with the messaging system and the Destination Bindings as the means of our application utilising the Destination Binder by being able to send and receive records to the messaging system.

Last but not least are Messages. Messages contain data relevant to a domain which are used by producers and consumers to communicate with Destination Binders.

If you’re reading this article I presume you understand the basics of Spring, Kotlin, RabbitMQ and Kafka. But, this article is pretty straight forward so people with basic knowledge will be just fine.


The first part of this article is going to focus on getting records out of Kafka. I use the Landoop Fast Data Dev docker image when I run Kafka locally. It comes with the full Kafka suite and is up and running in seconds. Here’s the Docker Compose file.

Let’s talk about the environment variables.

SAMPLEDATA: 1 creates a handful of Topics with some realistic data inside, RUNNING_SAMPLEDATA: 1 ensures that when we consume from these Topics there will be a steady stream of records returned and RUNTESTS: 0disables the integration tests from running for a quicker start up.

If everything is running as it should be you will see an output similar to the below.

kafka-cluster_1  | 2018-11-24 11:44:00,795 INFO success: broker entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1 | 2018-11-24 11:44:00,795 INFO success: connect-distributed entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
kafka-cluster_1 | 2018-11-24 11:44:00,797 INFO success: logs-to-kafka entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)

To make sure Kafka is up and running we can consume some records from one of the test Topics. I’d recommend downloading KafkaCat to carry this out. Running the below command returns the metadata associated with the specified broker.

kafkacat -L -b localhost:9092

The part we’re interested in is a small paragraph with the Topic name “backblaze_smart” and some partition information.

topic “backblaze_smart” with 2 partitions:
partition 0, leader 0, replicas: 0, isrs: 0
partition 1, leader 0, replicas: 0, isrs: 0

If you’re seeing this everything has worked as it should have so far, give yourself a pat on the back…give yourself a bigger pat on the back if you see a steady stream of records when running the below command.

kafkacat -b localhost:9092 -t backblaze_smart

The Topic named backblaze_smart contains log entries, which I think are from a hard drive array or something of that sort. Either way, it contains log entries which are strings and perfect for our solution.

So now that we have Kafka up and running and a Topic ready to stream records from we need a project to create a Consumer from. The best place to head to is https://start.spring.io/. Using Spring Initializer we can have a project up and running in no time. We need two dependencies, Cloud Stream and Kafka. Kafka is the Destination Binder that we are using in this part but we’ll be using a couple of the others in the future parts.

Now that we have a project to work with we need to write a Consumer. So the Consumer in this example is written in Kotlin and in future parts it will be written in Kotlin but it can be transposed to Java easily enough.

Right, so this block of code is enough to stream records from a Topic. The only other file that we need (at the current moment) is a configuration file with the name of the Topic that we want to consume from inside. So lets walk through it.

The first annotation to talk about is @EnableBinding.

This annotation is responsible for providing connectivity to the Destination Binder in the project, which in our case is Kafka. It also takes one or more Interfaces as an argument. Spring Cloud provides 3 interfaces for us, they are Sink, Source and Processor. If these Interfaces aren’t sufficient for your needs you can roll your own.

As we’re using the Sink Interface here that’s the one I’m going to be going into in a bit more detail about. I’ll cover the others in future parts.

If we have a look inside the Sink Interface it will make more sense how things work.

public interface Sink {

String INPUT = "input";

@Input(Sink.INPUT)
SubscribableChannel input();

}

The @Input annotation identifies an SubscribableChannel, this is how the records enter our application.

The next annotation in our BasicKafkaSink file is @StreamListener.

We add this to a function that we want to receive events from a middleware for processing. It takes an argument which is the value of the binding target, which in our case is Sink.INPUT that has a value of input. This constant refers to the middleware where our stream of records are coming from. We’ll use the value of this constant later to set our Topic value. But, if for some reason we swapped out Kafka for RabbitMQ, the only changes we would have to make are configuration ones, which is pretty neat! The function itself is extremely basic at the moment but it takes the log as a String and just prints it to the stdout.

Lastly is the configuration file.

There’s no mistake in that gist!

All we have to do is specify the Topic that we want to stream the records from. Spring is very opinionated, so a handful of default values are set behind the scenes.

As this project grows we’ll see more values enter the configuration file.

Take a look at the property key, spring.cloud.stream.bindings.input.destination, see how the property key contains input?

The value of the constant in the interface…the value of the @Input annotation on the SubscribableChannel…lightbulb moment?!?!

So say for example we rolled our own Interface and we set the value of the INPUT constant to “kafka”, we would refer to “kafka” in our configuration file.

spring.cloud.stream.bindings.kafka.destination=backblaze_smart

And thats how it’s all wired up. This is what makes Spring Cloud Stream so cool!

With the help of some configuration and a couple of annotations we have a tiny service which is hooked up to some middleware, listening to a Topic and streaming records from it.

If everything has worked correctly, you should see some output like this, and lots of it.

{“date”:”2017–01–01",”serial_number”:”S300Z4N3",”model”:”ST4000DM000",”capacity_bytes”:4000787030016,”failure”:0,”smart_1_normalized”:112,”smart_1_raw”:47623472,”smart_2_normalized”:null,”smart_2_raw”:null,”smart_3_normalized”:98,”smart_3_raw”:0,”smart_4_normalized”:100,”smart_4_raw”:2,”smart_5_normalized”:100,”smart_5_raw”:0,”smart_7_normalized”:87,”smart_7_raw”:489605003,”smart_8_normalized”:null,”smart_8_raw”:null,”smart_9_normalized”:86,”smart_9_raw”:12802,”smart_10_normalized”:100,”smart_10_raw”:0,”smart_11_normalized”:null,”smart_11_raw”:null,”smart_12_normalized”:100,”smart_12_raw”:2,”smart_13_normalized”:null,”smart_13_raw”:null,”smart_15_normalized”:null,”smart_15_raw”:null,”smart_22_normalized”:null,”smart_22_raw”:null,”smart_183_normalized”:100,”smart_183_raw”:0,”smart_184_normalized”:100,”smart_184_raw”:0,”smart_187_normalized”:100,”smart_187_raw”:0,”smart_188_normalized”:100,”smart_188_raw”:0,”smart_189_normalized”:100,”smart_189_raw”:0,”smart_190_normalized”:74,”smart_190_raw”:26,”smart_191_normalized”:100,”smart_191_raw”:0,”smart_192_normalized”:100,”smart_192_raw”:0,”smart_193_normalized”:77,”smart_193_raw”:47475,”smart_194_normalized”:26,”smart_194_raw”:26,”smart_195_normalized”:null,”smart_195_raw”:null,”smart_196_normalized”:null,”smart_196_raw”:null,”smart_197_normalized”:100,”smart_197_raw”:0,”smart_198_normalized”:100,”smart_198_raw”:0,”smart_199_normalized”:200,”smart_199_raw”:0,”smart_200_normalized”:null,”smart_200_raw”:null,”smart_201_normalized”:null,”smart_201_raw”:null,”smart_220_normalized”:null,”smart_220_raw”:null,”smart_222_normalized”:null,”smart_222_raw”:null,”smart_223_normalized”:null,”smart_223_raw”:null,”smart_224_normalized”:null,”smart_224_raw”:null,”smart_225_normalized”:null,”smart_225_raw”:null,”smart_226_normalized”:null,”smart_226_raw”:null,”smart_240_normalized”:100,”smart_240_raw”:12457,”smart_241_normalized”:100,”smart_241_raw”:37459596632,”smart_242_normalized”:100,”smart_242_raw”:13999128574,”smart_250_normalized”:null,”smart_250_raw”:null,”smart_251_normalized”:null,”smart_251_raw”:null,”smart_252_normalized”:null,”smart_252_raw”:null,”smart_254_normalized”:null,”smart_254_raw”:null,”smart_255_normalized”:null,”smart_255_raw”:null}

I’ve ran this code on a MacBook Pro and a Mac Mini Server and it worked with no issues so if it doesn't for you drop me a comment or a tweet and i’ll get back to you.

This is the first part of many in this Spring Cloud Stream series and I’m extremely excited for whats coming up.

If you want so see the project for this part its on Github.

Many thanks as always for reading my articles, it’s really appreciated. Any thoughts, comments or questions drop me a tweet.

Cheers 👍🏻

Danny

https://twitter.com/danieljameskay