Building Real-Time Streaming Data Pipeline In Azure Cloud — Part1

Murat_Aydin
9 min readJan 24, 2024

--

Tech Stack

  • Azure Stream Analytics (Event hubs, Stream Analytics Jobs)
  • Azure Synapse Analytics (Dedicated SQL Pool, Synapse Pipelines, External Tables)
  • Azure Blob Storage
  • Azure Data Factory (Linked Services, Datasets, Triggers, Custom, Copy and Delete activities)
  • Azure Batch (VM’s, Creating a custom environment)
  • PowerBI

Introduction

This project is the cloud implementation of one of my personal projects which was running in Docker. The details of the project can be found in below link:

In short, in this project I imagined a taxi-calling app for which I created fake data and served it in flask webserver in real time. The aim was to create a real-time streaming data pipeline where we ingest the fake data using Kafka, develop ETL pipelines and report the result with a dashboard in real time in Kibana. The pipeline was orchestrated by Airflow and was running in Docker.

While in this project, I wanted to make this pipeline cloud-native and moved everything to Azure Cloud. In this article, we are going to see step by step the implementation of the pipeline.

HOW TO IMPLEMENT THIS PROJECT FOR FREE?

If you have a university mail for which you did not get an azure account before, you can get a student account where you will have 200$ of free credits.

You can also use your personal mail but as far as I know the credit amount is lower in that case. It is still possible to get a free account for trial but you need to contact the sales in azure website and they will prepare a custom link for you where you can have around $200 worth of credits.

HOW DOES IT WORK?

Here is a high level view of the pipeline. We will ingest the data in real time in stream analytics, write everything to blob storage, using synapse pipelines we load the files to a dedicated SQL pool in Synapse Analytics, then using Azure Batch we do machine learning in taxi calls to predict the status through a python script, write back everything to Synapse Analytics and visualize the resulsts in real time in PowerBI.

Here is our data transformation pipeline:

We will create fake data using a python script and serve it in localhost using a lightweight flask webserver. Through a stream analytics job, we will ingest this data in real time. We will load the data to synapse analytics, to the taxi_calls_table. From this table, we will create two views, ml_mart and calls_result. ml_mart view will be used to do machine learning on taxi calls to predict the result which is either canceled or completed. Calls_result will have the true status of a taxi call. Drivers, users and locations are metadata, I used external tables for them in synapse. Finally, all the results will be transformed and stored in metrics_real_time view which we will use to do real time visualization in POWERBI.

PART 1

In this part of the project, we will be creating a stream analytics job and try to ingest the data in real time in azure stream analytics and write them to blob storage.

SETTING UP THE ENVIRONMENT

First, create a resource group. A resource group is a collection of resources that share the same lifecycle, permissions, and policies.

Then create a storage account.

We then need to create an eventhub service. Go to azure portal and create a resource.

Create an event hubs namespace service choosing the appropriate RG, subscription and pricing tier. Let’s call it taxidataeevnthub.

Note : Some of the names has to be globally unique in Azure. In that case, try to correctly substitute the names so that you do not get any errors.

Now go to the resource and create an event hub.

In a single azure eventhub namespace, we can have multiple events. This is the same concept as the topics in Kafka. We can have multiple topics and different number of listeners for that topic. Now clicking on the Event Hub button above, let’s create an event hub and let’s call it taxi_data_event.

Then go to shared access policies of this new event hub (topic) that we just created. To go there, go to eventhub name spaces from the home. From the menu on the left, find the event hubs and choose the taxi_data_event.

We have all the necessary connection details of this hub from a third party application like python. Once our namespace and hubs is ready, we can create the stream analytics job.

Give a name, choose the RG etc.

Here is the stream analytics job that we created above where the event hub is going to send the data in real time. What we need to do is to add the topics that we created in the eventhub. Click on the add input.

Give a name and choose the eventhub space that we created in the first point. Choose the authenticaton mode as connection string. Now we are ready to ingest the data in real time but what are we going to do with it?

We want to write the data somewhere. In our case, it will be blob storage. We want to write the data in a container in blob storage. To do it, we need to create an output. Before creating the output, let’s create a container in the storage account we created above and let’s call it taxistreamdata.

Coming back to the stream analytics job, still under the Query section

click on output

Choose blob storage

Enter your own storage account, container name and for the maximum waiting time which is the last parameter enter 20 seconds.

Now that we both have the input and output. We need to write the query.

Save the query, refresh the page and start the job. You will be asked when to start now or custom. So we can schedule it to anytime but we want to start it now.

We now need to start the py script that we generate fake data. Again, for the details, please refer to the project shared in the introduction. The below script will generate fake taxi calls for us and serve it in http://localhost:8081/fake_taxi_data

from flask import Flask, jsonify, request
from faker import Faker
import random
from apscheduler.schedulers.background import BackgroundScheduler
import pandas as pd
from geopy.geocoders import GoogleV3
import os
from dotenv import load_dotenv
load_dotenv()


#google_api_key = os.getenv("google_api_key")

app = Flask(__name__)
fake = Faker()


# Define the number of users, taxis, and drivers
num_users = 100
num_taxis = 30
num_drivers = 30
num_locations = 500

# Create a list of user IDs, taxi IDs, and driver IDs
user_ids = [f"User_{i}" for i in range(1, num_users + 1)]
taxi_ids = [f"Taxi_{i}" for i in range(1, num_taxis + 1)]
driver_ids = [f"Driver_{i}" for i in range(1, num_drivers + 1)]
location_ids = [i for i in range(1, num_locations + 1)]


fake_taxi_data_list = []

def generate_random_coordinates():
# Define latitude and longitude boundaries for Milan
min_lat, max_lat = 45.43344, 45.495083 # Approximate latitude boundaries
min_lon, max_lon = 9.145565, 9.18951 # Approximate longitude boundaries

# Generate random latitude and longitude within the boundaries
latitude = round(random.uniform(min_lat, max_lat), 8) # 6 decimal places
longitude = round(random.uniform(min_lon, max_lon), 8) # 6 decimal places

return latitude, longitude

def get_address(current_location,api_key):
latitude,longitude = current_location
geolocator = GoogleV3(api_key=api_key)
location = geolocator.reverse((latitude, longitude), language="en")
return location.address




# Generate a single fake Airbnb data entry
def generate_fake_taxi_data():
user = random.choice(user_ids)
taxi = random.choice(taxi_ids)
driver = random.choice(driver_ids)

passenger_current_location = generate_random_coordinates()
passenger_current_lat, passenger_current_lon = passenger_current_location

passenger_arrival_location = generate_random_coordinates()
passenger_arrival_lat, passenger_arrival_lon = passenger_arrival_location

drivers_current_location = generate_random_coordinates()
drivers_current_lat, drivers_current_lon = drivers_current_location


drivers_location = random.choice(location_ids)
pickup_location = random.choice(location_ids)
destination_location = random.choice(location_ids)

drivers_review = random.uniform(1.0, 5.0) # Lower driver reviews
booking_timestamp = pd.Timestamp(
f"2023-{random.randint(1, 12)}-{random.randint(1, 28)} {random.randint(0, 23)}:{random.randint(0, 59)}"
)
booking_source = random.choice(["website", "app"])
n_passengers = random.randint(1, 4)
estimated_fare = random.uniform(10, 50)
estimated_arrival_time = random.randint(5, 15)
vehicle_details = random.choice(["suv", "sedan","hatchback","minivan"])
trip_duration = random.randint(10, 60)
trip_distance = random.uniform(1, 20)
trip_rating = random.uniform(1.0, 5.0)
trip_id = fake.uuid4()



# Derive time_of_day and day_of_week from booking_timestamp
booking_hour = booking_timestamp.hour
if 6 <= booking_hour < 9:
time_of_day = "morning"
elif 9 <= booking_hour < 12:
time_of_day = "forenoon"
elif 12 <= booking_hour < 17:
time_of_day = "afternoon"
elif 17 <= booking_hour < 21:
time_of_day = "evening"
else:
time_of_day = "night"

day_of_week = booking_timestamp.strftime("%A") # Get the day of the week

weather_conditions = random.choice(["clear", "rainy", "snowy", "cloudy","sunny","windy"])
traffic_conditions = random.choice(["light", "moderate", "heavy",'normal'])
passenger_pick_up_time = booking_timestamp + pd.Timedelta(minutes=random.randint(1, 30))

# Calculate the difference in seconds between the current time and the booking timestamp

# Generate a random additional waiting time before cancellation (1 to 10 minutes)
additional_waiting_time = random.randint(1, 10)

# Introduce patterns for cancellations
if (drivers_review < 2 or estimated_arrival_time > (estimated_arrival_time + additional_waiting_time) or
(time_of_day == "morning" and 6.5 <= booking_timestamp.hour <= 9 and
estimated_arrival_time > random.randint(1, 10))
) and (weather_conditions == "rainy" or
time_of_day == "night" or
day_of_week in ["Saturday", "Sunday"] or
booking_source == "app"):
status = "cancelled"
cancelled_time = booking_timestamp + pd.Timedelta(minutes=random.randint(1, 10))
# Order completion time for cancellations
order_completion_time = cancelled_time
passenger_pick_up_time = None
trip_duration = None
trip_distance = None
else:
status = "completed"
cancelled_time = None
order_completion_time = booking_timestamp + pd.Timedelta(minutes=trip_duration)
passenger_pick_up_time = booking_timestamp + pd.Timedelta(minutes=(estimated_arrival_time + random.randint(-5,5)))
trip_duration = order_completion_time - passenger_pick_up_time

keys = ["user_id", "taxi_id", "driver_id","passenger_current_lat","passenger_current_lon","passenger_arrival_lat", "passenger_arrival_lon",
"drivers_current_lat", "drivers_current_lon", "drivers_location", "pickup_location", "destination_location", "drivers_review",
"booking_timestamp", "booking_source", "n_passengers", "estimated_fare",
"estimated_arrival_time_in_m", "vehicle_details", "trip_duration", "trip_distance","trip_rating",
'trip_id',"time_of_day", "day_of_week", "weather_conditions", "traffic_conditions",
"passenger_pick_up_time", "status", "order_completion_time"]

values = [user, taxi, driver,passenger_current_lat, passenger_current_lon,passenger_arrival_lat, passenger_arrival_lon, drivers_current_lat, drivers_current_lon,drivers_location,pickup_location, destination_location, drivers_review, str(booking_timestamp), booking_source,
n_passengers, estimated_fare, estimated_arrival_time, vehicle_details, str(trip_duration), trip_distance,trip_rating,
trip_id,time_of_day, day_of_week, weather_conditions, traffic_conditions, str(passenger_pick_up_time),
status, str(order_completion_time)]
my_dict = dict(zip(keys, values))
return my_dict

def generate_entries(data_list):
data_list.clear()
for _ in range(20):
data_list.append(generate_fake_taxi_data())

# Set up a route to access the generated data
@app.route('/fake_taxi_data', methods=['GET'])
def get_fake_taxi_data():
return jsonify(fake_taxi_data_list)


# Create a scheduler to regenerate data every 20 seconds
scheduler = BackgroundScheduler()
scheduler.add_job(generate_entries, 'interval', seconds=20, args=[fake_taxi_data_list])
scheduler.start()

if __name__ == '__main__':
generate_entries(fake_taxi_data_list) # Generate initial data
app.run(host='localhost', port=8081)

Run the above script in a terminal and it will start producing 20 fake data each 20 seconds.

We now need to get the fake data from http://localhost:8081/fake_taxi_data and send it to stream analytics with the below script. You need to provide your own connection string of the taxidataeventhub event hubs namespace service that we created above.

import requests
import asyncio
from azure.eventhub import EventData
from azure.eventhub.aio import EventHubProducerClient


def get_data():
data_response = requests.get('http://localhost:8081/fake_taxi_data')
return data_response.text


connection_str = ''
eventhub_name = 'taxi_data_event'



async def run():
# Create a producer client to send messages to the event hub.
# Specify a connection string to your event hubs namespace and
# the event hub name.
while True:

await asyncio.sleep(20)

producer = EventHubProducerClient.from_connection_string(
conn_str=connection_str, eventhub_name=eventhub_name)
async with producer:
# Create a batch.
event_data_batch = await producer.create_batch()
# Add events to the batch.
event_data_batch.add(EventData(get_data()))
# Send the batch of events to the event hub.
await producer.send_batch(event_data_batch)
print('Successfully sent to azure event hubs.')

asyncio.run(run())

Above script will send each 20 seconds the upcoming data to the stream analytics. After 20 seconds, go to stream analytics job where you should see an output similar to this:

The data is successfully ingested in stream analytics which is then written to blob storage as a batch of 20 records.

You can now go to taxistreamdata container that we created above to write the files and observe the outputs.

We have successfully ingested in real time 40 fake records and wrote them in blob storage. You can now stop the job in stream analytics not to incur unneccessary cost.

This is the end of part 1. See you in Part 2!

--

--