Realtime data streaming with Apache Kafka, Apache Pinot, Apache Druid and Apache Superset

Bruno Cardoso Farias
9 min readJan 18, 2022

--

Index

Introduction
Goal
1. Lets do it starting with Docker Playground
2. Init things inside Docker Labs
2.1 Installing Zookeeper
2.2 Installing Apache Kafka
3. Code injection -> Python3 fake_users_generator.py
3.1 Execute the fake_users_generator in /root
4. Apache Pinot
4.1 Apache Pinot [POST] /schema and [POST] /table via Swagger
4.1.1 [POST] /schema
4.1.2 [POST] /table
4.2 Query the data in Apache Pinot
5. Apache Druid
5.1 Configuring a new data in Apache Druid
5.1.1 Insert the data from the host.. or localhost..
5.1.2 Parse data and send to Apache Druid
5.1.3 Query the data in Apache Druid
6. Visualize with Apache Superset
6.1 Realtime data from Kafka visualized by Apache Superset
6.1.1 Add a database in Apache Superset (Pinot or Druid)
6.1.2 Add a dataset with this database
6.2 Visualize the data with a graph chart.
6.3 Save it and send to Dashboard panel
Conclusion

Introduction

Here I’m on my first post in Medium. I expect this helps anyone who haves to do some proof of concepts fast and flexible with thoose tools!

In all of those tips i’ll be posting here, I’ll use the same tool for infrastruture called Docker Playground so, no incompatibilities. (I guess haha) .

Machine Specs:

Alpine Linux - v3.12 - https://alpinelinux.org/4GB -> Shared XEON processor
Default Packet Manager: APK

Goal

The goal its get data from a realtime source in json semi-structured format, put then into an Apache Kafka topic and then consume it from the bigs: Apache Druid and Apache Pinot. Showing how the things work when we are near realtime. Also, providing a better way to #dataviz, with Apache Superset, providing a complete solution from consumption to visualization.

1. Lets do it starting with Docker Playground:

Access: https://labs.play-with-docker.com | Create an account, then click on “Start” button and the final step is click on “+ ADD NEW INSTANCE”. We are pretty ready.

PWD is a Docker playground which allows users to run Docker commands in a matter of seconds. It gives the experience of having a free Alpine Linux Virtual Machine in browser, where you can build and run Docker containers and even create clusters in Docker Swarm Mode.
Its simple to use…

2. Init things inside Docker Labs:

On the terminal, paste this commands:

# Init
apk update && apk upgrade
apk add nano python3 perl openrc
python3 -m ensurepip
pip install --upgrade pip
pip install faker kafka-python
apk --no-cache add openjdk8 --repository=http://dl-cdn.alpinelinux.org/alpine/edge/community
apk --no-cache add openjdk11 --repository=http://dl-cdn.alpinelinux.org/alpine/edge/community

2.1 Installing Zookeeper:

# Zookeeper
docker run --name zookeeper \
-p 2181:2181 \
-p 2888:2888 \
-p 3888:3888 \
-p 8080:8080 \
--restart always -d zookeeper:3.7.0

2.2 Installing Apache Kafka:

# Apache Kafka
mkdir apache_kafka && cd apache_kafka
wget https://archive.apache.org/dist/kafka/2.7.1/kafka_2.12-2.7.1.tgz
tar -xvzf kafka_2.12-2.7.1.tgz

## Start Kafka
cd kafka_2.12-2.7.1
# Local
./bin/kafka-server-start.sh config/server.properties >> ../kafka.log 2>&1 &
### Create topic
./bin/kafka-topics.sh --create --topic fake-users --bootstrap-server localhost:9092 --partitions 3
#### Describe
./bin/kafka-topics.sh --describe --topic fake-users --bootstrap-server localhost:9092
We are done here in Apache Kafka configuration and initialization.

3. Code injection -> Python3 fake_users_generator.py:

##### Code inject -> users
cd /root
touch fake_users_generator.py
echo '
import os
import random
import time
from time import sleep
import json
from faker import Faker
from kafka import KafkaProducer
from datetime import datetime
print("Iniciando a inserção de dados fakes...", end="\n\n")
print("Inicializando producer do kafka...")
topicName = "fake-users"
producer = KafkaProducer(bootstrap_servers=["localhost:9092"],
value_serializer=lambda x:
json.dumps(x).encode("utf-8"))
faker = Faker(["pt_BR"])
quantidade = 0
while 1==1 :
data = json.loads("{}")
data["id"] = random.getrandbits(32)
data["nome"] = faker.name()
data["sexo"] = random.choice("MF")
data["endereco"] = str((faker.address()).replace("\n", " ").replace("\r", "").strip())
data["cidade"] = str(faker.city())
data["cep"] = str(faker.postcode())
data["uf"] = str(faker.estado_sigla())
data["pais"] = str(faker.current_country())
data["telefone"] = faker.phone_number()
data["email"] = faker.safe_email()
data["foto"] = faker.image_url()
data["nascimento"] = str(faker.date_of_birth())
data["profissao"] = faker.job()
data["created_at"] = str(datetime.now())
data["updated_at"] = None
data["sourceTime"] = round(time.time() * 1000)
quantidade = quantidade + 1
print("[Registro: "+str(quantidade)+"]")
try:
print("---" * 20)
print("ID : " + str(data["id"]))
print("Nome : " + data["nome"])
print("Genero : " + data["sexo"])
print("Endereco : " + data["endereco"])
print("Cidade : " + data["cidade"])
print("Cep : " + data["cep"])
print("UF : " + data["uf"])
print("País : " + data["pais"])
print("Telefone : " + data["telefone"])
print("Email : " + data["email"])
print("Foto : " + data["foto"])
print("Nascimento : " + data["nascimento"])
print("Profissao : " + data["profissao"])
print("Criado em : " + data["created_at"])
print("Atualiz. Em : " + str(None))
print("sourceTime : " + str(data["sourceTime"]))
print("---" * 20, "\n")
producer.send(topicName, value=data)
sleep(1)
except (Exception) as error:
print(error.message)
' > fake_users_generator.py
cd /root/

3.1 Execute the fake_users_generator in /root:

python3 /root/fake_users_generator.pyOR in background (safest mode for continue working...)
python3 /root/fake_users_generator.py >> fake-users.log 2>&1 &
fake-users-generator.py running with success.

4. Apache Pinot

cd /root/
mkdir apache_pinot
cd apache_pinot
wget https://archive.apache.org/dist/incubator/pinot/apache-pinot-incubating-0.7.1/apache-pinot-incubating-0.7.1-bin.tar.gz
tar -xvf apache-pinot-incubating-0.7.1-bin.tar.gz
cd apache-pinot-incubating-0.7.1-bin
# Start Controller
export JAVA_OPTS="-Xms32M -Xmx100M -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+IgnoreUnrecognizedVMOptions -Xloggc:gc-pinot-controller.log"
./bin/pinot-admin.sh StartController -zkAddress localhost:2181 -controllerPort 9000 > ./controller-console.log 2>&1 &
sleep 20
# Start Broker
export JAVA_OPTS="-Xms32M -Xmx100M -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+IgnoreUnrecognizedVMOptions -Xloggc:gc-pinot-broker.log"
./bin/pinot-admin.sh StartBroker -zkAddress localhost:2181 > ./broker-console.log 2>&1 &
sleep 20
# Start Server
export JAVA_OPTS="-Xms32M -Xmx200M -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+IgnoreUnrecognizedVMOptions -Xloggc:gc-pinot-server.log"
./bin/pinot-admin.sh StartServer -zkAddress localhost:2181 > ./server-console.log 2>&1 &
cd /root

4.1 Apache Pinot [POST] /schema and [POST] /table via Swagger:

4.1.1 [POST] /schema -> Try it Out -> Paste code below:

{
"schemaName": "realtime_users",
"metricFieldSpecs": [
{
"name": "id",
"dataType": "INT"
}
],
"dimensionFieldSpecs": [
{
"name": "nome",
"dataType": "STRING"
},
{
"name": "sexo",
"dataType": "STRING"
},
{
"name": "endereco",
"dataType": "STRING"
},
{
"name": "cidade",
"dataType": "STRING"
},
{
"name": "cep",
"dataType": "STRING"
},
{
"name": "uf",
"dataType": "STRING"
},
{
"name": "pais",
"dataType": "STRING"
},
{
"name": "telefone",
"dataType": "STRING"
},
{
"name": "email",
"dataType": "STRING"
},
{
"name": "foto",
"dataType": "STRING"
},
{
"name": "nascimento",
"dataType": "STRING"
},
{
"name": "profissao",
"dataType": "STRING"
},
{
"name": "created_at",
"dataType": "STRING"
},
{
"name": "updated_at",
"dataType": "STRING"
}
],
"dateTimeFieldSpecs": [
{
"name": "sourceTime",
"dataType": "LONG",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}

And then, execute.

Create a new schema in Apache Pinot with Swagger API.

4.1.2 [POST] /table-> Try it Out -> Paste code below:

{
"tableName": "realtime_users",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "sourceTime",
"timeType": "MILLISECONDS",
"schemaName": "realtime_users",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "fake-users",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "localhost:9092",
"realtime.segment.flush.threshold.time": "3600000",
"realtime.segment.flush.threshold.size": "50000",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}
},
"metadata": {
"customConfigs": {}
}
}

And then execute.

Create a new table in Apache Pinot with Swagger API.

4.2 Query the data on Apache Pinot

5. Apache Druid:

# Apache Druid...
cd /root
mkdir apache_druid
cd apache_druid
wget https://dlcdn.apache.org/druid/0.22.0/apache-druid-0.22.0-bin.tar.gz
tar -xvf apache-druid-0.22.0-bin.tar.gz
cd apache-druid-0.22.0/conf
sed -i 's/clientPort=2181/clientPort=12181/g' zk/zoo.cfg
echo "admin.serverPort=18080" >> zk/zoo.cfg
sed -i 's/druid.zk.service.host=localhost/druid.zk.service.host=localhost:12181/g' druid/single-server/nano-quickstart/_common/common.runtime.properties
sed -i 's/druid.plaintextPort=8081/druid.plaintextPort=18081/g' druid/single-server/nano-quickstart/coordinator-overlord/runtime.properties
sed -i 's/druid.plaintextPort=8091/druid.plaintextPort=18091/g' druid/single-server/nano-quickstart/middleManager/runtime.properties
sed -i 's/druid.plaintextPort=8082/druid.plaintextPort=18082/g' druid/single-server/nano-quickstart/broker/runtime.properties
sed -i 's/druid.plaintextPort=8083/druid.plaintextPort=18083/g' druid/single-server/nano-quickstart/historical/runtime.properties
sed -i 's/druid.plaintextPort=8888/druid.plaintextPort=18888/g' druid/single-server/nano-quickstart/router/runtime.properties
## Online with superset:
echo '' >> apache_druid/apache-druid-0.22.0/conf/druid/single-server/nano-quickstart/_common/common.runtime.properties
sed -i 's/"druid-hdfs-storage", "druid-kafka-indexing-service", "druid-datasketches"/"druid-hdfs-storage", "druid-kafka-indexing-service", "druid-datasketches", "druid-basic-security"/g' /root/apache_druid/apache-druid-0.22.0/conf/druid/single-server/nano-quickstart/_common/common.runtime.propertiesecho '#
# Authentication
#
druid.auth.authenticatorChain=["MyBasicMetadataAuthenticator"]
druid.auth.authenticator.MyBasicMetadataAuthenticator.type=basic
druid.auth.authenticator.MyBasicMetadataAuthenticator.initialAdminPassword=superTestDruid
druid.auth.authenticator.MyBasicMetadataAuthenticator.initialInternalClientPassword=superTestDruid
druid.auth.authenticator.MyBasicMetadataAuthenticator.credentialsValidator.type=metadata
druid.auth.authenticator.MyBasicMetadataAuthenticator.skipOnFailure=false
druid.auth.authenticator.MyBasicMetadataAuthenticator.authorizerName=MyBasicMetadataAuthorizer
#Escalator
druid.escalator.type=basic
druid.escalator.internalClientUsername=admin
druid.escalator.internalClientPassword=superTestDruid
druid.escalator.authorizerName=MyBasicMetadataAuthorizer
#Authorizer
druid.auth.authorizers=["MyBasicMetadataAuthorizer"]
druid.auth.authorizer.MyBasicMetadataAuthorizer.type=basic' >> /root/apache_druid/apache-druid-0.22.0/conf/druid/single-server/nano-quickstart/_common/common.runtime.properties
sed -i 's/druid.host=localhost/druid.host='"$(hostname -i)"'/g' /root/apache_druid/apache-druid-0.22.0/conf/druid/single-server/nano-quickstart/_common/common.runtime.propertiescd ..
export DRUID_SKIP_JAVA_CHECK=1
export DRUID_SKIP_PORT_CHECK=1
./bin/start-nano-quickstart >> /root/apache_druid.log 2>&1 &
cd /root/

Access it from port 18888.

5.1 Configuring a new data in Apache Druid:

In this section we will configure Apache Druid for receive Apache Kafka messages.

5.1.1 Insert the data from the host.. or localhost..

Get the data from Kafka to Apache Druid

5.1.2 Parse data and send to Apache Druid

5.1.3 Query the data on Apache Druid:

6. Visualize with Apache Superset:

# Superset:docker network create pinot-demo
docker run \
--network pinot-demo \
--name=superset \
-p 31005:8088 \
-d apachepinot/pinot-superset:24c4b76ed64f-20220114
docker exec -it superset superset fab create-admin \
--username brunow \
--firstname Bruno \
--lastname Cardoso \
--email emergeit@outlook.com.br \
--password superTest
## I REALLY SUGGEST YOU TO RUN LINE PER LINEdocker exec -it superset superset db upgrade
docker exec -it superset superset init
## I REALLY SUGGEST YOU TO RUN LINE PER LINE
Sucessful installation of Apache Superset
You can now open it in Open Port -> 31005.Login: brunow  --  Password: superTest

Now pay attention IN root@(SERVER_IP) for the next steps.
(here is 192.168.0.28)

Apache Druid query string: druid://admin:superTestDruid@192.168.0.28:18888/druid/v2/sql

Apache Pinot query string:

pinot://192.168.0.28:8099/query/sql?controller=http://192.168.0.28:9000

6.1 Realtime data from Kafka visualized by Apache Superset

Now the things is going on. On this section we will display the data from Apache Druid/Apache Pinot on Apache Superset.

6.1.1 Add a database in Apache Superset (Pinot or Druid)

Data -> Databases -> (Top-right [+ Database]) -> Supported Databases -> Apache Pinot OR Apache Druid-> Test Connection. If sounds good, hit Connect

6.1.2 Add a dataset with this database:

Data -> Datasets -> (Top-right [+ Dataset]) -> Choose like the gif above…

6.2 Visualize the data with a graph chart.

Create a simples UF chart, with fake-users data!

6.3 Save it and send to Dashboard panel:

Sending to a dashboard

Conclusion

On this road, we discovered how to configure a POC with some applications:

Apache Kafka,

Apache Zookeper,

Apache Druid,

Apache Pinot and

Apache Superset!

Also, grabbing all those contents to configure a simple viewer for realtime (json) events with semi-structured data, normally used with API’s who integrates between system communications.

All the configurations, software-purposes and samples are pretty described on the application documentation. Some tricks with SED you could find it on Google.

Docs:

If I could answer any question… reply it! Thank you!

In this case we not have to use because the ingestion data is in json format, providing a faster solution for modern apps. By the way, we still have the unstructured-data to try/match and put it into a schema/table.

PS: This environment is completely for test use case or proof of concepts, do not connect it to internet because its lack of security. You can find more about security on application docs.

--

--