An Introduction to Pub/Sub in Ballerina

Nuvindu Dias
Ballerina Swan Lake Tech Blog
5 min readSep 30, 2022

Pub/Sub is an event-transmission model which allows events to asynchronously flow through it. For the events to flow, events have to be published to the Pub/Sub at one end and received at another. To receive events, users must be subscribed to a specific topic first. When an event is published on that topic it will broadcast that event to all of its subscribers. Now I think you may have a slight idea about Pub/Sub. Let’s get into more details of that in order.

Introduction

Generally, every Pub/Sub model has the following basic concepts.

Topic: A logical channel that provides a medium for publishers and subscribers to transmit data.

Publisher: The process which can send data to the topic.

Subscriber: The process which can receive data from the topic.

As discussed earlier, the publishers can publish events on a specific topic and every subscriber of the topic will receive that event. Primarily it follows the observer design pattern. But on some occasions, it is called the publish-subscribe pattern. Basically in this design, publishers and subscribers are loosely coupled. Therefore publishers do not know about the subscribers who will receive the events they publish. And vice versa.

Ballerina Pub/Sub Library

Currently, there is a third-party library in Ballerina for Pub/Sub as nuvindu/pubsub. It provides APIs to use a Pub/Sub instance in Ballerina. Here the event can be any type of data. When the events flow asynchronously, some APIs can be waiting for a while. Therefore timeout can be set for these APIs. If the timeout is elapsed while the API is still waiting, it will return an error and move to the next line of the code.
The subscribe API will return a stream according to the given type in the code. And the subscribers here are Pipe instances. And the pipe has a significant role in Pub/Sub library.

Ballerina Pipe Library

A pipe is a transmission medium to send and receive events simultaneously. Unlike Pub/Sub, it does not directly support broadcasting data to multiple users. In the base, it has a non-blocking queue and supports any type of data. Producers push data into the pipe and consumers get data from the pipe. A limit can be set in the pipe to notify the maximum number of items that can be held at once. If the limit is reached while producing events, the produce() API has to wait until an event is consumed or the timeout elapsed. On the other hand, if the pipe is empty while consuming events, The consume() API must wait until an event is produced or the timeout elapsed. There are two types of consumption methods in Pipe. The consume() API returns the next element in the pipe. And the consumeStream() API returns a stream and it can be used to extract data continuously from the pipe.
When a user subscribes to a Pub/Sub, internally it will create a new pipe instance for the user and returns a stream using the consumeStream() API of that pipe. When an event is published, that event will be produced for every pipe instance in the topic. Then the subscribers can receive new events via the stream.
Now let’s try an example using the nuvindu/pubsub library to get more familiar with it.

Use Case: News Alert System

This is a simulation for a common news alert system. Some users have subscribed to the system to receive the latest news alerts. Once a news event has been updated, there must be a way to send that news to every subscriber. This type of scenario can be easily implemented using a Pub/Sub system.

Let’s Start

First, we need to create a new ballerina package.

bal new ballerina_pubsub

Now, we need to import the nuvindu/pubsub library which provides APIs related to a Pub/Sub in Ballerina.

Creating Topics

By default, topics can be configured to automatically created, when a publish or subscribe API is invoked. But we can start manually creating new topics by setting the autoCreateTopics parameter to false, whenever a new instance is created. Then we have to use the createTopic() API and specify the name of the topic as below.

import nuvindu/pubsub;public function main() returns error? {
// pubsub:PubSub pubsub = new(); // to automatically create topics
pubsub:PubSub pubsub = new(autoCreateTopics = false);
check pubsub.createTopic("topic");
}

Then we need to create subscribers for the system using the subscribe() API. And then we are going to use some dummy data as the news alerts and then publish them to the Pub/Sub with some time gap. We can use the publish() API for that. While publishing events, we will try to get those published data via the subscribers concurrently. To add the concurrency we have to use workers in Ballerina. This whole scenario can be implemented as follows.

Now we can run the project using the bal run command. If it is successfully executed, you can see the following output logs in the terminal.

If you need to unsubscribe a user, you can just close the stream. It will automatically unsubscribe the user from the subscribed topic. To check that code can be re-modified as follows.

If you run the file again, you will see after the second news alert, ‘subscriber 2’ will no longer receive any news event.

Shutdown

When a Pub/Sub is shut down, publishers and subscribers can no longer use it. Every topic and its subscribers will be removed from the Pub/Sub. And each attempt to use an API will result in a pubsub:Error. There are two approaches to shutting down a Pub/Sub.

  • Force Shutdown: Removes all the topics and subscribers immediately
  • Graceful Shutdown: Provide a grace period for the subscribers to retrieve any remaining event in the Pub/Sub. After that, it removes all the topics and subscribers.

I think I could give some insights about the Pub/Sub package in Ballerina. If there is any unclear part, mention it in a comment below.

The complete code for the example use case can be found here.

Both Pub/Sub and Pipe Libraries are open-source projects. And in the following links, you can find the documentations as well.

Ballerina Pub/Sub Library: https://github.com/Nuvindu/module-pubsub

Ballerina Pipe Library: https://github.com/Nuvindu/module-pipe

Feel free to contribute by reporting issues, requesting new features, or even sending pull requests.

--

--