Implementing Competing Consumers Pattern with WSO2 Micro Integrator and RabbitMQ
Learn how to use multiple Micro Integrator instances to drain a RabbitMQ queue faster
The Competing Consumers pattern explains how multiple consumers compete for messages on the same message channel to concurrently process multiple messages.
The pattern is not new; it is quite popular in the enterprise integration domain since the early days of messaging systems. It is mostly applicable when you want to asynchronously process a discrete set of tasks by distributing them among parallel consumers. In return, you get a scalable, reliable, and resilient message processing system.
I’ve described how this pattern works and the its benefits in a previous article. I recommend you read that for a better context.
Competing Consumers Pattern Explained
The Competing Consumers pattern enables multiple concurrent consumers to process messages received on the same…
In this article, let’s build a task queue implementation that consists of two WSO2 Micro Integrator instances consuming messages from a RabbitMQ queue. To get the most out of this, you need to have a fair understanding of how RabbitMQ works and the basics of WSO2 Micro Integrator.
Why the Micro Integrator?
WSO2 Micro Integrator (MI) is an open-source, cloud-native integration framework with a graphical drag-and-drop integration flow designer and a configuration-based runtime for integrating systems, data, and services.
It has been optimised for container-native deployments based on Docker and Kubernetes and supports both centralised (ESB style) and decentralised (microservices, cloud-native) architectural styles.
With the Micro Integrator, you can quickly build a RabbitMQ consumer with just a handful of configurations. That way, you can avoid getting tied to a specific technology like Java.
Also, MI has many features that you can use to build your integration logic in a reliable and resilient manner. They are pre-built to the MI so that you don’t have to code them from the beginning.
Let’s try to understand the pattern with a real-world example.
Let’s consider an e-commerce order processing system as an example.
Orders are critical to the business. Loss of an order or erroneous processing can cause financial damage. Thus, the system in our example attempts to achieve the following goals in its architecture.
1. The orders must be asynchronously processed by loosely coupled components. A producer should not wait until a consumer completes an order.
2. An order must be processed at least once. The business can’t afford to lose any orders during the processing.
3. An order must be processed exactly once, not more than that.
4. The order processing components should dynamically scale to adapt to the fluctuations in the workload.
Being loosely coupled and asynchronous
We can start with a simple architecture like below. That makes order producers and consumers loosely-coupled and asynchronous.
In the above figure, an instance of WSO2 Micro Integrator (MI) consumes messages from a RabbitMQ queue in which order producing applications (web apps, Microservices) put orders. Queue decouples order producers from the order consumers, that is, both parties are not aware of each other’s presence. Also, producers don’t wait for a response in return. MI receives orders from the queue and processes them asynchronously at its own pace.
But having only one consumer can have several downsides. First, it introduces a single point of failure to order processing. Also, a single consumer can be easily overwhelmed by a sudden influx of orders.
Providing improved availability and on-demand scalability
To avoid the drawbacks mentioned above, let’s introduce another consumer to the solution shown in Figure 02. Now have two MI instances consuming orders from the same RabbitMQ queue.
This architecture can be beneficial in many ways. First, it makes the order processing highly available — if a consumer crashes, the remaining one can continue the processing.
Also, two consumers give a better processing throughput than a single consumer. You can horizontally scale-out the processing by adding more MI instances to the same queue. When the workload reduces, you can remove the additional instances.
Having this kind of dynamic scalability helps consumers adapt to fluctuations in the order processing workload.
Implementing exactly-once message processing
With two consumers consuming from the same queue, how do we make sure that an order is processed only by a single consumer, to ensure the exactly-once processing semantics?
When a queue has multiple consumers, RabbitMQ dispatches messages among them in a Round-robin manner so that the workload is load-balanced across consumers. If the processing was successful, the consumer tells RabbitMQ to delete the message from the queue. This way, we can ensure that an order will be processed exactly once.
Reliable message processing with manual acknowledgments
In our example, let’s assume that processing an order can take a few seconds or even minutes. A consumer can crash in the middle of processing with an order partly processed. By default, MI’s RabbitMQ listener automatically acknowledges upon receiving a message. Thus, RabbitMQ immediately marks a delivered message for deletion.
If a consumer dies, we will lose the message it was just processing. We’ll also lose all the messages sent to this particular consumer but not yet handled.
But we don’t want to lose any orders. If a consumer dies, we’d like the order to be delivered to another consumer.
To make sure an order is never lost, we set the acknowledgment mode to manual in the MI, which is an explicit way of telling RabbitMQ to delete a message. That way, if MI crashes during the processing, RabbitMQ assumes the consumer has died and delivers the message to the remaining consumer.
Now we have a solution that meets all architectural goals stated previously. Let’s have a closer look at various configurations in the implementation section.
The implementation is a Docker Compose project that contains two container definitions, a RabbitMQ broker and an MI-based order consumer.
The following figure shows the high level deployment architecture.
Clone or download the project
A completed and working implementation is available in the following Git repository. Clone or download that into your local workstation to play around with it.
You can't perform that action at this time. You signed in with another tab or window. You signed out in another tab or…
Understanding the configurations
Now, let’s walk through relevant configuration files one by one.
RabbitMQ transport listener
name = "AMQPConnectionFactory"
parameter.hostname = "rabbitmq"
parameter.port = 5672
parameter.username = "guest"
parameter.password = "guest"
MI needs the RabbitMQ transport listener to connect to RabbitMQ and consume messages. You’ll notice the following configuration block uncommented in the deployment.toml file to enable the listener.
Notice that the hostname is rabbitmq, which is the exact name given for the RabbitMQ service in the docker-compose.yml. We’ll come back to that in just a moment.
This proxy consumes messages from a RabbitMQ queue and logs the ID of the received message.
The parameter rabbitmq.queue.name specifies the queue name that we want to connect to in RabbitMQ. If there’s no queue exists, this proxy will create a new durable queue.
The parameter rabbitmq.message.content.type is set to application/xml as we are consuming XML messages from the queue. rabbitmq.queue.auto.ack is set to false to prevent auto acknowledgments. With manual acknowledgments, the rabbitmq.channel.consumer.qos parameter is set to 1. That’s a way for a consumer to tell RabbitMQ that don’t send me any new messages until I finish all messages that are not acknowledged yet. You can find more information on that here.
This file resides in the root directory of the repository and consists of two services as follows.
The service rabbitmq wraps a standard RabbitMQ container image with the management plugin so that you can visually inspect the created queue and publish messages when testing.
The service consumer refers to the local Docker image that contains the consumer. Make sure to replace the image name to match with yours.
When running the solution, you can scale up or down the number of consumer replicas with the
— scale flag.
Docker automatically creates a new network when you deploy these services. That enables inter-container communication between consumer and rabbitmq services.
Building and running the project
The Git repository mentioned above contains a detailed step-by-step guide on how to build and run the sample project with Docker Compose.
When to use this pattern
To keep things simple, our consumer only logged the messages it received. In reality, you can use this pattern to implement much more complex use cases.
- These are a few examples of asynchronous tasks where you can apply the pattern.
- The consumer sends out the received message to a downstream HTTP service.
- The consumer persists the received message into a database by calling a data service.
- The consumer triggers a workflow with the received message.
- The consumer processes the message and puts it in another queue.
Likewise, there are endless possibilities you can do with this pattern. However, not all asynchronous tasks are made equal. You need to consider two characteristics in them before applying this pattern.
1. The application workload is divided into tasks that can run asynchronously
This pattern works well if the task producer and task consumer communicate asynchronously. Task producing logic doesn’t have to wait for a task to complete before continuing. If the task producer expects a response from the task consumer in a synchronous manner, this pattern is not a good option.
2. Tasks are independent and can run in parallel
The tasks should be discrete and self-contained. There shouldn’t be a high degree of dependence between tasks.
What could’ve done better
You can take this reference implementation to production straight away. But I would suggest you should pay attention to the points highlighted below. They are critical for building a reliable and highly scalable task queue implementation with the WSO2 platform.
1. Consider re-queueing the message if there’s an error happened during the processing
There can be situations where you can’t immediately process the message due to some reasons. For example, the downstream service you are trying to invoke is not responsive. In that case, you should not lose the message at all.
The recommendation is to put the message back in the queue so that another consumer can handle it later.
2. Detect poison messages as early as possible and put them into DLQ
Erroneous or malformed messages should be discarded immediately or put into the Dead Letter Queue (DLQ). If they are returned to the queue, they’ll make an endless cycle among consumers and wastes valuable CPU time.
3. Consider using a container orchestration engine like Docker Swarm or Kubernetes
For fine-grained scalability of consumer processes, consider deploying the solution in a container orchestration engine. That way, you can control the scale-out and scale-in policies dynamically.