Data flow diagram of avro serialization

1. Set up the environment for Kafka (Kafka server, Zookeeper, Schema Registry) and Docker.

Imagine you are assigned the task of creating a data stream pipeline, the flow data properties updates over time. Such updates often require us to change the entire codebase. After making the necessary changes to the system, it no longer accepts the old system flow data.

To solve such problems, We can use a confluent schema registry to stream data in Avro format.This method allows users to consume both current or new data streams.

Since Windows does not support the Confluent platform, I use Docker Container to run the Kafka server, zoo and schema registry. The other important reason for the use of Docker is that this project can be easily performed on any Docker installed operating system.

Prerequisite — Install docker

Make sure you have a supported version of docker and docker-compose.

Note that all the commands used in this project run on top of docker.

docker --version
docker-compose --version

First, we need to create a docker-compose file. Then we have to include Kafka, Zookeeper and schema registry images to it. Docker images used in this project present by confluentinc.

confluentinc/cp-zookeeper:5.3.1
confluentinc/cp-enterprise-kafka:5.3.1
confluentinc/cp-schema-registry:5.3.1

I used the 5.3.1 version you can use the latest version by the time you follow this article.

Open the command prompt where your docker-compose.yml file is located and run the following command, which will create a zookeeper, a Kafka server and a Schema registry.

Note that the file name should be “docker-compose” and the file extension must be “yml”, otherwise, the command will not work.

docker-compose -f docker-compose.yml up -d

To check the status of the Docker container, run the following command.This command will give you all the details about the container. Such as the name of the container, the current state, and the ports to interact with.

docker-compose ps

Creating a topic

After making sure that Kafka and Zookeeper are in good condition, we can create a topic to publish data. I named the topic “Topic-A” and set the number of partitions and the replication factor to 1.

If you follow the docker-compose file, the script will create a single node cluster. Since we are doing this project using a single node cluster, the replication factor for each partition of a topic must be one.

docker run --net=host --rm confluentinc/cp-kafka:5.3.1 kafka-topics --create --topic Topic-A --partitions 1 --replication-factor 1 --if-not-exists --zookeeper localhost:2181

List the topic

This command will list all the existing topics in the cluster.

docker run --net=host --rm confluentinc/cp-kafka:5.3.1 kafka-topics --list --zookeeper  localhost:2181

Describe the topic

Using the describe command, we get an idea of ​​which the broker is holding the leader and followers. In our case, we only have one leader who doesn’t have followers.

docker run --net=host --rm confluentinc/cp-kafka:5.3.1 kafka-topics --zookeeper localhost:2181  --describe --topic Topic-A

If you have followed my approach so far. By now you have a zookeeper, Kafka server and a schema registry running on top of docker. Also, we created a topic to stream data.

2. Set up the programming environment

Now is the time to create a producer, consumer and avro schema. You can use any developing tool that supports Java for this project. I use IntelliJ idea.
First, create a Maven project and define all the required project dependencies in the “pom.xml” file.

3. Create a scheme for the object to be sent.

In this project I decided to send a employee object. Here is the Schema Registry for the employee object.

Name it “employee-version1.avsc” and put it into your resource file.

Here we don’t need to create separate classes for serializing and deserializing the objects like we used to do in my previous article.

When we use a 3rd party serializer like Avro, is concerned with creating the serialize class process.

Now let’s see learn to create a serialize class using Avro. If you use IntelliJ idea, you can simply create it by choosing the package option, which is available under life-cycle of Maven.

This leads to the creation of an object class based on the input scheme. Once the file is generated you will receive the message “BUILD SUCCESS”.

This automatically generated file is in your generated source directory.

If your are a person who is not relying on developing tools, you can use the avro-tools-1.9.2.jar to compile your schema manually.

Goto the following link and download avro-tools-1.9.2.jar file.

Once you download the tool use this command to generate your schema class.

java -jar <path/to/avro-tools-1.9.2.jar> compile schema <path/to/schema-file> <destination-folder>

4. Create the producer and send the Avro serial object to the topic.

Now let’s create a producer to publish our custom objects.

Create a producer class as follows and rename it KafkaAvroProducerVersion1. I named this class as version1, because we are going to use a version2 producer class to send different object attributes to the same topic.

If you look at the code you will see that “Value.serializer” is “KafkaAvroSerializer”. Moreover, I define the employee as a KafkaProducer’s data type.

5. Create customer and consume data published by the producer.

Once the producer sends the message to a Kafka topic . We need to create a consumer to consume published messages. Here is the code for customer I created.

Lets run the producer and publish some data to the topic we have created.

If the data successfully published in the kafka topic we get a response message from kafka .The response message includes all the details of how record stored in kafka.

We can use the consumer to consume this record from the other end.

6. Create another producer and send a different Avro serial object with different attributes and check if the customer can consume both records at once.

Lets create a schema for update version of employee object. I update the employee object by removing the employee_address and adding three properties — employee_blood_group,employee_email,employee_phone.

Here is the new producer class,that we are going to produce our updated employee object.

After we create a second producer, we can send our Avro serialize data to the same topic at once.

Console output after sent data from producer 1
Console output after sent data from producer 2

After verifying that the data has been sent to the topic, we are able to see whether the consumer consumed both current and new data.

Console output after consuming published data from consumer

You will notice that some fields are missing in this consumer, because this consumer is officially designed to consume data from an old employee object.

To get the latest employee object data we can use the same consumer class code, be sure to parse the new employee class to kafkaConsumer.

I hope you gain some knowledge by following my approach. I look forward to seeing you in another article.

The source code of this project will be available in this repository.

--

--

Naveen Fernando
Blogger

25 years old | Senior Software Engineer | Currently live in Sri-Lanka | Interested In Kafka, JavaScript, C#, Java, ReactJs, AWS, Stripe.