Making Sense of Stream Data

Ankush Khanna
Analytics Vidhya
Published in
6 min readNov 27, 2019

Have you ever wondered while working with Kafka, how you could showcase your project to stakeholders, highlight important processes and results in realtime without writing any frontend tools?

In this blog post, I am going to give a step-by-step guide on how can you get Kafka running locally, offload data to a data storage and leverage tools like Metabase to get insights into your data with graphs and dashboards.

In case you have an AWS account you can also leverage services there. But in this post I am going to show you how to run everything on your local machine.

Apache Kafka

Kafka is a distributed, reliable and scalable messaging system designed for high throughput. If you are new to Kafka, I recommend you to go through the introduction to kafka. Kafka enables multiple clients to connect simultaneously to produce or consume data at scale.

Running Kafka locally

You can use the confluent platform for running Kafka locally. Once you download the tar file you can unzip it and follow the following commands:

> cd confluent-5.2.1/bin
> ./confluent start
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html

Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]

Once all the services is running, you will have a broker running at port 9092.

Application

For this demo I will create a dummy pipeline, which reads historical Google timeline data from KML files and output it to a Kafka topic called google_history_trip. You can download your own Google timeline data here. The data I am using is from my personal timeline while travelling to Croatia. I dump this data to SQL by using Kafka connect and use Metabase to query the data stored on the SQL server.

Kafka Application

In this dummy Kafka application, code you can find here, I am reading data from Google KML files which I downloaded from my google timeline. I parse the files and pushed the data to Kafka using the Avro schema below.

{
"type": "record",
"name":"GoogleHistoryValue",
"namespace": "example.googlehistory",
"fields":[
{
"name":"user_id",
"type":"string"
},
{
"name": "start_time",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "end_time",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name":"name",
"type":"string"
},
{
"name":"latitude",
"type":"double"
},
{
"name":"longitude",
"type":"double"
}
]
}

SQL Server

Creating Database

Below I’m starting a MySQL server locally and creating a database kafka_offload, which I will be using to offload data to SQL.

> mysql.server start
Starting MySQL
.. SUCCESS!~ mysql -u root -pEnter password: *********
Welcome to the MySQL monitor. Commands end with ; or \g.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> CREATE DATABASE kafka_offload;
Query OK, 1 row affected (0.00 sec)
mysql> use kafka_offload;
Database changed
mysql> show tables;
Empty set (0.00 sec)
mysql>

Kafka Connect

Kafka connect allows us to pull or push data from or to Kafka. There are plenty of Kafka connectors available, which you can check at https://www.confluent.io/hub/.

I will be using io.confluent.connect.jdbc.JdbcSinkConnector for offloading the data from the Kafka topic to the MySQL server. We can easily use it by posting the following query:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '
{
"name": "google_history_trip_connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "google_history_trip",
"connection.url": "jdbc:mysql://localhost/kafka_offload",
"auto.create": "true",
"connection.user": "root",
"connection.password": "*******",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"key.converter.schema.registry.url": "http://localhost:8081",
"key.converter.schemas.enable":"true",
"value.converter.schemas.enable":"true",
"internal.key.converter":"org.apache.kafka.connect.json.JsonConverte r",
"internal.value.converter":"org.apache.kafka.connect.json.JsonConverter",
"internal.key.converter.schemas.enable":"false",
"internal.value.converter.schemas.enable":"false"
}
}'

Don’t forget to change the connection.password field.

This should start a Kafka connect to offload data to the database. You can check the status of your connector using: http://localhost:8083/connectors/google_history_topic/status

Metabase

Setup

  • You can start Metabase via docker or you can download Metabase locally from here.
> docker run -d -p 3000:3000 — name metabase metabase/metabase
  • Once Metabase starts you should see the following screen at http://localhost:3000
  • After completing the registration you can connect to the database
  • You should now see the welcome screen as below
  • You can check your data by navigating to Kafka Data your topic name

Query Data

You can use the notebook feature to answer some questions

  • Let’s check out the total count
  • Let’s check the count of rows where name or type of activity is Walking
  • Let’s group by latitude longitude and plot it on a map(choose no binning)
  • Let’s check the same for activity type Driving, showing that I drove from Split to Dubrovnik

SQL Query

You can also use SQL query in Metabase

  • Let’s calculate how much time I spend at each location
SELECT latitude, longitude, name, TIMESTAMPDIFF(minute, start_time, end_time) as duration FROM google_history_trip
  • Let’s calculate where I spent more than an hour
SELECT latitude, longitude, name, TIMESTAMPDIFF(minute, start_time, end_time)  as duration
FROM google_history_trip
WHERE (start_time >= timestamp('2019-09-21T00:00:00.000Z')
AND start_time < timestamp('2019-09-22T00:00:00.000Z'))
HAVING duration > 60

You can also use the dashboard feature of Metabase with variables.

Using the fast offloading power of Kafka connect, we can interact with data produced to the topic in almost realtime.

Conclusion

We have seen a quick way to analyse data in our Kafka topics by exporting the data to a MySQL running locally and using Metabase to visualise the data. This allows us to verify our approach in a nice, interactive way which can be shared with the business or product owners.

The solution currently uses MySQL which might not be ideal for large datasets. Therefore, this fast approach is not ideal for production. In my next blog post, I would explain how we can offload large data sets from Kafka to AWS S3 and use AWS Glue and QuickSight to make interactive queries with our data.

--

--