AMQP: Headers filtering & Competing Consumers

Olivier Mallassi
2 min readOct 12, 2015

--

I have to admit that I like AMQP because of the deployment patterns it supports. Let imagine you want to filter messages based on their headers. You can use JMSSelector or you can use the AMQP headers (AMQP 0–10) to do this on the broker side.

On the producer side:

  • You declare your JNDI context as below
amqpAddr = "amq.match";

Hashtable<Object, Object> env = new Hashtable<Object, Object>();
env.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
env.put("connectionfactory.qpidConnectionfactory", "amqp://guest:guest@/default?brokerlist='" + brokerList + "'");
env.put("destination.myDest", amqpAddr);
  • and before sending the message, add the JMS properties (that will be mapped, in qpid, in AMQP headers)
ObjectMessage message = session.createObjectMessage(nextMD);message.setStringProperty("loc", "CA");

producer.send(message);
  • on the consumer side, for each JVM that needs to consume the messages, the binding is the following:
amqpAddr = "ConsumerModule-B; {create: always, node: {x-declare:{auto-delete:true},x-bindings:[{exchange:'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}]}}";

Now let’s imagine you have different modules (made of several JVMs) that needs to consume all the messages, but within each module, the messages are load-balanced. You can simply have this by changing the name of the queue ConsumerModule-B to something else.

You will then see something like below:

In that case the first three consumers belong to the same module, the two others to another module. Each module got all the events, filters on the AMQP headers but, for a given module, each JVM has a subset of the events (with respect to the load-balancing)

--

--