How to Join Two Separate Json Kafka Topics Using Apache Flink and Scala

Yousef Alkhanafseh
TurkNet Technology
Published in
7 min readOct 6, 2023

Significantly explores deserialization, aggregation, and serialization techniques related to Apache Kafka JSON messages, all implemented through Apache Flink Scala and Apache Flink SQL.

Figure 1. Main Architecture.

Introduction

In general, when starting to learn Apache Flink, some straightforward methods might be overlooked while executing tasks due to the lack of documentations and code-related questions about Apache Flink. Moreover, the presence of deprecated methods and misunderstandings among other developers also contribute to the challenge of grasping the core concepts of Apache Flink. Consequently, some predefined methods might be missed, thereby writing unnecessary code. Starting from here, this documentation discusses three main subjects that have never been explained before:

  • Consuming JavaScript Object Notation (JSON) events that come from two different Apache Kafka topics.
  • Applying aggregations to consumed events.
  • Producing JSON events back to Apache Kafka.

Therefore, this documentation covers deserialization, aggregation, and serialization methods related to JSON messages, see Figure 1. It is important to note that these operations are done using both Apache Flink and Apache Flink SQL. The used enviorment can be completely installed by following the instructions found in our article.

Data-sets

The data-sets used during this tutorial is previously explained in our article, which also includes case study using Apache Flink. Additionaly, please refer to this article to be more familier with NetFlow data. However, in this tutorial only the format of NetFlow and CDR data-sets are converted to JSON. Please use the following command to clone this Github repository in order to reach the data:

git clone https://github.com/yousef-alkhanafseh/flink-kafka-scala-tutorial.git

An example of the used NetFlow data-set is presented below, and the data file is flink-kafka-scala-tutorial/data/netflowjson.json

{
"NETFLOW_DATETIME": "1694086940",
"SOURCE_ADDRESS": "222.161.138.90",
"SOURCE_PORT":"58587",
"IN_BYTES":90
}

In addition, a record from the used CDR data-set is displayed below, and the data file is flink-kafka-scala-tutorial/data/cdrjson.json.

{
"CUSTOMER_ID": "Customer-6975",
"PRIVATE_IP": "222.161.138.90",
"START_REAL_PORT":57784,
"END_REAL_PORT":59073,
"START_DATETIME":"1693568679",
"END_DATETIME":"1694091906"
}

Initially, Apache Kafka topics must be prepared. Three different topics should be created with the names of “cdrjson”, “netflowjson”, and “resultjson” which are supposed to have data related to CDR, NetFlow, and the obtained result, respectively. These topics can be created using the following commands.

kafka-topics.sh -create -bootstrap-server localhost:9092 -replication-factor 1 -partitions 3 -topic cdrjson
kafka-topics.sh -create -bootstrap-server localhost:9092 -replication-factor 1 -partitions 3 -topic netflowjson
kafka-topics.sh -create -bootstrap-server localhost:9092 -replication-factor 1 -partitions 3 -topic resultjson

To make the flow of this tutorial easier, the two datasets are decided to be produced from JSON files rather than from live sources. The two data-sets are found under flink-kafka-scala-tutorial/data folder, and they can be written to their topics by using the following commands:

cd flink-kafka-scala-tutorial/data
kafka-console-producer.sh -broker-list localhost:9092 -topic cdrjson < cdr.json
kafka-console-producer.sh -broker-list localhost:9092 -topic netflowjson < netflow.json

Coding

In this section, we provide explanations for two distinct coding approaches: Apache Flink Scala and Apache Flink SQL.

  • Apache Flink

There are several documentations that lead to misunderstanding the methods of deserializing JSON messages in Apache Flink which most of them state that a custom deserialization should be prepared in order to be able to read JSON messages in Apache Flink!. Nevertheless, Apache Flink has a built-in function that is able to deserialize JSON messages automatically, JSONKeyValueDeserializationSchema(), which takes boolen value as an argument [2]. This argument is responsible about inculding or passing the metadata of the message. If it is set to true, then, at the end of each record metadata that includes information about offset, topic, and partition will be added as a nested dictionary, as “metadata”:{“offset”:172940,”topic”:”netflowjson”,”partition”:0} .

If it is set to false, the message itself will only be presented without including the metadata.

The same code that is developed during this articleunder IMPLEMENTATION DETAILS section is prposed to be applied in this tutorial. But, our target is to reduce the amount of coding by reading and writing the data-sets in JSON format which in turn reduces the complexity of coding and execution time. The code is stated below and it is clear from it that It map operations for both netflowConsumer and cdrConsumer are eliminated. The flow of the job is clearly displayed in Figure 2 below.

%flink

import java.util.Properties
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows

val env = StreamExecutionEnvironment.getExecutionEnvironment

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "flink-consumer-group")
properties.setProperty("auto.offset.reset", "earliest") // set as earliest so that earliest events are consumed

val netflowTopic = "netflowjson"
val cdrTopic = "cdrjson"
val resultTopic = "resultjson"

val includeMetadata = false

val netflowConsumer = new FlinkKafkaConsumer(netflowTopic, new JSONKeyValueDeserializationSchema(includeMetadata), properties)
val netflowStream= env.addSource(netflowConsumer)

val cdrConsumer = new FlinkKafkaConsumer(cdrTopic, new JSONKeyValueDeserializationSchema(includeMetadata), properties)
val cdrStream = env.addSource(cdrConsumer)


val joinedDataSet = netflowStream
.join(cdrStream)
.where(netflow => netflow.get("value").get("SOURCE_ADDRESS").asText())
.equalTo(cdr => cdr.get("value").get("PRIVATE_IP").asText())
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.apply { (netflow, cdr) =>(netflow.get("value").get("NETFLOW_DATETIME").asInt(),
netflow.get("value").get("SOURCE_PORT").asInt(),
netflow.get("value").get("IN_BYTES").asInt(),
cdr.get("value").get("CUSTOMER_ID").asText(),
cdr.get("value").get("START_REAL_PORT").asInt(),
cdr.get("value").get("END_REAL_PORT").asInt(),
cdr.get("value").get("START_DATETIME").asInt(),
cdr.get("value").get("END_DATETIME").asInt())} // (NETFLOW_DATETIME, SOURCE_PORT, IN_BYTES, CUSTOMER_ID, START_REAL_PORT, END_REAL_PORT, START_DATETIME, END_DATETIME)

// take observations which achive the following crateria:
// A. NETFLOW_DATETIME betwen START_DATETIME and END_DATETIME
// B. SOURCE_PORT between START_REAL_PORT and END_REAL_PORT
// at the same time, aggregate the observations based on CUSTOMER_ID
// column and sum their IN_BYTES column

val resultStream = joinedDataSet.filter{
x =>
x._1 >= x._7 &&
x._1 <= x._8 &&
x._2 >= x._5 &&
x._2 <= x._6
}.keyBy(3).sum(2).map { line =>
("{CUSTOMER_ID:" + line._3 + "," + "sumOutBytes:" + line._4 + "}")}

// prepare kafka producer properties
val kafkaProducerProperties = new Properties()
kafkaProducerProperties.setProperty("bootstrap.servers", "localhost:9092")

// define kafka producer
val kafkaProducer = new FlinkKafkaProducer[String](
resultTopic,
new SimpleStringSchema(),
kafkaProducerProperties)

// sink resulted data to kafka "result" topic
resultStream.addSink(kafkaProducer)

resultStream.print()

env.execute("How to Read and Write Json Messages From and To Apache Kafka Using Apache Flink and Scala")
Figure 2. Apache Flink Running Jobs Interface
  • Apache Flink SQL

Unfortunately, when installing Apache Flink 1.14.3, the flink-sql-connector-kafka dependency is not automatically included. Therefore, to enable Apache Flink SQL over Apache Kafka, this dependency must be downloaded first. You can download it by executing the following commands:

  1. Go to the home directory
cd flink14/lib

2. Go to Apache Flink library of functions (lib) directory

cd flink14/lib

3. Install flink-sql-connector-kafka dependency

wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.14.3/flink-sql-connector-kafka_2.11-1.14.3.jar

In Apache Flink SQL, each Apache Kafka topic must be defined in seperete SQL tables. These tables can be created using CREATE TABLE command which takes columns names and their types as arguments, then it is followed with WITHcommand that includes Apache Kafka Topic properties. The most important properties, based on [2], [3] and [4], are:

connector: specifies which connector to use,

topic: defines topic name(s),

properties.bootstrap.servers: contains Kafka brokers,

properties.group.id: determines the id of the consumer group for Kafka source,

format: describes messages’ format type ,

scan.startup.mode: defines Startup mode for Kafka consumer,

json.timestamp-format.standard: Specifies the input and output timestamp formats

json.fail-on-missing-field: explaines whether to fail if a field is missing or not,

json.ignore-parse-errors: defines to skip fields and rows with parse errors instead of failing.

As a result, cdr and netflow tables can be created as follow:

  • CDR Table
%flink.ssql

CREATE TABLE cdr(
CUSTOMER_ID STRING,
PRIVATE_IP STRING,
START_REAL_PORT BIGINT,
END_REAL_PORT BIGINT,
START_DATETIME STRING,
END_DATETIME STRING
) WITH (
'connector' = 'kafka',
'topic' = 'cdrjson',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset',
'json.timestamp-format.standard' = 'ISO-8601',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
  • NetFlow Table
%flink.ssql

CREATE TABLE netflow(
NETFLOW_DATETIME STRING,
SOURCE_ADDRESS STRING,
SOURCE_PORT BIGINT,
IN_BYTES BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'netflowjson',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset',
'json.timestamp-format.standard' = 'ISO-8601',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)

These two tables can be joined and grouped using the following SQL query. It is important to note that window is not used here as it does not play an important role.

SELECT CUSTOMER_ID, SUM(IN_BYTES) as sumOutBytes
FROM netflow
INNER JOIN cdr
ON netflow.SOURCE_ADDRESS = cdr.PRIVATE_IP
WHERE netflow.SOURCE_PORT > cdr.START_REAL_PORT AND netflow.SOURCE_PORT < cdr.END_REAL_PORT AND netflow.NETFLOW_DATETIME > cdr.START_DATETIME AND netflow.NETFLOW_DATETIME < cdr.END_DATETIME
GROUP BY CUSTOMER_ID

The results obtained are presented in both Figure 3 and Figure 4. It is clear from Figure 4 that Apache Zeppelin can display the results in various formats, including line, area, and pie charts.

Figrue 3. Flink SQL table result.
Figrue 4. Flink SQL line result.

Conclusion

In summary, learning Apache Flink can be challenging for beginners due to limited documentation and the abundance of deprecated methods and misunderstandings among developers. This documentation has sucssesfully addressed these issues by focusing on three previously unexplained aspects: consuming JSON events from multiple Kafka topics, applying aggregations, and producing JSON events. It is prepared using both Apache Flink Scala and Apache Flink SQL.

References

[1] nightlies (n.d). CLASS. Accessed on [23.09.2023]. Retrieved from https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.html

[2] Flink (n.d). JSON Format. Accessed on [25.09.2023]. Retrieved from https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/

[3] Flink (n.d). Apache Kafka SQL Connector. Accessed on [23.09.2023]. Retrieved from https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/

[4] Flink (n.d). Formats. Accessed on [24.09.2023]. Retrieved from https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/overview/

--

--