Building an End-to-End Real-Time Streaming Data Pipeline with Azure: Part 2 — Data Ingestion with Azure Functions

Shijun Ju
7 min readMar 3, 2025

--

In this second installment of our series, we explore data ingestion using Azure Functions, an alternative to the Azure Databricks approach covered in Part 1. Azure Functions provides a lightweight, serverless solution for fetching weather data at regular intervals and streaming it to an Azure Event Hubs instance. This method excels in cost efficiency and simplicity, making it ideal for event-driven workloads compared to the more robust, resource-intensive capabilities of Azure Databricks.

Why Azure Functions?

Azure Functions operates on a serverless architecture, meaning you only pay for the compute resources used during execution, unlike Azure Databricks, which requires a persistent cluster and incurs higher costs. Functions are perfect for lightweight tasks like periodic API calls, while Databricks shines in scenarios requiring complex data transformations or large-scale processing. For this project, we leverage Azure Functions’ Timer Trigger to fetch weather data every 30 seconds, offering a cost-effective complement to Databricks. For pricing details, refer to the Azure Pricing Calculator.

streaming data ingestion functions app

Step-by-Step Implementation

1. Set Up Local Development Environment with VS Code

To develop and deploy the Azure Function locally, we use Visual Studio Code (VS Code) with the Azure Functions extension.

  • Prerequisites:
    - Download and install VS Code.
    - Install the Python extension for coding support.
    - Install the Azure Functions extension: Open VS Code, go to Extensions (Ctrl+Shift+X), search for “Azure Functions,” and click Install.
    - Locate the Azure icon in the left sidebar.
vs code azure

Sign In:

  • Click the Azure icon, then select “Sign in to Azure” and follow the prompts to authenticate.

Create Function Project:

  • In the Azure sidebar, under Workspace, click “Create Function Project.”
  • Choose a folder for your project, select Python as the language, and opt for Model V2 (the latest programming model).
  • Skip the virtual environment for simplicity (you can create one manually if needed).
  • Select Timer Trigger and press Enter.
  • Set the cron expression to */30 * * * * * (runs every 30 seconds).
functions app timer trigger cron expression examples
functions app timer trigger cron expression
vs code functions App virtual env created

Note: The cron expression */30 * * * * * breaks down as: every 30 seconds, any minute, any hour, any day, any month, any day of the week. See the Azure Functions Timer Trigger documentation for more details.

2. Configure Access for Azure Functions

To securely interact with Azure Event Hubs and Key Vault, we assign the Function App a managed identity and grant it appropriate permissions.

  • Enable Managed Identity:
    - In the Azure Portal, navigate to your Function App > Settings > Identity.
    - Toggle Status to On and click Save.
  • Grant Event Hubs Access:
    - Go to your Event Hubs Instance > Access Control (IAM) > + Add > Add Role Assignment.
    - Select Azure Event Hubs Data Sender, click Next, then choose Managed Identity.
    - Click + Select Members, pick your Function App, and click Select > Review + Assign.
eventhub instance Access Control IAM give access to function app
eventhub instance Access Control IAM give access to function app role assigned
  • Grant Key Vault Access:
    - Navigate to your Key Vault > Access Control (IAM) > + Add > Add Role Assignment.
    - Search for Key Vault Secrets User, select it, and click Next.
    - Choose Managed Identity, select your Function App, and complete the assignment.
key vault Access Control IAM give access to function app

3. Modify the Function Code

We enhance the default Timer Trigger code to fetch weather data, process it, and send it to Event Hubs.

  • Update requirements.txt:
    Add the following libraries to enable Event Hubs and Key Vault integration:
vs code functions App requirement txt completed
  • Edit function_app.py:
    Replace the default code with the following, customizing placeholders with your values:
functions app code logic
import logging
import azure.functions as func
import requests
import json
from azure.eventhub import EventHubProducerClient, EventData
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient

# Initialize the Function App
app = func.FunctionApp()

# Timer Trigger: Runs every 30 seconds
@app.timer_trigger(schedule="*/30 * * * * *", arg_name="myTimer", run_on_startup=False, use_monitor=False)
def weatherapifunction(myTimer: func.TimerRequest) -> None:
"""Fetches weather data and sends it to Event Hubs every 30 seconds."""
if myTimer.past_due:
logging.info("The timer is past due!")

logging.info("Python timer trigger function executed.")

# Event Hub Configuration
EVENT_HUB_NAME = "<your-event-hub-instance-name>" # e.g., "weatherstreamingeventhub"
EVENT_HUB_NAMESPACE = "<your-event-hubs-namespace-host-name>" # e.g., "eventhub-weather-streaming.servicebus.windows.net"

# Authenticate using Managed Identity
credential = DefaultAzureCredential()
producer = EventHubProducerClient(
fully_qualified_namespace=EVENT_HUB_NAMESPACE,
eventhub_name=EVENT_HUB_NAME,
credential=credential
)

# Send event to Event Hubs
def send_event(event):
event_data_batch = producer.create_batch()
event_data_batch.add(EventData(json.dumps(event)))
producer.send_batch(event_data_batch)

# Handle API response
def handle_response(response):
if response.status_code == 200:
return response.json()
else:
return f"Error: {response.status_code}, {response.text}"

# Fetch current weather data with air quality
def get_current_weather(base_url, api_key, location):
current_weather_url = f"{base_url}/current.json"
params = {"key": api_key, "q": location, "aqi": "yes"}
response = requests.get(current_weather_url, params=params)
return handle_response(response)

# Fetch forecast data
def get_forecast_weather(base_url, api_key, location, days):
forecast_url = f"{base_url}/forecast.json"
params = {"key": api_key, "q": location, "days": days}
response = requests.get(forecast_url, params=params)
return handle_response(response)

# Fetch weather alerts
def get_alerts(base_url, api_key, location):
alerts_url = f"{base_url}/alerts.json"
params = {"key": api_key, "q": location, "alerts": "yes"}
response = requests.get(alerts_url, params=params)
return handle_response(response)

# Flatten and merge weather data
def flatten_data(current_weather, forecast_weather, alerts):
location_data = current_weather.get("location", {})
current = current_weather.get("current", {})
condition = current.get("condition", {})
air_quality = current.get("air_quality", {})
forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
alert_list = alerts.get("alerts", {}).get("alert", [])

return {
"name": location_data.get("name"),
"region": location_data.get("region"),
"country": location_data.get("country"),
"lat": location_data.get("lat"),
"lon": location_data.get("lon"),
"localtime": location_data.get("localtime"),
"temp_c": current.get("temp_c"),
"is_day": current.get("is_day"),
"condition_text": condition.get("text"),
"condition_icon": condition.get("icon"),
"wind_kph": current.get("wind_kph"),
"wind_degree": current.get("wind_degree"),
"wind_dir": current.get("wind_dir"),
"pressure_in": current.get("pressure_in"),
"precip_in": current.get("precip_in"),
"humidity": current.get("humidity"),
"cloud": current.get("cloud"),
"feelslike_c": current.get("feelslike_c"),
"uv": current.get("uv"),
"air_quality": {
"co": air_quality.get("co"),
"no2": air_quality.get("no2"),
"o3": air_quality.get("o3"),
"so2": air_quality.get("so2"),
"pm2_5": air_quality.get("pm2_5"),
"pm10": air_quality.get("pm10"),
"us-epa-index": air_quality.get("us-epa-index"),
"gb-defra-index": air_quality.get("gb-defra-index"),
},
"alerts": [
{
"headline": alert.get("headline"),
"severity": alert.get("severity"),
"description": alert.get("desc"),
"instruction": alert.get("instruction"),
}
for alert in alert_list
],
"forecast": [
{
"date": day.get("date"),
"maxtemp_c": day.get("day", {}).get("maxtemp_c"),
"mintemp_c": day.get("day", {}).get("mintemp_c"),
"condition": day.get("day", {}).get("condition", {}).get("text"),
}
for day in forecast
],
}

# Retrieve secret from Key Vault
def get_secret_from_keyvault(vault_url, secret_name):
credential = DefaultAzureCredential()
secret_client = SecretClient(vault_url=vault_url, credential=credential)
retrieved_secret = secret_client.get_secret(secret_name)
return retrieved_secret.value

# Main logic to fetch and send weather data
def fetch_weather_data():
base_url = "http://api.weatherapi.com/v1/"
location = "Toronto" # Customize as needed
vault_url = "<your-key-vault-uri>" # e.g., "https://<your-key-vault-name>.vault.azure.net/"
api_key_secret_name = "<weather-api-key-name-in-key-vault>" # e.g., "weatherAPIKey"

# Fetch API key from Key Vault
weather_api_key = get_secret_from_keyvault(vault_url, api_key_secret_name)

# Get weather data
current_weather = get_current_weather(base_url, weather_api_key, location)
forecast_weather = get_forecast_weather(base_url, weather_api_key, location, 3)
alerts = get_alerts(base_url, weather_api_key, location)

# Process and send data
merged_data = flatten_data(current_weather, forecast_weather, alerts)
send_event(merged_data)

# Execute the main logic
fetch_weather_data()

Key Changes:

  • Added docstring and explanatory comments.
  • Corrected typo in function name (weatherapifuncton → weatherapifunction).
  • Used consistent naming conventions and improved readability with proper spacing.
  • Ensured placeholders (<your-…>) are clearly marked for user customization.

4. Deploy to Azure

Deploy the local code to run in the cloud:

  • In VS Code, under Workspace, click the “Deploy to Azure” icon.
  • Select your Azure subscription and Function App, then click Deploy.
vs code functions App deploy to azure
  • Verify Deployment:
    - In the Azure Portal, go to your Function App and confirm the function is listed.
functions app deployed
functions app deployed click into find code details
  • Check Event Hubs:
    - Navigate to your Event Hubs Instance > Data Explorer > View Events to confirm data is streaming every 30 seconds.

Key Takeaways

  • Cost Efficiency: Azure Functions’ serverless model minimizes costs compared to Databricks for simple ingestion tasks.
  • Security: Managed Identity and Key Vault eliminate the need to hardcode sensitive information.
  • Ease of Use: VS Code integration simplifies local development and deployment.

In the next part, we’ll explore processing and loading this streamed data into a data warehouse for analysis, then data reporting and configuring alerts for extreme weathers. Stay tuned!

Reference

--

--

Shijun Ju
Shijun Ju

Written by Shijun Ju

Aspiring data engineer, data scientist, AI engineer, educator

No responses yet