Azure Synapse Spark with Azure Event Hubs

Balamurugan Balakreshnan
Analytics Vidhya
Published in
2 min readMay 14, 2021

Process Streaming or event driven data using Event hub into Azure Synapse Analytics Workspace

Pre requisite

Steps

  • Create a new spark pool in Azure Synapse workspace
  • GO to Azure Event hub create a new event hub called synapseincoming
  • Set the partition to 1 as this is for testing
  • Go to Shared access policy and create a key to write and copy the connection string
  • Go to Azure Keyvault and store the key
  • Go to Eventhub name space and copy the connections string
  • Copy the event hub name
  • The above information is used for data generator
  • Now lets write the code
  • Go to Azure Synapse Analytics workspace
  • Go to manage and credentials
  • Add the new eventhub synapseincoming connection string to credential
  • We are getting the keys from keyvault stored above
  • Now lets create the code to read the events/messages from event hub and write to serverless sql table
  • destination is serverless sql table
  • Get the connection string securely from credentials
  • Create a new notebook and select pyspark as language

Code

keyVaultName = "keyvaultname"; 
secretName = "synapseeventhub";
secret = mssparkutils.credentials.getSecret(keyVaultName, secretName)connectionString = secret
ehConf = {
'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
}
def write2table(df2, epoch_id):
df2.write.mode("append").saveAsTable("default.eventhubdata")
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
df1 = df.withColumn("body", df["body"].cast("string"))df1.writeStream \
.outputMode("update") \
.trigger(processingTime='5 seconds') \
.option("checkpointLocation","abfss://eventhubdata@accsynapsestorage.dfs.core.windows.net/evetcheckpoint/") \
.foreachBatch(write2table) \
.start() \
.awaitTermination()
  • Execute each cell
  • Once the write stream is ran, now we are ready to send data

Event hub Data generator

Azure Synapse Serverless SQL

  • Go to Serverless SQL
  • Create a new query
  • Let’s do a count
SELECT count(*) FROM [default].[dbo].[eventhubdata]Select top 300 * from dbo.eventhubdata order by enqueuedTime DESC;

Originally published at https://github.com.

--

--