Realtime Pub/Sub meets Amazon SQS Elasticity

Rolando Santamaría Masó
4 min readJan 3, 2024

--

Introduction

In the fast-paced world of modern applications, the ability to ingest and deliver real-time information is a game-changer.

Realtime Pub/Sub emerges as a dynamic Real-time Messaging Gateway, offering seamless capabilities to elevate your application’s communication to new heights.

https://realtime.21no.de

In this blog post, we will explore one standout feature: real-time message forwarding to Amazon SQS queues.
We’ll delve into the details and provide a hands-on example of how to process and respond to your WebSocket clients requests using an Amazon SQS queue as a proxy layer between your application clients and the backend services.

Amazon SQS queues for High-Throughput Real-time Message Forwarding

Effortless Integration with Amazon SQS

Realtime Pub/Sub takes real-time communication to the next level by allowing seamless integration with Amazon SQS queues. This feature enables developers to leverage SQS scalability, consumer parallelism, and reliability, providing a robust solution for handling real-time messages.

What is Amazon SQS?

Use Cases for SQS Queues Forwarding

  • Scalable Backend Processing: Handle heavy real-time data influx efficiently by forwarding messages to Amazon SQS queues for backend processing.
  • Decoupling Components: Achieve component decoupling by using SQS queues as intermediary buffers for real-time messages, ensuring a resilient and scalable architecture.
    After processing incoming messages, respond to clients using the platform HTTP publishing endpoint.

Demo

Example Node.js service that processes incoming WebSocket messages via SQS Queue

const { getAuthToken } = require('./utils')
const AWS = require('@aws-sdk/client-sqs')

const {
APP_ID,
ADMIN_SIGNING_KEY, // For HS* signing algorithms, value should be "Admin Clients/Verification Key"
ALGORITHM,
CLUSTER_HOSTNAME,
AMAZON_SQS_QUEUE_URL,
AWS_REGION,
AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY
} = require('./config') // Import configuration constants

// Define the permissions for the publisher
const PUBLISHER_PERMISSIONS = {
roles: ['Publisher'],
allowedTopics: ['*']
}
// Define the publishing endpoint
const PUBLISHING_ENDPOINT = `https://${CLUSTER_HOSTNAME}/api/topics/${APP_ID}/publish`

// Initialize the AWS SQS service with the provided credentials
const sqs = new AWS.SQS({
region: AWS_REGION,
credentials: {
accessKeyId: AWS_ACCESS_KEY_ID,
secretAccessKey: AWS_SECRET_ACCESS_KEY
}
})

// Define the main function
async function init () {
// Start processing SQS Queue messages
while (true) {
try {
// Receive messages from the SQS queue
const { Messages } = await sqs.receiveMessage({
QueueUrl: AMAZON_SQS_QUEUE_URL,
MaxNumberOfMessages: 5,
WaitTimeSeconds: 20
})

// If there are any messages
if (Messages?.length > 0) {
// Process each message
for (const Message of Messages) {
const { Body } = Message

// Parse the message body
const { msg } = JSON.parse(Body)
const { client, payload, id } = msg
// Define responding topic
const topic = `priv/${client.subject}`

// Log the received message payload
console.log('Received message payload: ', payload)

// MESSAGE PROCESSING LOGIC GOES HERE

// Prepare the options for the acknowledgement request
const options = {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${getAuthToken(ADMIN_SIGNING_KEY, 5, PUBLISHER_PERMISSIONS, ALGORITHM, 'backend')}`
},
body: JSON.stringify({
topic,
message: {
type: 'response',
ack: id,
status: 'OK'
}
})
}

// Send the acknowledgement request
const response = await fetch(PUBLISHING_ENDPOINT, options)
// Optionally log the response status
console.log({
statusText: response.statusText,
status: response.status
})

// Delete the processed message from the SQS queue
await sqs.deleteMessage({
QueueUrl: AMAZON_SQS_QUEUE_URL,
ReceiptHandle: Message.ReceiptHandle
})
}
}
} catch (err) {
// Log any errors
console.log(err)
}
}
}

// Call the main function
init()

Enabling SQS Forwarding on your applications

  • Configure SQS Forwarding: Set up SQS forwarding in your application by navigating to Modify Application / WebSocket Inbound Messaging / Amazon SQS Forwarding Target Configuration.
  • Customize Node.js Service: Adapt the provided Node.js service according to your application’s logic. It can also be implemented in any other programming language with supported SQS and HTTP client libraries.

Conclusions

Integrating Amazon SQS queues as the ingress layer for your application’s WebSocket messaging allows your backend application to scale horizontally, addressing a critical use-case for enterprise or high-throughput scenarios.

Additionally, the platform also supports Amazon SNS, providing developers with access to a broader range of forwarding targets, including HTTP webhooks.

Explore more about Realtime Pub/Sub today: https://realtime.21no.de

Study other available demos in GitHub: https://github.com/BackendStack21/realtime-forum

--

--