Real-Time Stream Processing Using Flink and Kafka with Scala and Zeppelin (Part 2): Case Study

Yousef Alkhanafseh
TurkNet Technology
Published in
11 min readSep 14, 2023

Examines a case study involving data consuming from two distinct Kafka topics, their real-time processing, and sinking the obtained data to Apache Kafka topic using Apache Flink. This task is accomplished using both Scala programming language and Apache Zeppelin.

INTRODUCTION

The rising demand for real-time data processing has led to the widespread adoption of tools like Apache Flink and Apache Kafka as they have the efficiency in handling data processing tasks. In general, Apache Flink can be considered as a powerful stream processing framework that empowers organizations to process and analyze real-time data. On the other hand, Apache Kafka can be considered as an open-source distributed event streaming platform used for building real-time data pipelines and streaming applications. In this tutorial, we will explore how to leverage Apache Flink in Scala to read NetFlow data and Call Detail Record (CDR) data from two distinct Kafka topics and then join them based on specific criteria. The installatons of all the used services are mentioned in the first part of this article. NetFlow data typically contains information about network traffic, it is previously discussed in this tutorial, while CDR data holds details of phone call records and customers private and real IPs and Ports. By combining these two data-sets, valuable insights can be gained which in turn could increase the potentioal of prepared reports. This project is published as a Github repository.

Data-sets

This project relies on two distinct datasets: NetFlow and CDR which are taken from Turknet ISP Company. It’s important to note that both datasets used in this tutorial have been anonymized and do not contain real specific data.

  • CDR Data-set

The CDR dataset (Call Detail Records dataset) contains information about customers internet connection activity, including customer ID, private IP, Real Port, Start Real IP, End Real IP, Start Datetime, and End Datetime which are used for various telecommunications and network analysis purposes. This data is taken from Turknet Database and only the data of 30.000 different customers are taken. The data features and their types are listed bellow:

cdr.csv
|-- CUSTOMER_ID: string
|-- PRIVATE_IP: string
|-- START_REAL_PORT: Integer
|-- END_REAL_PORT: Integer
|-- START_DATETIME: timestamp
|-- END_DATETIME: timestamp
  • NetFlow Data

NetFlow is a network protocol used for collecting and analyzing network traffic data, which includes flow start time, duration, source address, distination address, source port, distination port, in bytes, and out bytes, which can provide insights into network utilization, security, and performance by tracking flows of data packets in a network, refer to this tutorial to get more information about how to process netflow data with Nfdump. This data is captured from Turknet internet routers. Moreover, the data is modified to only contain Ingress (Download) observations. Moreover, only 1.000.000 observations are captured for this tutorial. The data features and their types are listed bellow:

netflow.csv
|-- NETFLOW_DATETIME: string
|-- SOURCE_ADDRESS: string
|-- SOURCE_PORT: Integer
|-- IN_BYTES: integer

IMPLEMENTATION DETAILS

The primary objective of this article is to demonstrate real-time stream processing using Apache Kafka and Apache Flink in Scala using sbt build tool and Apache Zeppelin. This tutorial utilizes two data sets, NetFlow and CDR, generated by separate Kafka producers and sent to distinct Kafka topics, namely “cdr” and “netflow.” Subsequently, Apache Flink processes the data on a server equipped with 64 GB RAM, 24 CPU cores, and a 1 TB disk. The processed data is then sent to another Kafka topic, “result”. Eventually, the process is followed by the creation of insightful reports and visualizations based on specific requirements. The main architecture of this tutorial is represented below in Figure 1.

Figure 1. Main Architecture

At the begninning, the installations of each service including Java, Scala, SBT, Apache Flink, Apache Kafka, and Apache Zeppelin must be done. Their installations are discussed signficantly in this article, please make sure to read and implement its details before diving into this case study. After the services are installed and ready, at first zookeeper and kafka services must be started using the following commands.

systemctl start zookeeper
systemctl start kafka

Then, three different Apache kafka topics (cdr, netflow, result) should be created to be ready to receive data from cdr, netflow, and flink, respectively. They can be created using the following three commands. The value of — replication-factor and — partitions are just set to 1 and 3, respectively.

kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic cdr
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic netflow
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic result

Consequently, Apache Flink service must be run. It can be started using the following commands:

cd
flink/bin/start-cluster.sh

At this point, the services are installed, started, and ready to make computations. The repository of this tutorial is published on Github and can be cloned by using the following command:

git clone https://github.com/yousef-alkhanafseh/flink-kafka-scala-tutorial.git

Now, NetFlow and CDR data-sets should be sent to kafka topics (netflow & cdr). This can be achived using the following commands:

cd flink-kafka-scala-tutorial/data/
kafka-console-producer.sh --broker-list localhost:9092 --topic cdr < cdr.csv
kafka-console-producer.sh --broker-list localhost:9092 --topic netflow < netflow.csv

You can be sure that the data-sets are produced to kafka topics using the following commands:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cdr --from-beginning
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic netflow --from-beginning

Now, the coding part starts. Nevertheless, before prepare the code in Apache Flink, we have created a Python Pandas code, which is popular between coders in Python, to illustrate the tasks that we will undertake using Apache Flink. In the Python code below, the pandas library is used to manipulate and analyze the two CSV datasets, “netflow.csv” and “cdr.csv” that are provided under flink-kafka-scala-tutorial/data/. It starts by importing the pandas library and reading the data from these CSV files into separate DataFrames as “netflowDf” and “cdrDf”, respectively. The code then performs an inner join operation on these DataFrames, merging them based on the “SOURCE_ADDRESS” column in “netflowDf” and the "PRIVATE_IP" column in “cdrDf”. Afterward, it specifies a set of columns and converts them to integers for numerical calculations. It filters the data based on specific criteria related to datetime and port values, retaining only relevant observations. It also converts the "IN_BYTES" column to float type for future sum aggregation. The code then groups the resulting DataFrame by the "CUSTOMER_ID" column and calculates the sum of "IN_BYTES" for each customer. Finally, it converts the aggregated byte values from Bytes to MegaBytes, prints the first 5 rows of the result as it is shown in Figrue 2, and saves the final DataFrame to a CSV file called "result.csv". This Pandas Python script can be found inside /flink-kafka-scala-tutorial/scripts/python/pandasScript.py.

import os
import pandas as pd

# Read Netflow Data
netflowDf = pd.read_csv(os.getcwd().split("flink-kafka-scala-tutorial/scripts/python")[0] + "flink-kafka-scala-tutorial/data/netflow.csv")

# Read CDR Data
cdrDf = pd.read_csv(os.getcwd().split("flink-kafka-scala-tutorial/scripts/python")[0] + "flink-kafka-scala-tutorial/data/cdr.csv")

# Join netflowDf with cdrDf based on SOURCE_ADDRESS and PRIVATE_IP columns, respectively.
final_df = netflowDf.merge(cdrDf, left_on = "SOURCE_ADDRESS", right_on = "PRIVATE_IP", how="inner")

# Specify integer columns
int_cols = ["START_DATETIME", "NETFLOW_DATETIME", "END_DATETIME", "SOURCE_PORT", "START_REAL_PORT", "END_REAL_PORT"]

# Change the int_cols columns format to int in order to be able to apply equal, smaller, and bigger calculations on them
for intCol in int_cols:
final_df[intCol] = final_df[intCol].astype(int)

# Take observations which achive the following crateria:
# A. NETFLOW_DATETIME betwen START_DATETIME and END_DATETIME
# B. SOURCE_PORT between START_REAL_PORT and END_REAL_PORT
final_df = final_df[(final_df["START_DATETIME"] <= final_df["NETFLOW_DATETIME"]) & (final_df["NETFLOW_DATETIME"] <= final_df["END_DATETIME"])]
final_df = final_df[(final_df["START_REAL_PORT"] <= final_df["SOURCE_PORT"]) & (final_df["SOURCE_PORT"] <= final_df["END_REAL_PORT"])]

# Change the IN_BYTES column format to float in order to be able to apply sum aggregation on it
final_df["IN_BYTES"] = final_df["IN_BYTES"].astype(float)

# Group by the df based on CUSTOMER_ID column and sum the IN_BYTES column
final_df = final_df.groupby(["CUSTOMER_ID"], as_index=False).agg(sumOutBytes = ("IN_BYTES", "sum"))

# Print the first 5 rows of the obtained result
print(final_df.head(5))

final_df.to_csv(os.getcwd().split("flink-kafka-scala-tutorial/scripts/python")[0] + "flink-kafka-scala-tutorial/data/result.csv", index=False)
Figure 2. Example output obtained from Pandas code.

Therefore, our goal is to trasfer the past Pandas Python code into Apache Flink Scala code. However, in Apache Flink we will read/consume and write/sink the data from and to Apache Kafka topics.

To successfully build and run the Scala code using sbt, you must ensure that the dependencies you import are compatible with the versions of other services in your project. The following dependencies are defined based on the services that we have mentioned in the first part of this tutorial. The full script is found inside build.sbt file under flink-kafka-scala-tutorial/scripts/scala/flink-project

ThisBuild / scalaVersion := "2.11.8"

val flinkVersion = "1.14.3"

libraryDependencies += "org.apache.flink" %% "flink-scala" % flinkVersion % "provided"
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % flinkVersion % "provided"
libraryDependencies += "org.apache.flink" % "flink-core" % flinkVersion % "provided"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "3.5.1"
libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.14.3"

mainClass in Compile := Some("org.apache.flink.flinkKafkaScalaTutorial")

The Apache Flink Scala code can be found inside this file → /flink-kafka-scala-tutorial/scripts/scala/flink-project/src/main/scala/org/example/flinkKafkaScalaTutorial.scala

After compiling it using sbt tool, its snapshoot jar file is created under /flink-kafka-scala-tutorial/scripts/scala/flink-project/target. Before running the jar file, ensure that Apache flink is running

cd
cd flink14/bin/
./start-cluster.sh

Then the code can be run using the following commands:

cd
cd flink-kafka-scala-tutorial/scripts/scala/flink-project/target
flink run -detached flink-project-0.1-SNAPSHOT.jar

Note: If it is required to change the source code, after changing it the following commands must be run to apply changes

cd
cd flink-kafka-scala-tutorial/scripts/scala/flink-project/
sbt clean && sbt compile && sbt package

In addition, the same code is also implemented on Apache Zeppelin. It can be found inside flink-kafka-scala-tutorial/scripts/zeppelin/Flink-Kafka-Scala-Zeppelin-CaseStudy_2JBQ3NG8Y.zpln and it is also stated below:

%flink

// import required libraries
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import java.util.Properties

// define kafka topics
val netflow_topic = "netflow"
val cdr_topic = "cdr"
val result_topic = "result"

// prepare apache flink enviroment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// prepare kafka consumers properties
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "flink-consumer-group")
properties.setProperty("auto.offset.reset", "earliest")

// prepare kafka cdr consumer
val cdrConsumer = new FlinkKafkaConsumer[String](cdr_topic, new SimpleStringSchema(), properties)
val cdrSource: DataStream[String] = env.addSource(cdrConsumer)

// modify the schema of cdr stream
val cdrStream = cdrSource
.map { jsonString =>
val fields = jsonString.split(",")
val CUSTOMER_ID = fields(0)
val PRIVATE_IP = fields(1)
val START_REAL_PORT = fields(2)
val END_REAL_PORT = fields(3)
val START_DATETIME = fields(4)
val END_DATETIME = fields(5)
(CUSTOMER_ID, PRIVATE_IP, START_REAL_PORT, END_REAL_PORT, START_DATETIME, END_DATETIME)}

// prepare kafka netflow consumer
val netflowConsumer = new FlinkKafkaConsumer[String](netflow_topic, new SimpleStringSchema(), properties)
val netflowSource: DataStream[String] = env.addSource(netflowConsumer)

// modify the schema of netflow stream
val netflowStream = netflowSource
.map { jsonString =>
val fields = jsonString.split(",")
val NETFLOW_DATETIME = fields(0)
val SOURCE_ADDRESS = fields(1)
val SOURCE_PORT = fields(2)
val IN_BYTES = fields(3)
(NETFLOW_DATETIME, SOURCE_ADDRESS, SOURCE_PORT, IN_BYTES)}

// join netflow and cdr streams based on the 2nd fields (SOURCE_ADDRESS and PRIVATE_IP), respectively.
val joinedDataSet = netflowStream
.join(cdrStream)
.where(netflow => netflow._2)
.equalTo(cdr => cdr._2)
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.apply { (netflow, cdr) =>
(netflow._1.toInt, netflow._3.toInt, netflow._4.toInt, cdr._1, cdr._3.toInt, cdr._4.toInt, cdr._5.toInt, cdr._6.toInt) // (NETFLOW_DATETIME, SOURCE_PORT, IN_BYTES, CUSTOMER_ID, START_REAL_PORT, END_REAL_PORT, START_DATETIME, END_DATETIME)
}

// take observations which achive the following crateria:
// A. NETFLOW_DATETIME betwen START_DATETIME and END_DATETIME
// B. SOURCE_PORT between START_REAL_PORT and END_REAL_PORT
// at the same time, aggregate the observations based on CUSTOMER_ID column and sum their IN_BYTES column
val resultStream = joinedDataSet.filter{
x =>
x._1 >= x._7 &&
x._1 <= x._8 &&
x._2 >= x._5 &&
x._2 <= x._6
}.keyBy(3).sum(2).map { line =>
(line._4, line._3.toString)
}

// print the result
resultStream.print()

// prepare kafka producer properties
val kafkaProducerProperties = new Properties()
kafkaProducerProperties.setProperty("bootstrap.servers", "localhost:9092")

// define kafka producer
val kafkaProducer = new FlinkKafkaProducer[String](
result_topic,
new SimpleStringSchema(),
kafkaProducerProperties)

// sink resulted data to kafka "result" topic
resultStream.map(x => x._1 + "," + x._2).addSink(kafkaProducer)

// execute this code with the title of "Real-Time Stream Processing Using Flink and Kafka with Scala and Zeppelin (Part 2): Case Study"
env.execute("Real-Time Stream Processing Using Flink and Kafka with Scala and Zeppelin (Part 2): Case Study")

The job schedule that we have executed using Apache Zeppelin and Apache Flink is illustrated in Figure 3 below. After running it on Apache Zeppelin, please navigate to ‘localhost:8081’ in your web browser, and then access the ‘Jobs >> Running Jobs’ section. It is evident from the figure that the job has two Apache Kafka sources, each undergoing two map operations before converging through the use of TumblingProcessingTimeWindows of 1 minute interval. As the data sources of this tutorial are small, 1 minutes of TumblingProcessingTimeWindows founded to be enough. After that, a filtration process, aligned with our specific criteria, is applied. Finally, the records are dispatched/sunk to the Apache Kafka “result” topic.

Figure 3. Apache Flink Job Interface

At the end, ensure that the obtained data is written to Apache Kafka “result” topic using the following command:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic result --from-beginning

CONCLUSION

In conclusion, this tutorial has explored a real-time stream processing case involving two distinct datasets, NetFlow and CDR. These datasets are sourced from separate Apache Kafka topics and processed using Apache Flink with both Scala and Apache Zeppelin. Subsequently, the data-sets are combined, and various aggregation methods are applied to the obtained data-set. Finally, the processed data is sent to another Apache Kafka topic. It is essential to refer back to the first part of this tutorial as it provides a comprehensive description of the requirements and installation procedures for the used services during this tutorial, including Java, Scala, SBT, Apache Flink, Apache Kafka, and Apache Zeppelin. Additionally, As Pandas is popular between data coders and to make the Apache Flink code easy to understand, an identical Pandas Python code is presented and then transformed into Apache Flink Scala code. The Apache Flink section is explained using the SBT tool and Apache Zeppelin. This tutorial is conducted in standalone mode. In the next tutorial, the same process can be replicated in Yet Another Resource Negotiator (YARN) cluster mode. Furthermore, more complex operations can be performed in Apache Flink code.

REFERENCES

Carbone, P., Katsifodimos, A., Ewen, S., Markl, V., Haridi, S., & Tzoumas, K. (2015). Apache flink: Stream and batch processing in a single engine. The Bulletin of the Technical Committee on Data Engineering, 38(4).

Flink (n.d). Apache Kafka Connector. Accessed on [13.09.2023]. Retrieved from https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/

Flink (n.d). Joining. Accessed on [11.09.2023]. Retrieved from https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/joining/

Flink (n.d). Windows. Accessed on [11.09.2023]. Retrieved from https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/

Kafka (n.d). APACHE KAFKA QUICKSTART. Accessed on [11.09.2023]. Retrieved from https://kafka.apache.org/quickstart

Zeppelin (n.d). Flink interpreter for Apache Zeppelin. Accessed on [12.09.2023]. Retrieved from https://zeppelin.apache.org/docs/latest/interpreter/flink.html

Scala (n.d). Scala Standard Library. Accessed on [13.09.2023]. Retrieved from https://www.scala-lang.org/api/2.13.6/scala/collection/View$$Filter.html

--

--