Developing IBM Streams applications with the Python API (Version 1.6)

The IBM Data Science Experience (DSX) platform now integrates Streaming Analytics services using version 1.6 of the Python Application API, which enables application development and monitoring entirely in Python. The currently supported Python version is Python 3.5.

Python developers can use the streamsx package to:

  • Create IBM Streams applications in DSX Jupyter notebooks.
  • Create apps that are run in the Streaming Analytics service.
  • Access data streams from views defined in any app that is running on the service.

Furthermore, Python developers can now monitor submitted jobs with the Python REST API. This is particularly interesting for developers who want to retrieve and visualize streaming data in Jupyter notebooks, for example, for debugging or extra logging.

To develop streaming applications with Python 3.5 in DSX Jupyter notebooks, you can use the STREAMING_ANALYTICS_SERVICE context to submit a Python application to the IBM Streaming Analytics service. 
 Sample DSX Jupyter notebooks for Python applications that process streams are available on the community page of DSX:

  • Hello World!: Create a simple Hello World! application to get started and deploy this application to the Streaming Analytics service.
  • Healthcare Demo: Create an application that ingests and analyzes streaming data from a feed, and then visualizes the data in the notebook. You finally submit this application to the Streaming Analytics service.
  • Neural Net Demo: Create a sample data set, create a model for the sample data, use that model in a streaming application, visualize the streaming data, and finally submit the streaming application to the Streaming Analytics service.

Example: The Neural Net notebook

To illustrate the workflow of building a streaming application in DSX, we can walk through the Neural Net demo listed above. The workflow is comprised of three essential steps:

  1. Use the Python API to compose the streaming application.
  2. Submit the application to be run in a Streaming Analytics service.
  3. Retrieve data back into the notebook for visualization.

The purpose of the Neural Net notebook is to demonstrate how a data scientist can train a model on a set of data, and then immediately incorporate that model into a Streaming Application.

Creating a sample data set

First, we create a sample data set comparing the temperature of an engine to the probability that it will fail within the next hour:

xvalues = np.linspace(20,100, 100)
yvalues = np.array([((np.cos((x-50)/100)*100 + np.sin(x/100)*100 + np.random.normal(0, 13, 1)[0])/150.0 for x in xvalues])
yvalues = [y - np.amin(yvalues) for y in yvalues]
create_plot(xvalues, yvalues, title="Engine Temp Vs. Probability of Failure", xlabel = "Probability of Failure", ylabel = "Engine Temp in Degrees Celcius", xlim = (20,100), ylim = (0,1))

For brevity, several imports and function definitions were removed, however the full code is shown in the notebook itself.

Training a model

Given the data set we created, we use the PyBrain library to train a Feed Forward Neural Network (FFN) as a model to predict failure probabilities given a temperature.

# The neural net to be trained
net = buildNetwork(1,100,100,100,1, bias = True, hiddenclass = SigmoidLayer, outclass = LinearLayer)
# Construct a data set of the training data
ds = SupervisedDataSet(1, 1)
for x, y in zip(xvalues, yvalues):
ds.addSample((x,), (y,))
# The training harness. Used to train the model.
trainer = BackpropTrainer(net, ds, learningrate = 0.0001, momentum=0, verbose = False, batchlearning=False)
# Train the model. for i in range(50):
# Display the model in the plot.
fig, ax = create_plot(xvalues, yvalues, title="Engine Temp Vs. Probability of Failure", xlabel = "Probability of Failure", ylabel = "Engine Temp in Degrees Celcius", xlim = (20,100), ylim = (0,1))
ax.plot(xvalues, [net.activate([x]) for x in xvalues], linewidth = 2, color = 'blue', label = 'NN output')

The fully trained model, net, is a simple Python object, which, when provided with a temperature value, produces a probability reading. Above, we can see the output of the model (in blue) plotted against the data set.

Using the model in a Streaming application

It isn’t enough to simply have the net model in the DSX notebook, we might want to send it into production to predict failures in real time. To insert the model into a real-time streaming application with the streamsx.topology Python API, you must use classes that create and manipulate streaming data.

The following two classes represent such creation and manipulation of data, and are necessary components of the streaming application. The periodicSource class submits a random number between 20 and 100 every 0.1 seconds, and is used to simulate sample temperature readings.

The NeuralNetModel class simply takes a data item, feeds it as input to the neural net, and returns the output onto a stream.

# The source of our data. Every 0.1 seconds, a number between 20-100 will be inserted into the stream
# INPUT: None
# OUTPUT: A float with range [20,100]
class PeriodicSource(object):
def __call__(self):
while True:
yield random.uniform(20,100)
# A class which runs the neural net on data it is passed.
# INPUT: the input to the neural net, in this case a floating point number
# OUTPUT: an array containing the output of the neural net, as well as the input to the neural net.
class NeuralNetModel(object):
def __init__(self, net): = net
def __call__(self, num):
return [num,[num])[0]]

Building the Streaming application

The Application uses the periodicSource class to generate a stream temperature readings, which are then processed by an instance of the NeuralNetModel class to create a stream of probability readings. Since we are interested in viewing these probability readings, we allow the stream to be viewable with the view() method.

# Define operator
periodic_src = periodicSource()
nnm = NeuralNetModel(net)
# Build Graph
top = topology.Topology("myTop")
stream = top.source(periodic_src)
# Run the temp readings through the neural net and mark the
# output as viewable.
view = stream.transform(nnm).view()

Now that we have defined the application, we submit it to be run on a Streaming Analytics service on Bluemix using a call to submit.

vs={'streaming-analytics': [{'name': service_name, 'credentials': json.loads (credentials)}]}
cfg = {context.ConfigParams.VCAP_SERVICES : vs, context.ConfigParams.SERVICE_NAME : service_name}
job = context.submit(context.ContextTypes.STREAMING_ANALYTICS_SERVICE, top, config=cfg)

You’ll notice that the credentials and service_name values are used to define a cfg object used for authentication. Both of these can be obtained from the Streaming Analytics service management page on Bluemix.

Viewing Streaming data

Once the call to submit has completed successfully, the application is running. We can view its output in DSX using the view object that was created earlier.

fig, ax = create_plot([], [], title="Engine Temp Vs. Probability of Failure", xlabel = "Probability of Failure", ylabel = "Engine Temp in Degrees Celcius", xlim = (20,100), ylim = (0,1))
xdata = []
ydata = []
queue = view.start_data_fetch()
  for line in iter(queue.get, None):

Each dot in the above graph represents a live temperature reading used to predict likelihood of failure. Every time a new temperature reading is sent through the model, its output is reflected in the graph.

Closing the loop on DSX

Data visualization is becoming an increasingly important part of data science. After creating a model, a data scientist needs immediate visual feedback on its effectiveness both in and out of a production environment. Whether with static or real-time data, DSX is a tool that helps developers achieve this.

Originally published at on May 4, 2017 by William Marshall.