Azure EventHubs for Microservices

Krishnan Sriram
6 min readApr 5, 2022

Message Queue’s has been in the center of architecture to enable loosely coupled solutions. Message Queues have in the past been used to connect disparate systems. In today’s world, we extended loosely couple architecture with events and asynchronous channels and in the process allow for smaller/simpler services, we call them microservice. As we move into microservice world, allowed event/message brokers to facilitate communication. Tools like Istio, Eureka, AppMesh have been in the center stage for orchestration, while products like Kafka, RabbitMQ and others have been playing different roles in Event Broker space.

AMQP — Advanced Message Queuing Protocol is the standard for message oriented development. This standards is a step up on simple pub/sub capability. Kafka, RabbitMQ etc are great on-prem choices the leverage AMQP, although you can use Kafka on cloud as well (SaaS, Paas choices on all 3 public cloud). Think of, Azure Event Hub like Kafka, but a cloud native offering in Azure.

What is Azure Event Hub

An event streaming solution, capable of handling millions of events a day. Data sent to an event hub can be transformed and stored by using any real-time analytics provider or batching/storage adapters. You can read more about Azure Event hubs here.

Why consider EventHub over Apache Kafka?

Apache Kafka is software you typically need to install and operate, unless you use Kafka on Azure.

Event Hubs is a fully managed, cloud-native service. There are no servers, disks, or networks to manage and monitor and no brokers to consider or configure, ever. You create a namespace, which is an endpoint with a fully qualified domain name, and then you create Event Hubs (topics) within that namespace. Event Hubs uses a single (Virtual) IP address as the endpoint, so clients don’t need to know about the brokers or machines within a cluster.

For throughput information, check out this site

As in a message queue system there are 2 parties — producer/sender & consumer/receiver.

Important terms

Namespace

An Event Hubs namespace is a management container for event hubs

Courtesy — https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features

Event Hub

Topics in Kafka parlance

Events

Events can be AMQP 1.0, the Kafka protocol, or HTTPS. You can publish events individually or batched. A single publication has a limit of 1 MB, regardless of whether it’s a single event or a batch. Publishing events larger than this threshold will be rejected.

Event Hubs throughput is scaled by using partitions and throughput-unit allocations. It’s a best practice for publishers to remain unaware of the specific partitioning model chosen for an event hub and to only specify a partition key that is used to consistently assign related events to the same partition.

Courtesy — https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features

Event publisher

Any entity that sends data to an event hub is an event publisher

Event retention

Published events are removed from an event hub based on a configurable, timed-based retention policy. Here are a few important points:

  • The default value and shortest possible retention period is 1 day (24 hours).
  • For Event Hubs Standard, the maximum retention period is 7 days.
  • For Event Hubs Premium and Dedicated, the maximum retention period is 90 days.
  • If you change the retention period, it applies to all messages including messages that are already in the event hub.

How do you setup Event Hub for action?

Head over to Azure, open up EvenHub service and create a new namespace. You can create a new resource group too, if needed. I do that all the time for segregation

Next step is to create a topic

Tap on create and you are all set with the next steps. You can find complete code for Producer & consumer in github

Producer code

'use strict';
const uuid = require('uuid');
const { EventHubProducerClient, EventHubConsumerClient, earliestEventPosition } = require('@azure/event-hubs');
const connectionString = process.env.AZURE_EVENTHUB_CONNECTION_STRING;
const partition_key = process.env.AZURE_EVENTHUB_PARTITION_KEY;
const eventHubName = process.env.AZURE_EVENTHUB_NAME;
const users = [];
const consumerGroup = '$Default'; // name of the default consumer group

const add_user = async (firstName, lastName, email) => {
const producer = new EventHubProducerClient(connectionString, eventHubName);
const user = {
userId: uuid.v4(),
firstName,
lastName,
email,
};
const batch = await producer.createBatch({ partitionId: 0 });
// batch.tryAdd(user);
batch.tryAdd({ body: user });
// Send the batch to the event hub.
await producer.sendBatch(batch);
await producer.close();
console.log(`User with ID ${user.userId} has been sent over to Eventhub.`);
users.push(user);
return user;
};

const list_users = () => {
return users;
};

module.exports = {
add_user,
list_users
};

Producer start’s by creating a EventHubProducerClient, prepare to publish with payload and publish it with createBatch method. In the above example, we pushed it over to a specific partition, you can ignore it and let Azure manage it for you, if it’s too complicated for your use case.

Receiver code

'use strict';

require('dotenv').config();
const { start_subscription, stop_subscription } = require('./service/user_service');

const main = async () => {
await start_subscription();
};

process.on('SIGTERM', async () => {
console.log('PROCESS killed - SIGTERM');
await stop_subscription();
exit(1);
});
process.on('SIGINT', async () => {
console.log('PROCESS killed - SIGINT');
await stop_subscription();
exit(1);
});

main().catch((err) => {
console.error('ERROR occurred', err);
});

We fetch needed configuration to connect and listen. Above code is as straight forward as it can be — start to listen for events, when it’s time cleans up.

'use strict';

const { EventHubConsumerClient, earliestEventPosition } = require('@azure/event-hubs');
const { ContainerClient } = require('@azure/storage-blob');
const { BlobCheckpointStore } = require('@azure/eventhubs-checkpointstore-blob');

const connectionString = process.env.AZURE_EVENTHUB_CONNECTION_STRING;
const partition_key = process.env.AZURE_EVENTHUB_PARTITION_KEY;
const eventHubName = process.env.AZURE_EVENTHUB_NAME;
const storageConnectionString = process.env.AZURE_BLOB_CONNECTION_STRING;
const containerName = process.env.AZURE_BLOB_CONTAINER_NAME;
const consumerGroup = '$Default'; // name of the default consumer group
var consumerClient = null;
var subscription = null;

const start_subscription = async () => {
// Create a consumer client for the event hub by specifying the checkpoint store.
const containerClient = new ContainerClient(storageConnectionString, containerName);
const checkpointStore = new BlobCheckpointStore(containerClient);
consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName, checkpointStore);
const props = await consumerClient.getEventHubProperties();
console.log(props.name);
console.log(props.partitionIds);
console.log('Start subscription.....');
subscription = consumerClient.subscribe(
{
processEvents: async (events, context) => {
if (events.length === 0) {
console.log(`No events received within wait time. Waiting for next interval`);
return;
}
events.forEach((event) => {
console.log(`Received event: '${event.body}' from partition: '${context.partitionId}' and consumer group: '${context.consumerGroup}'`);
console.dir(event.body);
});
// Update the checkpoint.
await context.updateCheckpoint(events[events.length - 1]);
return;
},
processError: async (error, context) => {
console.error(`Error : ${err}`);
},
},
{ startPosition: earliestEventPosition }
);
};

const stop_subscription = async () => {
await subscription.close();
await consumerClient.close();
consumerClient = null;
subscription = null;
console.log('All subscriptions closed and cleaned up. Good job!');
};

module.exports = {
start_subscription,
stop_subscription,
};

Receiver has ‘start_subscription’, creates a EventHubConsumerClient with reference to storage container to mark read messages. We open up the subscribe method to listen for messages. It’s an async function, event will contain payload, metadata about payload, information about namespace, partition and more.

‘stop_subscription’ method closes all the connection to ensure a cleanup.

We use BlobStorage setup to mark read messages and avoid reading same messages over and over again, until expiry. Give this same example a try without BLOB storage. You’ll see the same messages read over and over again. By default messages will expire in a day. You can customize it to to keep it longer. This approach as opposed to active listener solutions allows for receiver to go into hibernation and come back at different intervals for messages.

What next

  • We can improvise this example to see, how we can exchange larger data like files
  • Dump messages into DB like CosmosDB
  • Prepare dead-letter queue
  • Much more…..

--

--

Krishnan Sriram

Tech enthusiast, life long learner and an enterprise architect