Azure Synapse Spark with Azure Event Hubs
Process Streaming or event driven data using Event hub into Azure Synapse Analytics Workspace
Pre requisite
Steps
- Create a new spark pool in Azure Synapse workspace
- GO to Azure Event hub create a new event hub called synapseincoming
- Set the partition to 1 as this is for testing
- Go to Shared access policy and create a key to write and copy the connection string
- Go to Azure Keyvault and store the key
- Go to Eventhub name space and copy the connections string
- Copy the event hub name
- The above information is used for data generator
- Now lets write the code
- Go to Azure Synapse Analytics workspace
- Go to manage and credentials
- Add the new eventhub synapseincoming connection string to credential
- We are getting the keys from keyvault stored above
- Now lets create the code to read the events/messages from event hub and write to serverless sql table
- destination is serverless sql table
- Get the connection string securely from credentials
- Create a new notebook and select pyspark as language
Code
keyVaultName = "keyvaultname";
secretName = "synapseeventhub";secret = mssparkutils.credentials.getSecret(keyVaultName, secretName)connectionString = secret
ehConf = {
'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
}def write2table(df2, epoch_id):
df2.write.mode("append").saveAsTable("default.eventhubdata")df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()df1 = df.withColumn("body", df["body"].cast("string"))df1.writeStream \
.outputMode("update") \
.trigger(processingTime='5 seconds') \
.option("checkpointLocation","abfss://eventhubdata@accsynapsestorage.dfs.core.windows.net/evetcheckpoint/") \
.foreachBatch(write2table) \
.start() \
.awaitTermination()
- Execute each cell
- Once the write stream is ran, now we are ready to send data
Event hub Data generator
Azure Synapse Serverless SQL
- Go to Serverless SQL
- Create a new query
- Let’s do a count
SELECT count(*) FROM [default].[dbo].[eventhubdata]Select top 300 * from dbo.eventhubdata order by enqueuedTime DESC;
Originally published at https://github.com.