How to process large data set stored in a database in a reliable manner through WSO2 EI
WSO2 Enterprise Integrator (a.k.a WSO2 EI) is a magic bullet in the integration space which provides capabilities like
- Enterprise Service Bus
- Data Integration
- Message Broker
- Business Process Server
In this post, I’m going to explain about using the “Integration” capabilities of the product to process a large data set stored in a database in a reliable and guaranteed manner. WSO2 EI comes with a separate runtime component (a.k.a profile) which is capable of fulfilling both ESB as well as Data integration requirements.
Let’s take a hypothetical example where you have a set of orders which needs to be sent to 2 of your warehouses in the same order they arrived. Let’s say this information is stored in a table which contains 100k+ rows. Here are the challenges you need face when processing this data set
- Data needs to be processed batch wise — otherwise, the server will go OOM
- Orders need to be sent to 2 endpoints (systems)
- The sequence of the orders needs to be guaranteed at the receiver side (needs to be the same as stored in the database table)
Let’s see how we can implement this scenario with WSO2 EI. The below diagram explains the process flow of the implementation.
Here we are using the following EI artefacts to implement this scenario.
- Proxy Service (CMProxy) — this is the entry point of the message processing flow. When a request comes into this service, the process will start. You can define this logic in a sequence and use a scheduled task to trigger this process periodically if needed
- Data service (SampleDS) — this data service provides 2 operations. One to get the count of all the records and another one to get a selected set of rows from the database table
- Class mediator (TestMediator) — this mediator implements the logic to iterate through the entire data set by calling the data service in a loop
- DB calling sequence (dblookupsequence) — this sequence calls the data service to select a set of data from the table and iterate through each data element and send that to
- Message Store (Store1, Store2) — these message stores are used to store the messages which are generated from the iterator.
- FailedMessageStore (FailedMessageStore1,2) — these message stores are used to store the messages which got 500 SC responses. There is a limitation in the message processor that it will consider 500 SC responses are successful responses and remove the message from the message store.
- MessageProcessor (Processor1, 2) — message processors are taking the message from the store and sending it to the backend for processing
- ReplyHandler sequence— when the message processor get a response, it will go through the reply handler. Since message processor considers 500 responses as normal responses, it is checked and put into a failedMessageStore within this sequence
- storeFaultHandler sequence — this sequence is executed when there is a failure occurred when processing the message from message processor
- endpoint (endpoint1, endpoint2) — these are the actual backend services which these messages need to be delivered in an orderly manner
The full source code for this implementation can be found in the below GitHub repository.
This simple class mediator can be used in a database paging requirement related use case …github.com
When a request is received to the proxy service, it starts executing in the below mentioned order.
- This implementation starts with a proxy service (this can be an API or Sequence) which calls a data service to get the count of all the rows available at the database table.
- Then this count is set to a synapse property.
- After that, there is a class mediator (written in Java) which reads the “count” value and take a parameter called “limit” which will decide the number of records read at once from the database when processing. This parameter allows users to get rid of any OOM errors and provide more control to the processing step. Within this class mediator, it will loop through the chunks of data of “limit” size and call a sequence which is stored in registry under “conf” section.
- Within the sequence, it will take the “startValue” and “limitValue” parameters which were set in the class mediator and call the dataservice with these parameters to read exactly that segment of the table (from “startValue” a chunk of “limitValue”) and then iterate through this data set, clone and store them one by one in 2 different message stores. These message stores are backed by message queues created in WSO2 EI Broker profile which is running with offset 3. You should run this within your local machine to test this implementation. This process will be repeated from the class mediator until the table data is fully read.
- The message store will store these messages in the same order which were present in the database table. Then a message processor is configured to consume these messages in the same order (FIFO) and send to an endpoint.
- This endpoint will process the message and give a response. If there is a failure in the endpoint, the message processor will retry 4 times and then deactivate without processing further messages. This will prevent messages from losing.
- Once the process is completed, the messages are delivered to the endpoint in the same order which was stored in the database table and you can verify that by looking at the log file.
- Make sure to save the “dblookupsequence” in the registry under “conf” path since that is the path referred by the class mediator.
A sample database table used for this implementation is similar to below table.
That’s all you need to do. This scenario covered a lot of different aspects of integration. Here are some of the key points
- Iterating through messages
- Cloning messages
- Implement pagination when reading from a table
- Guaranteed delivery
- In-order delivery
- Processing a large set of data without going OOM
You can use sections of this implementation to fulfil the aforementioned use cases separately.