Azure Event Hub — Micro Course

Sivakumar Mahalingam ⚡
5 min readSep 24, 2021

--

Azure Event Hubs is a big data streaming platform and event ingestion service. It can receive and process millions of events per second. Data sent to an event hub can be transformed and stored by using any real-time analytics provider or batching/storage adapters.

Use Cases

· Anomaly detection (fraud/outliers)

· Application logging

· Analytics pipelines, such as clickstreams

· Live dashboarding

· Archiving data

· Transaction processing

· User telemetry processing

· Device telemetry streaming

Key architecture components

· Event producers: Any entity that sends data to an event hub. Event publishers can publish events using HTTPS or AMQP 1.0 or Apache Kafka (1.0 and above)

· Partitions: Each consumer only reads a specific subset, or partition, of the message stream.

· Consumer groups: A view (state, position, or offset) of an entire event hub. Consumer groups enable consuming applications to each have a separate view of the event stream. They read the stream independently at their own pace and with their own offsets.

· Event receivers: Any entity that reads event data from an event hub. All Event Hubs consumers connect via the AMQP 1.0 session. The Event Hubs service delivers events through a session as they become available. All Kafka consumers connect via the Kafka protocol 1.0 and later.

· Throughput units or processing units: Pre-purchased units of capacity that control the throughput capacity of Event Hubs.

Create an Event Hubs namespace

1. In the Azure portal, and select Create a resource at the top left of the screen.

2. Search for Event Hubs and select it

3. Click on Create button on Event Hubs

4. On the Create namespace page, take the following steps:

5. On the Review + Create page, review the settings, and select Create. Wait for the deployment to complete.

6. On the Deployment page, select Go to resource to navigate to the page for your namespace.

Create an event hub

To create an event hub within the namespace, do the following actions:

1. On the Event Hubs Namespace page, select Event Hubs in the left menu.

2. At the top of the window, select + Event Hub.

3. Type a name for your event hub, then select Create.

* Partition Count: Partitions are a data organization mechanism that relates to the downstream parallelism required in consuming applications. The number of partitions in an event hub directly relates to the number of concurrent readers you expect to have.

* Message Retention: This is the retention period for events. You can set the retention period between 1 and 7 days.

4. After the event hub is created, you see it in the list of event hubs.

Create a Connection String in event hub

1. Click on Shared access policies menu on the created event hub

2. Click on Add button and create the SAS policy as below, for sender

3. SAS policy is successfully created

4. In the same way create one SAS policy for receiver

Send Events

  1. Create a sender script with the python script available in
import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData

async def run():
# Create a producer client to send messages to the event hub.
# Specify a connection string to your event hubs namespace and
# the event hub name.
producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESPACE - CONNECTION STRING", eventhub_name="EVENT HUB NAME")
async with producer:
# Create a batch.
event_data_batch = await producer.create_batch()

# Add events to the batch.
event_data_batch.add(EventData('John Wick Chapter 1'))
event_data_batch.add(EventData('John Wick Chapter 2'))
event_data_batch.add(EventData('John Wick Chapter 3'))
event_data_batch.add(EventData('John Wick Chapter 4'))

# Send the batch of events to the event hub.
await producer.send_batch(event_data_batch)

loop = asyncio.get_event_loop()
loop.run_until_complete(run())

2. Provide the sender SAS policy connection string and event hub name

3. Execute the sender python script

Receive Events

  1. Create a receiver script with the python script available in
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore


async def on_event(partition_context, event):
# Print the event data.
print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))

# Update the checkpoint so that the program doesn't read the events
# that it has already read when you run it next time.
await partition_context.update_checkpoint(event)

async def main():
# Create an Azure blob checkpoint store to store the checkpoints.
checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME")

# Create a consumer client for the event hub.
client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store)
async with client:
# Call the receive method. Read from the beginning of the partition (starting_position: "-1")
await client.receive(on_event=on_event, starting_position="-1")

if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Run the main method.
loop.run_until_complete(main())

2. Provide the receiver SAS policy connection string and event hub name

3. Since I have a storage account with container, provide the same container name and storage account connection string

4. Execute the receiver python script

Metrics

You can monitor event hub metrics in Overview section

If you like the article and would like to support me make sure to:

--

--