Serverless Data Flow Sequencing with Watson Data API and IBM Cloud Functions

Damian Cummins
IBM watsonx Assistant
4 min readApr 9, 2018

The complete code for this tutorial and other Watson Data API Data Flow samples can be found here.

In a previous tutorial, you saw how data flows could be run one after another by polling using a simple shell script. This tutorial demonstrates how to deploy the same functionality as a serverless action. IBM Cloud Functions enable you to deploy a simple, repeatable function and run it periodically by using the alarm package.

Again, a data flow can read data from a large variety of sources, process that data in a runtime engine using pre-defined operations or custom code, and then write it to one or more targets.

For example, if you have two data flows (data_flow_1 and data_flow_2) and you always want to run data_flow_2 after data_flow_1 run completes, you can write an IBM Cloud Function to check the status of the latest data_flow_1 run. If the status is completed, then the function should start a run of data_flow_2.

Creating a Node.js function

First, clone this repository and run npm install to install the dependencies. Once this completes, be sure to include your project ID and the IDs of the two data flows you want to monitor and run in index.js, for example:

// Parameters
const projectId = 'c2254fed-404d-4905-9b8c-5102f195cc0d';
const dataFlowId1 = '37bd30f0-dd3f-4052-988d-69c8fb2bf40a'; // Data Flow Ref to check status of latest run
const dataFlowId2 = 'd31116c7-854f-404c-9e7a-de274a8bb2d6'; // Data Flow Ref to trigger run for

The project ID can be retrieved from the browser URI between /projects/ and /assets in Watson Studio or Watson Knowledge Catalog when viewing the project:

Similarly, the data flow ID can be retrieved from the browser URI between /refinery/ and /details in Watson Studio or Watson Knowledge Catalog when viewing the data flow:

The main function is the one that will be called each time the action is invoked. The function creates a new authentication token, retrieves the latest run for dataFlowId1, and then either creates a new dataFlowId2 run or simply returns, depending on the state and completed_date.

The function is configured to run every 20 seconds so we will only start a new run for dataFlowId2 if the latest run for dataFlowId1 completed in the last 20 seconds. This is to avoid starting dataFlowId2 every time we retrieve the latest finished run for dataFlowId1.

To deploy this node.js function with IBM Cloud using the IBM Cloud Functions CLI, package it as a .zip archive, including the node_modules, index.js and package.json files.

Getting started with IBM Cloud Functions CLI

First, follow the instructions here to install the IBM Cloud Functions CLI.

In a terminal window, upload the .zip file containing the node.js action as a Cloud Function by using the following command: bx wsk action create packageAction --kind nodejs:default action.zip.

You can test the action you have just created manually by using the following command: bx wsk action invoke --blocking --result packageAction.

Trigger: every-20-seconds

You can include a trigger that uses the built-in alarm package feed to fire events every 20 seconds. This is specified through cron syntax in the cron parameter.

[Optional] The maxTriggers parameter ensures that it only fires for five minutes (15 times), rather than indefinitely.

Create the trigger with the following command: bx wsk trigger create every-20-seconds --feed /whisk.system/alarms/alarm --param cron "*/20 * * * * *" --param maxTriggers 15.

Rule: invoke-periodically

This rule shows how the every-20-seconds trigger can be declaratively mapped to the packageAction.

Create the rule with the following command: bx wsk rule create invoke-periodically every-20-seconds packageAction

Next, open a terminal window to start polling the activation log. The console.log statements in the action will be logged here. You can stream them with the following command: bx wsk activation poll

Monitoring logs

Before running your data flow, you should see entries similar to the following ones:

The first entry shows the IAM Authorization token being obtained, retrieving the data flow run, and then returning because the entity.summary.completed_date is earlier than the lookback date.

At this point, run dataFlowId1 from either Watson Studio or Watson Knowledge Catalog. You can do this using the Refine action for the data flow in the project assets page.

This entry is very similar but in this case, the entity.state is running so the function returns again.

In this entry, you can see that the run for the data flow with an ID of 37bd30f0-dd3f-4052-988d-69c8fb2bf40a finished so the data flow with an ID of d31116c7-854f-404c-9e7a-de274a8bb2d6 starts.

To Summarize…

In summary, we have created a serverless action that polls the status of a data flow’s most recent run and, on completion, runs another data flow. This demonstrates the ability to chain or sequence the running of data flows using Watson Data APIs in the IBM Cloud.

Damian Cummins is a Cloud Application Developer with the Data Refinery and IBM Watson teams at IBM.

--

--