Azure Functions & Event Hub

Mohit Maheshwari
Analytics Vidhya
Published in
5 min readApr 20, 2020

In my last article I explained what is Azure Function and its components. In this article we will talk about what is Event Hub and How to process events from event hub using Azure Functions. The Code will be in Python.

To view my previous article on Azure Functions click below:

What is Event Hub?

Azure Event Hub is a big data streaming platform & event ingestion system. It is based on Pub-Sub model. It is similar to Apache Kafka. Event Hub provides distributed stream processing platform with low latency and seamless integration with data & analytics services inside and outside Azure.

Key Terminologies & Components of Event Hub.

  1. Event Hub Namespace → These are the containers which contains or stores the Event Hub. Also known as Topics in terms of Apache Kafka. A single Event hub Namespace can store multiple Event Hubs.
  2. Event Hub → These are the storage where records are published. It is similar to Topics in Kafka. It consists of partitions which we can define at the time of creating Event Hub on Azure Portal.
  3. Event Data → It contains the body of the event which is a binary stream. The events are stored in binary form inside the partitions.
  4. Publisher → Any entity which sends/publish the data/event to event hub. Publisher can use two types of protocols to send/publish the data, i.e., HTTP’s and AMQP. For low volume of data used HTTP’s and for high volume of data use AMQP.
  5. Partitions → Partitions are like a large blocks in event hub where the publisher will publish the event. In Event Hubs, the partition retains the data even after it’s been read by the consumer. Also, it maintains a retention period which is customizable and manual deletion of events is not possible.
  6. Consumer Groups → Consumer group is like a view of the event hub. A single consumer group can be consumed by different consumers to each have a separate view of events and can consume accordingly at their own pace and offsets. A single event hub can have multiple consumer groups.
  7. Capture Data → While creating a event hub we can enable the option of capture data. This option is helpful if we want to retain the events sent by producers in the event hub at latter point of time even after the retention period is over.

In this post we will see how to see code and deploy the azure function which will be trigger as soon as any event is being published by producer on the event hub.Then this event data will be stored into the blob storage using output binding or actual code in the function.

Prerequisites →

  • Azure function core tools
  • Azure CLI
  • Python
  • A Function App [on local system] [check my previous article on how to create function app]

Follow the below steps →

  1. Open you function app that you have created in the local system.
  2. open local.settings.json file
{"IsEncrypted": false,"Values": {"FUNCTIONS_WORKER_RUNTIME": "python","AzureWebJobsStorage": [Paste the Storage account connection string which is linked with your Azure function that you have Created on Azure Portal],"FUNCTIONS_EXTENSION_VERSION": "~2","receiverConnectionString": [Paste the Endpoint Connection String of the EventHubNamespace here in double quotes and remove these brackets.],"MyStorageConnectionString": [Paste the Endpoint Connection String of the blob storage account here in double quotes and remove these brackets.]},"ConnectionStrings": {}}

3. Open function.json file.

In this file we will write code to bind the azure function with the Event Hub, so that it will automatically run when any new event is being published by the producer and will specify the output binding for the blob storage.

{"scriptFile": "__init__.py","bindings": [{"type": "eventHubTrigger","name": "events","direction": "in","eventHubName": [Enter the name of your event hub in double quotes and remove these brackets.],"connection": "receiverConnectionString","cardinality": "many","consumerGroup": "$Default","dataType": "binary"},
{
"name": "outputblob",
"type": "blob",
"path": [Enter the path of the folder where you want to store the data in double quotes and remove these brackets.],
"connection": "MyStorageConnectionString",
"direction": "out"
}]}
  • In above code the "type":"eventHubTrigger" specify the trigger of azure function on event hub.
  • The "eventHubName":"Value" specify to which event hub the event will be received so that the azure function will trigger.
  • make sure that the value of “connection” i.e. "connection":"receiverConnectionString" should be present in the local.setting.json file. If you scroll up and check the code of the local.setting.json you will see that “receiverConnectionString” key is present in that.
  • The "name":"outputblob" is the name of the output binding for the blob storage which will be used in the code to refer the output blob. You can give any name just make sure to use the same in the code i.e. in __init__.py file.
  • The "path":"Value" this will the path of the folder or directory where you want to store the output data processed by the function.
  • make sure that the value of “connection” i.e. "connection":"MyStorageConnectionString" should be present in the local.setting.json file. If you scroll up and check the code of the local.setting.json you will see that “MyStorageConnectionString” key is present in that.

4. Open __Init__.py file

The below code will be triggered as soon as any new event is being published by the producer and will put the event data in the output blob using the output blob binding variable “outputblob”

Note → Here I am considering that the event which is being published to the event hub by the producer is in json form.

import logging 
import azure.functions as func
def main(events: List[func.EventHubEvent],outputblob: func.Out[func.InputStream]):

for event in events:
event_data = event.get_body()
logging.info(event_data) my_json = event_data.decode('utf8').replace("'", '"') event_data_json = json.loads(my_json)

outputblob.set(event_data_json)

Note → The problem with above code is that the output blob is associated with a particular path which is being mentioned in the function.json file. So each time this function triggers the data will be rewritten to the same file. Also very less customization can be possible. If you need more customized way then check step No 5.

5. Remove the previous code from __init__.py file and paste below code.

import logging
import azure.functions as func
from azure.storage.blob import BlobServiceClient
import uuid
def main(events: List[func.EventHubEvent]):

blob_service_client =BloBlobServiceClient.from_connection_string
(conn_str=[Enter the Endpoint connection string for the blob storage account in double quotes and remove these brackets.])
try:
container_client = blob_service_client.getContainer_client("myContainer")
except:
container_client = blob_service_client.create_container("myContainer")
for event in events:
event_data = event.get_body()
logging.info(event_data)
my_json = event_data.decode('utf8').replace("'", '"')
event_data_json = json.loads(my_json)

container_client.upload_blob(name= uuid.uuid4(), data=event_data_json)

6. Save all the files.

7. Run command func start and enjoy🙂.

Thank you for reading this tutorial. I hope you enjoyed and have understood what event hub is and how to use it as a trigger with azure function.

--

--