Real-Time User Profile Data Pipeline from Kafka to Elasticsearch
Goal
Nowadays there is a large amount of data generated from numerous sources, such as web services, digital media, sensor log data, etc., and only a small portion of it has been well managed or utilized to create value. This article may be a bit old but still worth a quick read — The Data Made Me Do It.
It has become more challenging than ever before to read a large amount of data, process it, and take action upon that data.
In this article, I’m trying to demonstrate:
- Generate mock user profile data in Python
- Send mock data to Kafka topic by Kafka Producer
- Read data and Upload into Elasticsearch with Logstash
- Visualize streaming data with Kibana
Data Streaming Architecture
My Environments
- Ubuntu Desktop 18.04 on Oracle VirtualBox 6.1 (Created virtual hard disk 30 GB)
Everything below was installed in the above VM:
- Python 3.8
- PyCharm 2020.2.1
- Docker 19.03.12
- Docker Compose 1.26.2
- Configuration file of Logstash: /etc/logstash/conf.d/
- Log file: /var/log/logstash
- Install Kafka-Zookeeper-KafkaManager Using Docker Compose
- Kafka is listening on PORT 9092
- Zookeeper is listening on PORT 2181
- Kafka Manager is listening on PORT 9000
- Install Elasticsearch and Kibana 7.4.0 using Docker Compose
- Elasticsearch is running on PORT 9092
- Kibana is running on PORT 5601
- Java Runtime Environment, JRE, OpenJDK 11.0.7
- Logstash
Let’s get started
We can use Docker Compose to run multi-container Docker applications. With Compose, we use a YAML file to configure our services. About how to configure YAML files for each service below, we can refer to the Docker doc.
Step 1: Start the services in the background
# Start kafka
docker-compose -f kafka-docker-compose.yml up -d
# Start ELK
docker-compose -f elasticsearch-docker-compose.yml up -d
# Check if these dockers are up
docker ps
Step 2: Generate mock user profile data with Python faker library
Set up a Python 3.8 project by creating a virtual environment:
# Find Python3.8 Interpreter Path in Linux/Mac Operating System
whereis python3.8
>>> output: /usr/local/bin/python3.8# Install pip
sudo apt install python-pip# Install virtualenv
sudo apt install virtualenv
Create a folder for this project. Open PyCharm, go to the directory where I want to create my project. Use settings to set up a virtual environment and use python 3.8 as interpreter (used the output from whereis python3.8):
Once click OK we run the terminal and there’s venv shown:
My entire script to specify what my mock user profile data looks like:
KafkaProducer function will be listening on my IP’s 9092 port, taking my data which will be serialized into JSON format, send generated mock data registered_data into my Kafka topic every 3 seconds.
Replace my IP with your IP address if you would like to try it out:
Step 3: Create a cluster and topic on KafkaManager
# Go to my KafkaManager
[my ip]: 9000
Add cluster and topic - my topic's name is the same as in Python script:
No data is coming here to be offset yet:
Go to /etc/logstash/conf.d to create a conf file for this project. A few things to note — add our IP address (like mine here is 192.168.56.149), check the port is 9092, topic name here is the same as what we created in KafkaManager. Also if we want to make sure our data will be interpreted in JSON format, specify it in the filter field:
Step 4: Run the Python script, and Create an index pattern in Kibana
Run the script in PyCharm, data is coming into Kafka topic:
# Go to my kibana
[my ip]: 5601
Create index patterns to be visualized in Kibana:
Until this moment, there are 42 data points uploaded to this index registered_user*.
There’s a field called message but Logstash also interpreted it into separate fields as I expected:
Step 5: Make Kibana refresh data visualization every 3 seconds
Created 3 charts on a dashboard:
On the dashboard we can set an auto-refresh by specific periods as we wish:
Result
Data will keep flowing from Python-Kafka-Logstash-Kibana unless I stop executing code in PyCharm. The flow will look like this:
Even though it’s a mock demo, we can still see the composition of users' profile have changed over time (even though they’re mock data) e.g. when I only had 42 data points, NT$ 85K — NT$ 95K was the only major monthly income range among users, but when more data are adding to ELK, another monthly income range, NT$ 35K — NT$ 45K, also comes up almost on par.
This article is inspired by this blog.
That’s it! Thanks for reading, and Have a wonderful day ahead!