Real-Time User Profile Data Pipeline from Kafka to Elasticsearch

Li-Ting Liao
Dev Diaries
Published in
5 min readSep 13, 2020
Giphy

Goal

Nowadays there is a large amount of data generated from numerous sources, such as web services, digital media, sensor log data, etc., and only a small portion of it has been well managed or utilized to create value. This article may be a bit old but still worth a quick read — The Data Made Me Do It.

It has become more challenging than ever before to read a large amount of data, process it, and take action upon that data.

In this article, I’m trying to demonstrate:

  1. Generate mock user profile data in Python
  2. Send mock data to Kafka topic by Kafka Producer
  3. Read data and Upload into Elasticsearch with Logstash
  4. Visualize streaming data with Kibana

Data Streaming Architecture

Real-time user profile data streaming architecture by me :)

My Environments

  • Ubuntu Desktop 18.04 on Oracle VirtualBox 6.1 (Created virtual hard disk 30 GB)

Everything below was installed in the above VM:

  • Python 3.8
  • PyCharm 2020.2.1
  • Docker 19.03.12
  • Docker Compose 1.26.2
  1. Configuration file of Logstash: /etc/logstash/conf.d/
  2. Log file: /var/log/logstash
  • Install Kafka-Zookeeper-KafkaManager Using Docker Compose
  1. Kafka is listening on PORT 9092
  2. Zookeeper is listening on PORT 2181
  3. Kafka Manager is listening on PORT 9000
  • Install Elasticsearch and Kibana 7.4.0 using Docker Compose
  1. Elasticsearch is running on PORT 9092
  2. Kibana is running on PORT 5601
  • Java Runtime Environment, JRE, OpenJDK 11.0.7
  • Logstash

Let’s get started

We can use Docker Compose to run multi-container Docker applications. With Compose, we use a YAML file to configure our services. About how to configure YAML files for each service below, we can refer to the Docker doc.

Step 1: Start the services in the background

# Start kafka 
docker-compose -f kafka-docker-compose.yml up -d
# Start ELK
docker-compose -f elasticsearch-docker-compose.yml up -d
# Check if these dockers are up
docker ps

Step 2: Generate mock user profile data with Python faker library

Set up a Python 3.8 project by creating a virtual environment:

# Find Python3.8 Interpreter Path in Linux/Mac Operating System
whereis python3.8
>>> output: /usr/local/bin/python3.8
# Install pip
sudo apt install python-pip
# Install virtualenv
sudo apt install virtualenv

Create a folder for this project. Open PyCharm, go to the directory where I want to create my project. Use settings to set up a virtual environment and use python 3.8 as interpreter (used the output from whereis python3.8):

Once click OK we run the terminal and there’s venv shown:

My entire script to specify what my mock user profile data looks like:

KafkaProducer function will be listening on my IP’s 9092 port, taking my data which will be serialized into JSON format, send generated mock data registered_data into my Kafka topic every 3 seconds.

Replace my IP with your IP address if you would like to try it out:

Step 3: Create a cluster and topic on KafkaManager

# Go to my KafkaManager
[my ip]: 9000

Add cluster and topic - my topic's name is the same as in Python script:

No data is coming here to be offset yet:

Go to /etc/logstash/conf.d to create a conf file for this project. A few things to note — add our IP address (like mine here is 192.168.56.149), check the port is 9092, topic name here is the same as what we created in KafkaManager. Also if we want to make sure our data will be interpreted in JSON format, specify it in the filter field:

Step 4: Run the Python script, and Create an index pattern in Kibana

Run the script in PyCharm, data is coming into Kafka topic:

It was 0 but now it’s 9 offsets!
# Go to my kibana
[my ip]: 5601

Create index patterns to be visualized in Kibana:

Until this moment, there are 42 data points uploaded to this index registered_user*.

There’s a field called message but Logstash also interpreted it into separate fields as I expected:

Step 5: Make Kibana refresh data visualization every 3 seconds

Created 3 charts on a dashboard:

On the dashboard we can set an auto-refresh by specific periods as we wish:

Here I set for every 3 seconds!

Result

Data will keep flowing from Python-Kafka-Logstash-Kibana unless I stop executing code in PyCharm. The flow will look like this:

Run Python script
Check Kafka topic on KafkaManager
Kibana dashboard

Even though it’s a mock demo, we can still see the composition of users' profile have changed over time (even though they’re mock data) e.g. when I only had 42 data points, NT$ 85K — NT$ 95K was the only major monthly income range among users, but when more data are adding to ELK, another monthly income range, NT$ 35K — NT$ 45K, also comes up almost on par.

This article is inspired by this blog.

That’s it! Thanks for reading, and Have a wonderful day ahead!

--

--

Li-Ting Liao
Dev Diaries

Software developer by day, amateur writer by night. Passionate about both code and creativity, and always seeking new ways to learn and grow.