Integrating Event-Driven Systems using WSO2 Stream Processor and Siddhi

Enterprise systems communicate with each other using APIs, events, and data streams. APIs allow systems to communicate in a synchronous manner where they call other services to retrieve data or to perform tasks. At the same time, systems also asynchronously publish events or stream data to other systems, as a mean of informing other systems without expecting any response from the receiver. Notification, surveillance, and monitoring systems are some that fall into this category. These systems are built using the enterprise architecture, which is famously known as the event-driven architecture (EDA), where all the systems communicate with each other by passing messages in an asynchronous manner.

This post will be an insight into how we can efficiently integrate events and data streams to build an effective enterprise system based on event-driven architecture.

Components of Event-Driven Architecture (EDA)

Although the systems in the event-driven architecture can directly send messages to each other, in most of the cases, message passing is done, through a centralized message queue such as Kafka, or a JMS broker like ActiveMQ, IBM MQ, etc. These message queues become very handy as they provide guaranteed message delivery by allowing the consumer to be offline when the publisher publishes the message and lets you scale the system by publishing the message to one or more consumers.

To integrate systems using event-driven architecture only adding a message queue to your solution will not solve the problem. This is because the systems that consume and publish the message from the message queues should also be written in an efficient way to get the complete benefit of the overall solution. This is where the stream processors become very useful since they eliminate the need to write complex codes, to publish & consume messages from event queues, process, transform and integrate them with various other systems.

Stream Processors

With lightweight and feature-rich stream processing systems you can:

  • Collect events from various streaming sources, log files, and data stores using techniques like Change Data Capture (CDC).
  • Transform events to & from JSON, XML, text and other formats.
  • Enrich data from databases & services.
  • Perform filtering, aggregations and other complex event processing operations like pattern matching, and anomaly detection.
  • Do real-time prediction using machine learning.
  • Send events as notifications to external systems or load the data to various data stores.

Unfortunately, all stream processing systems are not lightweight or have the capability to performable all the above-mentioned operations by default. Therefore, to demonstrate the necessary capabilities we chose WSO2 Stream Processor.

WSO2 Stream Processor.

WSO2 Stream Processor is a cloud-native, lightweight, highly scalable system. It’s built using the high-performance Siddhi streaming engine providing rich features for building stream processing logic. It also provides great developer experience with its graphical and Streaming SQL-based query editor allowing developers to build, test, debug, and simulate their use cases in the editor.

Sample Event-Driven Data Integration Scenario

To demonstrate how we can integrate event-driven systems, let’s take a notification management system as a sample use case. Here, suppose we have to send notifications to the buyers when the items are shipped from the store, acknowledging that their items are shipped, and keeping them informed on when to expect the delivery. Here, the shipment information will be consumed from a message queue/broker, the buyer details are enriched by querying a database and the expected delivery date is added by calling an external service, before sending the notification as an email.

Sample integration scenario.

For this example, we will be using Kafka as the message broker. Avro as the message format used by Kafka, MySQL database to store buyer information, a mock Siddhi HTTP service to retrieve the expected delivery date, JSON as the message format to communicate with the HTTP service, and finally Gmail for email notifications.


This can be easily implemented and visualized through the WSO2 Stream Processor Studio as shown below.

Design view of the WSO2 Stream Processor Editor.

The 46 lines of Siddhi Streaming SQL query for building the sample notification scenario is given below:

Siddhi Streaming SQL of the sample notification scenario.
Code view of the WSO2 Stream Processor Editor.

As shown in lines 4~9, Siddhi application consumes messages having Avro messaging format from a Kafka topic called ‘shipment-info’ and passes that to a stream called ‘ShipmentInfoStream’. As of lines 42~46, this ‘ShipmentInfoStream’ joins with an RDBMS backed ‘BuyerInfoTable’ (defined in lines 38~40) to enrich the event stream by retrieving username, address, and email fields from the database. The concatenated result is then fed into ‘ShipmentAndBuyerInfoStream’ which converts the events to JSON messages and calls an HTTP service running on URL http://localhost:8080/delivery-info/ through the http-request sink given in lines 11~15. As this http-request sink and http-response source defined in lines 18~23 are correlated by the same, ‘delivery-info’, the JSON response coming from the HTTP service call is retrieved by the http-response source, converted into an event and pushed to ‘DeliveryInfoResponseStream’ along with the other event properties present while calling the HTTP service. Since log sink and email sink are registered to the ‘DeliveryInfoResponseStream’ they immediately get the message from the stream. Here the log sink logs the message to the console while the email sink defined in line 25~33 sends the constructed email payload to the buyer’s email address.

Email notification

In a nutshell, integrating event-driven systems and processing streaming data has now become seamless with graphical editing and a powerful streaming SQL language like Siddhi. To implement the above scenario download the latest stable version of WSO2 Stream Processor and follow the step by step guide given here. You can get more details about the WSO2 Stream Processor by visiting its official website. For further assistance refer to the documentation and the Siddhi Query Guide.

Happy streaming integration!