Using RabbitMQ and Kafka to send messages between applications

Ed Halliwell
8 min readOct 20, 2023

--

How to get started with RabbitMQ and Kafka to set up simple message queues between separate applications

Image credit: Gavin Allanwood via Unsplash

Introduction

I’ve been keen to have a play with message queues for a while as they are not something I’ve used before. Queues can bring lots of benefits when it comes to decoupling applications whilst retaining reliability and enabling scalability — so let’s dive in!

RabbitMQ vs. Kafka

Kafka and RabbitMQ are both message queue systems and I wanted to get some experience with both as I’m not sure which one I’ll need to use (maybe both!). There is plenty written about how to choose which technology to use (see this piece by Eran Stiller), and this useful article on AWS explains how they’re both quite similar and quite different:

“RabbitMQ is a distributed message broker that collects streaming data from multiple sources to route it to different destinations for processing. Whereas Apache Kafka is a streaming platform for building real-time data pipelines and streaming applications. Kafka provides a highly scalable, fault-tolerant, and durable messaging system with more capabilities than RabbitMQ.”

Architecturally, they’re obviously very different, but in essence both “RabbitMQ and Apache Kafka allow producers to send messages to consumers.”

Project Plan

My plan was to create two applications, then connect them using queues (rather than have one app directly call the other), so I made:

  • A producer app that would publish events or messages to a queue
  • A consumer app that would read from the queue and do ‘something’ (probably just store the message somewhere)
  • To connect the apps, I setup a cloud-based RabbitMQ queue and then a docker-based Kafka Topic.

These projects (GitHub: events-consumer and events-producer) are written in typescript and use prettier, eslint and nodemon for development ease.

Creating the producer

I used express to create a simple endpoint which I could use to send messages to the queue. I wanted to trigger the messages myself (to mimic an event in the UI) so I created a basic form to render on the home page:

<!DOCTYPE html>
<html>
<head>
<title>Event Testing</title>
</head>
<body>
<h1>Event Testing!</h1>
<form action="/" method="post" id="form1">
<label for="message">Message:</label>
<input type="text" id="message" name="message">
<input type="submit" value="OK" />
</form>
</body>
</html>

Submitting this form calls the root domain with a POST call with the message data:

app.post("/", (req: Request, res: Response) => {
console.log("Firing event... ");

// get the message from the form
const { message } = req.body;

return res.redirect("/");
});

Getting started with (CloudAMQP) RabbitMQ

I wanted to create a decoupled producer and consumer to more closely mimic a real world setup, so I used CloudAMQP (Advanced Message Queuing Protocol) to host my message queue.

I signed up at cloudamqp.com and created a development instance (using the free ‘Little Lemur’, plan), a queue, and then utilised the RabbitMQ Getting Started Documentation (Javascript) to write the producer and consumer code.

On the producer side, I created this class:

import amqplib, { Channel } from "amqplib";

export class MyRabbitMqProducer {
private queueName: string;

private queueUrl: string;

private queue!: Channel;

constructor(queueName: string, queueUrl: string) {
this.queueName = queueName;
this.queueUrl = queueUrl;
}

async connect() {
console.log("BEGINNING OF CONNECT FUNCTION");
const connection = await amqplib.connect(
this.queueUrl ?? "amqp://localhost"
);
console.log("CONNECTION: ", connection);

const channel = await connection.createChannel();
channel.assertQueue(this.queueName);

this.queue = channel;
}

async sendMessage(msg: string): Promise<void> {
await this.queue.sendToQueue(this.queueName, Buffer.from(msg));
}
}

I used a connect() function to get around some async / await issues I was having with the setup described in the docs. The class is instantiated in the index.ts file and called inside the POST call:

const queueName = process.env.QUEUE_NAME ?? "";
const queueUrl = process.env.QUEUE_URL ?? "";

// RABBIT MQ
const messageQueue = new MyRabbitMqProducer(queueName, queueUrl);

async function connectToRabbitQueue() {
await messageQueue.connect();
console.log("RabbitMQ connected successfully");
}
connectToRabbitQueue();

interface Log {
sentAt: Date;
message: string;
}

...

app.post("/", (req: Request, res: Response) => {
const { message } = req.body;
const log: Log = { sentAt: new Date(Date.now()), message };
console.log("Firing event... ", log);
messageQueue.sendMessage(JSON.stringify(log));
return res.redirect("/");
});

I added a timestamp to the message object to keep track of exactly when the message was created / sent from the producer (I’ll use this later to track how long the messages are in the queue before reaching the consumer).

Creating the consumer

Similar to the producer app, the consumer also uses express but this time, it’s enough to have one GET endpoint to display the messages. The connection to RabbitMQ ‘listens’ for new messages automatically but we’ll need to refresh the root page to see any new messages that have been consumed (though we can see them arrive in real time in the console).

I created this class to interact with my queue in CloudAMQP:

import amqplib, { Channel } from "amqplib";

export class MyRabbitMqConsumer {
private queueName: string;

private queueUrl: string;

constructor(queueName: string, queueUrl: string) {
this.queueName = queueName;
this.queueUrl = queueUrl;
}

async createRabbitConnection(): Promise<Channel> {
const connection = await amqplib.connect(this.queueUrl);

const channel = await connection.createChannel();

channel.assertQueue(this.queueName, { durable: true });

return channel;
}

async consumeMessages(channel: Channel) {
await channel.consume(
this.queueName,
(message) => {
if (message) {
console.log(` [/] Message successfully received from RabbitMQ`);
console.log(JSON.parse(message.content.toString()));
}
},
{ noAck: true }
);
}
}

Linking the producer and consumer with RabbitMQ

The above class is instantiated in the main index.ts file of the consumer before setting up the express route:

// RABBITMQ CONSUMER
async function connectToRabbit() {
const rabbit = new MyRabbitMqConsumer(queueName, queueUrl);
const channel = await rabbit.createRabbitConnection();
await rabbit.consumeMessages(channel);
}
connectToRabbit();

...

app.get("/", async (req: Request, res: Response) => {
res.send("...you can render something here");
});

I added some decoration to the logs (which I’ve not shown in the code samples here but it is in the GitHub repos) to timestamp when they were created in the producer and then when they arrived in the consumer as I was interested to see how long it would take for the message to do the round trip to the queue and then onto the consumer.

With both apps running locally, I can fill out and submit the form in the producer. The message body (along with a createdAt timestamp) is then sent to the RabbitMQ queue. This queue holds the message until the consumer application ‘consumes’ the message. It then adds a receivedAt timestamp and runs the calculation to determine the time taken to make the journey (delayInSeconds) which provides the following output:

[
{
"source": "rabbit",
"receivedAt": "2023-10-19T12:55:00.257Z",
"sentAt": "2023-10-19T12:54:11.198Z",
"delayInSeconds": 49,
"message": "hello"
}
]

There is far more I could look into on RabbitMQ but this was as far as I’d planned to go to get my feet a little wet. So, onto Kafka…

Adding Kafka to the producer

There isn’t a cloud based Kafka offering (that I’m aware of) like CloudAMQP so I had to look into running Kafka in a docker container on my machine. This meant ‘dockerising’ both the producer and consumer, so I added a Dockerfile and a docker-compose.yaml to both apps.

  • On the consumer side, docker just runs the consumer app as a service and attaches it to an events network (to allow the apps to connect to Kafka).
  • On the producer side, it sets up the producer app as a service and attaches it to the same events network. It then adds kafka, zookeeper and akhq services and links them all together on the same network.

We use Apache Zookeeper to coordinate distributed processes and services and AKHQ to visual the data in the Kafka ‘Topic’ (equivalent to ‘queues’ in RabbitMQ).

I added the following class for the Kafka producer, using the kafkajs library and their Getting Started documentation:

import { Kafka, Partitioners, Producer } from "kafkajs";

export class MyKafkaProducer {
private clientId: string;

private topicName: string;

private producer: Producer;

constructor(clientId: string, topicName: string) {
this.clientId = clientId;
this.topicName = topicName;
this.producer = this.#createProducer();
}

async connect(): Promise<void> {
try {
await this.producer.connect();
} catch (error) {
console.log("Error connecting the producer: ", error);
}
}

async sendMessage(msg: string): Promise<void> {
await this.producer.send({
topic: this.topicName,
messages: [{ value: msg }],
});
}

async shutdown(): Promise<void> {
await this.producer.disconnect();
}

#createProducer(): Producer {
const kafka = new Kafka({
clientId: this.clientId,
brokers: ["kafka:9092"],
});

return kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
});
}
}

This class is then instantiated in the index.ts file (just after setting up the RabbitMQ connection but before the express routes):

// KAFKA
const kafka = new MyKafkaProducer(kafkaClientId, kafkaTopicName);

async function connectToKafka() {
await kafka.connect();
console.log("Kafka connected successfully", {
kafkaClientId,
kafkaTopicName,
});
}
connectToKafka();

...

app.post("/", (req: Request, res: Response) => {
const { message } = req.body;

const log: Log = { sentAt: new Date(Date.now()), message };

console.log("Firing event... ", log);
messageQueue.sendMessage(JSON.stringify(log)); // RABBIT MQ
kafka.sendMessage(JSON.stringify(log)); // KAFKA

return res.redirect("/");
});

Adding Kafka to the consumer

As before, I made the following class in the consumer to manage Kafka:

import {
Consumer,
ConsumerSubscribeTopics,
EachMessagePayload,
Kafka,
} from "kafkajs";

export class MyKafkaConsumer {
private kafkaConsumer: Consumer;

private clientId: string;

private topicName: string;

public constructor(clientId: string, topicName: string) {
this.clientId = clientId;
this.topicName = topicName;
this.kafkaConsumer = this.#createKafkaConsumer();
}

public async startConsumer(): Promise<void> {
const topic: ConsumerSubscribeTopics = {
topics: [this.topicName],
fromBeginning: false,
};

try {
await this.kafkaConsumer.connect();
await this.kafkaConsumer.subscribe(topic);

await this.kafkaConsumer.run({
eachMessage: async (messagePayload: EachMessagePayload) => {
const { message } = messagePayload;
console.log(` [/] Message successfully received from Kafka`);
console.log(JSON.parse(message?.value?.toString()));
},
});
} catch (error) {
console.log(" [X] Error: ", error);
}
}

public async shutdown(): Promise<void> {
await this.kafkaConsumer.disconnect();
}

#createKafkaConsumer(): Consumer {
const kafka = new Kafka({
clientId: this.clientId,
brokers: ["kafka:9092"],
});
const consumer = kafka.consumer({ groupId: "consumer-group" });
return consumer;
}
}

And again, adding the connection setup to the index.ts file:

// KAFKA CONSUMER
async function connectToKafka() {
const kafka = new MyKafkaConsumer(kafkaClientId, kafkaTopicName);
await kafka.startConsumer();
}
connectToKafka();

Bringing it all together

Running docker-compose up -d in both consumer and producer apps, I can then visit my producer (running on localhost:4000) and submit the form in the UI. The same message is then sent to both the RabbitMQ queue on CloudAMQP and to the Kafka Topic I have running in a docker container.

Then in the consumer app I can see (either in the console or by refreshing localhost:4001) the following output:

[
{
"source": "kafka",
"receivedAt": "2023-10-19T12:54:11.259Z",
"sentAt": "2023-10-19T12:54:11.198Z",
"delayInSeconds": 0,
"message": "hello"
},
{
"source": "rabbit",
"receivedAt": "2023-10-19T12:55:00.257Z",
"sentAt": "2023-10-19T12:54:11.198Z",
"delayInSeconds": 49,
"message": "hello"
}
]

It’s interesting to see that the delay for Kafka is barely any time at all (as it’s all only running locally) but the round trip to CloudAMQP takes a little while longer.

I can also view the Kafka topic on AKHQ at localhost:8080:

Wrapping up

There are loads more features and configuration I could have explored in both systems — exchanges, dead letter queues, retries, retention policies and so much more — and I hope to go back and visit these in future.

For this project, I enjoyed getting some (simple) exposure to RabbitMQ and Kafka and I learned a lot about how they both work, as well as getting some useful practise Docker-ising my apps.

--

--