TRANSIT: Flexible pipeline for IoT data with Bluemix and OpenWhisk

Alex Glikson
Apache OpenWhisk
Published in
10 min readJan 24, 2017

In continuation to the previous post where we introduce the principles of using OpenWhisk for serverless transformation of IoT data in motion, here we provide the ‘nitty-gritty details’ of setting up such a pipeline end-to-end — spanning the IoT devices (simulated, in this example) and IBM Bluemix.

Meet TRANSIT (stands for ‘serverless TRANSformation of IoT data’) — an example of such a pipeline, available on github. All you need to set it up is a Bluemix account (which you can get for free), and something that can run Docker to simulate few devices (and a bit of your time).

In a nutshell, TRANSIT demonstrates collection, processing, storage and analytics of events generated by a fleet of (simulated) taxi vehicles. Each taxi is equipped with a device that reports vehicle’s location and velocity. Sensor readings are generated every second, and, for efficiency, are aggregated into small batches and sent to the cloud every minute, in a compressed (non-JSON) format. TRANSIT uses Docker and Node-RED to simulate these devices. On the cloud side, messages are ingested by the IoT Platform, forwarded to Message Hub (by the ‘historical data storage extension’ bridge), and then transformed by OpenWhisk and persisted in Object Storage (by a dedicated bridge). Then Spark and Data Science Experience services are used to apply analytics on the data.

The following diagrams outlines the overall architecture of the solution.

Example solution architecture: TRANSIT

Setting up an ‘instance’ of TRANSIT involves configuration of the following components:

  1. Watson IoT Platform
  2. Devices (simulated with Node-RED)
  3. Message Hub and the bridge from IoT Platform
  4. OpenWhisk
  5. Object Storage and the bridge from Message Hub
  6. Spark and the Data Science Experience

Let’s briefly review each of them.

Watson IoT Platform

First, let’s set up the Watson IoT Platform, which is the core of the above architecture.

  • Provision an instance of the Watson IoT Platform (e.g., using the Bluemix UI). You will be assigned an orgId(6-character alphanumerical string), which determines the URL of the Dashboard, as well as of the MQTT broker dedicated to your org.
IoTP service instance (orgID: 54l5hz)
  • Register a device type (a group of devices sharing similar behavior and event schema — e.g., taxi) and one or more devices in the IoT Platform (e.g., taxi001, taxi002, taxi003). Take a note of the id and security token of each device — they are needed in order to configure the MQTT client on the devices, so that they can securely connect to the Watson IoT Platform.
    Important: the scripts (for Node-RED configuration) provided in this project assume that the tokens follow the pattern token-${deviceId} — e.g., token-taxi001, token-taxi002, token-taxi003. Obviously, this is done for demonstration purposes only — you should use more secure tokens otherwise (e.g., generated by the IoT Platform).
Summary of a device registered with the IoT Platform

Devices

Next step is to set up one or more simulated devices using Node-RED. In order to do this, we are using a general-purpose Linux machine with Docker.

  • Make a copy of the ‘transit’ project on the Linux machine
$ git clone https://github.com/glikson/transit.git
$ cd transit
  • Update the IOTP_ORG_ID parameter in iotp.env (after creating it by cloning iotp.env.template), e.g.:
IOTP_ORG_ID=5415hz
  • Run a taxi simulator for each of the devices you defined in the IoT Platform in the previous step, e.g.:
$ ./run-taxi.sh taxi001 50
$ ./run-taxi.sh taxi002 60
$ ./run-taxi.sh taxi003 30

Notes:

  • Under the covers, the script creates a dedicated copy of the template data directory (within node_red directory), customizes the Node-RED flow in it (injecting credentials and other paramters of the device), and passes it as a data directory to the standard Node-RED docker image from docker hub.
  • You can use ./rm-taxi.sh taxi003 to stop/remove simulators
  • The second argument of run-taxi.sh is the maximal velocity that the simulated device will report. Give different values to different taxi’s to make the analytics results more interesting.
  • The script creates a docker container on the local machine. You can use regular docker commands to list, browse and troubleshoot them (e.g., sudo docker ps to see running containers and their port mapping).
  • Using the port mapping details, you can connect to the Web interface of Node-RED to see the Node-RED flow simulating the taxi device.
Node-RED flow simulating a taxi device

In a nutshell, the flow is as follows:

  1. start with an inject node that triggers the flow every second
  2. generate a JSON comprising random location and velocity change
  3. concatenate events over 60 seconds
  4. augment the events with device details
  5. compress the data [*]
  6. publish to the specified MQTT topic (of the broker associated with the specified IoTP org)

[*] due to a limitation of the Kafka feed in OpenWhisk, the compressed data needs to be base64-encoded before sending it to the IoT Platform. Consequently, the OpenWhisk action needs to decode messages back.

Message Hub and the bridge from IoT Platform

Now we need to configure the bridge between the Watson IoT Platform and Message Hub:

  • Provision an instance of the Message Hub service (e.g., named ‘kafka’), create two Kafka topicsiotp and transformed.
Message Hub setup
  • In the Watson IoT Platform, configure ‘Extensions’ → ‘Historical data storage’ to publish historical data to the above Message Hub service instance. For convenience, you can configure custom forwarding rules that would make the Watson IoT Platform publish events of different device types and/or event types to different Kafka topics (in this example we use just a single iotp topic). Confirm granting access to the selected service in the displayed pop-up window (tip: make sure pop-ups are enabled!).
Configuration of the bridge from IoT Platform to Message Hub

OpenWhisk

Next step is to configure OpenWhisk to perform the transformation, subscribing to the iotp topic and publishing to transformed topic. This can be achieved by copying messagehub.env.template into messagehub.env and updating it with proper credentials (from VCAP_SERVICES or the “Credentials” tab in Message Hub UI), and running the owdeploy.sh script provided in this project. Notice that the script relies on the wsk CLI, which can be installed and configured following instructions online.

$ ./owdeploy.shSetting up Message Hub package: ok: created package kafka
Done creating kafka package
ok: created action kafka/mhpost
Done creating mhpost action
ok: created binding kafka-transformed
Done creating package binding
Setting up trigger: ok: invoked
[...]
ok: created trigger iotp-trigger
Done
Setting up sequence: ok: created action iotp2flat
Done creating iotp2flat
ok: created action iotp2transformed
Done creating sequence
Setting up rule: ok: created rule iotp-to-transformed
Done
All Set!

In a nutshell, the script performs the following steps:

  • Configure kafka-iotp package binding for the iotp topic, and kafka-transformed package binding for the transformed topic, specifying proper credentials in parameters (both associated with the above Message Hub instance called kafka). The first package binding will be used to define the trigger associated with the feed of incoming messages in the iotp topic, while the second will be used to post transformed messages to the transformed topic.
  • Configure the kafka-iotp-trigger trigger associated with the Message Hub feed (under the kafka-iotp package binding) and the iotp topic
  • Create the iotp2flat action transforming messages from the raw format forwarded by the IoT Platform (aggregated, compressed, non-JSON) to format compatible with the Object Storage bridge in Message Hub (flat JSON, with timestamp field used for date-based partitioning). Here is an implementation of the transformation action:
OpenWhisk action performing the transformation in TRANSIT
  • Create the iotp2transformed sequence comprising the above transformation action iotp2flat, followed by the action publishing [*] to the transformed Kafka topic (under the respective package binding created above). Notice that since mhpost requires a topic parameter, and given that the ‘transformed’ topic is not one of the outputs of iotp2flat, the name of the topic must be included in the action itself — or the corresponding package binding (like in our case).

[*] Until the ‘publish’ action becomes part of the official Kafka package, a simplified version is available as part of this project.

Object Storage and the bridge from Message Hub

Furthermore, we need to configure Object Storage as well as the bridge between Message Hub and Object Storage:

  • Provision an instance of the Object Storage service (e.g., using Bluemix UI). E.g., call it “swift”, using the free plan. Create a container (folder) that will hold the archived data (e.g., taxi). Tip: make sure the container name does not contain “_”, as it may cause troubles later on.
  • Populate objectstorage.env configuration file by making a copy of objectstorage.env.template and updating the details of the created service instance.
  • Create an Object Storage bridge in Message Hub, configured to archive messages from the transformed Kafka topic into the taxi Object Storage container, specifying the desired time/size thresholds for batching (in our example 1MB/1 hour), as well as the name of the timestamp field for date-based partitioning (timestamp in our case). You can do it by running the obstorbr.sh script provided in this project:
./obstorbr.sh

The following figure shows an example of the generated files in object storage (with 3 taxi devices, the first batch is expected to appear roughly 45 minutes after the devices start sending data):

Objects in object storage generated by the bridge, partitioned by date

Spark and the Data Science Experience

The last (optional) step is to configure the Data Science Experience service, to perform analytics on the collected historical data.

  • Create an instance of Spark Service, linked to the above object storage instance and container
  • Create a project associated with the above Spark Service and Object Storage instances (tip: you currently can not choose project name identical to one of the container names in the selected object storage service).
  • Create a Notebook that reads data from the above container using Spark APIs. Note that due to special naming convention, Spark can seamlessly access all the data, without the need to target individual files created by the bridge. Moreover, the date partitioning alows to reduce the access just to the dates specified in the SQL query. You can find an example notebook as part of this project.

Finally, the resulting visualization will look like this:

Sample visualization of velocity of vehicles

Furthermore, given the expressiveness of Spark in general and Spark SQL in particular, it is very easy to develop many different analyses, generating business insights from the extensive historical data collected from IoT devices over time.

End-to-End Flow

To summarize, now that we understand the role of individual services, the configuration needed to wire them together, and the role of OpenWhisk in the pipeline, let’s review the flow of how the end-to-end pipeline actually works (more or less).

  1. Node-RED (simulated device): the ‘inject node’ triggers the flow at given intervals (e.g., every second). A sample event JSON is generated (by a custom ‘function node’). It is then aggregated using the ‘join’ node, compressed (using a custom function node), and sent to the Watson IoT Platform (using MQTT with properly configured orgId, device type, event type, device id and security token).
  2. Watson IoT Platform (with historical data storage extension to Message Hub): receives MQTT events from devices (after proper auth validation), augments them (e.g., adding timestamp and context metadata) and publishes in small batches to a Kafka topic (Message Hub).
  3. OpenWhisk (with custom transformation action): the Message Hub feed provider in OpenWhisk subscribes to messages in the iotp Kafka topic, aggregates small batches of messages and sends them as a payload to the OpenWhisk trigger, which in turn triggers a sequence of actions performing the message transformation and publishing the results to the transformed Kafka topic.
  4. Message Hub (bridge to Object Storage): aggregates messages published to the transformed topic according to a specified policy (e.g., files of up to 1 hour or 1MB of data), and uploads the files to Object Storage according to the specified layout (e.g., applying date-based partitioning).
  5. Spark Service (part of Data Science Experience platform): retrieves historical data from Object Storage, applying SQL or other interfaces — according to the needs of the particular batch analytics job.

Now you are invited to try this out, but more importantly — to think of your own ways of using OpenWhisk for serverless IoT-driven data pipelines, and to share your insights!

All the source code used in the post is available on github under Apache v2 license.

--

--