Distributed Sinks & Force publishing in WSO2 Stream Processor — 2 node minimum HA deployment

Senthuran Ambalavanar
6 min readFeb 24, 2018

--

Each Siddhi application deployed in WSO2 Stream Processor listens to external event sources, receives events from them, and processes them.

Why 2 node minimum HA?

WSO2 SP runs in a node, that is a machine or a VM. When only a single node is used; if the node goes down by any chance, we will encounter event loss in certain situations.

2 node minimum HA deployment pattern is a solution for handling event loss in such scenarios.

What is 2 node minimum HA deployment?

HA stands for High Availability. High availability for processing events is enabled by employing two WSO2 SP nodes, listening to the same event source.

Just like having a spare wheel for a vehicle, an additional node is employed as a backup. This will be an exact copy of the first node (exclusive of some cluster coordination and configuration details).

This node is always kept ready, but it won’t be used to publish events out of it all the time.

That means, events are received from both the nodes, but only one node will publish them. We call that node as the active node. The other node will just keep the received events without publishing them. We call that as the passive node.

Both the nodes will communicate among themselves periodically. At that time, the passive node cleans up the events buffered in it, after ensuring that they have been published by the active node.

If the active node goes down by any chance, the passive node will become the new active node and will start to publish events. If the gone-off node is restarted or a new node is started as a replacement, it will start as the passive node.

Read the documentation for more information about setting up 2 node minimum HA with WSO2 SP.

Note: source refers to sources in Siddhi applications. Event source refers to external event sources such as Kafka topics, JMS Queues & etc.

Event sources with easy setup

When the event source is something like a Kafka topic or a JMS topic, implementing 2 node minimum HA is going to be much easy.

Siddhi applications deployed in both the nodes are exactly same in this case. They have the same source, name, queries, streams and so on.

When an event comes to the event source, it will allow sources of both the Siddhi applications to read that event, and each Siddhi app will have a copy of that event.

The need for distributed sinks

If the event source is a JMS Queue or similar, we can’t expect this behavior.

After an event comes to the queue, the event will wait until it’s read by a consumer. As soon as the event is received by any consumer, it is removed from the queue.

Only the Siddhi application — whose source reads the event first, will be getting that particular event. The event is removed from the queue then and there, since it has been consumed.

It doesn’t matter whether the Siddhi application is from the active node or the passive node; if it’s the first consumer, it gets the events.

To overcome this, we have to make sure that the event is duplicated to the other node as well.

For this purpose, we are going to have two Siddhi applications deployed in a node. The first one is called the Distributor, whose sole purpose is just to duplicate the events. The other one is called the Executor, who is actually going to execute.

The Distributor will have a distributed sink to distribute the events to two destinations.

Distributed Sink

A sink in a Siddhi application has some options such as transport type (HTTP, TCP, …), destination (publisher.url for HTTP, url for TCP, …), and etc. There can be only one destination within a sink. For example, the following HTTP sink publishes events only to a single URL.

@sink(type='http',
publisher.url='http://localhost:8009/foo',
method='POST',
@map(type='xml', @payload('{{payloadBody}}')))
define stream FooStream (payloadBody String);

Note: publisher.url is the destination for a HTTP sink. Other sinks have their own sort of destinations, such as topic for a Kafka sink.

The distributed sink allows to have multiple destinations, with the help of the @destination annotation inside the @distribution annotation.

If we are about to introduce two destinations for the above sink, we will do like this:

@sink(type='http',
method='POST',
@map(type='xml', @payload('{{payloadBody}}')),
@distribution(
strategy='broadcast',
@destination(publisher.url='http://localhost:8009/foo'),
@destination(publisher.url='http://localhost:8009/bar')
))
define stream FooStream (payloadBody String);

strategy = 'broadcast' states that “Each event should be broadcasted to the destinations”. strategy can be 'roundRobin' if you wish to send the events to the destinations in a round robin manner, but for our case, it’s 'broadcast'.

Distributing events & receiving them from Executor

We will be distributing the events to two TCP URLs, using the Distributor Siddhi application.

The first TCP URL will be listened by the source of the Executor Siddhi application, deployed in one of the two WSO2 SP nodes.

The second TCP URL, will be listened by the source of the remaining WSO2 SP node’s Executor Siddhi application.

Since one Distributor Siddhi application is deployed in each node, it is always ensured that both nodes will get a copy of an event for processing, when one node has consumed an event.

Note: Only the Executor Siddhi applications in the two nodes are going to slightly differ because of their listening TCP URLs.

It might look alright, but still, we have a problem.

Since the passive node is not supposed to publish any events, the distributed sink of the passive node’s Distributor Siddhi application won’t publish any events.

Before enabling force publish

Force Publish

In order to enable this, we have to enable force publishing in the distributed sink. This can be done by setting forcePublish='true' inside the @distribution annotation.

@sink(type= <sink_type>,
@distribution(
forcePublish= 'true',
strategy='broadcast',
@destination(publisher.url='http://localhost:8009/foo'),
@destination(publisher.url='http://localhost:8009/bar')
)
)
After enabling force publish

We have to enable force publishing in the distributed sinks of both the Distributor Siddhi applications, since any of the two nodes can become active/passive at any time.

With this, events from an event source such as a JMS queue can be duplicated to both nodes.

That is how distributed sinks, along with force publishing, help us in 2 node minimum high availability deployment pattern for WSO2 Stream Processor.

An example

Note: Each node should be configured for custom host and ports, mentioned in TCP URLs. These are system parameters of the Siddhi IO TCP extension. To do this, add the following to the deployment.yaml file found in <SP_HOME>/conf/worker.

siddhi: 
extensions:
- extension:
name: tcp
namespace: source
properties:
host: 0.0.0.0
port: 5511

The Siddhi apps deployed in both nodes look like this:

Distributor.siddhi

@App:name("Distributor")
@App:description("Receives events from a JMS Queue and distributes to two TCP sinks")
-- JMS SOURCE REFERRING TO A JMS QUEUE
@source(type='jms',
factory.initial=
'org.apache.activemq.jndi.ActiveMQInitialContextFactory',
provider.url='tcp://localhost:61616',
destination='jms_result_queue',
connection.factory.type='queue',
connection.factory.jndi.name='QueueConnectionFactory',
@map(type='keyvalue'))
define stream SourceStream (message String);
-- DISTRIBUTED TCP SINK REFERRING TO TWO TCP URLS
@sink(type='tcp',
sync='true',
@map(type='json'),
@distribution(
forcePublish= 'true',
strategy='broadcast',
@destination(
url='tcp://0.0.0.0:5511/ExecutorSourceStream'),
@destination(
url='tcp://0.0.0.0:5512/ExecutorSourceStream')))
define stream DistributionStream (message String);
from SourceStream
select *
insert into DistributionStream

Executor.siddhi

@App:name("Executor")
@App:description("Receives events from a TCP source and processes them")
/* TCP SOURCE LISTENING TO ONE OF THE TCP URL CONTEXTS
(host & port are configured) */

@Source(type = 'tcp',
context = 'ExecutorSourceStream',
@map(type='json'))
define stream ExecutorSourceStream (message String);
-- Define WhateverStreamYouLikefrom ExecutorSourceStream
select *
insert into WhateverStreamYouLike

--

--