Understanding WSO2 Stream Processor — Part 2

Chanaka Fernando
WSO2 Best Practices
5 min readJun 1, 2018

--

In the first part of this tutorial, I have explained about the concepts around WSO2 Stream Processor and how they are correlated with each other and which components users can use to implement their streaming analytics requirements. It laid out the platform for this tutorial (part 2) where we get our hands dirty with WSO2 SP.

The first thing you have to do is download the WSO2 SP runtime from WSO2 website.

https://wso2.com/analytics/install

Once you download the product distribution, you can extract that into a directory and run the product from the bin directory. You need to set the “JAVA_HOME” environment variable to your java installation (1.8 or higher) before starting the product. In this part of the tutorial, we are going to implement some streaming analytics use cases with WSO2 SP. Hence we need to start the SP in “editor” mode using the following command (for linux).

$ sh bin/editor.sh

This command will start the editor profile of the WSO2 SP and prints the URL of the editor in the console similar to below.

http://localhost:9390/editor

Now you can click on the above link and it will open up the editor in a browser window.

This is your playgorund where you can implement your streaming analytics use cases and test, debug and deploy into the runtime. All these activities can be done without moving away from the editor. Editor comes with so many samples which are self-explanatory and easy to execute. Let’s open up an existing sample to get things going.

Let’s start with the sample “ReceiveAndCount” by clicking on the sample. This will open the source file of this siddhi application. If you ignore the comments section, the code looks like below. You can save this file with the name “ReceiveAndCount.siddhi”.

Let’s go through this code and understand what we are doing here. First we define the name of the siddhi application and a description about the use case.

@App:name(“ReceiveAndCount”)

@App:description(‘Receive events via HTTP transport and view the output on the console’)

Then we define the source of the events with the following code segment. Here we are specifying the protocol as “http” and the data type as “json”. Also we specify the URL of the exposed service and the format of data which is coming (schema).

@Source(type = ‘http’,

receiver.url=’http://localhost:8006/productionStream',

basic.auth.enabled=’false’,

@map(type=’json’))

define stream SweetProductionStream (name string, amount double);

After that we define the sink where we specify action on the output and the format of the output stream. Here we are pushing the result to “log” file.

@sink(type=’log’)

define stream TotalCountStream (totalCount long);

Finally we have the processing logic where we give the name “query1” through the @info annotation for identification of this query. Here we are taking events from input stream which we have defined in the source section and then using the “count()” function to count the number of events and push the result into output stream which we have defined within the sink section.

— Count the incoming events

@info(name=’query1')

from SweetProductionStream

select count() as totalCount

insert into TotalCountStream;

With this understanding, let’s run this siddhi application from the editor by saving this file and clicking on the “Run” button or select the relevant menu item. If it is deployed and started successfully, you will see the below log message in the editor console.

ReceiveAndCount.siddhi — Started Successfully!

Now let’s send some events to this siddhi application. You can either use a tool like PostMan/SOAPUI or the built in event simulation feature of the editor. Here I’m using the event simulator which is coming with the editor. You can click on the “event simulator” icon which is on the left side panel (second icon) and it will expand that panel and open the event simulation section.

Here you need to select the following values.

  • Siddhi App Name = ReceiveAndCount
  • Stream Name — SweetProductionStream
  • name(STRING) — Flour (sample value)
  • amount(DOUBLE) — 23 (sample value)

Once you select those values, you can click on “Send” button and it will send an event with following JSON format

{ name: “Flour”, amount: 23}

If you observe the console which you start the editor at the beginning, you will see the following line getting printed.

[2018–06–01 10:57:01,776] INFO {org.wso2.siddhi.core.stream.output.sink.LogSink} — ReceiveAndCount : TotalCountStream : Event{timestamp=1527830821771, data=[1], isExpired=false}

If you click on send event 2 more times, you will see that “data” element of the above log line is aggregating to number of events you have sent.

[2018–06–01 10:58:51,500] INFO {org.wso2.siddhi.core.stream.output.sink.LogSink} — ReceiveAndCount : TotalCountStream : Event{timestamp=1527830931494, data=[2], isExpired=false}

[2018–06–01 10:58:52,846] INFO {org.wso2.siddhi.core.stream.output.sink.LogSink} — ReceiveAndCount : TotalCountStream : Event{timestamp=1527830932845, data=[3], isExpired=false}

Congratulations! You have run your first siddhi application with WSO2 SP which counts the number of events received to a given http service.

Let’s do something meaningful with the next sample. Let’s say we want to implement a fraud detection use case where if someone is spending more than 100K within a 10 minute time interval from one credit card, that needs to be considered as a red flag and send an email to the user. We can implement this use case with the following siddhi application.

The above application sends an email when there is a fraudulent event occurs. The execution flow and the application logic can be explained using the below figure.

Figure: Fraud detection siddhi application

Here we create a partition of the event stream using a given credit card number. Within that partition, we check for a 10 minute time window and within that period, we do an aggregation and check the value ot be greater than 100K. If all those conditions are satisfied, we choose the last arrived event and send those details through an email to the relevant user.

You can save the above siddhi application as “AlertsAndThresholds.siddhi” file within the editor and then send a series of events from event simulation section and observe that when there are transactions which sums up to 100K for a given credit card number, it will send an email to the configured email address. The email will look similar to below.

Alert for large value transaction: cardNo:444444

creditCardNo:”444444",

country:”lk”,

item:”test”,

lastTransaction:50000.0

That’s it. You just wrote a siddhi application to detect fraudulent activities. You can extend this application based on your conditions.

--

--

Chanaka Fernando
WSO2 Best Practices

Writes about Microservices, APIs, and Integration. Author of “Designing Microservices Platforms with NATS” and "Solution Architecture Patterns for Enterprise"