EDA implementation — Integration scenarios

Use cases and methods for integrating business systems with events backbone

Chathura Ekanayake
Nerd For Tech
7 min readMay 6, 2021

--

We discussed about a generic architecture for event-driven architecture (EDA) based systems in a previous post. In this article, we will be exploring implementation approaches for such event-driven systems by focusing on specific products and their interactions.

Figure 1 depicts a generic architecture of an EDA based system (details about this were discussed here). Here, we will narrow down to few interesting sections of this architecture (highlighted in the diagram below) and explore possible implementation methods.

Figure 1: Generic architecture for an EDA based system highlighting the section focused in this article

Focus of this article is mainly on the events backbone, specifically on its interactions with the rest of the system. There are multiple choices for events backbone, both commercial and open source, including RabbitMQ, Kafka, ActiveMQ, NATS and IBM MQ.

Events backbone facilitates loose coupling among services and provides reliable messaging. In addition, it supports publish/subscribe pattern, which allows multiple services to receive events published from other services.

From the implementation perspective, we need to consider integration approaches for connecting the events backbone with relevant services. Many event brokers support multiple messaging protocols such as AMQP, MQTT and STOMP. If a business service can communicate via those protocols, it can directly connect with the events broker to publish and consume messages.

However, if a business service communicates only via protocols such as HTTP or if it necessary to integrate file servers or databases into the event-driven system, it is necessary to use an integration layer. Furthermore, there are many scenarios, where it is necessary to process events before publishing to the events backbone or after consuming from the events backbone. In such situations, integration layer becomes an important component to perform such intermediate message processing operations. In this article, we will use Kafka as the events backbone and WSO2 Micro Integrator as the integration layer to illustrate some of these implementation approaches.

Integrating HTTP based business services with events backbone

Let’s consider the scenario depicted in figure 2:

Figure 2: Integrating events backbone with business systems over HTTP
  1. A customer places an order in the shopping portal
  2. Shopping portal sends the sales order to the integration layer as an JSON request over HTTP
  3. Integration layer processes the request according to given policies as below:
    3a. If quantity is smaller than 100, sends it to the “sales-orders” topic in Kafka
    3b: If quantity is greater than 100, calls the inventory service over HTTP to get ordered item’s unit price and available stock
    3c: Calculates the total order value, prepares a message by combining all relevant information for order approval and sends it to the “order-approvals” topic in Kafka
    3d: Constructs an appropriate JSON response message and reply to the shopping portal over HTTP

In this scenario, multiple HTTP based services (i.e. shopping portal and the inventory service) and the events backbone (i.e. Kafka) participate in the business process. Furthermore, the integration layer performs message processing activities such as conditional routing (3a), service orchestration (3b) and message enrichment (3c), in addition to bridging HTTP services with Kafka events broker.

Figure 3 shows this integration flow modeled in WSO2 Integration Studio (which is a graphical editor for creating integration flows for WSO2 Micro Integrator).

Figure 3: Visual integration for integrating HTTP services with Kafka

XML based integration code for the same flow is shown in Figure 4 (below). It first extracts necessary data from the incoming message using the property group mediator. Then the filter mediator is used to perform conditional routing based on the order quantity. If order quantity is greater than 100, it invokes the inventory service using the call mediator. Then payload factory mediator is used to construct a new JSON payload by combining necessary information. This enriched message is published to “order-approvals” Kafka topic using the Kafka connector.

If order quantity is less than 100, message is directly directly published to the “sales-orders” topic using the the Kafka connector. In both case, it uses the payload factory mediator to construct appropriate response message and sends it to the shopping portal.

Figure 4: Integration code (DSL) for integrating HTTP services with Kafka

Persisting processed events in a database

Now let’s consider a scenario where we need to record completed orders in a database. We have created a topic named “completed-orders” in Kafka to receive details of completed orders in JSON format. Order handling systems from multiple departments may publish completed order information to this topic. Now we can configure our integration layer to listen for this topic and persist received events in database tables as needed. In this scenario, let’s assume that we need to extract order details and shipment related details separately from the JSON message and store them in two tables named “orders” and “shipments”. This use case depicted in Figure 5.

Figure 5: Extracting details from a Kafka event and persisting those in multiple tables

As we need to access a database, we are using WSO2 MI Data Services in this integration scenario. It is possible to expose SQL backed database operations as REST services via data services. Therefore, when such REST data service method is invoked, underlying SQL based operations are performed on data by using REST payloads as SQL parameters. Figure 6 shows the data service configuration for two REST methods “/hmartdata/orders” and “/hmartdata/shipments”, as well as their associated SQL operations.

Figure 6: WSO2 MI Data Services configuration for inserting order and shipment data

Now, we need a way to subscribe for the “completed-orders” topic in Kafka. For that we use a construct called inbound endpoint, which can listen for messages/events coming from various sources including Kafka, RabbitMQ, MQTT and websockets. Below is the configuration of the Kafka inbound endpoint, which specifies the Kafka topic to listen to and the integration flow to be triggered for each incoming event.

Figure 7: Kafka inbound endpoint configuration

Now we can consider the actual integration flow, which will be triggered for each incoming message (Figure 8). First, it extracts the order related information from the incoming Kafka message and calls the “/hmartdata/orders” data service method to persist it in the “orders” table. Then it extracts shipment related information and call the “/hmartdata/shipments” method to store shipment data in the “shipments” table.

Figure 8: Integration flow to extract information from completed orders messages and persist those in database tables via data services

Graphical integration flow of the above DSL based integration flow is shown in Figure 9.

Figure 9: Graphical integration flow designed in WSO2 Integration studio for persisting orders

Integrating file systems with the events backbone

As the third use case, let’s assume that a partner organization wants to place bulk orders to a warehouse. For this they are putting all their individual orders in a single XML file and send it to a file server hosted by the warehouse. Now the warehouse systems have to pick such bulk order files and process individual orders. Also, let’s assume that warehouse systems can work only with JSON messages.

We can use the events backbone (i.e. Kafka) to hold such individual orders until they are processed. Then each warehouse system that needs to process order requests can subscribe to the Kafka topic and consume orders. In this scenario, we can use the integration layer to bridge the gap between the file server and Kafka by performing following tasks:

  1. Fetch bulk order files from the file server
  2. Validate bulk order contents by checking if all necessary sections are available (using an XML schema)
  3. Split the bulk order into individual orders and convert them to JSON
  4. Publish each individual order to a Kafka topic named “sales-orders”

This scenario is depicted in Figure 10.

Figure 10: Integrating file servers with Kafka

Similar to the previous scenario, we are using an inbound endpoint to fetch files in to the integration layer. Below is the configuration for the file inbound endpoint containing the folder to fetch files from and the integration flow to be triggered for each incoming file (note that in this example, we are fetching files from the local file system instead of a file server):

Figure 11: WSO2 MI Inbound endpoint for fetching files and triggering the relevant integration flow

Now we can consider the integration flow for processing each file fetched from the file system. It validates the incoming file against a given XML schema (using the validate mediator), split the file content into separate orders (using iterate mediator) and publish each order to a Kafka topic. Graphical view and the integration code for this integration flow are shown in Figure 12 and Figure 13 respectively.

Figure 12: Graphical view of the file processing integration flow
Figure 13: Integration code for the file processing flow

As we have discussed in this article, enterprise systems can be integrated with events backbones in various ways to fulfill EDA related business use cases. In upcoming articles, we will see how stream processing techniques can be applied in EDA as well as how we can expose events to external consumers as managed APIs.

--

--