Integrating RabbitMQ with Node.js: Building Real-time Applications

Raheel Butt
9 min readMay 7, 2024

--

node-rabbitMQ-docker

In today’s fast-paced world, time is everything. Real-time data streaming has become incredibly important for modern applications. Just imagine being able to process large volumes of data instantly, gaining valuable insights in the blink of an eye! Thanks to powerful tools like RabbitMQ and frameworks like Node.js, developers can now build robust applications that handle real-time data seamlessly.

RabbitMQ allows applications to send and receive real-time data feeds, while Node.js offers exceptional performance and scalability, making them a perfect pair. The possibilities are truly limitless, opening doors for innovation like never before. Get ready for an exciting journey into the world of real-time data streaming with RabbitMQ and Node.js!

By the end of this tutorial, you’ll have a solid understanding of how to harness the power of RabbitMQ and Node.js to create real-time data streaming applications that cater to the needs of modern users. Let’s dive in!

Prerequisites: Before we dive into the exciting world of real-time data streaming with RabbitMQ and Node.js, let’s make sure you have everything set up on your environment.

Project Setup:

To start, we have to initialize a node project.

npm init --yes

where — — yes will set the default template values.

Now, we have to install dependencies to implement the RabbitMQ.

npm i amqplib

and yes, that’s the only dependency we need to run on servers.

In the package.json file, we would write one script.

"scripts": {
"start": "node index.js"
}

After creating the project, design this directory structure in src folder:

directory structure

Constants.js — src/constant.js

const YOUR_USERNAME='REPLACE_WITH_YOUR_USERNAME'
const YOUR_PASSWORD='REPLACE_WITH_YOUR_PASSWORD'

export const CONNECTION_STRING = `amqp://${YOUR_USERNAME}:${YOUR_PASSWORD}@localhost:5672/`;

// Define queue properties and configurations
export const QUEUE_OPTIONS = {
durable: true, // Make the queue durable
exclusive: false, // Not exclusive
autoDelete: false, // Don't auto-delete the queue
arguments: {
'x-message-ttl': 30000, // Message TTL of 30 seconds
'x-max-length': 1000, // Maximum queue length of 1000 messages
},
};

export const ACKNOWLEGMENT = {noAck: false};

Config.js — src/config.js

// Established connection to the RabbitMQ

import amqp from 'amqplib';
import {CONNECTION_STRING} from "./constants.js";

const connectToRabbitMQ = async () => {
try {
const connection = await amqp.connect(CONNECTION_STRING);
const channel = await connection.createChannel();

return {connection, channel}
} catch (error) {
console.error('Error connecting to RabbitMQ:', error);
}
}

export default connectToRabbitMQ

index.js — src/modules/create_queue/index.js

// Create a queue with a specified name

import connectToRabbitMQ from '../config.js';
import {QUEUE_OPTIONS} from "../constants.js";

const createQueue = async (queueName) => {
try {
const {connection, channel} = await connectToRabbitMQ();

await channel.assertQueue(queueName, QUEUE_OPTIONS);
console.log('Queue created successfully.');

return {connection,channel}
} catch (error) {
console.error('Error creating queue:', error);
}
}

export default createQueue

index.js — src/modules/creating_exchange/index.js

import {QUEUE_OPTIONS} from "../../constants.js";

const declareExchange = async (channel, exchangeType, exchangeName) => {
try {
await channel.assertExchange(exchangeName, exchangeType, QUEUE_OPTIONS);
console.log(`${exchangeName} exchange declared....`);
} catch (error) {
console.error(`Error declaring ${exchangeName} exchange:`, error);
}
}

export default declareExchange

index.js — src/modules/bind_queue_with_exchange/index.js

import {QUEUE_OPTIONS} from "../../constants.js";

const bindQueueWithExchange = async (channel, queueName, exchangeName,queueOptions=QUEUE_OPTIONS) => {
try {
await channel.assertQueue(queueName, queueOptions);
await channel.bindQueue(queueName, exchangeName, '');
console.log('Queue declared and bound to exchange:', queueName);
} catch (error) {
console.error('Error declaring and binding queue:', error);
}
}

export default bindQueueWithExchange

Now, we are going to create the core implementation for asserting messages into queues (Producers) without exchange and with exchanges i.e. Direct, Topic, Fanout.

index.js — src/modules/message_consumer/index.js

import {ACKNOWLEGMENT} from "../../constants.js";

const consumer =
async (queueName, channel, exchangeName = '') => {
try {
await channel.consume(queueName, (message) => {
if (message) {
console.log(`Message consumed from queue ${queueName}:`, message.content.toString());

channel.ack(message)
} else {
console.error('Something went wrong. Try again!')
}
}, ACKNOWLEGMENT);

if (exchangeName) {
console.log('Consumer started for exchange:', exchangeName);
}
} catch (error) {
console.error('Error consuming message:', error);
}
}

export default consumer

Creating Queues: Without Exchanges:

without_exchange.js — src/modules/queue_asserting/without_exchange.js

import createQueue from "../create_queue/index.js";

const sendMessageToQueueWithoutExchange = async (channel, message) => {
try {
const queueName = 'queue_without_exchange'
await createQueue(channel, queueName);

await channel.sendToQueue(queueName, Buffer.from(JSON.stringify({content: message})));
console.log(`Sent ${message}`);
} catch (error) {
console.error('An error occurred:', error);
}
}

export default sendMessageToQueueWithoutExchange

Creating Queues: With Exchanges:

  1. Create a queue with direct exchange:

In the direct technique, one queue (e.g. queue A) is bonded/subscribed with the exchange and the producer will publish the message on that exchange only once and the queue (queue A) which is bound with this exchange will receive the messages.

direct_exchange.js — src/modules/queue_asserting/direct_exchange.js

import declareExchange from "../creating_exchange/index.js";
import bindQueueWithExchange from "../bind_queue_with_exchange/index.js";

const sendMessageToQueueWithDirectExchange = async (channel, message) => {
try {
const exchangeName = 'my_direct_exchange';
const queueName = 'my_direct_queue'

// Declare the direct exchange
await declareExchange(channel, 'direct', exchangeName)
await bindQueueWithExchange(channel, queueName, exchangeName, {durable: true})

// Publish the message to the direct exchange with the routing key
await channel.sendToQueue(queueName, Buffer.from(JSON.stringify({content: message})));
console.log(`Message sent to exchange '${exchangeName} and ${queueName}' : ${message}`);
} catch (error) {
console.error('Error setting up direct exchange:', error);
}
}

export default sendMessageToQueueWithDirectExchange;

2. Create a queue with topic exchange:

In the topic technique, multiple queues (e.g. queues A, B, Not C) are bonded/subscribed with the exchange by a routing pattern/key and the producer will publish the message on that exchange only once and the queues (queue A, B, Not C) which are bound with this exchange will receive the messages.

topic_exchange.js — src/modules/queue_asserting/topic_exchange.js

import declareExchange from "../creating_exchange/index.js";
import bindQueueWithExchange from "../bind_queue_with_exchange/index.js";
import {QUEUE_OPTIONS} from "../../constants.js";

const sendMessageToQueueWithTopicExchange = async (channel, message) => {
try {
const queueName = 'my_topic_queue';
const exchangeName = 'my_topic_exchange';
const routingKey = 'topic.routing.key';

// Declare the topic exchange
await declareExchange(channel, 'topic', exchangeName)
await bindQueueWithExchange(channel, queueName, exchangeName,QUEUE_OPTIONS , routingKey)

// Publish the message to the topic exchange with the routing key
const res = await channel.sendToQueue(queueName, Buffer.from(JSON.stringify({content: message})));
if (res) {
console.log(`Message sent to exchange '${exchangeName}' with routing key '${routingKey}': ${message}`);
} else {
console.error('Something went wrong. Try again!')
}
} catch (error) {
console.error('Error setting up topic exchange:', error);
}
}

export default sendMessageToQueueWithTopicExchange;

3. Create a queue with fanout exchange:

In the fanout technique, multiple queues (e.g. queues A, B) are bound/subscribed with the exchange, and the producer will publish the message on that exchange only once and the queues (queues A, B) which are bound with this exchange will receive the messages.

fanout_exchange.js — src/modules/queue_asserting/fanout_exchange.js

import declareExchange from "../creating_exchange/index.js";
import bindQueueWithExchange from "../bind_queue_with_exchange/index.js";

const sendMessageToQueueWithFanoutExchange = async (channel, message1, message2) => {
try {
const firstQueueName = 'first_fanout_exchange_queue';
const secondQueueName = 'second_fanout_exchange_queue';
const exchangeName = 'fanout_exchange'

await declareExchange(channel, 'fanout', exchangeName)
await bindQueueWithExchange(channel, firstQueueName, exchangeName, {durable: true})
await bindQueueWithExchange(channel, secondQueueName, exchangeName, {durable: true})

await channel.sendToQueue(firstQueueName, Buffer.from(JSON.stringify({content: message1})));
await channel.sendToQueue(secondQueueName, Buffer.from(JSON.stringify({content: message2})));

console.log(`Sent ${message1}`);
console.log(`Sent ${message2}`);
} catch (error) {
console.error('An error occurred:', error);
}
}

export default sendMessageToQueueWithFanoutExchange

index.js — index.js (Usage):

Message Consumption/Acknowledgment:

Message acknowledgments in message queues are like confirmations sent by consumers to the server, saying, “Yes, I received and processed this message.” This acknowledgment process is crucial for ensuring that messages are reliably handled.

In the provided code, the setting { noAck: false } means that after a consumer processes a message, it must explicitly send an acknowledgment to the server. This is important because it ensures that messages are only removed from the queue after they have been successfully processed.

If noAck were set to true, the server would remove messages from the queue immediately after delivering them, without waiting for confirmation from the consumer. This configuration is useful for scenarios where speed is more important than ensuring every message is processed.

So, with { noAck: false }, we must send acknowledgments to the server channel.ack(message) to confirm successful processing. If we don't acknowledge messages, they will remain in the queue, potentially causing issues with message duplication or queue congestion.

  1. Consume messages from a queue without exchange:

without_exchange.js — src/modules/queue_consumption/without_exchange.js

import consumer from "../message_consumer/index.js";

const consumeMessageFromQueueWithDirectExchange = async (channel) => {
try {
const exchangeName = 'my_direct_exchange';
const queueName = 'my_direct_queue'

await consumer(queueName, channel, exchangeName)
} catch (error) {
console.error('Error consuming message:', error);
}
}

export default consumeMessageFromQueueWithDirectExchange

2. Consume messages from a queue with direct exchange:

direct_exchange.js — src/modules/queue_consumption/direct_exchange.js

import consumer from "../message_consumer/index.js";

const consumeMessageFromQueueWithDirectExchange = async (channel) => {
try {
const exchangeName = 'my_direct_exchange';
const queueName = 'my_direct_queue'

await consumer(queueName, channel, exchangeName)
} catch (error) {
console.error('Error consuming message:', error);
}
}

export default consumeMessageFromQueueWithDirectExchange

3. Consume messages from a queue with the topic exchange:

topic_exchange.js — src/modules/queue_consumption/topic_exchange.js

import consumer from "../message_consumer/index.js";

const consumeMessageFromQueueWithTopicExchange = async (channel) => {
try {
const queueName = 'my_topic_queue';
const exchangeName = 'my_topic_exchange';

await consumer(queueName, channel, exchangeName)
} catch (error) {
console.error('Error consuming message:', error);
}
}

export default consumeMessageFromQueueWithTopicExchange

4. Consume messages from a queue with the fanout exchange:

fanout_exchange.js — src/modules/queue_consumption/fanout_exchange.js

import consumer from "../message_consumer/index.js";

const consumeMessageFromQueueWithFanoutExchange = async (channel) => {
try {
const firstQueueName = 'first_fanout_exchange_queue';
const secondQueueName = 'second_fanout_exchange_queue';
const exchangeName = 'fanout_exchange'

await consumer(firstQueueName, channel, exchangeName)
await consumer(secondQueueName, channel, exchangeName)
} catch (error) {
console.error('Error consuming message:', error);
}
}

export default consumeMessageFromQueueWithFanoutExchange

index.js — src/modules/consumers/index.js (Usage):

import consumeMessageFromQueueWithoutExchange from "../queue_consumption/without_exchange.js";
import consumeMessageFromQueueWithDirectExchange from "../queue_consumption/direct_exchange.js";
import consumeMessageFromQueueWithTopicExchange from "../queue_consumption/topic_exchange.js";
import consumeMessageFromQueueWithFanoutExchange from "../queue_consumption/fanout_exchange.js";

const messageConsumers = async (channel) => {
try {
// consume message from queue without any exchange
await consumeMessageFromQueueWithoutExchange(channel)

// create queue and send message with direct exchange
await consumeMessageFromQueueWithDirectExchange(channel);

// create queue and send message with topic exchange
await consumeMessageFromQueueWithTopicExchange(channel);

// create queues and send message with fanout exchange
await consumeMessageFromQueueWithFanoutExchange(channel)

} catch (error) {
console.error('An error occurred:', error);
}
}

export default messageConsumers

index.js (Usage):

import connectToRabbitMQ from "./src/config.js";

import express from "express";
import bodyParser from "body-parser";
import sendMessageWithoutExchange from "./src/modules/controllers/without_exchange_controller.js";
import sendMessageWithDirectExchange from "./src/modules/controllers/direct_exchange_controller.js";
import sendMessageWithTopicExchange from "./src/modules/controllers/topic_exchange_controller.js";
import sendMessageWithFanoutExchange from "./src/modules/controllers/fanout_exchange_controller.js";
import messageConsumers from "./src/modules/consumers/index.js";

const app = express();
const jsonParser = bodyParser.json();

const {channel} = await connectToRabbitMQ();


app.get("", (req, res, next) => {
console.log('Hello to Node Express and RabbitMQ')
return res.send('<h1>Hello to Node Express and RabbitMQ</h1>')
});

app.post("/api/without_exchange", jsonParser, sendMessageWithoutExchange(channel));
app.post("/api/direct_exchange", jsonParser, sendMessageWithDirectExchange(channel));
app.post("/api/topic_exchange", jsonParser, sendMessageWithTopicExchange(channel));
app.post("/api/fanout_exchange", jsonParser, sendMessageWithFanoutExchange(channel));

await messageConsumers(channel)

app.listen(8080, () => {
console.log(`Server is running on port 8080.`);
});

To explore the full code, visit the node-rabbitmq-docker repository. Follow me on GitHub for additional insightful content and updates.

Conclusion:

In this tutorial, we’ve explored how to integrate RabbitMQ with Node.js to build real-time applications. We’ve learned how to create a RabbitMQ producer to send messages and create a RabbitMQ consumer to receive messages. With RabbitMQ and Node.js, developers can build scalable, fault-tolerant, and real-time applications that process data streams efficiently.

That is all for this post, Apache Kafka is quite an extensive topic to cover in a single post, and I’m excited to see you in my other couple of posts with the following topics.

  1. Kafka vs RebitMQ

If you found this blog post useful then clap, comment, and follow.

🤝 Let’s connect on LinkedIn: Raheel Butt

--

--