Publish and Subscribe with Ballerina

What is Ballerina?

Ballerina is an entirely new open source programming language which allows you to graphically represent your code. Basically you can draw while you code or code while you draw. Ballerina lets you implement any integration scenario by drawing sequence diagrams without merely stopping at describing scenarios, which sequence diagrams are traditionally used for. This post explains how the Publish-Subscribe Channel Enterprise Integration Pattern (EIP) can be implemented and graphically represented using Ballerina.

Introduction to Publish-Subscribe Channel

The Publish-Subscribe Channel EIP receives messages from the input channel, and then splits and transmits them among its subscribers through the output channels. Each subscriber has only one output channel which is allowed to consume the message only once. For more information on Publish-Subscribe EIP, refer here.

Scenario

With the emergence of Digital Marketing in the Enterprise world, organizations need the ability to publish company updates and news on social media. The Ballerina Language was similarly announced by WSO2 in social media at the WSO2 conference. Without manually updating each social media channel, the organizations can publish the message to a queue or topic for which subscribers are listening to. Each subscriber has a service which connects to a social media channel like Facebook, Twitter or LinkedIN and post or tweet the message published to the queue/topic. Lets see how easily this can be achieved with Ballerina.

I use WSO2 Enterprise Integrator as the JMS provider and a Topic as the destination. Ballerina is used to set up the publishers and subscribers.

How to do it

Setting up the environment

First, go to Ballerinalang website and download the latest Ballerina Tools + Runtime distribution which has the runtime, composer, docerna and testarina.

Note: This scenario is tested on Ballerina 0.8 release

Since you need to use two Ballerina runtimes at the same time to run the subscriber and the publisher, you need to get a copy of the tools distribution and change the default port (9090) that the Ballerina services are listening to in one of the distributions. You can change it in the following file.

ballerina-tools-<version>/bre/conf/netty-transports.yml

listenerConfigurations:
-
id: “default”
host: “0.0.0.0”
port: 9091

Then, download any JMS message broker. WSO2 Enterprise Integration (EI) server is used as the message broker in this sample. Go to the WSO2 website, download the latest EI server and start the server.

Ballerina Tools distribution does not contain any JMS implementation jars in its distribution by default. This must be copied from the EI server to both Ballerina distributions.

The required libraries can be found in wso2ei-<version>/wso2/broker/client-lib directory. Then, copy the jars into the ballerina-tools-<version>/bre/lib directory.

Now we are ready get the system going.

Subscribing to a Topic

For our scenario to work, the subscriber should be able to consume the messages published to the topic and post the message in a social media channel. For the sake of this example, we will describe the configurations for Facebook only, as the same configurations will apply for other social media channels as well..

In order to achieve that, we first create a jndi.properties file and save it to a location in the system. Instead of specifying the provider URL directly in the source code, a property file can be created with the connection factories and topic names in one location and point the file in the provider url property in the source code. The details in this file will be used to establish the connection between Ballerina JMS subscriber and the EI server.

A sample jndi.properties file is provided below.

# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]
connectionfactory.QueueConnectionfactory=amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5675'
connectionfactory.TopicConnectionFactory=amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5675'

# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
queue.MyQueue = MyQueue

# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
#topic.MyTopic = MyTopic

Note: admin/admin is the default credentials used in the EI server and the broker will be listening to the 5675 port.

Then, open the Ballerina Composer or any IDE or tool you are using to write Ballerina code. Ballerina distribution provides plugins for IntelliJ Idea, Vim, Atom, Sublime and VS Code. You can either draw it on the canvas if you are using the Composer or write the code in the source view.

If you draw it from scratch in the Composer, you will see a design similar to the below. I have created a service (TopicSubscriberService) and a function (postToFacebook) to implement the scenario.

The TopicSubscriberService listens to the topic and prints the details of the messages subscribed. Then, inside a try-catch block it calls the postToFacebook function passing the message received and finally acknowledge the message.

The postToFacebook function creates a connection to the Facebook account and is capable of publishing messages to Facebook. In this function, you can clearly see the integration point with Facebook. It verifies the message is successfully published on Facebook and throws an error if not.

Following is the code that is generated for the above design.

import ballerina.net.jms;
import org.wso2.ballerina.connectors.facebook;
import ballerina.lang.jsons;
import ballerina.lang.messages;
import ballerina.lang.system;
import ballerina.lang.exceptions;
@jms:JMSSource {
factoryInitial : "org.wso2.andes.jndi.PropertiesFileInitialContextFactory",
providerUrl : "file://<path>/jndi.properties",
connectionFactoryType : "topic"}
@jms:ConnectionProperty{key:"destination", value:"MyTopic"}
@jms:ConnectionProperty{key:"useReceiver", value:"true"}
@jms:ConnectionProperty{key:"connectionFactoryJNDIName", value:"TopicConnectionFactory"}
@jms:ConnectionProperty{key:"sessionAcknowledgement", value:"CLIENT_ACKNOWLEDGE"}
service TopicSubscriberService {
   resource onMessage(message m) { 
      //using the native functions in jms and messages packages
system:println("Message type - " + messages:getProperty(m, "JMS_MESSAGE_TYPE"));
system:println("Message ID - " + messages:getHeader(m, "JMS_MESSAGE_ID"));
system:println("Message received - " + messages:getStringPayload(m));

//get the string payload of the message
string statusMessage = messages:getStringPayload(m);
try {
//call the postToFacebook function
postToFacebook(statusMessage);
}
catch (exception e) {
string errMsg = exceptions:getMessage(e);
system:println("Error when publishing to Facebook. Error message - " + errMsg);
}
      //acknowledge the message delivery
jms:acknowledge(m, "SUCCESS");
  }
}
function postToFacebook(string statusMessage) {
message facebookResponse;
json facebookJSONResponse;
exception fbError = {};
string token = "<token>";
      //creating a new facebook connector
facebook:ClientConnector facebookConnector = create facebook:ClientConnector(token);
//calling the createPost function
facebookResponse = facebook:ClientConnector.createPost(facebookConnector, "me", statusMessage, "", "");
//get the json payload of the response
facebookJSONResponse = messages:getJsonPayload(facebookResponse);
      //get the value of id of the response. If successful print the id. If an error, throw a user defined exception
try {
string id = jsons:getString(facebookJSONResponse, "$.id");
system:println("Message is published on Facebook. ID - " + id);
}
catch (exception e) {
exceptions:setMessage(fbError, jsons:getString(facebookJSONResponse, "$.error.message"));
throw fbError;
      }
}

Notes:

  • “ballerina.net.jms” package is imported in order to refer to the JMS functions.
  • Details required to establish the connection should be mentioned in the “Source” service level annotation. The following table describes the properties used in the sample. More details regarding the properties can be found here.
  • The following functions in the JMS package are used to check the message type and acknowledge the message in the sample. Apart from this, there are other useful functions you can use which can found from the Ballerina API Documentation.
  • To create a connection with Facebook and use its’ actions refer to samples and documentation in here.

Now we are ready to subscribe to the topic. Run the following command to deploy and run the service from <ballerina_home>/bin folder or run the service from the Composer.

./ballerina run service <fileName>.bal

You will see a message logged in the console as follows and it will be listening to the topic.

ballerina: deploying service(s) in ‘<filePath>/<fileName>.bal’
ballerina: started server connector http-9090

If you check the Message Broker of the EI server you will notice that the topic has been automatically created.

Publishing to a Topic

Now, let’s create the JMS Publisher using Ballerina.

Here we use the main function, but depending on your scenario a service can be used as well. A service keeps on running after it is started until it is stopped whereas a main function exits by itself after executing a given task. In the design below, you can clearly identify the integration point with the JMS provider.

Following is the corresponding source code for the above design.

import ballerina.net.jms;
import ballerina.lang.messages;
import ballerina.lang.system;
function main(string[] args) {
      //initialize a JMS connector instance
jms:ClientConnector jmsConnector = create jms:ClientConnector("org.wso2.andes.jndi.PropertiesFileInitialContextFactory", "file://<path>/jndi.properties");
message textMessage = {};
messages:setStringPayload(textMessage, "Get started with
Ballerina. http://ballerinalang.org");
map dataMap = {};
map mapMessage = {"MapData":dataMap};
//invoke the send action of the jms client connector
jms:ClientConnector.send(jmsConnector, "TopicConnectionFactory", "MyTopic", "topic", "TextMessage", textMessage, mapMessage);
system:println("====Text Message sent to the Topic====");
}

Notes:

  • “ballerina.net.jms” package needs to be imported in order to refer to the JMS functions.
  • When initializing the JMS client connector following parameters should be passed.
  • There are different types of messages that we can send in JMS, namely TextMessage, MapMessage, ObjectMessage and BytesMessage. Here we send a TextMessage. Depending on the type of message and the destination you are publishing the message to, the parameters of the send action should be changed. The following table describes the parameters used in this action. More details can be found here.

Now things are getting interesting. We are going to run the publisher and verify that the message is published on Facebook.

Run the publisher as follows which will publish the message to the topic.

./ballerina run main <fileName>.bal

Verify the console of the subscriber. It will print the details of the message consumed and the response of the createPost request as follows.

Message type — TextMessage
Message ID — ID:9668fa36–7be5–314c-ae0b-c9a10a08cca3
Message received — Get started with Ballerina. http://ballerinalang.org
Message is published on Facebook. ID — 1202285316_10212656442904965

Finally, you can verify whether the message got published on Facebook.


What makes Ballerina Flexible, Powerful and Beautiful?

Ballerina is not just another programming language. It is designed for the special purpose of making integration a lot more flexible and easier than ever before. This post shows, how easily Ballerina lets you achieve a common Enterprise scenario of publishing company news and updates to multiple social media channels through a JMS message broker. Similarly, you can use any inbuilt connectors or you have the flexibility to write your own connector in Ballerina extending your application or service.

Furthermore, Ballerina provides a very powerful set of tools such as Ballerina Composer, API doc generator, test tool and plugins for many of the popular IDE’s where you can choose any mode to code or draw your application according to your preference. The Composer is one of the most powerful features of Ballerina which lets you draw your entire integration scenario to life. Also, Ballerina allows those new to integration as well as integration experts to implement an integration scenario with absolute ease.

There is more to it than meets the eye. Read how Ballerina is different from other programming languages or visit https://medium.com/ballerinalang to simply discover more about ballerina.