Structured streaming with Azure Databricks and Event Hub
Use case:
In this article, we are going to see how to stream real time Taxi data from Azure Event hubs using structured steaming in Azure Databricks and transform, load it to Delta lake for downstream consumers.
Solution Architecture:
Below are the services to be used for this POC.
- Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
- Azure Event Hubs is a cloud native data streaming service that can stream millions of events per second, with low latency, from any source to any destination. Azure Event Hubs provides an Apache Kafka endpoint on an event hub, which enables users to connect to the event hub using the Kafka protocol. You can often use an event hub’s Kafka endpoint from your applications without any code changes
Using Event Hubs to ingest and store streaming data, businesses can harness the power of streaming data to gain valuable insights, drive real-time analytics, and respond to events as they happen, enhancing overall efficiency and customer experience.
Apache Kafka and Azure Event Hubs conceptual mapping
Conceptually, Kafka and Event Hubs are very similar. They’re both partitioned logs built for streaming data, whereby the client controls which part of the retained log it wants to read. The following table maps concepts between Kafka and Event Hubs.
Azure Event Hubs is a multi-protocol event streaming engine that natively supports Apache Kafka. so, we can bring Kafka workloads to Azure Event Hubs without doing any code change. You don’t need to set up, configure, and manage your own Kafka clusters or use a Kafka-as-a-Service offering that’s not native to Azure.
- Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc.
Prerequisites:
Azure subscription, Databricks workspace, Azure Event hub namespace, knowledge of Pyspark and Databricks.
Implementation:
- Create and configure the Azure Event hub namespace in azure portal and get the details of namespace name, EventHub name and connection string.
2. Create and configure the Azure Databricks workspace and cluster to run the streaming workload.
3. Mount the Azure Data Lake storage containers into DBFS in order to store the streaming data and process it subsequently. Unity Catalog is out of scope in this POC.
# Databricks notebook source
# MAGIC %md
# MAGIC ### Mount Azure Data Lake using Service Principal
# MAGIC #### Steps to follow
# MAGIC 1. Get client_id, tenant_id and client_secret.
# MAGIC 2. Set Spark Config with App/ Client Id, Directory/ Tenant Id & Secret
# MAGIC 3. Call file system utility mount to mount the storage
# COMMAND - - - - -
client_id = Add your client id from app registration
tenant_id = Add your tenant id from app registration
client_secret = Add your client secret
# COMMAND - - - - -
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": client_id,
"fs.azure.account.oauth2.client.secret": client_secret,
"fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"}
dbutils.fs.mount (
source = 'abfss://bronze@streamingpocdl.dfs.core.windows.net',
mount_point = '/mnt/streamingpocdl/bronze',
extra_configs = configs
)
dbutils.fs.mount (
source = 'abfss://silver@streamingpocdl.dfs.core.windows.net',
mount_point = '/mnt/streamingpocdl/silver',
extra_configs = configs
)
dbutils.fs.mount (
source = 'abfss://gold@streamingpocdl.dfs.core.windows.net',
mount_point = '/mnt/streamingpocdl/gold',
extra_configs = configs
)
dbutils.fs.mount (
source = 'abfss://checkpoint@streamingpocdl.dfs.core.windows.net',
mount_point = '/mnt/streamingpocdl/checkpoint',
extra_configs = configs
)
It is recommended to use Azure Key Vault or Databricks secret scope to securely store your secrets. In this POC, we have not used any vault service.
4. Create respective databases for bronze, silver and gold layers.
CREATE DATABASE IF NOT EXISTS streaming_bronze
LOCATION '/mnt/streamingpocdl/bronze/';
CREATE DATABASE IF NOT EXISTS streaming_silver
LOCATION '/mnt/streamingpocdl/silver/';
CREATE DATABASE IF NOT EXISTS streaming_gold
LOCATION '/mnt/streamingpocdl/gold/'
5.Read data streams from Azure Event Hub using Kafka protocol.
from pyspark.sql.functions import col, cast
from pyspark.sql.types import *
# Event Hub details
EH_CONN_STR= "my-event-hubs-connection-string"
EH_NAMESPACE = "streaming-poc" # my-event-hubs-namespace"
EH_KAFKA_TOPIC = "Taxi" # Taxi_stream"
EH_BOOTSTRAP_SERVERS = f"{EH_NAMESPACE}.servicebus.windows.net:9093"
EH_SASL_WRITE = f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";"
# standard configuration options
topic_name = EH_KAFKA_TOPIC
eh_namespace_name = EH_NAMESPACE
eh_sasl = EH_SASL_WRITE
bootstrap_servers = EH_BOOTSTRAP_SERVERS
kafka_options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.request.timeout.ms": "60000",
"kafka.session.timeout.ms": "30000",
"startingOffsets": "latest",
"kafka.sasl.jaas.config": eh_sasl,
"subscribe": topic_name,
}
6. Convert the input stream value from binary to text and ingest it in the upcoming steps.
df = spark\
.readStream\
.format("kafka")\
.options(**kafka_options)\
.load()\
.withColumn("value_text", col("value").cast("string"))
7.Create an external Delta table in the bronze layer to ingest the streaming data. For the bronze layer, the thought process is to create external tables to ingest both batch and streaming data.
%sql
CREATE TABLE IF NOT EXISTS streaming_bronze.Taxi
(
Key BINARY,
Value BINARY,
Topic STRING,
Partition INT,
Offset LONG,
Timestamp TIMESTAMP,
TimestampType INT,
Value_text STRING
) using DELTA
Location '/mnt/streamingpocdl/bronze/Taxi'
8. Write the data stream into a Bronze External Delta table using append mode. The thought process is to append the data streams continuously into the Delta lake, as it is arriving in the event hub. We won’t be seeing any UPSERT operations whereas its all insert operations for every execution.
df.writeStream\
.outputMode("append")\
.option("checkpointLocation", "/mnt/streamingpocdl/checkpoint")\
.format("delta")\
.trigger(once=True)\
# "trigger(processingTime = '5 minute')\ - use this to process the streams in a near real time"
.start("/mnt/streamingpocdl/bronze/Taxi")
9. Define a schema manually and use it while consuming the streaming data from the bronze layer.
from pyspark.sql.functions import col, current_timestamp, from_json, cast, lit, from_unixtime, to_timestamp, sum, avg, when, count, round, max
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, DoubleType, LongType, TimestampType
taxi_schema = StructType(fields =
[
StructField("vendorID", StringType(), False),
StructField("tpepPickupDateTime", LongType(), True),
StructField("tpepDropoffDateTime", LongType(), True),
StructField("passengerCount", IntegerType(), True),
StructField("tripDistance", DoubleType(), True),
StructField("puLocationId", StringType(), True),
StructField("doLocationId", StringType(), True),
StructField("startLon", IntegerType(), True),
StructField("startLat", IntegerType(), True),
StructField("endLon", IntegerType(), True),
StructField("endLat", IntegerType(), True),
StructField("rateCodeId", IntegerType(), True),
StructField("storeAndFwdFlag", StringType(), True),
StructField("paymentType", StringType(), True),
StructField("fareAmount", DoubleType(), True),
StructField("extra", IntegerType(), True),
StructField("mtaTax", DoubleType(), True),
StructField("improvementSurcharge", StringType(), True),
StructField("tipAmount", DoubleType(), True),
StructField("tollsAmount", IntegerType(), True),
StructField("totalAmount", DoubleType(), True)
]
)
10.Perform the following data transformations in the bronze table
Drop key, value, Partition, offset, Timestamp columns b. Split the consolidated data stream into individual columns such as Vendor_Id, Pickup_Datetime, Drop_Datetime, Passenger_Count. Trip_Distance, Fair_Amount, Total_Amount(Tax,Tip and Tolls Included)
taxi_df= spark.sql("SELECT * FROM streaming_bronze.Taxi")\
.drop("key","value","partition","offset","Timestamp","Timestamptype")\
.withColumn("JSON", from_json(col("Value_text"), taxi_schema))\
.withColumn("vendor_Id",col("JSON.vendorID"))\
.withColumn("pickup_Datetime", to_timestamp(from_unixtime(col("JSON.tpepPickupDateTime")/1000)))\
.withColumn("dropoff_Datetime", to_timestamp(from_unixtime(col("JSON.tpepDropoffDateTime")/1000)))\
.withColumn("passenger_Count", col("JSON.passengerCount"))\
.withColumn("trip_Distance", col("JSON.tripDistance"))\
.withColumn("fair_Amount", col("JSON.fareAmount"))\
.withColumn("total_Amount_Incl_Tax_Tip_Tolls", col("JSON.totalAmount"))\
.withColumn("ingestion_Date", current_timestamp())\
.drop("JSON","value_text")
11.
a. Create a managed silver table with a managed location to populate the transformed data.
%sql
CREATE TABLE IF NOT EXISTS streaming_silver.Taxi
(
topic STRING,
vendor_Id STRING,
pickup_Datetime TIMESTAMP,
dropoff_Datetime TIMESTAMP,
passenger_Count INT,
trip_Distance DOUBLE,
fair_Amount DOUBLE,
total_Amount_Incl_Tax_Tip_Tolls Double,
ingestion_Date TIMESTAMP
) using DELTA
b. Append the transformed data into silver table for the downstream users (ML)
taxi_df.write.mode("append")\
.option("mergeSchema", "True")\
.saveAsTable("streaming_silver.Taxi")
12. Aggregate the total_fair_amount, trip_distance and passenger count for each vendor.
taxi_silver_df = spark.sql("SELECT * FROM streaming_silver.Taxi")\
.groupBy("vendor_id")\
.agg(round(sum("total_Amount_Incl_Tax_Tip_Tolls"),2).alias("Total_amount_per_vendor"),\
round(sum("trip_Distance"),2).alias("Total_trip_distance"),\
sum("passenger_Count").cast('Int').alias("Total_passengers"),\
max("ingestion_Date").alias("ingestion_Date"))
13.
a. Create a managed gold table with a managed location to populate the aggregated data
%sql
CREATE TABLE IF NOT EXISTS streaming_gold.Taxi
(
vendor_Id STRING,
Total_amount_per_vendor Double,
Total_trip_distance DOUBLE,
Total_passengers INT,
ingestion_Date TIMESTAMP
) using DELTA
b. Save the aggregated data into a gold table for downstream analytics users.
taxi_silver_df.write.mode("append")\
.option("mergeSchema", "True")\
.saveAsTable("streaming_gold.Taxi")
14. Schedule the Notebook to run in a continuous manner using Databricks workflow.
15.Use the following preview feature in Event hub to generate the streaming data in real time.
Conclusion:
In this article, we have explored how to stream the real time taxi data from Event hubs to Azure Databricks and transform and load it to the Delta lake in near real time.
If you like this article, please follow me on linkedin at https://www.linkedin.com/in/kaviprakash-selvaraj/