Real-time Data Processing and Analysis with Kafka, Connect, KSQL, Elasticsearch, and Flask

Stefentaime
18 min readFeb 20, 2023

--

This project involves working with real-time data from two Kafka topics, airplane-data and city-data. The goal is to join these topics using KSQL to create a third enriched topic, enriched-data. This enriched data is then ingested into an Elasticsearch index using Kafka Connect. Finally, we build an API that provides information about various metrics such as average flight speed, engine performance, number of flights, airline with most flights, top departure cities, top arrival cities, average temperature, average humidity, average pressure, and total distance flown.

The project aims to demonstrate how to work with real-time data using Kafka, KSQL, Elasticsearch, and Flask. It shows how to perform joins on Kafka topics, ingest data into Elasticsearch using Kafka Connect, and build a REST API to provide real-time metrics to end-users.

Architecture Overview:

The architecture is designed to handle real-time data processing using Kafka, Connect, KSQL, Elasticsearch, and Flask. The project consists of several services, each running in its own container, orchestrated with Docker Compose. The services include Zookeeper, Broker, Schema Registry, Connect, Control Center, KSQLDB Server, KSQLDB CLI, KSQL Data Generator, Kafka REST Proxy, Elasticsearch, Kibana, and Kafka UI.

Zookeeper provides centralized service for maintaining configuration information, naming, providing distributed synchronization, and group services. Broker is a distributed message broker that receives, stores, and distributes messages. Schema Registry manages the schema for the messages exchanged between the producers and consumers. Connect is a framework for streaming data between Kafka and other data systems. Control Center is a management and monitoring platform for Kafka clusters. KSQLDB Server is a distributed streaming SQL engine for Kafka. KSQLDB CLI is a command-line interface to interact with KSQLDB Server. KSQL Data Generator generates data for KSQLDB Server. Kafka REST Proxy provides a RESTful interface to interact with Kafka. Elasticsearch is a distributed, open-source search and analytics engine. Kibana is an open-source data visualization dashboard for Elasticsearch. Kafka UI is a web interface for Kafka management and monitoring.

The services communicate with each other through Kafka topics, where each service reads data from the previous service, processes it, and writes it to a new topic. The enriched-data topic is created by performing a join operation between the airplane-data and city_data topics using KSQL. The data from enriched-data is then processed using Kafka Connect to be written to Elasticsearch. Flask is used to create an API to consume the data in Elasticsearch and provide insights on the data.

services:
zookeeper:
image: confluentinc/cp-zookeeper:6.1.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-server:6.1.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: "true"
CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"

schema-registry:
image: confluentinc/cp-schema-registry:6.1.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "broker:29092"
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

connect:
hostname: connect
container_name: kafka-connect
image: confluentinc/cp-kafka-connect:6.1.0
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.1.0.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

control-center:
image: confluentinc/cp-enterprise-control-center:6.1.0
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
- ksqldb-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: "broker:29092"
CONTROL_CENTER_CONNECT_CLUSTER: "connect:8083"
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021

ksqldb-server:
image: confluentinc/cp-ksqldb-server:6.1.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"

ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:6.1.0
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true

ksql-datagen:
image: confluentinc/ksqldb-examples:6.1.0
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- connect
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 11 && \
tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081

rest-proxy:
image: confluentinc/cp-kafka-rest:6.1.0
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: "broker:29092"
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"

elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.11.2
container_name: elasticsearch
environment:
- xpack.security.enabled=false
- discovery.type=single-node
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
cap_add:
- IPC_LOCK
volumes:
- elasticsearch-data:/usr/share/elasticsearch/data
ports:
- 9200:9200
- 9300:9300

kibana:
container_name: kibana
image: docker.elastic.co/kibana/kibana:7.11.2
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- 5601:5601
depends_on:
- elasticsearch

kafka-ui:
image: "provectuslabs/kafka-ui:master"
ports:
- "9080:8080"
environment:
- KAFKA_CLUSTERS_0_NAME=streaming-demo
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=broker:29092
depends_on:
- broker

volumes:
elasticsearch-data:
driver: local

The architecture is highly scalable and can handle a large volume of data in real-time. The use of Docker Compose makes it easy to set up and manage the services, while Kafka provides a reliable and efficient way to handle the streaming data. The use of Elasticsearch and Kibana allows for easy querying and visualization of the data, while Flask provides a simple and user-friendly interface to consume the data.

Kafka Overview Topic:

The “airplane-data” topic contains information about a flight, including its timestamp, airplane ID, airplane model, airline company, departure city, arrival city, flight ID, flight number, departure time, arrival time, flight speed, altitude, engine performance, temperature, humidity, and pressure.

{
"timestamp": "2023-02-19 18:48:24.630370",
"airplane_id": "832ef42b-9050-4963-abde-de0e0f3b3c6b",
"airplane_model": "Boeing 787",
"airline_company": "Delta Air Lines",
"departure_city": "Atlanta",
"arrival_city": "Miami",
"flight_id": "5dd88b54-51e7-4580-b3e2-c2575ca142c6",
"flight_number": 8969,
"departure_time": "2023-02-19 19:48:24",
"arrival_time": "2023-02-21 11:48:24",
"flight_speed": 409.56,
"altitude": 35901.98,
"engine_performance": 46.48,
"temperature": 25.99,
"humidity": 71.89,
"pressure": 1075.45
}

The “city_data” topic contains information about a city, including its ID, state code, state name, city name, county name, latitude, and longitude.

 {
"id": 1,
"state_code": "AK",
"state_name": "Alaska",
"city": "Adak",
"county": "Aleutians West",
"latitude": 55.999722,
"longitude": -161.207778
}

To enrich the “ENRICHED_DATA” topic with latitude and longitude information for each city, we can join the two topics based on the departure and arrival cities. This will allow us to replace the latitude and longitude information in the “airplane-data” topic with the corresponding values from the “city_data” topic.

The enriched “ENRICHED_DATA” topic will then contain all the original fields from the “airplane-data” topic along with the corresponding latitude and longitude information for the departure and arrival cities. This will allow for more advanced analysis and visualization of the flight data.

{
"TIMESTAMP": "2023-02-19 22:41:40.349104",
"AIRPLANE_ID": "ff748449-3b71-478a-8a47-0aff55da0975",
"AIRPLANE_MODEL": "Boeing 787",
"AIRLINE_COMPANY": "Southwest Airlines",
"DEPARTURE_LATITUDE": null,
"DEPARTURE_LONGITUDE": null,
"DEPARTURE_CITY": "Phoenix",
"ARRIVAL_LATITUDE": null,
"ARRIVAL_LONGITUDE": null,
"FLIGHT_ID": "341d3f7b-efae-4fd6-993a-0e3e9a842d8b",
"FLIGHT_NUMBER": 5265,
"DEPARTURE_TIME": "2023-02-20 16:41:40",
"ARRIVAL_TIME": "2023-02-21 11:41:40",
"FLIGHT_SPEED": 405.43,
"ALTITUDE": 12493.31,
"ENGINE_PERFORMANCE": 61.65,
"TEMPERATURE": 35.54,
"HUMIDITY": 69.74,
"PRESSURE": 1066.9
}

Enriching data with ksqlDB:

CREATE STREAM airplane_data_stream (
timestamp VARCHAR,
airplane_id VARCHAR,
airplane_model VARCHAR,
airline_company VARCHAR,
departure_city VARCHAR,
arrival_city VARCHAR,
flight_id VARCHAR,
flight_number INT,
departure_time VARCHAR,
arrival_time VARCHAR,
flight_speed DOUBLE,
altitude DOUBLE,
engine_performance DOUBLE,
temperature DOUBLE,
humidity DOUBLE,
pressure DOUBLE
) WITH (KAFKA_TOPIC='airplane-data', VALUE_FORMAT='JSON');

This is a KSQL statement to create a new KSQL stream called airplane_data_stream with a schema that defines the structure of the data that will be stored in the stream. The schema has 16 fields, including timestamp, airplane_id, airplane_model, airline_company, departure_city, arrival_city, flight_id, flight_number, departure_time, arrival_time, flight_speed, altitude, engine_performance, temperature, humidity, and pressure.

The WITH clause specifies the Kafka topic that the stream is associated with (airplane-data) and the format of the data in that topic (JSON). This means that the data in the Kafka topic is expected to be in JSON format, and it will be parsed and stored in the stream according to the specified schema.

CREATE STREAM city_data_stream (
id INT,
state_code VARCHAR,
state_name VARCHAR,
city VARCHAR,
county VARCHAR,
latitude DOUBLE,
longitude DOUBLE
) WITH (KAFKA_TOPIC='city_data', VALUE_FORMAT='JSON');

his statement creates a Kafka stream named city_data_stream. The stream consists of data with the following fields: id, state_code, state_name, city, county, latitude, and longitude. The source of the data is the Kafka topic city_data, and the value format of the data is JSON. This statement essentially defines the structure of the data in the Kafka topic and makes it available as a stream in KSQL.

CREATE STREAM departure_cities AS 
SELECT *
FROM city_data_stream;

CREATE STREAM arrival_cities AS
SELECT *
FROM city_data_stream;

These two statements create two new streams, departure_cities and arrival_cities, by selecting all the data from an existing stream called city_data_stream.

The city_data_stream contains information about different cities, including their names, coordinates (latitude and longitude), and other details.

The departure_cities and arrival_cities streams are essentially copies of city_data_stream, but with a different name. These new streams can be used to enrich other data streams by joining them on the city names.

CREATE STREAM enriched_data AS 
SELECT ad.timestamp, ad.airplane_id, ad.airplane_model, ad.airline_company,
dc.latitude AS departure_latitude, dc.longitude AS departure_longitude,
ad.departure_city, ac.latitude AS arrival_latitude, ac.longitude AS arrival_longitude,
ad.arrival_city, ad.flight_id, ad.flight_number,
ad.departure_time, ad.arrival_time, ad.flight_speed, ad.altitude,
ad.engine_performance, ad.temperature, ad.humidity, ad.pressure
FROM airplane_data_stream ad
LEFT JOIN departure_cities dc
WITHIN 1 HOUR
ON ad.departure_city = dc.city
LEFT JOIN arrival_cities ac
WITHIN 1 HOUR
ON ad.arrival_city = ac.city;

This code is a KSQL statement that creates a new stream called “enriched_data”. The stream is created by selecting specific fields from the “airplane_data_stream” and joining them with fields from the “departure_cities” and “arrival_cities” streams. The join is performed based on the city names of the departure and arrival cities in the “airplane_data_stream” and the corresponding cities in the “departure_cities” and “arrival_cities” streams. The join is defined to be within a time window of 1 hour.

The resulting “enriched_data” stream includes all the fields from the “airplane_data_stream” as well as the latitude and longitude of the departure and arrival cities obtained from the “departure_cities” and “arrival_cities” streams. The purpose of this statement is to enrich the original “airplane_data_stream” with additional geographic information about the cities.

Final-Flow:

Setting up Elasticsearch and Indexing data:

This is a bash script that creates an Elasticsearch dynamic template. The dynamic template specifies the mapping of field names for the ENRICHED_DATA stream in Elasticsearch. The script does the following:

  • Unsets any existing http_proxy and https_proxy environment variables
  • Sets http_proxy and https_proxy to empty values to override any existing values
  • Sets the Elasticsearch host to localhost:9200
  • Defines the Elasticsearch dynamic template and sends an HTTP PUT request to the Elasticsearch host to create the template
  • The dynamic template specifies the mapping of field names for the ENRICHED_DATA stream in Elasticsearch. The mappings specify the data types of the fields, such as “timestamp” as a date, “airplane_id” as a keyword, and “flight_number” as an integer.
  • Outputs a newline character to separate the curl output from other output.
#!/usr/bin/env bash

# unset any http_proxy and https_proxy environment variables
unset http_proxy
unset https_proxy
# set http_proxy and https_proxy to empty values to override any existing values
export http_proxy=
export https_proxy=

# set the Elasticsearch host
HOST=localhost:9200

# define the Elasticsearch dynamic template
# this template specifies mappings for certain field names in the ENRICHED_DATA stream
curl -XPUT "http://${HOST}/_template/enriched_data/" -H 'Content-Type: application/json' -d'
{
"template": "enriched_data",
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"timestamp": {
"type": "date"
},
"airplane_id": {
"type": "keyword"
},
"airplane_model": {
"type": "keyword"
},
"airline_company": {
"type": "keyword"
},
"departure_latitude": {
"type": "float"
},
"departure_longitude": {
"type": "float"
},
"departure_city": {
"type": "keyword"
},
"arrival_latitude": {
"type": "float"
},
"arrival_longitude": {
"type": "float"
},
"arrival_city": {
"type": "keyword"
},
"flight_id": {
"type": "keyword"
},
"flight_number": {
"type": "integer"
},
"departure_time": {
"type": "date"
},
"arrival_time": {
"type": "date"
},
"flight_speed": {
"type": "float"
},
"altitude": {
"type": "float"
},
"engine_performance": {
"type": "float"
},
"temperature": {
"type": "float"
},
"humidity": {
"type": "float"
},
"pressure": {
"type": "float"
}
}
}
}'

# output a newline to separate the curl output from other output
echo

This code is creating a Kafka Connect Elasticsearch Sink Connector to write the enriched data to Elasticsearch.

The code starts by unsetting the http_proxy and https_proxy environment variables and then setting their values to empty strings to override any existing values.

The HOST variable is set to the Elasticsearch host, which in this case is set to http://localhost:9200.

The table_name variable is set to “enriched_data”, and the TABLE_NAME variable is set to the uppercase version of the table_name.

The curl command is then used to send a POST request to the Kafka Connect REST API to create a new Elasticsearch Sink Connector. The request includes a JSON configuration for the connector, which includes the name of the connector, the topics to consume, the key and value converters, the Elasticsearch connection URL, and a schema mapping that defines the mapping of the enriched data to Elasticsearch field types.

The output of the curl command is then displayed in the terminal.

#!/bin/bash

unset http_proxy
unset https_proxy
export http_proxy
export https_proxy

HOST=http://localhost:8083

table_name="enriched_data"
TABLE_NAME=`echo $table_name | tr '[a-z]' '[A-Z]'`

echo "************************************"
echo "*** MAP ${TABLE_NAME} ***"
echo "************************************"

curl -X "POST" "${HOST}/connectors/" \
-H "Content-Type: application/json" \
-d $'{
"name": "es_sink_'$TABLE_NAME'",
"config": {
"schema.ignore": "true",
"topics": "'$TABLE_NAME'",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": false,
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"key.ignore": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"type.name": "type.name=kafkaconnect",
"topic.index.map": "'$TABLE_NAME':'$table_name'",
"connection.url": "http://elasticsearch:9200",
"transforms": "ExtractTimestamp",
"transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.ExtractTimestamp.timestamp.field" : "timestamp",
"schema.mapping": "{ \"properties\": { \"timestamp\": { \"type\": \"date\" }, \"airplane_id\": { \"type\": \"keyword\" }, \"airplane_model\": { \"type\": \"keyword\" }, \"airline_company\": { \"type\": \"keyword\" }, \"departure_city\": { \"type\": \"keyword\" }, \"arrival_city\": { \"type\": \"keyword\" }, \"flight_id\": { \"type\": \"keyword\" }, \"flight_number\": { \"type\": \"integer\" }, \"departure_time\": { \"type\": \"date\" }, \"arrival_time\": { \"type\": \"date\" }, \"flight_speed\": { \"type\": \"float\" }, \"altitude\": { \"type\": \"float\" }, \"engine_performance\": { \"type\": \"float\" }, \"temperature\": { \"type\": \"float\" }, \"humidity\": { \"type\": \"float\" }, \"pressure\": { \"type\": \"float\" }, \"departure_latitude\": { \"type\": \"geo_point\" }, \"departure_longitude\": { \"type\": \"geo_point\" }, \"arrival_latitude\": { \"type\": \"geo_point\" }, \"arrival_longitude\": { \"type\": \"geo_point\" } } }"
}
}'

echo
echo

Building a Flask API:

from elasticsearch import Elasticsearch
from elasticsearch import Elasticsearch
from flask import Flask, jsonify, render_template

app = Flask(__name__, template_folder='path/scripts/template')

es = Elasticsearch()

@app.route('/')
def index():
return render_template('index.html')

@app.route('/average_flight_speed', methods=['GET'])
def get_average_flight_speed():
# Query Elasticsearch for all enriched_data documents
res = es.search(index='enriched_data', body={'query': {'match_all': {}}})

# Calculate the average flight speed of all flights
total_flight_speed = 0
for hit in res['hits']['hits']:
total_flight_speed += hit['_source']['FLIGHT_SPEED']
average_flight_speed = total_flight_speed / res['hits']['total']['value']

# Return the average flight speed as a JSON response
return jsonify({'message': 'Average flight speed calculated successfully',
'average_flight_speed': average_flight_speed,
'unit': 'km/h'})



@app.route('/average_engine_performance', methods=['GET'])
def get_average_engine_performance():
# Query Elasticsearch for all enriched_data documents
res = es.search(index='enriched_data', body={'query': {'match_all': {}}})

# Calculate the average engine performance of all flights
total_engine_performance = 0
for hit in res['hits']['hits']:
total_engine_performance += hit['_source']['ENGINE_PERFORMANCE']
average_engine_performance = total_engine_performance / res['hits']['total']['value']

# Return the average engine performance as a JSON response
return jsonify({'message': 'Average engine performance calculated successfully',
'average_engine_performance': average_engine_performance})

@app.route('/number_of_flights', methods=['GET'])
def get_number_of_flights():
# Query Elasticsearch for all enriched_data documents
res = es.search(index='enriched_data', body={'query': {'match_all': {}}})

# Calculate the number of flights
num_flights = res['hits']['total']['value']

# Return the number of flights as a JSON response
return jsonify({'message': 'Number of flights calculated successfully',
'number_of_flights': num_flights})


@app.route('/airline_with_most_flights', methods=['GET'])
def get_airline_with_most_flights():
# Query Elasticsearch for all enriched_data documents
res = es.search(index='enriched_data', body={'query': {'match_all': {}}})

# Calculate the number of flights for each airline
airlines = {}
for hit in res['hits']['hits']:
airline_company = hit['_source']['AIRLINE_COMPANY']
if airline_company in airlines:
airlines[airline_company] += 1
else:
airlines[airline_company] = 1

# Find the airline with the most flights
max_flights = 0
max_airline = ''
for airline, num_flights in airlines.items():
if num_flights > max_flights:
max_flights = num_flights
max_airline = airline

# Return the airline with the most flights as a JSON response
return jsonify({'airline_with_most_flights': max_airline, 'number_of_flights': max_flights})

@app.route('/top_departure_cities', methods=['GET'])
def get_top_departure_cities():
# Query Elasticsearch for all enriched_data documents
res = es.search(index='enriched_data', body={'query': {'match_all': {}}})

# Calculate the number of flights departing from each city
departure_cities = {}
for hit in res['hits']['hits']:
departure_city = hit['_source']['DEPARTURE_CITY']
if departure_city in departure_cities:
departure_cities[departure_city] += 1
else:
departure_cities[departure_city] = 1

# Sort the departure cities by the number of flights and get the top 5
top_departure_cities = sorted(departure_cities.items(), key=lambda x: x[1], reverse=True)[:5]

# Return the top departure cities as a JSON response
return jsonify({'top_departure_cities': top_departure_cities})

@app.route('/top_arrival_cities', methods=['GET'])
def get_top_arrival_cities():
# Query Elasticsearch for all enriched_data documents
res = es.search(index='enriched_data', body={'query': {'match_all': {}}})

# Calculate the number of flights arriving in each city
arrival_city_counts = {}
for hit in res['hits']['hits']:
if 'ARRIVAL_CITY' in hit['_source']:
arrival_city = hit['_source']['ARRIVAL_CITY']
if arrival_city in arrival_city_counts:
arrival_city_counts[arrival_city] += 1
else:
arrival_city_counts[arrival_city] = 1

# Get the top arrival cities
top_arrival_cities = dict(sorted(arrival_city_counts.items(), key=lambda item: item[1], reverse=True)[:5])

# Return the top arrival cities as a JSON response
return jsonify({'top_arrival_cities': top_arrival_cities})


@app.route('/average_temperature', methods=['GET'])
def get_average_temperature():
# Query Elasticsearch for all enriched_data documents
res = es.search(index='enriched_data', body={'query': {'match_all': {}}})

# Calculate the average temperature of all flights
total_temperature = 0
for hit in res['hits']['hits']:
total_temperature += hit['_source']['TEMPERATURE']
average_temperature = total_temperature / res['hits']['total']['value']

# Return the average temperature as a JSON response
return jsonify({
'message': 'Average temperature calculated successfully',
'average_temperature': average_temperature,
'unit': '°C'
})

@app.route('/average_humidity', methods=['GET'])
def get_average_humidity():
# Query Elasticsearch for all enriched_data documents
res = es.search(index='enriched_data', body={'query': {'match_all': {}}})

# Calculate the average humidity of all flights
total_humidity = 0
for hit in res['hits']['hits']:
total_humidity += hit['_source']['HUMIDITY']
average_humidity = total_humidity / res['hits']['total']['value']

# Return the average humidity as a JSON response
return jsonify({'average_humidity': average_humidity})

@app.route('/average_pressure', methods=['GET'])
def get_average_pressure():
# Query Elasticsearch for all enriched_data documents
res = es.search(index='enriched_data', body={'query': {'match_all': {}}})

# Calculate the average pressure of all flights
total_pressure = 0
for hit in res['hits']['hits']:
total_pressure += hit['_source']['PRESSURE']
average_pressure = total_pressure / res['hits']['total']['value']

# Return the average pressure as a JSON response
return jsonify({
'message': 'Average pressure calculated successfully',
'average_pressure': average_pressure,
'unit': 'hPa'
})

@app.route('/total_distance_flown', methods=['GET'])
def get_total_distance_flown():
# Query Elasticsearch for all enriched_data documents
res = es.search(index='enriched_data', body={'query': {'match_all': {}}})

# Calculate the total distance flown by all flights
total_distance = 0
for hit in res['hits']['hits']:
altitude = hit['_source']['ALTITUDE']
distance = altitude * 0.3048 / 1000
total_distance += distance

# Return the total distance flown as a JSON response
return jsonify({
'message': 'Total distance flown calculated successfully',
'total_distance': total_distance,
'unit': 'km'
})




if __name__ == '__main__':
app.run(debug=True)

This is a Python Flask app that connects to an Elasticsearch instance and serves several API endpoints that allow retrieving and processing flight data stored in Elasticsearch.

The app defines several endpoints, each of which sends a query to Elasticsearch, retrieves data, performs calculations on it, and returns the result as a JSON object.

Some of the endpoints are:

  • /average_flight_speed: retrieves all enriched_data documents, calculates the average flight speed of all flights, and returns the result as a JSON response.
  • /average_engine_performance: retrieves all enriched_data documents, calculates the average engine performance of all flights, and returns the result as a JSON response.
  • /number_of_flights: retrieves all enriched_data documents, calculates the number of flights, and returns the result as a JSON response.
  • /airline_with_most_flights: retrieves all enriched_data documents, calculates the number of flights for each airline, and returns the airline with the most flights as a JSON response.
  • /top_departure_cities: retrieves all enriched_data documents, calculates the number of flights departing from each city, sorts the cities by the number of flights, and returns the top 5 as a JSON response.
  • /top_arrival_cities: retrieves all enriched_data documents, calculates the number of flights arriving in each city, sorts the cities by the number of flights, and returns the top 5 as a JSON response.
  • /average_temperature: retrieves all enriched_data documents, calculates the average temperature of all flights, and returns the result as a JSON response.
  • /average_humidity: retrieves all enriched_data documents, calculates the average humidity of all flights, and returns the result as a JSON response.
  • /average_pressure: retrieves all enriched_data documents, calculates the average pressure of all flights, and returns the result as a JSON response.
  • /total_distance_flown: retrieves all enriched_data documents, calculates the total distance flown by all flights, and returns the result as a JSON response.

The app also defines a / endpoint that serves an HTML template.

How to Run:

  1. Clone the GitHub repository:
git clone https://github.com/Stefen-Taime/Real-time-Data-Processing-and-Analysis-with-Kafka-Connect-KSQL-Elasticsearch-and-Flask.git

2. Run Docker Compose to start the required services:

cd Real-time-Data-Processing-and-Analysis-with-Kafka-Connect-KSQL-Elasticsearch-and-Flask
docker-compose up -d

3. In a new terminal, go to the scripts directory and run the producer.py script:

cd scripts
python producer.py
import random
import datetime
import time
import uuid
from kafka import KafkaProducer
import json

airplane_models = ['Boeing 747', 'Airbus A320', 'Boeing 787', 'Airbus A380']
airline_companies = {
'Delta Air Lines': [
{'departure_city': 'Atlanta', 'arrival_cities': ['Los Angeles', 'New York', 'Las Vegas', 'Miami']},
{'departure_city': 'Los Angeles', 'arrival_cities': ['Atlanta', 'New York', 'San Francisco', 'Seattle']},
{'departure_city': 'New York', 'arrival_cities': ['Atlanta', 'Los Angeles', 'Chicago', 'Las Vegas', 'Miami']},
{'departure_city': 'Las Vegas', 'arrival_cities': ['Atlanta', 'Los Angeles', 'New York']},
{'departure_city': 'Miami', 'arrival_cities': ['Atlanta', 'New York']}
],
'United Airlines': [
{'departure_city': 'Chicago', 'arrival_cities': ['Houston', 'San Francisco', 'Seattle', 'Phoenix']},
{'departure_city': 'Houston', 'arrival_cities': ['Chicago', 'San Francisco', 'Seattle', 'Phoenix']},
{'departure_city': 'San Francisco', 'arrival_cities': ['Chicago', 'Houston', 'Seattle', 'Phoenix']},
{'departure_city': 'Seattle', 'arrival_cities': ['Chicago', 'Houston', 'San Francisco', 'Phoenix']},
{'departure_city': 'Phoenix', 'arrival_cities': ['Chicago', 'Houston', 'San Francisco', 'Seattle']}
],
'American Airlines': [
{'departure_city': 'New York', 'arrival_cities': ['Los Angeles', 'Chicago', 'Las Vegas', 'Miami']},
{'departure_city': 'Los Angeles', 'arrival_cities': ['New York', 'Chicago', 'Las Vegas']},
{'departure_city': 'Chicago', 'arrival_cities': ['New York', 'Los Angeles', 'Las Vegas']},
{'departure_city': 'Las Vegas', 'arrival_cities': ['New York', 'Los Angeles', 'Chicago']},
{'departure_city': 'Miami', 'arrival_cities': ['New York', 'Chicago']}
],
'Southwest Airlines': [
{'departure_city': 'Houston', 'arrival_cities': ['Phoenix', 'Atlanta', 'Chicago', 'Seattle']},
{'departure_city': 'Phoenix', 'arrival_cities': ['Houston', 'Atlanta', 'Chicago', 'Seattle']},
{'departure_city': 'Atlanta', 'arrival_cities': ['Houston', 'Phoenix', 'Chicago', 'Seattle']},
{'departure_city': 'Chicago', 'arrival_cities': ['Houston', 'Phoenix', 'Atlanta', 'Seattle']},
{'departure_city': 'Seattle', 'arrival_cities': ['Houston', 'Phoenix', 'Atlanta', 'Chicago']}
]
}

flight_delay = 1 # delay in minutes

producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda m: json.dumps(m).encode('ascii'))

while True:
for company, flights in airline_companies.items():
for flight in flights:
departure_city = flight['departure_city']
for arrival_city in flight['arrival_cities']:
# Generate random sensor data
flight_speed = round(random.uniform(400, 500), 2)
altitude = round(random.uniform(10000, 50000), 2)
engine_performance = round(random.uniform(0, 100), 2)
temperature = round(random.uniform(0, 40), 2)
humidity = round(random.uniform(20, 80), 2)
pressure = round(random.uniform(900, 1100), 2)

# Generate random flight data
airplane_id = str(uuid.uuid4())
flight_id = str(uuid.uuid4())
airplane_model = random.choice(airplane_models)
flight_number = random.randint(1000, 9999)
departure_time = (datetime.datetime.now() + datetime.timedelta(hours=random.randint(1, 24))).strftime("%Y-%m-%d %H:%M:%S")
arrival_time = (datetime.datetime.now() + datetime.timedelta(hours=random.randint(25, 48))).strftime("%Y-%m-%d %H:%M:%S")

# Generate a random timestamp for the data
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")

# Format the data as a JSON object
data = {
"timestamp": current_time,
"airplane_id": airplane_id,
"airplane_model": airplane_model,
"airline_company": company,
"departure_city": departure_city,
"arrival_city": arrival_city,
"flight_id": flight_id,
"flight_number": flight_number,
"departure_time": departure_time,
"arrival_time": arrival_time,
"flight_speed": flight_speed,
"altitude": altitude,
"engine_performance": engine_performance,
"temperature": temperature,
"humidity": humidity,
"pressure": pressure
}

# Publish the data to the Kafka topic
producer.send('airplane-data', value=data)

# Wait for a delay before generating the next flight
time.sleep(flight_delay * 60)

# Wait for 1 minute before generating the next data point
time.sleep(60)

4. In another terminal, run the populate_city.py script to populate the city_data topic:

from confluent_kafka import Producer
import json

def delivery_callback(err, msg):
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")

def read_json_file(file_path):
with open(file_path) as f:
data = json.load(f)
return data

def produce_to_topic(topic_name, data):
producer = Producer({"bootstrap.servers": "localhost:9092"})

for row in data:
# Convert keys to lowercase
row = {k.lower(): v for k, v in row.items()}
try:
producer.produce(topic_name, json.dumps(row), callback=delivery_callback)
except BufferError as e:
print(f"Message queue is full ({len(producer)} messages awaiting delivery): {e}")
producer.poll(0)

producer.flush()

if __name__ == "__main__":
data = read_json_file("path/scripts/us_cities.json")
produce_to_topic("city_data", data)
python populate_city.py
  1. Go to http://localhost:9021 and click on the Topics tab to view the created Kafka topics (airplane_data and city_data).
  2. Go to the KSQLDB tab and in the Editor section, run the following queries to create KSQLDB streams:
CREATE STREAM airplane_data_stream (timestamp VARCHAR, airplane_id VARCHAR, airplane_model VARCHAR, airline_company VARCHAR, departure_city VARCHAR, arrival_city VARCHAR, flight_id VARCHAR, flight_number INT, departure_time VARCHAR, arrival_time VARCHAR, flight_speed FLOAT, altitude FLOAT, engine_performance FLOAT, temperature FLOAT, humidity FLOAT, pressure FLOAT) WITH (KAFKA_TOPIC='airplane_data', VALUE_FORMAT='JSON');
CREATE STREAM city_data_stream (id INT, state_code VARCHAR, state_name VARCHAR, city VARCHAR, county VARCHAR, latitude FLOAT, longitude FLOAT) WITH (KAFKA_TOPIC='city_data', VALUE_FORMAT='JSON');CREATE STREAM departure_cities AS SELECT * FROM city_data_stream;CREATE STREAM arrival_cities AS SELECT * FROM city_data_stream;CREATE STREAM enriched_data WITH (KAFKA_TOPIC='enriched_data', PARTITIONS=1, REPLICAS=1) AS
SELECT a.timestamp, a.airplane_id, a.airplane_model, a.airline_company, a.departure_city, a.arrival_city, a.flight_id, a.flight_number, a.departure_time, a.arrival_time, a.flight_speed, a.altitude, a.engine_performance, a.temperature, a.humidity, a.pressure, d.latitude AS departure_latitude, d.longitude AS departure_longitude, a.latitude AS arrival_latitude, a.longitude AS arrival_longitude
FROM airplane_data_stream a
LEFT JOIN departure_cities d ON a.departure_city = d.city
LEFT JOIN arrival_cities a ON a.arrival_city = a.city;

5. Run the following command to create an Elasticsearch dynamic template

./04_elastic_dynamic_template

6. Run the following command to create a Kafka to Elasticsearch sink connector:

./05_kafka_to_elastic_sink

To download the Elasticsearch Sink Connector, you can follow these steps:

Open your terminal and run docker ps to get the name or ID of your Connect container.

Run docker exec -it <your_container_connect> bash to access the Connect container.

Once inside the container, run the following command to install the Elasticsearch Sink Connector:
confluent-hub install confluentinc/kafka-connect-elasticsearch:14.0.5.

After the installation is complete, stop and restart the Connect container using the following commands:

docker stop <your_container_connect>
docker start <your_container_connect>
Once you've completed these steps, the Elasticsearch Sink Connector should be available for use in your Kafka Connect instance.

7. Run the Flask web application:

python app.py

8. Open a web browser and go to http://localhost:5000 to view the web application. You can now use the application to query and analyze the enriched_data stream in Elasticsearch.

--

--

Stefentaime

Data engineer sharing insights and best practices on data pipelines, ETL, and data modeling. Connect and learn with me on Medium!