Handy-Messaging-Framework — Messaging Systems Portability

Aron Sajan Philip
7 min readSep 12, 2024

--

Preface

These days where AI systems are increasingly getting smarter, a quintessential part of the process, be it in the training or the prediction phase is the movement of data between systems. When organizations build such systems there are multiple messaging channels of choice such as Apache Kafka, Google Pubsub, Azure EventHub etc… Though these messaging systems have different implementation methods the core of what they offer pretty much stays the same — A publisher produces a message and one or more consumers can subscribe to that message though a topic or a queue. The consumers on receiving the message will process the message. In essence this fundamental behavior pretty much stays the same across most of these messaging systems. Though these messaging systems have plus and minus features to this fundamental behavior, there is no such standard framework for developers to interface with these messaging systems. This is the motivation for the Handy-Messaging-Framework.

Messaging Systems Portability — Walkthrough

This section is the walkthrough of sample code utilizing Handy-Messaging-Framework4J (HMF4J) demonstrating portability between Apache Kafka and Google Pubsub messaging systems. Apache Kafka and Google Pubsub has been taken for demonstration purpose, but this can be any such messaging systems that HMF4J supports.

The Plot

In this example here, we are going to build a consumer that is going to consume a message from a message channel and prints the contents of the message.

We will also be hosting an endpoint that generates a binary serialized form of a message (conforming to SimpleProtoMessage format). This message will be sent to the message channel.

Lets dive down to the details now —

Repository

Clone the repository here — Portability Example Code. The example here is illustrated based on this repository.

The Infrastructure

This section, we will setup the infrastructure needed for this illustration. We need infrastructure setup for Apache Kafka and Google Pubsub messaging systems. First, lets run the instances of Kafka and Pubsub. For this, in the root directory of the example (example_portability) run:

docker compose up

This should raise an instance of Apache Kafka and Google Pubsub. Now we need to define the queue aka topic needed, in each messaging system.

Apache Kafka

Create a topic in Kafka named test_topic with partitions = 1 and replication-factor = 1. You can follow the instructions here

Google Pubsub

For a Google Pubsub consumer to pull data from a queue, it needs the following 3 things:

  • ProjectID
  • Topic/Queue Name
  • Topic Subscription

Topic — For this, we first need to create a topic by the name test_topic for Google pubsub. Execute the curl request below for doing that:

curl --location --request PUT 'http://localhost:8085/v1/projects/sample-pubsub/topics/test_topic'

Note that the project ID we used here is sample-pubsub. This will be needed later on in configuring HMF4J

Subscription — Now we need to create a subscription for that topic. This can be done using the curl request below:

curl --location --request PUT 'http://localhost:8085/v1/projects/sample-pubsub/subscriptions/test_sub' \
--header 'Content-Type: application/json' \
--data '{"topic":"projects/sample-pubsub/topics/test_topic"}'

Here we are creating a new subscription for the topic test_topic by the name test_sub.

The configuration

Now that we are done with the infrastructure needed for this example, lets go to HMF4J’s configuration file. The configuration file named hmf4j-conf.yml dictates HMF4J the parameters needed for establishing a producer or a consumer. The structure of the configuration file follows profile(s) defined where each profile has the configuration needed to connect to a messaging system. Under the profile, details pertaining to the producer and consumer of the profile is defined.

Now, lets take a look at the configuration file for this example.

hmf4j:
profiles:
- profileName: kafka_profile
system: kafka
consumer:
properties:
bootstrap.servers: localhost:9092
group.id: test_app
max.messages.per.batch: 3
max.poll.duration.millis: 10000
- profileName: pubsub_profile
system: google-pubsub
consumer:
properties:
emulator.exec.flag: true
host.name: "localhost"
host.port: 8085
project.id: "sample-pubsub"
subscription.id: "test_sub"
max.messages.per.batch: 3
max.poll.duration.millis: 15000

The configuration file here has two profiles — `kafka_profile` and `pubsub_profile` for interfacing with Kafka channel and Google Pubsub channel respectively. Each profile’s system field defines which messaging system is it interfacing with. Each of the consumer section under the profiles specify the consumer properties for the two profiles. If any profile is picked up for use, these properties direct HMF4J how to create a consumer for the messaging system. More details on the configuration file for supported messaging systems can be found here — HMF4J Supported Platforms

Note that the configuration parameters align with the parameters we used for creating the infrastructure in the previous step

The Message Handler Class

The message handler class is the one going to be invoked for each message read from the incoming message channel.

@Component
public class DataProcessor extends MessageHandlerFoundation {

Logger LOGGER = LoggerFactory.getLogger(DataProcessor.class);
@Override
protected String getQueueName() {
return "test_topic";
}

@Override
protected String getProfileName() {
return "kafka_profile";
}

@Override
protected String getMessageTypeClass() {
return "io.github.handy.messaging.types.simplemessage.SimpleMessage";
}

@Override
public void handleMessage(Message message) {
SimpleMessage msg = (SimpleMessage) message;
LOGGER.info(String.format("Message Version - %s", msg.getVersion()));
LOGGER.info(String.format("Message ID - %s", msg.getId()));
LOGGER.info(String.format("Sender - %s", msg.getSender()));
LOGGER.info(String.format("Content Schema - %s", msg.getContentSchema()));
LOGGER.info(String.format("Payload - %s", new String(msg.getPayload())));
LOGGER.info(String.format("Sent at TS - %s", msg.getDateTimestamp().toString()));
}

@Override
public Optional<MessageHandler> getNewInstance() {
return Optional.empty();
}
}
  • getQueueName() — Tells HMF4J the queue from which HMF4J got to pull messages from. We are using test_topic
  • getProfileName() — Tells HMF4J the profile that needs to be used for this consumer. In this case, we are using the kafka_profile
  • getMessageTypeClass() — Tells HMF4J how to deserialize the messages read from this topic. HMF4J will first deserialize the message before delivering it to the message handler. For this example, we are using SimpleMessage class
  • handleMessage(Message message) — This function gets invoked for every message read from the incoming channel (queue). The application level logic of how to process the message can be written here. In the example here, we are just logging the contents of the message.
  • getNewInstance() — Is an optional entity that tells HMF4J how to generate a new instance of the message handler class for each message. If the Optional element is empty then the HMF4J will use the same instance of the message handler for each message. Note that, HMF4J invokes the message handler class for each message it receives in a different thread. So if the message handler class is mutable, it is recommended to return a value for getNewInstance() otherwise getNewInstance() can return Optional.empty() as in the example.

The mechanism of routing messages

When a message is read from the channel named test_topic HMF4J will first deserialize the message from the channel to the type defined by getMessageTypeClass() function. In this case, an instance of SimpleMessage (aka SimpleProtoMessage) then it is passed over to the handleMessage(Message message) function. The handleMessage decides what to do with the content of the message. In this case, we just simply log the content.

HMF4J uses Akka Actors to invoke the handleMessage. Thus each handleMessage is taken care by a different Actor in a different thread space. HMF4J has an efficient dispatching mechanism standing between reading the message from the queue to dispatching it to the handleMessage(…) function. More on this can be read here.

Running the application

  • In the root directory of the project (example_portability), run the application as:
mvn spring-boot:run
  • Next step is to get a message generated to be used for this demo. This can be achieved by running the following curl command:
curl -X GET http://localhost:8080/message > message.bin

A sample message will be generated and be available in the file message.bin

Handling the message for the Kafka profile

For publishing messages to the Kafka queue, I am using the tool Offset Explorer. You can use any tool of your preference for this.

Using Offset Explorer, I am posting the message.bin file created earlier into the test_topic

Once, the message gets posted you can see the following log statements in the application console log indicating that the message has been consumed and delivered to the DataProcessor class.

The contents of the message logged by DataProcessor class with Apache Kafka Messaging System

Switching over to Google Pubsub system

Now that we saw the message gets handled properly for the Kafka messaging system, we are going to check how the system handles the message through the Google pubsub messaging system.

Data Processor Changes

We can redirect our DataProcessor to use pubsub_profile instead of kafka_profile. This can be done by changing the getProfileName() function as:

@Override
protected String getProfileName() {
return "pubsub_profile";
}

Well! That’s the extent of changes needed.

Now lets run the application. Before this we need a base64 encoded version of the message.bin which we generated before. This is needed since Google’s PubSub REST API for publishing messages accept only binary data as base64 encoded string. You can use any terminal utility for generating base64 string of message.bin.

Run the service which consumes the message again using the command:

mvn spring-boot:run

Post the message to the Google PubSub topic — test_topic as:

curl --location 'http://localhost:8085/v1/projects/sample-pubsub/topics/test_topic:publish' \
--header 'Content-Type: application/json' \
--data '{"messages":[{"data": "<base64 encoded string>"}]}'

The section <base64 encoded string> should be replaced with the base64 string corresponding to message.bin.

Publish this message to Google Pubsub and inspect the running logs of the example service. You should be able to see the same log message as seen while connected to Kafka messaging system as shown below:

The contents of the message logged by DataProcessor class with Google Pubsub Messaging System

Conclusion

We have seen with this example that with minimal changes in the system, HMF4J enables a system to switch from one messaging system to another. To view a list of messaging systems supported check here — HMF4J Supported Messaging Systems

--

--