Real-Time Data Streaming With Spark Structured Streaming in Azure, Databricks

Arunkumar Sooridayalan
9 min readFeb 16, 2023

--

Here, we’ll explore the exciting world of real-time data streaming with Spark Structured Streaming in Azure Databricks. With the help of Spark Structured Streaming, we can efficiently process and analyze large volumes of data in real-time, enabling us to derive actionable insights and make informed decisions.

In this blog post, we will be using a free weather API to read live weather information and feed it to Azure Event Hub. We will configure Databricks to read the Event Hub, implement a micro-batch process, and store the data in a Delta table. This data will then be read in Power BI via a direct query, and real-time data will be plotted on a map for consumption.

The high-level flow of the use case is as follows.

  1. Producer code is written in python which reads weather info from https://openweathermap.org/api
  2. Azure event hub consumes the weather json.
  3. Azure Databricks is the analytics platform. Structured Streaming is a scalable and fault-tolerant stream-processing engine built on the Spark SQL engine. It enables us to use streaming computation using the same semantics used for batch processing.
  4. The landing and transformed data is stores in Delta Lake. Azure data lake gen 2 is the underlying storage.
  5. Lastly, Power BI is used to visualize the data via direct query.

Prerequisite

  1. Azure event hub name space and topic is created with data format read set to JSON.
  2. Databricks service is created with premium tier for role based access
  3. Storage account with desired container is created to host the delta lake
  4. Keyvault to store the ADLS gen 2 access keys for databricks mounting. It can also be stored to have the event hub end point with access key and later retreived in the code.

Event Producer code

ChatGpt is used to write the core logic and then customized with azure parameters.

import asyncio
import aiohttp
import json
import logging
from azure.eventhub import EventHubClient, EventData

async def get_weather(city, state, api_key):
url = f"http://api.openweathermap.org/data/2.5/weather?q={city},{state}&appid={api_key}"
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()

async def send_to_event_hub(event_hub_client, weather_data):
event = EventData(json.dumps(weather_data).encode("utf-8"))
await event_hub_client.send(event)

async def main(api_key, event_hub_client):
# List of cities in the USA
cities = [
{"city": "New York", "state": "NY"},
{"city": "Los Angeles", "state": "CA"},
{"city": "Chicago", "state": "IL"},
{"city": "Houston", "state": "TX"},
{"city": "Phoenix", "state": "AZ"},
{"city": "Philadelphia", "state": "PA"},
{"city": "San Antonio", "state": "TX"},
{"city": "San Diego", "state": "CA"},
{"city": "Dallas", "state": "TX"},
{"city": "San Jose", "state": "CA"}
]

tasks = []
for city_data in cities:
task = asyncio.create_task(get_weather(city_data["city"], city_data["state"], api_key))
tasks.append(task)

results = await asyncio.gather(*tasks)
for weather_data in results:
await send_to_event_hub(event_hub_client, weather_data)

# Set up logging
logging.basicConfig(level=logging.INFO)

# Event Hub Configuration
event_hub_namespace = "<Your Azure Event Hub Namespace>"
event_hub_name = "eh_sample_01"
event_hub_key = "<Your Azure Event Hub Key>"
event_hub_endpoint = f"amqps://{event_hub_namespace}.servicebus.windows.net/{event_hub_name}"

api_key = "<Your OpenWeatherMap API Key>"
interval = 5 * 60 # 5 minutes in seconds
event_hub_client = EventHubClient.from_connection_string(event_hub_endpoint, event_hub_key)
while True:
asyncio.run(main(api_key, event_hub_client))
await asyncio.sleep(interval)

We can also use time scheduled azure functions to act as producer which will send weather events to event hub in a schedule.

Databricks

Create databricks cluster with event hub libraries

I have used spark 3.3.1 for my cluster

Cluster creation details

To connect to Azure Event Hubs, we’ll use the com.microsoft.azure:azure-eventhubs-spark_2 library, which implements to Azure Event Hubs Spark connector. This library is implemented as a Maven coordinate. Simply add the most recent Maven coordinate to your Databricks cluster, as shown below:

Add the event hub library to your cluster from the maven repo

com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.17

Mount adlsgen2 to databricks cluster

The next step is to mount the storage to Databricks cluster for easy access. Keyvault is used here to store the key values and they are retrieved in the notebooks for mounting.

storage_account_name = "realtimelanding"
client_id = dbutils.secrets.get(scope="realtime-databricks-scope", key="realtime-databricks-app-client-id")
tenant_id = dbutils.secrets.get(scope="realtime-databricks-scope", key="realtime-databricks-app-tenant-id")
client_secret = dbutils.secrets.get(scope="realtime-databricks-scope", key="realtime-databricks-app-client-secret")

configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": f"{client_id}",
"fs.azure.account.oauth2.client.secret": f"{client_secret}",
"fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"}

def mount_adls(container_name):
dbutils.fs.mount(
source = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/",
mount_point = f"/mnt/{storage_account_name}/{container_name}",
extra_configs = configs)

dbutils.fs.unmount("/mnt/realtimelanding/dbkslanding/")

mount_adls("dbkslanding")

Create delta lake table with schema definition

#importing necessary libraries
from pyspark.sql.types import *
from pyspark.sql.functions import *
#function to create delta table

def createDeltaTable(database_name, table_name, table_schema, table_location):
#***This method creats a delta table with the passed-in name, schema and location***

# we create a simple empty data frame
df = spark.createDataFrame(spark.sparkContext.emptyRDD(), table_schema)
df.printSchema()

# create the underlying storage for our table
df.write.format("delta").mode('overwrite').save(table_location)

# Drop the table and the database if it already exists.
# then re create the database
spark.sql(f"DROP TABLE IF EXISTS {database_name}.{table_name}")
spark.sql(f"DROP DATABASE IF EXISTS {database_name}")
spark.sql(f"CREATE DATABASE {database_name}")

# re create the table
spark.sql(f"CREATE TABLE {database_name}.{table_name} USING DELTA LOCATION '{table_location}'")

# Verify that everything worked by doing a select on the table, we should get an empty dataframe
# back with correct schema
emptyDF = spark.sql(f"SELECT * FROM {database_name}.{table_name}")
display(emptyDF)

# list the details of the table
describe_df = spark.sql(f"DESCRIBE {database_name}.{table_name}")
display(describe_df)

In this example, we first create an empty DataFrame with the passed-in schema. We then write this DataFrame to the specified Delta file. Using an empty DataFrame like this is a nice trick to create a Delta file with a specified schema.

We’re assuming that we create a dedicated Hive database for our solution, so we create the Hive Database and Delta table on top of our Delta file. Notice the Create Table Using Delta Location syntax. This syntax enables the Hive metastore to automatically inherit the schema, partitioning and table properties of the existing data — effectively importing this data into the metastore.

Define input schema and read the data in to streaming dataframe

Sample json for Detroit,MI

{
"coord": {
"lon": -83.0458,
"lat": 42.3314
},
"weather": [{
"id": 800,
"main": "Clear",
"description": "clear sky",
"icon": "01n"
}
],
"base": "stations",
"main": {
"temp": 281.86,
"feels_like": 278.08,
"temp_min": 279.84,
"temp_max": 283.19,
"pressure": 1013,
"humidity": 64
},
"visibility": 10000,
"wind": {
"speed": 8.23,
"deg": 260,
"gust": 14.4
},
"clouds": {
"all": 0
},
"dt": 1676508661,
"sys": {
"type": 2,
"id": 2006979,
"country": "US",
"sunrise": 1676464149,
"sunset": 1676502225
},
"timezone": -18000,
"id": 4990729,
"name": "Detroit",
"cod": 200
}
# for Delta table, we need database name, delta table name and the location where the files will be stored.
#
#

database_name = 'realtime_streaming_db'
delta_table_name = 'weather_landing_dlt_tbl'
delta_location = f'/mnt/realtimelanding/dbkslanding/dbks_realtime_landing/{delta_table_name}.delta'

#define the delta table schema
schema = StructType()\
.add("name", StringType(),False)\
.add("country_cd", StringType(),True)\
.add("id", IntegerType(),True)\
.add("weather_dt", TimestampType(),True)\
.add("timezone", StringType(),False)\
.add("lat", DoubleType(),True)\
.add("lon", DoubleType(),True)\
.add("temp", DoubleType(),True)\
.add("feels_like", DoubleType(),True)\
.add("temp_min", DoubleType(),True)\
.add("temp_max", DoubleType(),True)\
.add("pressure", DoubleType(),True)\
.add("humidity", DoubleType(),True)\
.add("wind_speed", DoubleType(),True)\
.add("wind_degree", DoubleType(),True)
#Create the delta table
createDeltaTable(database_name,delta_table_name,schema,delta_location)

Call the create delta table function and create the empty delta table with the defined schema.

Contruct the azure event hub endpoint and read the input events with reasStream function. The body is always provided as a byte array. In the next step, we’ll use the Spark’s withColumn function to convert all fields to Spark-compatible types. We’ll only be working with the body column going forward, but I’ve included the appropriate conversions for each column below in case you need to utilize the other columns:

# Read Event Hub's stream
conf = {}

conf["eventhubs.connectionString"] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt("Endpoint=sb://<Eventhub name>.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<shared access key>=;EntityPath=ehub_databricks_read_01")

input_stream_df = (
spark
.readStream
.format("eventhubs")
.options(**conf)
.load()
)
from pyspark.sql.types import *
import pyspark.sql.functions as F

#re shape the input stream so we have the correct data types for all our columns.
#convert the message body to a string so we can work it with JSON
messages_df = input_stream_df.withColumn("Offset",F.col("offset").cast(LongType()))\
.withColumn("Time (readable)",F.col("enqueuedTime").cast(TimestampType()))\
.withColumn("Timestamp",F.col("enqueuedTime").cast(LongType()))\
.withColumn("Body",F.col("body").cast(StringType()))\
.select("Offset","Time (readable)","Timestamp","Body")

Use micro batch and perform data transformation and merge the final data to target delta table

Micro-batch processing is the practice of collecting data in small groups (aka “batches”) for the purpose of immediately processing each batch. Micro-batch processing is a variation of traditional batch processing where the processing frequency is much higher and, as a result, smaller “batches” of events are processed.

In Spark Structured Streaming, the .foreachBatch() function can be used. In this case we pass in a reference to a function (referred to as the foreachBatch sink function) which will receive the batch:

from pyspark.sql.types import *
import pyspark.sql.functions as F
from delta.tables import *
import datetime

database_name = 'realtime_streaming_db'
delta_table_name = 'weather_landing_dlt_tbl'
  • A micro batch sink function receives data as a standard (non-streaming) Spark DataFrame. This means that we can use batch DataFrame operations like count, which cannot be used on a streaming DataFrame.
  • You can implement foreachBatch sinks unsupported by Spark Structured Streaming and writing to multiple sinks can be executed successfully.
  • You can alter the batch_id to have a custom format like a timestamp. It will default to a zero-based integer (0, 1, 2, …)


def process_weatherdata_micro_batch(micro_batch_df, micro_batch_id):
micro_batch_df.printSchema()
print(f"Method process_weatherdata_micro_batch, batch id: {micro_batch_id},number of records: {micro_batch_df.count()}")

schema = StructType([
StructField("coord", StructType([
StructField("lon", DoubleType()),
StructField("lat", DoubleType())
])),
StructField("weather", StructType([
StructField("id", IntegerType()),
StructField("main", StringType()),
StructField("description", StringType()),
StructField("icon", StringType())
])),
StructField("base", StringType()),
StructField("main", StructType([
StructField("temp", DoubleType()),
StructField("feels_like", DoubleType()),
StructField("temp_min", DoubleType()),
StructField("temp_max", DoubleType()),
StructField("pressure", IntegerType()),
StructField("humidity", IntegerType())
])),
StructField("visibility", IntegerType()),
StructField("wind", StructType([
StructField("speed", DoubleType()),
StructField("deg", IntegerType())
])),
StructField("clouds", StructType([
StructField("all", IntegerType())
])),
StructField("dt", IntegerType()),
StructField("sys", StructType([
StructField("type", IntegerType()),
StructField("id", IntegerType()),
StructField("country", StringType()),
StructField("sunrise", IntegerType()),
StructField("sunset", IntegerType())
])),
StructField("timezone", IntegerType()),
StructField("id", IntegerType()),
StructField("name", StringType()),
StructField("cod", IntegerType())
])


#read the event body in to a data frame, using the above schema
body_df = micro_batch_df.select(F.from_json(F.col("body"),schema)\
.alias("data"))\
.select("data.*")

#do the final select in to a new dataframe
weather_data_df = body_df \
.withColumn('WEATHER_DT',F.from_unixtime(body_df.dt.cast(StringType()) , "yyyy-MM-dd'T'HH:mmss'Z'")) \
.withColumn('LAT',body_df.coord.lat.cast(DoubleType())) \
.withColumn('LON',body_df.coord.lon.cast(DoubleType())) \
.withColumn('NAME',body_df.name.cast(StringType())) \
.withColumn('COUNTRY_CD',body_df.sys.country.cast(StringType())) \
.withColumn('ID',body_df.sys.id.cast(IntegerType())) \
.withColumn('TIMEZONE',body_df.timezone.cast(IntegerType())) \
.withColumn('TEMP',body_df.main.temp.cast(DoubleType())) \
.withColumn('FEELS_LIKE',body_df.main.feels_like.cast(DoubleType())) \
.withColumn('TEMP_MIN',body_df.main.temp_min.cast(DoubleType())) \
.withColumn('TEMP_MAX',body_df.main.temp_max.cast(DoubleType())) \
.withColumn('PRESSURE',body_df.main.pressure.cast(DoubleType())) \
.withColumn('HUMIDITY',body_df.main.humidity.cast(DoubleType())) \
.withColumn('WIND_SPEED',body_df.wind.speed.cast(DoubleType())) \
.withColumn('WIND_DEGREE',body_df.wind.deg.cast(DoubleType())) \
.select("NAME","COUNTRY_CD","ID","WEATHER_DT","TIMEZONE","LAT","LON","TEMP","FEELS_LIKE","TEMP_MIN","TEMP_MAX", \
"PRESSURE","HUMIDITY","WIND_SPEED","WIND_DEGREE")

weather_data_deduped_df = weather_data_df.dropDuplicates(['NAME'])

print(f"Number of records in weather_data_deduped_df: {weather_data_deduped_df.count()}")

deltaTable = DeltaTable.forName(spark, f"{database_name}.{delta_table_name}")

deltaTable.alias('t') \
.merge(weather_data_deduped_df.alias('s'), "s.NAME = t.NAME") \
.whenNotMatchedInsertAll() \
.whenMatchedUpdateAll() \
.execute()

We’re using the Name column as the primary key for our Delta table, so we can update our Delta table with a simple merge statement above.

messages_df.writeStream.option("checkpointLocation","/mnt/realtimelanding/dbkslanding/dbks_realtime_landing/weather_landing_dlt_tbl.checkpoint")\
.foreachBatch(process_weatherdata_micro_batch).start()

Delta table output

Visualization with Power BI

The Power BI DirectQuery functionality

We’ll be using the DirectQuery functionality of Power BI to connect to the Delta table. Unlike the import functionality, which copies (or imports) the tables and columns into Power BI, DirectQuery doesn’t perform any copying of the data. At both design time and runtime, Power BI will directly query the underlying data source.

Using DirectQuery has several advantages:

  • DirectQuery enables us to build visualizations over very large datasets, where it would otherwise be unfeasible to first import all the data.
  • Underlying data changes require a refresh of data. For some more complex reports, the need to display current data can require large data transfers, making reimporting data impractical. By contrast, DirectQuery reports always use current data.
  • Certain data limitations in Power BI do not apply to DirectQuery.

below is a just one way of representing the data. The intention is not to get meaningful data out from weather :) but to show direct query as a capability. Adapt this to your data needs.

Summary

By following the steps in this tutorial, you’ll be able to use the combination of cloud-based services with Azure Databricks and Spark Structured Streaming to create a robust, fautt-tolerant, near-real-time experience.

--

--

Arunkumar Sooridayalan

insert ChatGtp response for "write a witty one line bio for medium.com profile for data engineer"