Ballerina Kafka Serialization with Avro

Thisaru Guruge
Ballerina Swan Lake Tech Blog
7 min readMay 5, 2020

This feature is now deprecated in Ballerina SwanLake

Apache Kafka is widely-used message streaming platform to handle real-time messaging in network applications. Ballerina on the other hand is a new programming language, which mainly focused on network applications.

Ballerina language comes with a handy set of libraries (or modules if you speak ballerina) to help to create cloud-era applications easily. The Ballerina Kafka module is such a module to easily handle Kafka messages in Ballerina applications.

Apache Avro is a data serialization system and it is frequently used in Kafka messaging as the serializing / deserializing system.

This article will demonstrate how to use Apache Avro to serialize / deserialize the messages in Ballerina Kafka producers and consumers.

You can find the source code used in this article on Github. Check it out.

First, let’s setup our environment. If you already know how to install ballerina, configure Kafka, and configure a Schema registry for Kafka, you can skip these steps.

Configure The Environment

  • Download and install Ballerina:
    First you need to download and install ballerina. You can easily download it from ballerina website. For this article, I am using the ballerina 1.2.3 version. Any Ballerina 1.2.x version should work fine.
  • Download Kafka:
    You need to download and run Kafka server instance to make this work. Apache Kafka can be downloaded from Kafka website. For this article, I am using Kafka 2.3.1, but any of the Kafka 1.x and 2.x versions should work fine.
  • Download Confluent (For schema registry)
    To use Avro serialization, we need a schema registry. Confluent provides a schema registry for Kafka. Confluent can be downloaded from Confluent website. I am using confluent 5.5.0 for this article. But confluent 5.4.x will also work fine.
  • Start the Zookeeper
    Open a terminal and go inside the downloaded Kafka Directory. Then enter the following command to start the zookeeper.
bin/zookeeper-server-start.sh config/zookeeper.properties
  • Start the Kafka Server
    Open another terminal and go inside the downloaded Kafka directory. Then enter the following command to start the Kafka server.
bin/kafka-server-start.sh config/server.properties
  • Start the Schema Registry
    Open another terminal and go inside the dowloaded Confluent directory. Then enter the following commands to start the Schema Registry.
bin/schema-registry-start etc/schema-registry/schema-registry.properties
  • A code editor
    You can use any code editor for this, I am using VSCode since VSCode has a pretty decent Ballerina plugin. You may use IntelliJ IDEA as well.

Creating A Ballerina Project

We are going to create a ballerina project with two modules. One module is for the producer, and the other is for the consumer.

To create the ballerina project, open a terminal and type the following:

ballerina new kafka_avro_sample

This will create a new Ballerina project. Now go inside the kafka_avro_sample directory and enter the following two commands:

ballerina add producer
ballerina add consumer

This will create two modules inside our ballerina project. The modules include resources, and tests directories as well as a Module.md file. For this article, I am going to remove the files and directories, which aren’t compulsory for the project to work, to reduce the complexity. If you want to know more about the ballerina project structure, read about how to structure ballerina code.

├── Ballerina.toml
└── src
├── consumer
│ └── main.bal
└── producer
└── main.bal

Now this looks much simple.

Adding The Dependencies

To run Ballerina Kafka clients with Avro, we need to add some dependencies. To do so, let’s first create a directory inside our project called javalibs to keep our dependencies.

mkdir javalibs

We need following dependencies to run the producer

  • io.confluent:kafka-avro-serializer:5.4.1
  • io.confluent:kafka-schema-registry-client:5.4.1
  • io.confluent:common-config:5.4.1
  • io.confluent:common-utils:5.4.1
  • org.apache.avro:avro:1.9.2
  • com.fasterxml.jackson.core:jackson-core:2.10.3
  • com.fasterxml.jackson.core:jackson-databind:2.10.3
  • com.fasterxml.jackson.core:jackson-annotations:2.10.3

You need to download the corresponding jar files and copy them to the previously created javalibs directory. The mentioned versions are the ones used for this article. You may use different versions.

Now we have to mention the dependencies in our Ballerina.toml file. Following is a sample dependency entry in the Ballerina.toml file. To learn more about adding Java dependencies in Ballerina, please read How to Call Java Code from Ballerina. Following is an example dependency entry in the Ballerina.toml file.

[platform]
target = "java8"
[[platform.libraries]]
module = "producer"
path = "./javalibs/kafka-avro-serializer-5.4.1.jar"
artifactId = "kafka-avro-serializer"
version = "5.4.1"
groupId = "io.confluent"

Creating The Producer

Now we can create our producer. To create the producer, open the src/producer/main.bal file in your editor.

You need to import the ballerina/kafka module first.

import ballerina/kafka;

Now we have to set the necessary properties of the producer. For this, Ballerina Kafka module have a record type called kafka:ProducerConfiguration.

kafka:ProducerConfiguration producerConfiguration = {
bootstrapServers: “localhost:9092”,
valueSerializerType: kafka:SER_AVRO,
schemaRegistryUrl: “http://localhost:8081"
};

Here, first we mention the Kafka server host and the port in bootstrapServers field.

Then we provide the serializer type. You can have valueSerializerType, and the keySerializerType for your Kafka messages. Here, I only use valueSerializerType. Since we do not specifically mention the keySerializerType, its value will be kafka:SER_BYTE_ARRAY which is the default value for both the keySerializerType and the valueSerializerType. By setting the valueSerializerType to kafka:SER_AVRO we set the Apache Avro as the serializer for the Kafka values.

Then we provide schemaRegistryUrl, which is we set in early steps. Since we use the default configurations for confluent, the schema registry URL will be http:localhost:8081.

Then we can create out Producer object as below:

kafka:Producer producer = new(producerConfiguration);

To demonstrate the serialization, I am defining two record types:

public type Account record {
int accountNumber;
float balance;
};
public type Person record {
string name;
int age;
Account account;
};

In order to serialize these records, we need a schema. I am defining the schema here inside the code for ease. You can use a separate file and read it.

string schema = "{\"type\" : \"record\"," +
"\"namespace\" : \"Thisaru\"," +
"\"name\" : \"person\"," +
"\"fields\" : [" +
"{ \"name\" : \"name\", \"type\" : \"string\" }," +
"{ \"name\" : \"age\", \"type\" : \"int\" }," +
"{ \"name\" : \"account\"," +
"\"type\" : {" +
"\"type\" : \"record\"," +
"\"name\" : \"account\"," +
"\"fields\" : [" +
"{ \"name\" : \"accountNumber\", \"type\" : \"int\" }," +
"{ \"name\" : \"balance\", \"type\" : \"double\" }" +
"]}" +
"}" +
"]}";

Now we create example Person record to send using Kafka.

Account account = {
accountNumber: 19930808,
balance: 123.23
};
Person person = {
name: "Lahiru Perera",
age: 28,
account: account
};

To send this record, we need to create a kafka:AvroRecord. This record consists two fields, schemaString, which is a string, to provide the Avro schema, and the dataRecord, which is the type of anydata to provide the record to serialize. We can create it as below.

kafka:AvroRecord avroRecord = {
schemaString: schema,
dataRecord: person
};

Now we can send the message using send() function in kafka:Producer.

var result = producer->send(avroRecord, "add-person-with-account");

Following is the complete code for the producer.

Creating The Consumer

Now we are going to create a consumer to consume Kafka messages, using Avro deserialization. First open the src/consumer/main.bal file in your editor.

We have to import balleina/kafka in here too.

import ballerina/kafka;

Now we can provide necessary properties for the Consumer. Following is the minimum required values for the consumer to use Avro deserialization.

kafka:ConsumerConfiguration consumerConfiguration = {
bootstrapServers: "localhost:9092",
groupId: "avro-consumer-group",
topics: ["add-person-with-account"],
valueDeserializerType: kafka:DES_AVRO,
schemaRegistryUrl: "http://localhost:8081/"
};

Then we can create a listener using these configurations.

listener kafka:Consumer consumer = new(consumerConfiguration);

Now, we can create a service on the above created listener. Inside the service, we have our resource function onMessage, which is the resource function required to consume Kafka messages inside a service.

service KafkaService on consumer {
resource function onMessage(kafka:Consumer consumer, kafka:ConsumerRecord[] records) {
foreach var kafkaRecord in records {
anydata value = kafkaRecord.value;
if (value is kafka:AvroGenericRecord) {
log:printInfo(value.toString());
} else {
log:printError("Invalid record type received.");
}
}
}
}

Whenever new message(s) pushed to the subscribed topics of the consumer, the onMessage resource will be dispatched. Then we can iterate through the received records using the foreach loop.

The value field of the kafka:ConsumerRecord holds the deserialized value of the received message.

Note that the Ballerina Kafka Consumers only supports generic Avro record deserialization yet. Therefore, you have to read the fields by their name.

Following is the complete code for the consumer.

Running The Project

Now we have completed the implementation, we can run the project. Make sure your Kafka server, and the Schema Registry is up and running.

To run the project, open a terminal window and go inside our project directory (kafka_avro_sample). We first run our consumer. To run it enter the following command in the terminal.

ballerina run consumer

This will start the consumer. If the consumer started without any issue, following lines will be printed in the terminal. Let it run in that terminal.

[ballerina/kafka] kafka servers: localhost:9092
[ballerina/kafka] subscribed topics: add-person-with-account
[ballerina/kafka] started kafka listener

Then we can run our producer. To do so, open another terminal, and enter the following command.

ballerina run producer

This will run our producer. If the producer was able to send the message successfully, the following line will be printed in the terminal.

Successfuly sent

Now, if you check the terminal where we ran our consumer, something similar to the following lines should be printed.

2020-05-06 18:44:26,671 INFO  [thisaru/consumer] - name=Lahiru Perera age=28 accountNumber=19930808 balance=123.23

That’s it! We successfully sent a message using Kafka with Avro serialization and received it back.

Hope you learned something from this.

Note: Apache Kafka and Ballerina logos are trademarks of the respective companies.

The complete project can be found as a Github project. Feel free to check it out.

--

--