7 Streaming Integration Patterns with WSO2

Chanaka Fernando
WSO2 Best Practices
10 min readDec 16, 2020

--

Let’s build real-time event processing use cases with example code

Introduction

Streaming integration is becoming increasingly popular within the enterprise landscape where vast amounts of data is being generated daily. Organizations with better real-time event processing and artificial intelliegence capabilities are taking giant leaps towards becoming the leaders of respective industries. Even though AI (Artifical Intelligence) and ML (Machine Learning) are the hot topics of late, one of the fundamental requirement for both of those functionalities is the processing of real-time data (or events). Both AI and ML become useless without proper data. In this article, we are discussing about this fundamental, infrastructure level capability that most enterprises will need sooner rather than later which is the streaming integration.

At a 33,000 feet level, streaming integration consists of 4 main stages as depicted in the below figure.

Figure: Streaming Integration Pattern

Event source

It all starts with an event source which generates a continuous stream of events (or data). This can be

  • an interaction by a user on a website (user click becomes an event)
  • temperature sensor sending temp data every minute
  • stock market updates
  • orders placed by customers on an ecommerce site

Event consumer

This is the place where events are consumed based on their format, underlying protocol so that it converts into a canonical model that can be processed later. These consumers can consume events from

  • HTTP clients like web browsers
  • IoT devices
  • Message brokers like Kafka

Event processing

This is where the processing of events happens. Based on the requirements of that particular enterprise, various processing operation can be done. Some examples are

  • Transforming
  • Enriching
  • Cleansing
  • Filtering
  • Correlating
  • Summarizing

Event target

Once the data is read and processed, that data needs to be pushed into a target system so that the business can take decisions based on that and move forward. The target can be

  • Database where summary information is stored
  • A websocket endpoint that shows real-time data on a dashboard
  • Trigger another service to initiate a new operation in the workflow (e.g. initiate shipping process)

A more vivid demonstration of the streaming integration is depicted in the below figure which I took from WSO2 website.

Figure: Streaming integration with WSO2

Let’s dive into some of the basic streaming integration patterns that can be used to build more complex patterns that are required in enterprise use cases. Each pattern has an associated example with instructions to tryout the same.

Before trying out the examples mentioned in the below section, I would recommend you to follow the getting started guide of the WSO2 Streaming Integrator documentation link mentioned below.

https://ei.docs.wso2.com/en/latest/streaming-integrator/quick-start-guide/quick-start-guide/

Streaming Integration Patterns

Pattern 1 — consume real-time data

The simplest pattern of streaming integration is to consume data from an event source and print a log message with that consumed data. This is the first pattern we are going to implement here.

Figure: consume real-time data pattern

As depicted in the above figure, we are using an HTTP client (CURL) to simulate the event source and read the events using an HTTP listener (source) that is configured in the WSO2 Streaming Integrator. Then we read the data as a JSON message and simply logs the message into the console without doing any processing. The source code to implement the above use cases can be found in the below gist.

In the above sample code we are sending events with a set of fields and reading that as a json message from the “source”. Then that event is copied (select *) to the output stream which is a log stream.

You can find the instructions to try out this example in the below link.

https://github.com/chanakaudaya/streaming-integration-samples/tree/main/consume-data

Pattern 2 — Transform real-time data

Once you understand the basic data consumption pattern, the next step is to do some transformation to the data according to your need. In this pattern, we are using the same event source we used before (HTTP) and apply a simple transformation using a built-in function of WSO2 SI to perform the transformation.

Figure: real-time data transform pattern

As depicted in the above figure, we are using the HTTP client to generate the event while using an HTTP listener (source) at the WSO2 SI to read the same. At the same time, we use a built-in function to calculate the average value of a certain data field and transform the incoming data format with this calculated average value in the output data format which is logged into the console. You can find the source code for the above pattern in the below gist.

You can find the instructions to try out this example in the below link.

https://github.com/chanakaudaya/streaming-integration-samples/tree/main/transform-data

Pattern 3 — Transform real-time data with custom function

In most cases, the buil-in functions for transforming data can be utilized when implementing real world use cases. In some cases, users may need to transform data based on certain custom requirements which cannot be fulfilled with the built-in functions. In such cases, users need to implement their own transformation logic. This pattern is designed for those type of use cases.

Figure: real-time data transformation with custom function pattern

As per the above figure, event source and the consumer are the same as the previous cases. The only difference in this pattern is that it uses a custom written function to do the actual data transformation from input data format to output data format. After the transformation, the message is logged into the console. You can find the source code for this use cases in the gist below.

As you can see in the above code, the transformation logic is implemented using the function called “concatFn” in which you write a simple javascript code to execute the use case. You can find the instructions on how to run this sample and test it in the below link.

https://github.com/chanakaudaya/streaming-integration-samples/tree/main/transform-custom-function

Pattern 4 — enrich real-time data with database pattern

There are multiple mechanisms available to transform a given message. One such option is to enrich the message (data) with some information that is coming from a different source. This source can be a constant embedded into the code or an external storage like a database. In this pattern, we are going to discuss how to enrich an incoming event (message) with a data that is stored in a MySQL database.

Figure: enrich real-time data with a database pattern

As depicted in the above figure, once the event is sent from the source, it is consumed by the listener and then during the processing stage, it queries the database and pull the data from there and enrich that information to the event. Once that is done, enriched data is printed in the console log. The source code for this use case can be found in the below gist.

In this use case we are using the “join” operator to enrich data from the database table into the event stream based on a certain condition. You can find the instructions to run this example and tryout in the below link.

https://github.com/chanakaudaya/streaming-integration-samples/tree/main/enrich-data

Pattern 5 — real-time ETL with file tailing pattern

One of the oldest methods of moving data from one system to another is the ETL (Extract, Transform, Load). There is a considerable amount of such requirements out there in enterprises and dedicated tools are built for this purpose. With the advancements of the business requirements, these data migrations across systems needs to happen in real-time more often than not. That motivation has created the term real-time ETL which is an evolution of the traditional ETL process where data migrations happened in batch mode at night. One of the fundamental models of ETL is to read from a file and move that data into another system. In this pattern, we will be discussing about how to read information from a file in real-time by tailing the file and moving that data to a target system. In this case, target system is a log entry in the console. Users can extend this to any other target system such as database, file system or real-time event target such websocket endpoint.

Figure: real-time ETL with file tailing pattern

As depicted in the above figure, source of the events in this use case is a file that stores the information in CSV format. It is important to understand that the file needs to adhere to the CSV standard for this use case to work as intended. We use a file listener (source) in the WSO2 SI to read the file in “tailing” mode so that it read the file line by line as and when lines are added to the file in real time. Then the information is read in CSV format and printed to the console log as the target output. You can find the source code to implement this use case in the below gist.

You can find the instructions to tryout this code in the below link.

https://github.com/chanakaudaya/streaming-integration-samples/tree/main/etl-files/tail-line

Pattern 6 — real-time ETL with file regex pattern

There are some use cases in enterprise that external parties like partners, dealers upload their information as files without adhering to the format that the enterprise announces. In such cases, it is easier to have a mechanism built-in at the enterprise system to read such files rather than going back to that external party and tell them rework the file and send it again. In such a scenario, you can use the regex based file reading pattern to migrate data from such files to a target system. One thing to note here is that, there should be a some sort of a pattern that can be identified through a regex to read that file.

Figure: real-time ETL with file regex pattern

As depicted in the above figure, the source in this use case is a file that has some noisy data which does not adhere to any common standard like csv, xml. Instead, it has a set of noisy data that needs to be read by the consumer. We are using the file listener to read data from this file as events according to a defined regular expression that can be used to match a certain data segment within the file as an event. You can find the source code for this use case in the below gist.

As per the code mentioned above, you can find that the file source is configured to read in the tailing mode with a regular expression that looks for a certain begin and end sections with a separate regular expressions to capture event data (e.g. symbol, price and volume). Then if the symbol is found for a certain event, that will be logged into the console with the upper case symbol along with the price and volume. The instructions to tryout this example can be found in the below link.

https://github.com/chanakaudaya/streaming-integration-samples/tree/main/etl-files/tail-regex

Pattern 7 — real-time change data capture (CDC) with MySQL pattern

Another common use case of ETL is moving data from databases to another system. One of the challenges with this is that data residing in the databases keep changing with the time and when migrating data from a database to a target system, it needs to check only for the modified data which is a time consuming and a complex operation when you consider a large data set. Due to this reason, modern databases provided a mechanism to read the data changes as and when it occurs. This is called the change data capture or CDC and this approach reduces a lot of complexity on the consumer side. In this pattern, we are going to use MySQL as the data source and read any data changes as events and publish those changes to a target system in this case a log. In a real world scenario, target system can be another database.

Figure: real-time change data capture pattern

As depicted in the above figure, the source system in this use case is a database (MySQL) and the consumer is a special type of listener which is capable of listening to change of data events at the database. In this case, we are using the CDC listener of MySQL that is available with WSO2 SI. Data can be changed in the database through multiple operations like insert, update and delete. You can implement CDC listener to listen on a particular type like insert or update or delete. Once that event is read, it is printed accordingly to the console log. You can find the source code for the “Insert” operation in the below gist.

You can find further sample code for “update” and “delete” operations as well as instructions to tryout this code in the below link.

https://github.com/chanakaudaya/streaming-integration-samples/tree/main/change-data-capture/mysql

Summary

In this article, we discussed about some fundamental patterns that can be used in enterprise systems to process real-time event streams with streaming integration technologies using WSO2 Streamin Integrator. These patterns can be extended and mixed to build more complex patterns and use cases according to the requirements of the user. You can find all the sample code and instructions in the below GitHub repository. If you like the work we have done here, don’t forget to give it a star.

https://github.com/chanakaudaya/streaming-integration-samples

References:

https://ei.docs.wso2.com/en/latest/streaming-integrator/overview/overview/

https://ei.docs.wso2.com/en/latest/streaming-integrator/quick-start-guide/quick-start-guide/

https://stackoverflow.com/questions/50093144/mysql-8-0-client-does-not-support-authentication-protocol-requested-by-server

https://www.digitalocean.com/community/tutorials/how-to-create-a-new-user-and-grant-permissions-in-mysql

https://debezium.io/documentation/reference/1.3/connectors/mysql.html#create-a-mysql-user-for-cdc_debezium

--

--

Chanaka Fernando
WSO2 Best Practices

Writes about Microservices, APIs, and Integration. Author of “Designing Microservices Platforms with NATS” and "Solution Architecture Patterns for Enterprise"