How to Build your First Real-Time Streaming(CDC) system(Kafka Steams and Aggregation-Part 3)

Rohan Mudaliar
Analytics Vidhya
Published in
6 min readMar 16, 2020

In article 1 of this POC, we looked at the key concepts required to build the real-time streaming system, in article 2 we looked at the MySql setup and the local infrastructure setup. In this article, we are going to look at reading the real-time data ingested using Kafka streams, perform some aggregations on them and create indices in elastic search. We will as usual start with some basic concepts, then look at the technical tasks that we have and then the final implementation of the system.

Note: If you need a better understanding of the entire problem, please read the first and second part of the article, it contains detailed information regarding the problem statement, the architecture, infrastructure, and MySQL setup.

Basic Kafka Streams Concepts:

So before we dive into the implementation of the problem let us understand some Kafka streams basics:-

What is Kafka Streams?

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters.

Kafka Streams DSL

To interact with streams in Java, Kafka provides The Kafka Streams DSL (Domain Specific Language).

Two main abstractions that we are using from streams DSL are as below:-

KStream

I am gonna quote the example provided in apache’s official documentation for both of the above.

KTable

Overall Technical Tasks:

In terms of the overall technical tasks we had the below:-

  1. Setting up local infrastructure using docker.
  2. Data Ingestion into Kafka from MySQL database using Kafka connect.
  3. Reading data using Kafka streams in Java backend.
  4. Creating indices on Elasticsearch for the aggregated views.
  5. Listening to the events in real-time and updating the same.
  6. Setting up your local and running the java code

We have finished tasks 1 and 2 in the first article in this series along with getting a detailed understanding of the different concepts required to be understood for the problem. Now let’s jump to the rest of the tasks:-

3. Reading Data using Kafka streams in Java backend for First time- Index creation.

So now that we have our Kafka topics created, the next step is to read the streaming data, perform some aggregation using Kafka streams as required and store this in elastic search.

So let's look at what we want our final index to contain and its sources before we proceed with any code.

To achieve the above, let me break this down into steps:-

  1. Decide on the topics you want to listen to, these will be used to create the aggregated views.
  2. Decide on the tables(topics) you would have to join or transformations you would want to perform on the tables(topics) to arrive at your final aggregated tables.
  3. Once you have your final aggregated view constructed, call elastic search to create an Index.

Let's look at points 1 and 2 via code.

We are listening to logisticsTopic,orderTopic, and wmsTopic to begin with.

We are initializing a new streambuilder and creating an orderKtable, logisticsKtable, and wmsKtable from the corresponding topics. We are as of now not doing anything with the streamed data.

So to construct our final aggregated view, we need to create a few intermediate aggregations.

We are first processing the orderKTable and wmsKTable first setting the orderId as key.

OrderStream
WmsStream
ShipmentStream

Now that we have our individual tables ready, the next step is to create the intermediate aggregate tables. We first join order and wms table on orderId to create an aggregated view with shipmentId as key.

createOrderWmsStream method takes the merged stream and sets shipmentId as key. The merging of streams is done in assignValues method.

createOrderWmsStream
assignValues

AssignValues takes the two incoming streams, it extracts the JSON from them and we use this to create our result DTO object.

Now our next step is to merge this table to shipmentKtable to arrive at our final aggregate stream.

apendValues method here is used to construct the final aggregated view

apendvalue

assignValues and apendValue methods are used purely used for setting values to the aggregated views.

So with this, we have our final aggregated table ready.

4. Creating indices on Elasticsearch for the aggregated views.

Let's look at the controller class method

We read the final aggregated stream and we create a map in the controller from it, the map’ s key are the columns of the DTO and values are the DTO values. This map is required to store data in Elasticsearch.

Once we have created a map we call the service class and also provide the index that we want to create. This is done in the below line.

elasticSearchService.pushToEs(ElasticSearchTopics.OURBOUND_REPORT_TOPIC, inputMap);

Let's look at the service and the DAO class which is used to persist data into elastic search.

ElasticsearchService create index

we check if the index already exists if it does we don't create the documents.

OrderReportDAO.insertindex

Elastic search has a java high-level rest API which can be used for operations to work with elastic search. Here we are using IndexRequest to create a new index in elastic search.

IndexRequest indexRequest = new IndexRequest(tableName + "_es")
.id(payloadMap.get("id").toString())
.type("_doc").id(id).source(payloadMap);

The above statement is used to create a new request to create an elasticsearch Index. We create an instance of RestHighLevelClient as mentioned earlier to interact with elastic search from java.

restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);

The above statement basically does the creation of indices in elastic search.

5. Enabling real-time event-based updates to the system

Now that we have written code to create aggregate views required for business, the next step is to make updates to the created index based on individual event updates in real-time. Let's have a look at how we can go about this. Let us look at how we do this for one of the events from wms system. We can replicate the same for other systems as well.

We first listen to the wmstopic and create a stream from it.

Once we have this stream we need to check if the stream is an update or just insert, we figure this out by checking if the stream has a before the object. Existence of before object means that the current record is an update.

We now traverse through each record and we call the service class to do our updates.

What we are doing in the above piece of code is 3 things,

  • listen to the wms event,
  • once we have that event, we update the existing document with the new information from the event and
  • update the particular document in Elasticsearch.

The above is the controller for working with updates. Let's look at the DAO for the same.

UpdateIndex DAO

So what I am doing here is I am using UpdateRequest which is the java high-level client class provided to work with updates to Elasticsearch indices. We again use RestHighLevelClient to update the index.

Lets Visualize

We have now read from the stream, performed some aggregation, created an index and performed some updates. Now let's picture the entire thing.

SUMMARY:

So now that we have reached the final step lets recap everything that we have done in this exercise:-

  • We first enabled Binlogs on the MySQL database.
  • We created the required services in our local for the application using docker.
  • Next, we created a connector using Debezium-Kafka connect which would listen to the creates/updates in our MySQL database and push the changes to Kafka.
  • We wrote a Kafka streams application that would listen to the Kafka events in real-time and create aggregated views.
  • We created an index on Elasticsearch using RestHighLevelClient.
  • We wrote another listener using Kafka streams that listen to individual Kafka events and pushed in the updates.

Our last step is to verify the code in Elasticsearch. I also am covering the code set up in our local for those who want to try this out.

If you do like this article, please do read the subsequent articles and share your feedback. Find me on LinkedIn at rohan_linkedIn.

--

--