Optimizing Data Storage Using Compact Binary Formats

Harshita Singh
The Startup

--

Looking for a simple KStreams implementation to help you convert your json schema-less records into Avro? Or maybe Parquet?

I think, there are quite a few articles out there that tell you how to produce Avro records for Kafka. However, I had a hard time finding a solution on how I could transform an existing Json record into Avro/Parquet.

Let’s see if I can help some of you find a solution to this problem. In this tutorial, I’ll be showing you how to create a Kafka Streams application in Java using SpringBoot. Using this application, you’ll be able to read a json record, transform it into Avro and write it back to a new topic. Once your data is in Avro format, Kafka Connect provides support to read this Avro data and write it in either Avro or Parquet format.

If you are new to Kafka and want to understand some basic concepts, Apache Kafka provides a good introduction to get you started.

Prerequisites:
1. Confluent Platform (either with docker or without) — This includes Kafka and Schema Registry among other cool tools
2. Kafka S3 sink connector
3. Java8+
4. SpringBoot
5. Maven3.6.3+
6. IntelliJ IDEA — Or any editor of your choice :)

Avro vs Parquet:

Avro and Parquet are both compact binary storage formats that require a schema to structure the data that is being encoded. The difference is that Avro stores data in row format and Parquet stores data in a columnar format.
In my experience, these two formats are pretty much interchangeable. In fact, Parquet natively supports Avro schemas i.e., you could send Avro data to a Parquet reader and it would work just fine.

Side note: this conversion can be done quite easily using KSQL streams. You can find out how here.
However, there could be other reasons why you might want to pick KStreams over KSQL.

Alright, let’s get started!

On my quest for a working Avro data creation code, quite often I found myself asking “But how do you register the schema??”. Everyone spoke about what changes you need to make to the code, but nobody really explained how a schema-less Kafka topic suddenly had a registered schema (yes, schemas are registered topic-wise). Well, it’s kinda automatic. You’ll see how.

Let’s first generate a skeleton of our SpringBoot application using Spring Intializr. This is what mine looks like:

This will download a zip folder that you can then import into your Eclipse or IntelliJ IDEA.

First, let’s define our schema. Let’s create a schema that has a ‘student_name’ field and an ‘age’ field. Create an “avro” directory under src/main/java/resources in the imported project and add the following code in a schema_v1.avsc file inside it.

{
"type": "record",
"namespace": "com.kafkaprocessor",
"name": "json_to_avro",
"version": "1",
"fields": [
{ "name": "student_name", "type": "string", "doc":"Student Name" },
{ "name": "age", "type": "int", "doc":"age of student" }
]
}

Any Avro record has the following fields:
1. type
2. namespace
3. name
4. version
5. fields
Note, the namespace we are using is the same as the group name for the project skeleton that we created earlier.
The name and version fields define the name of the schema and the version. You can have multiple versions for the same schema. Read schema evolution and compatibility.
The fields section is where the actual data is stored.

Great! You have defined your schema. Let’s now tell our application where to look for this schema and where to create appropriate resources. For this, we will add the following lines to our pom.xml file.

<properties>
<!-- Keep versions as properties to allow easy modification -->
<avro.version>1.10.1</avro.version>
<kafka.version>2.6.0</kafka.version>
<confluent.version>6.0.1</confluent.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.6.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.3.6.RELEASE</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
<version>3.0.10.RELEASE</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-examples</artifactId>
<version>0.10.0.1</version>
</dependency>
<!-- Thanks for using https://jar-download.com -->

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
<stringType>String</stringType>
<createSetters>false</createSetters>
<enableDecimalLogicalType>true</enableDecimalLogicalType>
<fieldVisibility>private</fieldVisibility>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>target/generated-sources/avro</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Let’s focus on the plugins here. Specifically, org.apache.avro.avro-maven-plugin and org.codehaus.mojo.build-helper-maven-plugin. The avro-maven-plugin section has 3 goals and a source directory. This specifies the directory in which these files — ‘schema’, ‘protocol’ and ‘idl-protocol’ are present. Since we do not have protocol and idl-protocol files, we can skip them in the goals. The build-helper-maven-plugin, with the goal ‘add-source’ indicates that we want to create schema related class and files under the specified <source> directory, i.e., target/generated-sources/avro.

Now, let’s get to writing actual code. We want to build a stream to read data from the input topic, define some properties that will be associated with our Avro records, build the Avro record using the schema we created, the properties we defined and the Json data we’re reading and push it to an output topic through another stream.

So we’re going to define the main method accordingly

public static final String INPUT_TOPIC = "intopic";
public static final String OUTPUT_TOPIC = "outtopic";
public static void main(final String[] args) {
final Properties props = getStreamsConfig();

final StreamsBuilder builder = new StreamsBuilder();
createAvroStream(builder);
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);

// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-avroconverter-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});

try {
streams.start();
latch.await();
} catch (final Throwable e) {
System.exit(1);
}
System.exit(0);
}

getStreamsConfig() sets all the properties associated with our Avro records

static Properties getStreamsConfig() {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-avroconverter"); // By default, bootstrap server runs on localhost:9092
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
// Each record has a Key and a Value. The Key in this case would be a String/null, so we use String serializer-deserializer
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Using Generic Avro serializer-deserializer for values
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());
// Schema registry, by default runson localhost:8081
props.put("schema.registry.url", "http://127.0.0.1:8081");

// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}

You can read more about SerDes here.

Let’s now run $ mvn clean package. Don’t worry about any java: cannot find symbol errors, yet. Once you run the command, a “target” directory gets created under the project directory. When you navigate to the target/generated-sources/avro folder (as specified in pom.xml), you will find com/kafkaprocessor/json_to_avro.java class that will contain all methods that you might need to work with the Avro schema that we have created, including setStudentName() and setAge(). Notice that the class name is the same as the name of the schema in the avsc file.

Time to convert json to avro.

static json_to_avro avro_converter(JSONObject json_record){    // Create a record of json_to_avro type using json data
json_to_avro av_record = json_to_avro.newBuilder()
.setStudentName((String)json_record.get("student_name"))
.setAge((Integer) json_record.get("age")).build();
return av_record;
}
static void createAvroStream(final StreamsBuilder builder) {
// There is no inbuilt support for Json serdes. In order to read json data from input topic, we need to create one using JsonPOJOClass
// https://kafka.apache.org/10/documentation/streams/developer-guide/datatypes#json
Map<String, Object> serdeProps = new HashMap<>();

final Serializer<JSONObject> jsonSerializer = new JsonPOJOSerializer<>();
serdeProps.put("JsonPOJOClass", JSONObject.class);
jsonSerializer.configure(serdeProps, false);

final Deserializer<JSONObject> jsonDeserializer = new JsonPOJODeserializer<>();
serdeProps.put("JsonPOJOClass", JSONObject.class);
jsonDeserializer.configure(serdeProps, false);
final Serde<JSONObject> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
// Building KStream from source topic
final KStream<String, JSONObject> source = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), jsonSerde));

// Building Kstream with transformed avro records
final KStream<String, json_to_avro> avro_stream = source
.mapValues(value -> avro_converter(value));

// Sending the avro KStream data to the output topic
avro_stream.to(OUTPUT_TOPIC);
}

The line final KStream<String, json_to_avro> avro_stream = source.mapValues(value -> avro_converter(value)) is where we specify the type of the value inside each record in avro_stream, which is — json_to_avro custom schema type. So, when we run the KStream application, and avro_stream is written to the output topic, the schema gets registered with the topic.

Starting Confluent Platform
As discussed earlier, there are two ways to setup Confluent Platform on your system — with and without docker. Let’s start the services using either one of these methods:
with docker:
$ cd <path_to_confluent_dockerfile>
$ docker-compose up -d
$ docker-compose ps # Verify if the services are up
Without docker:
$ confluent local services start

‘Go’ time!
Produce the json records to ‘intopic’ and run the KStreams application. You can view the Confluent control center from your browser at http://localhost:9021. Go to cluster->topics->outtopic->schema and you will see the registered schema. You can also see the incoming messages in the message section.

Congratulations!! You’ve built your Json to Avro converter using KStreams. Using Kafka Connect you can easily read this Avro data and store it in either Avro or Parquet formats.
You can find the full code on my Github.

Let’s connect on LinkedIn!

--

--