Building Real-time Data Pipeline: A Comprehensive Guide with Kafka, Flink, and Elasticsearch

Kishan Modasiya
Simform Engineering
7 min readJan 3, 2024

Construct an agile, scalable, real-time pipeline with Kafka, Flink, and Elasticsearch as the connective foundation.

Real-time data analytics empowers businesses with timely insights and actionable intelligence. However, constructing robust data pipelines to enable real-time capabilities poses complex challenges. In this blog, I demonstrate an end-to-end solution by connecting three powerful technologies — Apache Kafka, Apache Flink, and Elasticsearch. Together, these tools form a robust foundation for constructing a seamless, end-to-end data pipeline that enables organizations to ingest, process, and visualize data in real-time. From event streaming with Kafka, to stream processing with Flink, and ultimately indexing and querying with Elasticsearch, this blog provides a hands-on exploration of how these components come together to empower businesses with timely insights and actionable intelligence.

Apache Kafka: Apache Kafka is a distributed event store and stream-processing platform. It is an open-source system developed by the Apache Software Foundation and written in Java and Scala. The project aims to provide a unified, high-throughput, low-latency platform for real-time data feeds.

Apache Flink: Apache Flink is an open-source, unified stream-processing and batch-processing framework developed by the Apache Software Foundation. The core of Apache Flink is a distributed streaming data-flow engine written in Java and Scala. Flink executes arbitrary dataflow programs in a data-parallel and pipelined manner.

Elasticsearch: Elasticsearch is a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents.

Here’s the overview of the pipeline that we will create in this blog:

Data Pipeline with Kafka, Flink, and Elasticsearch

Initially, a Python script generates and produces data into Kafka topics, acting as a data producer. Subsequently, Apache Flink, a powerful stream processing framework, ingests this data from Kafka topics, performs transformative operations, and efficiently channels the processed data to Elasticsearch.

Let’s get started:

Here are the prerequisites that will be used for this pipeline:

  • Confluent Kafka 7.5
  • Apache Flink 1.15.2
  • Elasticsearch 7.10 (We are using Elasticsearch on Docker)

Check this for installation of Confluent Kafka and Apache Flink.

We also need a connector to connect Kafka and Flink, so we need a jar file flink-connector-kafka-1.15.2.jar and kafka-client-2.8.1.jar for that. We also need a connector to connect Flink and Elasticsearch. For that, we need flink-sql-connector-elasticsearch7–1.15.2.jar . To ensure proper functionality, put these jar files in the path/to/flink/lib/ folder of Flink.

Also, change the port number of Flink from 8081 to something else. In Confluent Kafka, there is a schema registry that is also running on the 8081 port number, so we need to change one of them. To change the port number of Flink, go to path/to/flink/conf/flink-conf.yaml and change rest.port from 8081 to 8084.

Now start Kafka with the following command:

confluent local services start

Start Flink with the following command:

path/to/flink/bin/start-cluster.sh

For Elasticsearch, we will use a docker. Here’s the docker-compose file for Elasticsearch and Kibana:

version: '2'
services:

elasticsearch:
container_name: elastic
image: docker.elastic.co/elasticsearch/elasticsearch:7.10.2
ports:
- '9200:9200'
- '9300:9300'
environment:
xpack.security.enabled: "false"
discovery.type: single-node
ES_JAVA_OPTS: "-Xms1g -Xmx1g"

kibana:
container_name: kibana
image: docker.elastic.co/kibana/kibana:7.10.2
ports:
- '5601:5601'
depends_on:
- elasticsearch
environment:
SERVER_NAME: kibana.example.org
ELASTICSEARCH_HOSTS: '["http://elastic:9200"]'

Then, with the following command, start Elasticsearch on docker:

docker-compose up -d

To check whether everything is running properly or not, go to the following localhosts to check:

Dashboard of Kafka, Flink, and Elasticsearch

So everything is up and running. Now it’s time to produce data from Python to the Kafka topics. Here’s a Python script which will create three topics for user, product, and order:

from confluent_kafka import Producer
import json
import random
import time

# Kafka broker address
bootstrap_servers = 'localhost:9092'

# Kafka topics
user_topic = 'user_topic'
order_topic = 'order_topic'
product_topic = 'product_topic'

# Create a Kafka producer configuration
conf = {
'bootstrap.servers': bootstrap_servers,
'client.id': 'python-producer'
}

# Create a Kafka producer instance
producer = Producer(conf)

user_id_counter = 1 # Initialize the user ID counter
order_id_counter = 1 # Initialize the order ID counter
product_id_counter = 1 # Initialize the product ID counter

def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')

def generate_random_user():
global user_id_counter
user = {
'user_id': user_id_counter,
'user_name': f'User_{user_id_counter}',
'age': random.randint(18, 60),
'email': f'user_{user_id_counter}@gmail.com'
}
user_id_counter += 1
return user

def generate_random_product():
global product_id_counter
product = {
'product_id': product_id_counter,
'product_name': f'Product_{product_id_counter}',
'price': random.randint(100, 500)
}
product_id_counter += 1
return product

def generate_random_order():
global order_id_counter
order = {
'order_id': order_id_counter,
'user_id': random.choice(users)['user_id'],
'product_id': random.choice(products)['product_id'],
'quantity': random.randint(1, 5),
}
order_id_counter += 1
return order

def produce_message(producer, topic, message):
""" Produce a JSON message to the Kafka topic """
producer.produce(topic, value=json.dumps(message), callback=delivery_report)
producer.poll(0) # Trigger delivery report callbacks

# Initialize lists to store user and product data
users = []
products = []

# Generate 50 users
for _ in range(50):
users.append(generate_random_user())

# Generate 20 products
for _ in range(20):
products.append(generate_random_product())

def main():
try:
for user_data in users:
produce_message(producer, user_topic, user_data)
time.sleep(0.01)

for product_data in products:
produce_message(producer, product_topic, product_data)
time.sleep(0.01)

while True:
order_data = generate_random_order()
produce_message(producer, order_topic, order_data)
time.sleep(1)

except KeyboardInterrupt:
print("Interrupted! Closing producer...")
finally:
producer.flush()

if __name__ == '__main__':
main()

This script will insert 50 users into user_topic, 20 products into product_topic and insert orders into order_topic until the keyboard interrupts.

Run this script, and check topics in the Kafka dashboard. You will see three newly created topics.

Created Kafka topic using Python Script

Now, let’s consume these topics’ data in Flink using Flink SQL. For that, first start Flink SQL with the following command:

path/to/flink/bin/sql-client.sh
Flink SQL Terminal

We will create a Flink SQL table for all topics. Here’s a script for creating tables for each topic:

-- For user_topic data
CREATE TABLE users (
`user_id` INT,
`user_name` STRING,
`age` INT,
`email` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);

-- For product_topic data
CREATE TABLE products (
`product_id` INT,
`product_name` STRING,
`price` INT
) WITH (
'connector' = 'kafka',
'topic' = 'product_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);

-- For order_topic data
CREATE TABLE orders (
`order_id` INT,
`user_id` INT,
`product_id` INT,
`quantity` INT
) WITH (
'connector' = 'kafka',
'topic' = 'order_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);

To check whether the table has been created and consumes data from the Kafka topic, write an SQL query. You will see the below output:

SELECT * FROM users;
Data of Kafka Topic in Flink SQL Table

So we have three tables, users, products and orders. In these tables, you will see the orders table has references to other tables. Now, we can join these tables and create one sink table invoice for inserting data into Elasticsearch.

Create table script for invoice table and insert statement which will insert data by joining all tables:

CREATE TABLE invoice (
`order_id` INT,
`user_id` INT,
`product_id` INT,
`quantity` INT,
`user_name` STRING,
`age` INT,
`email` STRING,
`product_name` STRING,
`price` INT,
`total_amount` INT
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://172.17.0.1:9200',
'index' = 'invoice'
);

INSERT INTO invoice
SELECT
o.*,
u.user_name, u.age, u.email,
p.product_name, p.price,
o.quantity * p.price as total_amount
FROM orders AS o
LEFT JOIN users AS u
ON o.user_id = u.user_id
LEFT JOIN products AS p
ON o.product_id = p.product_id;

Once you execute the insert statement, it will submit one job to the Flink. You can see it on the Flink Dashboard.

Flink Job for inserting data in Elasticsearch

This job inserts data into the invoice table, which means it is inserting data into Elasticsearch. It will create one index in Elasticsearch with the name invoice.

To create an Index Pattern in Elasticsearch, visit the Kibana dashboard (http://localhost:5601/app/kibana#/management/kibana/index_pattern).

Create an Index Pattern for invoice

After creating an Index Pattern, you can see the written data at http://localhost:5601/app/kibana#/discover.

Invoice index with Flink SQL table’s data in Elasticsearch

Now, let’s clean up the environment.

Run the following command to stop Confluent Kafka:

confluent local services stop

Run the following command to stop Flink:

path/to/flink/bin/stop-cluster.sh

Run the following command to turn off docker containers for Elasticsearch:

docker-compose down

Conclusion

The composition of Kafka, Flink, and Elasticsearch in this real-time data pipeline exemplifies the prowess of modern data engineering. From the initial data production in Python to the transformative capabilities of Apache Flink and the persistent storage in Elasticsearch, this pipeline underscores the significance of a well-integrated and efficient data processing architecture. The seamless collaboration of these technologies facilitates real-time insights and exemplifies the potential for stream processing for addressing complex data challenges.

Follow Simform Engineering to keep yourself updated with the latest trends in the technology horizon. Follow us: Twitter | LinkedIn

--

--