Structured streaming with Azure Databricks from IotHub to Cosmos DB

Alexandre Bergere
datalex
Published in
5 min readAug 30, 2019

In this article I’m going to explain how to built a data ingestion architecture using Azure Databricks enabling us to stream data through Spark Structured Streaming, from IotHub to Comos DB.

Audience:

Database Developer, Analyst

Tags:

Azure, Big Data, Cosmos DB, IotHub, Eventhub, Spark, Spark Structured Streaming

To built this architecture you will need an Azure account. You can create a free account over here.

For the puporse of the article, we will simulate a device through the Microsoft Azure device SDK for Node.js. Our example will simulate a simple telemetry device, with the following structure:

{ "id": "ironmaiden", "temperature": 20 + (Math.random() * 15) }

IotHub configuration

First, we have to configure our main entrance to the Azure portal. Provision Iot Hub service and choose our configuration.

Once initialized, you have to create an Iot device for our the one we want to push to.

We are just creating one device for the purpose of our example, but for a industrial project, you can go here for create multiple devices, automatically.

Databricks configuration

Our active cluster is launching with this configuration:

  • Databricks Runtime 4.X
  • Spark 2.3.1
  • Scala 2.11

After launching Azure Databricks, and start our cluster, we have to import our different connectors:

  • azure-eventhubs-spark_2.11–2.3.4
  • azure-cosmosdb-spark_2.3.0_2.11–1.2.2

Azure Event Hubs Connector for Apache Spark

All complementary informations can be found in the connector’s documentation.

Azure Cosmos DB Connector for Apache Spark

The Use of Azure Cosmos DB Spark Connector page is currently not up-to-date, so instead of directly get the library from “maven coordinate”, you have to download the uber.jar from Maven and import it directly.

All complementary informations can be found in the connector’s documentation.

All our Databricks environment is ready to work, we have to create our cosmos DB collection, before streaming into it.

Create our Cosmos DB collection

In order to push to Cosmos DB, we have to create our cosmos db collection.

Once our Cosmos DB instance is launched, we can use Cosmos DB explorer, to manage our new database.

Go to https://cosmos.azure.com and fill it with your connection string, which can be found in your the “Keys” pannel, in your Azure Cosmos DB ressource page.

Once connected, we will create a new collection, named “telemetry”, under a new database named “devices”.

Our new collection is ready to receive our devices’ stream.

Databricks notebook

Before go further, we have to get the connections needed.

IotHub connection

  1. Go to the Azure Portal and find your IoT Hub instance
  2. Click on Endpoints under Messaging. Then click on Events.
  3. Find your EventHub-compatible name and EventHub-compatible endpoint.

Cosmos DB connection

As shown is the last part:

  1. Go to the Azure Portal and find your Cosmos DB SQL instance
  2. Click on Keys.
  3. Find your CosmosDB connection string.

Notebook

First, we have to initialize our stream, using the eventhub connector (using with iothub).

import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import org.apache.spark.sql.functions.{ explode, split }
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

// Build connection string with the above information
val connectionString = ConnectionStringBuilder("YOUR.EVENTHUB.COMPATIBLE.ENDPOINT")
.setEventHubName("YOUR.EVENTHUB.COMPATIBLE.NAME")
.build

val eventHubsConf = EventHubsConf(connectionString)
//follow by the different options usable

val eventhubs = spark.readStream
.format("eventhubs")
.options(eventHubsConf.toMap)
.option("eventhubs.partition.count", "4")
.load()

Our stream is now loading and have the following structure:

eventhubs.printSchema()

root

|-- body: binary (nullable = true)

|-- offset: long (nullable = true)

|-- seqNumber: long (nullable = true)

|-- enqueuedTime: long (nullable = true)

|-- publisher: string (nullable = true)

|-- partitionKey: string (nullable = true)

|-- properties: map (nullable = true)

| |-- key: string

| |-- value: string (valueContainsNull = true)

Don’t hesite to take a look on the connector’s documentation to used all the option available.

Note: We choose IotHub for the entrance connector, but choosing eventhub instead is possible. You just have to change the previous code with this one:

val connectionString = "YOUR.EVENTHUB.COMPATIBLE.ENDPOINT" val eventHubsConf = EventHubsConf(connectionString)

We will now modify our structure to get the desired columns:

val streamdata = eventhubs.select(
from_unixtime(col("enqueuedTime").cast(LongType)).alias("enqueuedTime")
,get_json_object(col("body").cast(StringType), "$.temperature").alias("temperature")
, get_json_object(col("body").cast(StringType), "$.id").alias("id")
)

You can check your stream in that way:

streamdata.createOrReplaceTempView("dataStreamsView") %sql SELECT * FROM dataStreamsView

The last step is to push our stream, using spark.cosmos connector, to our desired collection:

import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSinkProvider
import com.microsoft.azure.cosmosdb.spark.config.Config


val ConfigMap = Map(
"Endpoint" -> "YOUR.COSMOSDB.ENDPOINT",
"Masterkey" -> "YOUR.COSMOSDB.MASTERKEY",
"Database" -> "YOUR.COSMOSDB.DATABSE",
"Collection" -> "YOUR.COSMOSDB.COLLECTION",
"Upsert" -> "true"
)

streamdata.select("id","temperature","enqueuedTime")
.writeStream
.format(classOf[CosmosDBSinkProvider].getName)
.outputMode("append")
.options(ConfigMap)
.option("checkpointLocation", "/tmp/streamingTelemetry")
.start

We filled the option Upsert to true, to overwrite all object by his id.

Everything is updated in Cosmos DB

All devices are now push to our database. In our example, we can see that the temperature is updating.

Links

To go deeper, don’t hesitate to visit this link:

Originally published at https://www.linkedin.com.

--

--

Alexandre Bergere
datalex
Editor for

Data Architect & Solution Architect independent ☁️ Delta & openLineage lover.