Building a Simple Review Filtering System Using Ballerina Kafka

Thisaru Guruge
Ballerina Swan Lake Tech Blog
5 min readFeb 12, 2019

Previously I explained about how to configure Ballerina-Kafka module to work along with Apache Kafka. Now let’s see how to implement a simple review filtering system using Ballerina and Kafka.

If you’re not familiar with Ballerina, please take a look at ballerina language first, before continuing.

Consider the following scenario:

Develop a system where you have to get reviews about online courses in an online educational website, and then decide whether are there any fraudulent reviews. If there are any, filter them out, and send your *honest* reviews to another place.

Kafka and Ballerina suits this scenario really well.

First what we need to do is to setup an endpoint where we take all the reviews. Using ballerina, you can easily do this.

Create the System to Filter Reviews

Following is the design of the application we’re going to build.

  • We have a Kafka broker, which is handling all the messages.
  • PreProcessReviews : This is the application to retrieve and pre-process the reviews. All the reviews are directed here, through a web service. We retrieve those reviews (in JSON format) and publish them to a Kafka topic pre-processed-reviews , so that others can get them.
  • FilterReviews : This is where we filter-out reviews. We decide whether a review is a fraudulent one, or not. Depending on the decision, we send the review into two different topics; accepted-reviews or rejected-reviews.
  • HandleAcceptedReviews/ HandleRejectedReviews : These two endpoints handle those accepted or rejected reviews.

In this sample I am not going to tell you how to decide a review is fraudulent or not. We’re just looking at how we can use Kafka and Ballerina together to develop a system to filter the reviews.

Following is our directory structure:

.
├── README.md
└── src
└── ballerina
├── FilterReviews
│ └── FilterReviews.bal
├── PreProcessReviews
└── PreProcessReviews.bal

We create a directory for each of out component, and inside the directory we create our .bal files. After creating this structure, inside the src/ballerina directory, execute the following command:

ballerina init

This will initialize a ballerina project in your area.

Now let’s create PreProcessReviews.bal component:

First we create a Kafka Producer using some cofigurations. Let’s look into those configurations

  • bootstrapServers: This configuration parameter sets the Kafka broker addresses, to which the producer should send the data.

You can use more than one Kafka brokers for a producer. If there are multiple brokers, use following syntax in your configuration: “bootstrapServers: host1:port1, host2:port2”

  • clientID: This configuration parameter is used for identifying the producer. This should be unique.
  • asks: This configuration denotes the number of acknowledgments the producer requires the leader to have received before considering a request complete.
  • noRetires: This configuration parameter sets the number of times which the producer retires to send a message, if sending is failed.

To see all the available configurations for a Ballerina Kafka Producer, please refer the API doc for wso2-kafka-module.

Then we create an HTTP service, which listens to the port 30005. Inside the service we read the request (which sends the reviews) and process the reviews.

Make sure to handle the errors, as ballerina is intended to do so. For the example, I have ignored some of the resulting variables (using _ = ) which is not a good practice.

As you can see we have two record types; Review and ProcessedReview. The reviews we get from our request should be matched to the record type Review while, we create ProcessedReview type from the review we receive, and the isFraud status of the Review. You can add isFraud flag after deciding whether a given Review is fraudulent or not. Then we convert the ProcessedReview into byte array, then send it to the Kafka Producer.

producer->send() function returns an error if the sending is failed, it returns nil otherwise.

Now let’s create our FilterReviews component:

Here we have both, a Kafka Consumer and a Kafka Producer.

First we define the configurations for the Kafka Consumer using the configurations shown in the code above. Here in Consumer configurations, we define the following configurations:

  • bootstrapServers: Host and port number of the Kafka brokers which the consumer should connect. As we mentioned in Kafka Producer configurations, Consumer configurations too can have multiple brokers, to add multiple brokers, just provide them comma separated (eg: bootstrapServers: "host1: port1, host2:port2"), as we did in Producer config.
  • groupId: This group ID is used to identify different consumer groups in kafka cluster. This is helpful when we need Kafka group management functionality is been used.
  • topics: This configuration holds the array of topics to which the Consumer should listen. You can provide multiple topics as an array of strings.

To see all the available configurations for a Ballerina Kafka Consumer, please refer the API doc for wso2-kafka-module.

I just said, Kafka Consumer can *listen* to particular topics! What is *listening* to a topic ? How can a Consumer *listen* to a Topic ?

Well, Kafka Consumer can be used as a listener in Ballerina. (This listener model is a common practice in ballerina, which is really handy.). Using this listener approach you can *listen* to a given Kafka topic, and whenever someone publishes to that particular topic, our listener will retrieve the values. For the purpose all you need to do is to define Kafka Consumer as a listener type.

All we have to do is to create a listener with topics to which it should listen, and create a service on that listener object.

service filterReviewService on reviewConsumer {
onMessage(
kafka:SimpleConsumer consumer,
kafka:ConsumerRecord[] records
) {
// Logic to process the review
}
}

This service (created on Kafka Consumer), have only one resource function, either

onMessage(
kafka:SimpleConsumer consumer,
kafka:ConsumerRecord[] records
) {
// Logic to process reviews.
}

or more complex resource signature:

resource function onMessage(
kafka:SimpleConsumer consumer,
kafka:ConsumerRecord[] records,
kafka:PartitionOffset[] offsets,
string
groupId
) {
// Logic to process the review.
}

Note that the onMessage() resource can return either error , nil () or union of both these types, error?. If your onMessage function do return something else, there will be a compilation error.

If the resource name is wrong or the function signature is invalid, there will be a compilation error.

ConsumerRecord[] is passed into the resource function, which will provide the array of records, which our listener retrieved. Then we can handle the received review and publish them into two different topics, rejected-reviews and accepted-reviews. (Not going to explain about the Producer again).

We can categorize the Reviews and send them two different topics, to handle after that. We use our producer to do so. Producer will send the review to either accepted-reviews or rejected-reviews depending on the status of the review.

That’s it !

This is a smaller example to show how to work with Ballerina Kafka. If you have any problem (regarding the Ballerina Kafka), feel free to ask. You can catch me on Twitter.

Let’s see how we can play more with Kafka offsets, and committing records in next posts.

Note: Apache Kafka and Ballerina logos are trademarks of the respective companies.

--

--