Nutanix Data Pipelines and AI for Developers

Bao Phan
nutanix-iot
Published in
7 min readMay 7, 2019

by Sai Nanduri, Engineer, Xi IoT R&D

IoT edge computing is all about real-time analysis and understanding of how users are interacting with applications. In big data IoT scenarios, anything you can do to improve the transform stage in the extract, transform, load (ETL) process is going to be helpful because your edge data sources can number in the hundreds or even thousands. Each edge pushing new messages and data payloads to your service expects a high throughput and low latency.

The article “ Nutanix Xi IoT Application Containers for Developers “ discusses how Xi IoT lets you use containerized applications to configure and deploy custom services and runtime environments for your edge computing solution.

In this post, I’ll take it a step further by abstracting away the container components entirely, and walking you through a demonstration of our native data pipelines for a real-world AI implementation. Data pipelines make hooking your application functions up to data sources extremely easy so you can focus on developing transformation logic, rather than building runtimes.

Data Pipelines on Xi IoT

Data pipelines on the Xi IoT Platform support a complete end-to-end ETL solution. A data pipeline is a collection of the input streams, transformation functions and output endpoints. A pipeline enables you to connect to your data sources, regardless of their message data type, physical location, message sending velocity and so on.

To see configured data pipelines in the Xi IoT management console, click Apps and Data > Data Pipelines. The List tab displays a listing of current pipelines. The Visualization tab provides a card view that shows the connections between data sources, pipelines and external cloud environments. The Alerts tab displays any current connectivity alerts related to configured pipelines.

When you create a new data pipeline you are provided with a visual editor that you can use to attach the input, transformation and output elements to each other and create a pipeline.

Data pipelines work in a sequential manner. They capture the input from your sources and pass the payload along to transformation functions. The transformation functions only have to focus on transformation logic or analysis, and then can pass their output on to additional transformation functions, data pipelines, storage or other services.

Data sources such as sensors and other devices are configured in the Infrastructure part of the Xi IoT service and are then exposed as data sources and data streams for a pipeline. For a data pipeline, the data source or stream is selected in the Input area of the editor.

All of the data source specific configuration for pipeline ingest is completed when setting up the data source itself, rather than the pipeline. This means each data source is configured only once and can be used by multiple pipelines. As an example, you may configure:

  • Type of data source, such as a sensor or gateway.
  • Protocol to be used to capture the messages.
  • Authentication modes-if any.
  • Source’s IP Address.

A key feature of configuring data sources and edges in Xi IoT is defining attributes and categories that help you query and select them when creating data pipelines. Under Infrastructure > Categories you can create custom categories and assign attribute values for the data source. It can be anything meaningful to your environment: a description of the sensor type or use, a data protocol, a location or anything that enables easy configuration of your solution. It’s these attributes that are used as the input criteria to a pipeline, rather than each individual sensor. These means they are dynamic and it becomes very easy to scale.

Developing Transformation Functions

As a developer, your primary work involves writing the functions that process the incoming raw data inside the pipeline. Functions are similar to serverless computing functions you may already be familiar with, but can be much more powerful. In Xi IoT, you can write (or upload) transformation functions directly to the service on the Apps and Data > Functions page, creating a library of reusable functions. Xi IoT supports multiple languages in which you can write your functions, currently including Go, Node (JavaScript) and Python.

We not only provide core runtimes for these languages, but optimized runtimes with the most widely used platforms and libraries, too. For example, we have a runtime configured for TensorFlow, where you only need to write the code that makes use of TensorFlow, rather than managing the installation of all required packages.

We plan to have more runtimes and languages supported in future as well. At the time of this authoring, we have support for TensorFlow, but you can utilize external AI services-RESTful APIs, or packages for Python or Go-to perform these actions, too (more on that in a future post).

We provide you with an online editor to write your scripts, but if you have a local function written-that conforms to our standards and requirements for functions-then you can simply select the file for upload.

While developing functions, just keep in mind the signature used by our platform to call your function. Your functions need to have a main function that accepts two parameters, a context, and the message that will be passed by the platform. In Python, for example, it looks like this:

def main(ctx,msg):

The context ( ctx) parameter is used to return the output back to the pipeline once you have processed the message ( msg). Here is an example for you to easily understand the relationship between these parameters. In the following, we count the number of objects in the image that have been detected by AI.

import json
import logging
def main(ctx,msg):
input_data = json.loads(msg)
detections = input_data[‘detections’]
result = {}
object_counts = {}
for i in range(len(detections)):
detection = detections[i]
label = detection[‘label’]
if label in object_counts:
object_counts[label] = object_counts[label] + 1
else:
object_counts[label] = 1
result[‘image’] = input_data[‘data’]
result[‘metadata’] = object_counts
output = json.dumps(result)
return ctx.send(output)

We capture the data payload from the msg parameter and process it within the body of the function. This is where data pipelines help you in developing the functions, as you only need to check the parameters passed, not the source or raw data type. Data Pipelines take care of forwarding the payloads to their respective transformation functions.

Once we are done with the processing of the message, we forward the message over to the next function or middleware using the send function in the context parameter. This enables the pipeline to pass around the payload from one transformation to another.

Apart from the main function signature itself, you can go ahead and add as many helper functions as you wish. Not only can you add sub functions within a function, but you can also chain a function to another function. As you can see in our sample project, we have utilized multiple functions in a pipeline to convert the image from raw content to JPEG, process the image to find objects, then draw a bounding box around the detected region.

Of course, we could have done that in a single-go, but that would increase the overall complexity of the function and its runtime for processing of messages. By using multiple functions, we can have the platform orchestrate them and take care of scalability for us. In addition, we can write more individual specialized functions that can be reused as needed on other pipelines.

Output Data on the Cloud

After our functions transform the data, we can output the data to an external service where it can be used for other purposes-reporting, downloading, storing and so on. Our platform supports data output to:

When connecting a cloud instance, we currently support direct connection to several Amazon Web Services and Google Cloud Platform services using standard APIs.

In the sample project, our output is delivered to a service on the edge itself.

You can configure output services under Infrastructure > Cloud Profiles in Xi IoT. Once you have configured your Cloud Profile, you can utilize the Output pane to submit the content over to your cloud storage.

If using AWS, we support S3, Kinesis, and SQS to ingest the data in your cloud storage. It depends on your choice, as well as the business use case to select an edge for data streaming or to utilize a cloud storage to ingest the processed data.

Conclusion

As you can see from this walkthrough, a strength of the Xi IoT platform is the ability to abstract away some of the plumbing details of your solution and instead focus on connecting the important elements of your data pipeline. Data sources and output locations or services can be configured once, as infrastructure, then connected as simple inputs and outputs as you build your data transformation pipeline. The transformations themselves are also configured as a library of pluggable, reusable functions. Once you’ve set up the building blocks of an Xi IoT solution, building or upgrading data transformation pipelines is as simple as hooking up the pieces.

In addition to the management interface demonstrated here, Xi IoT also provides a RESTful API enabling programmatic access to input data streams, pipelines, functions, and output cloud profiles.

All of these features give you the ability to easily configure, deploy, and manage IoT-based real-time analysis applications at scale.

About the Author

Sai Nanduri, Engineer, Xi IoT R&D

Sai has 6 years of experience working in various components of Linux Kernel, storage drivers and most recently in Xi IoT’s planet scale stack based on Kubernetes. Sai has previously worked for Solaris at Oracle and PernixData and holds a Master’s Degree in Computer Engineering from Carnegie Mellon University.

Originally published at https://developer.nutanix.com on May 7, 2019.

--

--