Streaming data to Elasticsearch using Confluent connect tools and Avro Schema

Brachi Packter
3 min readNov 20, 2018

--

In this guide, we will walk through 10 steps, how to stream data in Avro format, by using the Confluent schema tool, to ensure data is always meeting to the schema contract. Then connect it to Elasticsearch with Confluent connect tool and plugins.

In this article I Assume you are already familiar with:

  1. Kafka producer/consumer
  2. Kafka connect
  3. Kafka streams
  4. Elasticsearch and Kibana

What I would explain is:

How to produce data from Kafka stream to a topic, using schema registration and Avro types, and then use this records for Elasticsearch connect.

When I worked on it, it was hard to find a good explanation how to do this, I find this tutorial but it is using CLI tools and not interactively working with a stream java application, So I decided to write some explanation about this.

Before we dive in just make sure you have:

  1. running Kafka and Zookeeper
  2. running Elasticsearch and Kibana
  3. installed Confluenthub client

Let’s start

1. configure the Elasticsearch connector

In the end, I would want to use Elasticsearch connect tool, that is available in Confluenthub client tools, and use this command to run it:

Before we run let’s make sure that we configure everything according to our use case:

  1. adjust quickstart-elasticsearch.properties, the important property for us is topic, we need to set the topic name as the stream output topic.

2. adjust connect-avro-standalone.properties to working with the correct key and value converter, the default is io.confluent.connect.avro.AvroConverter for both key and value, sometimes your key is a primitive object like a string, so make sure you change it in the properties:

now, we can start the connector from Confluenthub client:

2. start schema registration tool from Confluenthub client

3. configure new schema:

3.1 create a schema file, for example login-attack-count.avsc

3.2 register it:

you can try also do it in CLI: (this is different schema)

4. Define matching SerDe to the schema we have just created

5. build Avro record and send to output topic.

In this example I used windowedBy API : .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(1))) .

now my keys contain a window object, I need to extract some relevant data from this key.

6. build record:

This method will invoke rest API request for schema registration server with the newly added record, you can see it in the logs, the record itself sent as JSON in the request body, but with Avro format in the header, at the end this record is saved in the topic in Avro format(and not as JSON.)

2018–11–20 15:32:15,138] INFO 127.0.0.1 — — [20/Nov/2018:13:32:15 +0000] “POST /subjects/attack-value/versions HTTP/1.1” 200 9 23 (io.confluent.rest-utils.requests:60)
[2018–11–20 15:32:17,949] INFO 127.0.0.1 — — [20/Nov/2018:13:32:17 +0000] “POST /subjects/attack-value/versions HTTP/1.1” 200 9 4 (io.confluent.rest-utils.requests:60)

7. run the stream application

Simply. rum stream java application, and now, topic records automatically send to Elasticsearch index. index name is like the topic name.

8. Find Elasticsearch index

To find the index just run this in Kibana dev-tools

GET /attack/_search?pretty

9. Visualizing in Kibana

add visualizations and dashboard on this index.

10. enjoy your dashboard and be proud of yourself 👏 💪

and a small tip. right now you have many open tabs in your terminal, and you may be confused which tab is running which command, if you use iTerm (like me), just rum command+i, put a name in session title, and now you have name for each tab 🔥

and if you want source code: https://github.com/brachi-wernick/kafka-stream-fuse-day.

--

--