Publishing events to the WSO2 DAS and perform the real-time analytics using the WSO2 Siddhi

Sajith Abeywardhana
9 min readApr 5, 2018

--

(Connecting AWS IoT via the WSO2 ESB MQTT inbound endpoint, publishing events to the WSO2 DAS and do the analytics using the WSO2 Siddhi — Part 2)

Introduction

Now a days a typical IoT related application would be like, the devices are publish their status to the some kind of IoT platform like AWS IoT and then by using a real-time analytical platform those device statuses will be analyzed according to the predefined rule set and any anomalies were found, the notification will be published to the other system or to the users via the various kind of sources like email, SMS and push events.

This is the second part of the article on the particular topic. If you haven’t gone through the first part of the article I strongly suggest you have a look on the first part in here.

In this article I will be discussing how to configure the WSO2 ESB event sink, how to publish the wso2event to WSO2 DAS, wso2event receiver in the WSO2 DAS, event capturing with event streams and event persisting using the event store and finally how to write a siddhi query to perform the real time analytics.

Implementations in the WSO2 ESB

Event Sink

Event sinks contain information about transport endpoints provided by other systems. Events can be published to these endpoints. Endpoint transport and authentication URLs and credentials are captured in event sinks.

Configuring the event sink

Navigate to the event sink creation page using the path Home > Configure > Event Sinks > Event Sink Editor. In the “Add Event Sink” page you need to configure the event sink properties such as event sink name, username, password and receiver URL in the DAS. (I started the DAS with port offset 10, so I’m using the port as 7621 this demo.)

1. Event sink configuration.

For more details about the event sink please refer the WSO2 documentation in here.

Sequence

Implementing a sequence to perform event sinking

This sequence is responsible for extracting the device event sensor values from the JSON payload and calling the event sink so that DAS will able to perform the analytics. In this sequence we are going to use the publishEvent mediator as per the below code block. Event sink name, stream name, stream version and payload should be specified when implementing the publishEvent mediator.

Note: We will be creating an event stream in the DAS named as com.demo.device.data.event.stream to capture events which are coming from the ESB.

<?xml version="1.0" encoding="UTF-8"?>
<sequence name="DASEventSinkSequence" trace="disable"
xmlns="http://ws.apache.org/ns/synapse">
<property expression="json-eval($.deviceId)" name="deviceId"
scope="default" type="STRING" xmlns:ns="http://org.apache.synapse/xsd" />
<property expression="json-eval($.eventTime)" name="eventTime"
scope="default" type="LONG" xmlns:ns="http://org.apache.synapse/xsd" />
<property expression="json-eval($.ambientTemperature)" name="ambientTemperature"
scope="default" type="FLOAT" xmlns:ns="http://org.apache.synapse/xsd" />
<property expression="json-eval($.lightValue)" name="lightValue"
scope="default" type="FLOAT" xmlns:ns="http://org.apache.synapse/xsd" />
<property expression="json-eval($.soundValue)" name="soundValue"
scope="default" type="FLOAT" xmlns:ns="http://org.apache.synapse/xsd" />
<property expression="json-eval($.motion)" name="motion"
scope="default" type="INTEGER" xmlns:ns="http://org.apache.synapse/xsd" />
<publishEvent>
<eventSink>DASEventSink</eventSink>
<streamName>com.demo.device.data.event.stream</streamName>
<streamVersion>1.0.0</streamVersion>
<attributes>
<meta />
<correlation />
<payload>
<attribute defaultValue=""
expression="get-property('deviceId')" name="deviceId" type="STRING"
xmlns:ns="http://org.apache.synapse/xsd" />
<attribute defaultValue=""
expression="get-property('eventTime')" name="eventTime" type="LONG"
xmlns:ns="http://org.apache.synapse/xsd" />
<attribute defaultValue=""
expression="get-property('ambientTemperature')" name="ambientTemperature" type="FLOAT"
xmlns:ns="http://org.apache.synapse/xsd" />
<attribute defaultValue=""
expression="get-property('lightValue')" name="lightValue" type="FLOAT"
xmlns:ns="http://org.apache.synapse/xsd" />
<attribute defaultValue=""
expression="get-property('soundValue')" name="soundValue" type="FLOAT"
xmlns:ns="http://org.apache.synapse/xsd" />
<attribute defaultValue=""
expression="get-property('motion')" name="motion" type="INTEGER"
xmlns:ns="http://org.apache.synapse/xsd" />
</payload>
<arbitrary />
</attributes>
</publishEvent>
</sequence>

To add the sequence you can navigate via the path Home > Manage > Service Bus > Sequences > Design Sequence in the ESB management console and on the design page click on the “switch to source view”. Now paste the above code block in the designer window and save it.

Configuring the MQTT Inbound Endpoint

Inbound endpoint should be configured to have an above sequence as a execution sequence. We can just change the sequence name as a “DASEventSinkSequence” in the inbound endpoint. (Please note that inbound endpoint configuration I discussed on the first part of this story.)

2. Inbound endpoint configuration.

Now we are done with the ESB configurations and implementations, so now we need to do the configurations and implementations in the WSO2 DAS.

Implementations in the WSO2 DAS

You may need to download the WSO2DAS-3.1.0 if you don’t have it on your local machine.

Start the DAS server by following the guide in here. You may need to do a port offset if you are to run both ESB and DAS servers in the same machine.

When the event sinking happens, DAS will receive the events from the ESB. To capture those events it is must to have wso2event receiver in the DAS and then by using the stream we can map the particular event as the streams and using event store we can persist the event in the event database.

Using siddhi query we will be analyzing the input event in real time for various kind use cases. After analyzing the event, the processed data will be sent to different stream and by using that stream the email will be sent to the end user using the email event publisher.

What we have in the DAS,

  • wso2event event receiver (DemoWso2eventReceiver)
  • Stream to capture input events ( com.demo.device.data.event.stream)
  • Stream to hold processed events ( com.demo.processed.data.event.stream)
  • siddhi query (demo-ambient-temp)
  • email event publisher (DemoEmailPublisher)

Use case:

For each device event we need to monitor the ambient temperature value.

If the ambient temperature ≥ 50 “alert:high-temperature” email alert should be sent.

If the ambient temperature ≥ 30 and ambient temperature < 50 “warning:high-temperature” email alert should be sent.

If the ambient temperature > 12 and ambient temperature ≤ 17 “warning:low-temperature” email alert should be sent.

If the ambient temperature ≤ 12 “alert:low-temperature” email alert should be sent.

Email body should be like: For the device id {{deviceId}}, ambient temperature of {{ambientTemperature}} detected.

Note: To send the emails you should configure the email output adapter as documented in here. Follow these steps as per the documentation to configure the WSO2 DAS server.

Configuring event steam and persisting event data

First, we need to create an event stream. To do so you can navigate via the path Home > Manage > Event > Stream in the DAS management console and then click on the “Add Event Stream” button.

Define the event stream configurations as per the below screenshot to capture input event. Just make sure to give a name as the same name as we configured in the sequence(com.demo.device.data.event.stream).

3. Event stream details.

Under the stream attribute section, we can define the payload data attributes. Attributes name and the data type must be same as what we configured in the sequence.

4. Stream attributes.

Now we can persist the event stream in the build in H2 database by clicking on the “Next[Persist]” button. Select all the attributes and then click the “Save Event Stream” button as per the below screenshot.

5. Persist event stream.

We need another stream to capture processed data as below screenshots. Just create a new stream and use the persist option. As per our use case we need to have deviceId, ambientTemperature and statusMessage attributes in this stream.

6. Processed data stream definition.
7. Processed data event persistence option.

Now we have implemented both the event streams so we can go ahead to implement an event receiver.

Configuring the wso2event receiver

Navigate via the path Home > Manage > Event > Receivers in the DAS management console and then click on the “Add Event Receiver” button. State the name, select the wso2event as “Input Event Adapter Type” and select the event stream as “com.demo.device.data.event.stream:1.0.0” then save the event receiver.

8. Event receiver configuration.

Now we should be able to see the events which are published by the AWS IoT test service via the ESB on the DAS data explore window. To make sure to AWS IoT, ESB and DAS integrations are working as expected we can test it by just publishing the sample JSON payload using the AWS IoT test service. Once you published the event the DAS data explore should show the event as below screenshots.

9. Sample JSON event simulating.
10. WSO2 DAS data explorer.

Implementing siddhi query

Navigate to the path Home > Manage > Streaming Analytics > Execution Plans in the DAS management console and click on the “Add Execution Plan” button. Copy and paste the below code block and save the query.

@Plan:name('demo-ambient-temp')@Import('com.demo.device.data.event.stream:1.0.0')
define stream inputStream (deviceId string, eventTime long, ambientTemperature float, lightValue float, soundValue float, motion int);
@Export('com.demo.processed.data.event.stream:1.0.0')
define stream outputStream (deviceId string, ambientTemperature float, statusMessage string);
@info(name = 'at-out-query')
from inputStream
select deviceId, ambientTemperature,
ifThenElse(ambientTemperature >= 50, 'alert:high-temperature',
ifThenElse(ambientTemperature >= 30 and ambientTemperature < 50, 'warning:high-temperature',
ifThenElse(ambientTemperature > 12 and ambientTemperature <= 20,'warning:low-temperature',
ifThenElse(ambientTemperature <= 12, 'alert:low-temperature', 'undefined')
)
)
) as statusMessage
insert into outputStream;

Configuring email event publisher

Note: Please note that you need to configure the DAS to send the email as documented in here.

Navigate to the path Home > Manage > Event > Publishers in the DAS management console and click on the “Add Event Publisher” button. Named the publisher, and select the Event Source as “com.demo.processed.data.event.stream” (processed event stream). Select the “Output Event Adapter Type” as email. Configure the email address and email properties as per the below screenshots.

11. Email event publisher configurations.
12. Email event publisher configurations.

Now we have configured and implemented all the necessary things in the WSO2 DAS and also in the AWS IoT and WSO2 ESB. So that all the integration should work as depicted in the below diagram. Once we publish the sample message to the topic via the AWS IoT test service, according to the ambient temperature value you should receive the emails.

13. High Level Architecture Diagram.

Testing

For alert:high-temperature

14. Sample payload.
15. Email notification.

For warning:high-temperature

16. Sample payload.
17. Email notification.

For warning:low-temperature

18. Sample payload.
19. Email notification.

For alert:low-temperature

20. Sample payload.
21. Email notification.

Conclusion

In this article I have discussed how to use the WSO2 ESB event sink in order to pass the wso2event to the WSO2 DAS. How to capture the event via the WSO2 DAS event receiver, how to perform the real time analytic using WSO2 siddhi and finally how to configure the email publisher so that the end users would be able to get the notifications according to the analytics results.

Finally we have tested each use case on our solution which consists of the AWS IoT, WSO2 ESB and WSO2 DAS.

The task for you to do: If you publish the ambientTemperature value in between 20 to 30 you will be getting an email with the subject as “undefined”. Your task is to figure out how to stop this email by enhancing our siddhi query :)

Thanks for reading…

--

--

Sajith Abeywardhana

AWS Certified Solutions Architect — Associate | WSO2 APIM Certified | Integration Consultant at Yenlo