Serverless Data Flow Sequencing with Watson Data API and IBM Cloud Functions
The complete code for this tutorial and other Watson Data API Data Flow samples can be found here.
In a previous tutorial, you saw how data flows could be run one after another by polling using a simple shell script. This tutorial demonstrates how to deploy the same functionality as a serverless action. IBM Cloud Functions enable you to deploy a simple, repeatable function and run it periodically by using the alarm
package.
Again, a data flow can read data from a large variety of sources, process that data in a runtime engine using pre-defined operations or custom code, and then write it to one or more targets.
For example, if you have two data flows (data_flow_1
and data_flow_2
) and you always want to run data_flow_2
after data_flow_1
run completes, you can write an IBM Cloud Function to check the status of the latest data_flow_1
run. If the status is completed, then the function should start a run of data_flow_2
.
Creating a Node.js function
First, clone this repository and run npm install
to install the dependencies. Once this completes, be sure to include your project ID and the IDs of the two data flows you want to monitor and run in index.js
, for example:
// Parameters
const projectId = 'c2254fed-404d-4905-9b8c-5102f195cc0d';
const dataFlowId1 = '37bd30f0-dd3f-4052-988d-69c8fb2bf40a'; // Data Flow Ref to check status of latest run
const dataFlowId2 = 'd31116c7-854f-404c-9e7a-de274a8bb2d6'; // Data Flow Ref to trigger run for
The project ID can be retrieved from the browser URI between
/projects/
and/assets
in Watson Studio or Watson Knowledge Catalog when viewing the project:
Similarly, the data flow ID can be retrieved from the browser URI between
/refinery/
and/details
in Watson Studio or Watson Knowledge Catalog when viewing the data flow:
The main
function is the one that will be called each time the action is invoked. The function creates a new authentication token, retrieves the latest run for dataFlowId1
, and then either creates a new dataFlowId2
run or simply returns, depending on the state
and completed_date
.
The function is configured to run every 20 seconds so we will only start a new run for dataFlowId2
if the latest run for dataFlowId1
completed in the last 20 seconds. This is to avoid starting dataFlowId2
every time we retrieve the latest finished run for dataFlowId1
.
To deploy this node.js function with IBM Cloud using the IBM Cloud Functions CLI, package it as a .zip
archive, including the node_modules
, index.js
and package.json
files.
Getting started with IBM Cloud Functions CLI
First, follow the instructions here to install the IBM Cloud Functions CLI.
In a terminal window, upload the .zip file containing the node.js action as a Cloud Function by using the following command: bx wsk action create packageAction --kind nodejs:default action.zip
.
You can test the action you have just created manually by using the following command: bx wsk action invoke --blocking --result packageAction
.
Trigger: every-20-seconds
You can include a trigger that uses the built-in alarm package feed to fire events every 20 seconds. This is specified through cron syntax in the cron parameter.
[Optional] The maxTriggers parameter ensures that it only fires for five minutes (15 times), rather than indefinitely.
Create the trigger with the following command: bx wsk trigger create every-20-seconds --feed /whisk.system/alarms/alarm --param cron "*/20 * * * * *" --param maxTriggers 15
.
Rule: invoke-periodically
This rule shows how the every-20-seconds trigger can be declaratively mapped to the packageAction.
Create the rule with the following command: bx wsk rule create invoke-periodically every-20-seconds packageAction
Next, open a terminal window to start polling the activation log. The console.log
statements in the action will be logged here. You can stream them with the following command: bx wsk activation poll
Monitoring logs
Before running your data flow, you should see entries similar to the following ones:
The first entry shows the IAM Authorization token being obtained, retrieving the data flow run, and then returning because the entity.summary.completed_date
is earlier than the lookback date.
At this point, run dataFlowId1
from either Watson Studio or Watson Knowledge Catalog. You can do this using the Refine action for the data flow in the project assets page.
This entry is very similar but in this case, the entity.state
is running so the function returns again.
In this entry, you can see that the run for the data flow with an ID of 37bd30f0-dd3f-4052-988d-69c8fb2bf40a
finished so the data flow with an ID of d31116c7-854f-404c-9e7a-de274a8bb2d6
starts.
To Summarize…
In summary, we have created a serverless action that polls the status of a data flow’s most recent run and, on completion, runs another data flow. This demonstrates the ability to chain or sequence the running of data flows using Watson Data APIs in the IBM Cloud.
Damian Cummins is a Cloud Application Developer with the Data Refinery and IBM Watson teams at IBM.