Structured streaming with Azure Databricks from IotHub to Cosmos DB
In this article I’m going to explain how to built a data ingestion architecture using Azure Databricks enabling us to stream data through Spark Structured Streaming, from IotHub to Comos DB.
Audience:
Database Developer, Analyst
Tags:
Azure, Big Data, Cosmos DB, IotHub, Eventhub, Spark, Spark Structured Streaming
To built this architecture you will need an Azure account. You can create a free account over here.
For the puporse of the article, we will simulate a device through the Microsoft Azure device SDK for Node.js. Our example will simulate a simple telemetry device, with the following structure:
{ "id": "ironmaiden", "temperature": 20 + (Math.random() * 15) }
IotHub configuration
First, we have to configure our main entrance to the Azure portal. Provision Iot Hub service and choose our configuration.
Once initialized, you have to create an Iot device for our the one we want to push to.
We are just creating one device for the purpose of our example, but for a industrial project, you can go here for create multiple devices, automatically.
Databricks configuration
Our active cluster is launching with this configuration:
- Databricks Runtime 4.X
- Spark 2.3.1
- Scala 2.11
After launching Azure Databricks, and start our cluster, we have to import our different connectors:
- azure-eventhubs-spark_2.11–2.3.4
- azure-cosmosdb-spark_2.3.0_2.11–1.2.2
Azure Event Hubs Connector for Apache Spark
All complementary informations can be found in the connector’s documentation.
Azure Cosmos DB Connector for Apache Spark
The Use of Azure Cosmos DB Spark Connector page is currently not up-to-date, so instead of directly get the library from “maven coordinate”, you have to download the uber.jar from Maven and import it directly.
All complementary informations can be found in the connector’s documentation.
All our Databricks environment is ready to work, we have to create our cosmos DB collection, before streaming into it.
Create our Cosmos DB collection
In order to push to Cosmos DB, we have to create our cosmos db collection.
Once our Cosmos DB instance is launched, we can use Cosmos DB explorer, to manage our new database.
Go to https://cosmos.azure.com and fill it with your connection string, which can be found in your the “Keys” pannel, in your Azure Cosmos DB ressource page.
Once connected, we will create a new collection, named “telemetry”, under a new database named “devices”.
Our new collection is ready to receive our devices’ stream.
Databricks notebook
Before go further, we have to get the connections needed.
IotHub connection
- Go to the Azure Portal and find your IoT Hub instance
- Click on Endpoints under Messaging. Then click on Events.
- Find your EventHub-compatible name and EventHub-compatible endpoint.
Cosmos DB connection
As shown is the last part:
- Go to the Azure Portal and find your Cosmos DB SQL instance
- Click on Keys.
- Find your CosmosDB connection string.
Notebook
First, we have to initialize our stream, using the eventhub connector (using with iothub).
import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import org.apache.spark.sql.functions.{ explode, split }
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
// Build connection string with the above information
val connectionString = ConnectionStringBuilder("YOUR.EVENTHUB.COMPATIBLE.ENDPOINT")
.setEventHubName("YOUR.EVENTHUB.COMPATIBLE.NAME")
.build
val eventHubsConf = EventHubsConf(connectionString)
//follow by the different options usable
val eventhubs = spark.readStream
.format("eventhubs")
.options(eventHubsConf.toMap)
.option("eventhubs.partition.count", "4")
.load()
Our stream is now loading and have the following structure:
eventhubs.printSchema()
root
|-- body: binary (nullable = true)
|-- offset: long (nullable = true)
|-- seqNumber: long (nullable = true)
|-- enqueuedTime: long (nullable = true)
|-- publisher: string (nullable = true)
|-- partitionKey: string (nullable = true)
|-- properties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
Don’t hesite to take a look on the connector’s documentation to used all the option available.
Note: We choose IotHub for the entrance connector, but choosing eventhub instead is possible. You just have to change the previous code with this one:
val connectionString = "YOUR.EVENTHUB.COMPATIBLE.ENDPOINT" val eventHubsConf = EventHubsConf(connectionString)
We will now modify our structure to get the desired columns:
val streamdata = eventhubs.select(
from_unixtime(col("enqueuedTime").cast(LongType)).alias("enqueuedTime")
,get_json_object(col("body").cast(StringType), "$.temperature").alias("temperature")
, get_json_object(col("body").cast(StringType), "$.id").alias("id")
)
You can check your stream in that way:
streamdata.createOrReplaceTempView("dataStreamsView") %sql SELECT * FROM dataStreamsView
The last step is to push our stream, using spark.cosmos connector, to our desired collection:
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSinkProvider
import com.microsoft.azure.cosmosdb.spark.config.Config
val ConfigMap = Map(
"Endpoint" -> "YOUR.COSMOSDB.ENDPOINT",
"Masterkey" -> "YOUR.COSMOSDB.MASTERKEY",
"Database" -> "YOUR.COSMOSDB.DATABSE",
"Collection" -> "YOUR.COSMOSDB.COLLECTION",
"Upsert" -> "true"
)
streamdata.select("id","temperature","enqueuedTime")
.writeStream
.format(classOf[CosmosDBSinkProvider].getName)
.outputMode("append")
.options(ConfigMap)
.option("checkpointLocation", "/tmp/streamingTelemetry")
.start
We filled the option Upsert to true, to overwrite all object by his id.
Everything is updated in Cosmos DB
All devices are now push to our database. In our example, we can see that the temperature is updating.
Links
To go deeper, don’t hesitate to visit this link:
Originally published at https://www.linkedin.com.