Real time tweets streaming with Kafka and Logstash

Yudhishthir Singh
Analytics Vidhya
Published in
7 min readDec 2, 2020

I will first explain both in brief and then will show a simple example where I am streaming tweets in real time using Kafka into ElasticSearch using Logstash.

Kafka

I recently started working with Apache Kafka and I can’t get enough of it, It’s a wonderful tool when you want to stream events in real time or have an application that has different components that emit and consume data from one another. Let me explain both these in a little detail.

  1. Why is data streaming important?
    In today’s day and age where huge amounts of data flow everyday, you need technologies to store, read, analyse and stream your data. Data streaming involves the processing of vast amount of real-time data from several sources. Some example are IOT devices emitting data, within mobile or web applications (when you want to analyse user interactions within your app, every tap is an event), information about financial markets etc, you get the idea.
  2. How can an app have different components emitting and consuming data?
    Take this example, you have a mobile app that stores its data in a database and you use that data to show on the app. Sounds pretty simple, one source and one destination. But this does not stay as it is once your app starts to grow, different requirements come in about how and which type of data is to be stored where. Like you would want to have a different source of data for user activity, different one for the data that you actually show on the app, different one that holds your sales data and so on. This list can grow very gradually. When you have many source and target systems, you need to have many connections among those systems which is very hard to manage.

Apache Kafka solves both these problems, In Kafka you can have any data stream that you can think about. It can be your app data, transactions, user interactions etc. Once your data is in Kafka, you can fetch it in any system that you want. Kafka acts as a universal data pipeline across multiple application and services, we have all the data from different systems residing at a single place, making Kafka a true source of data.

Working of Kafka

Kafka works on the publish-subscribe pattern. What it means is that it allows some part of your application to publish data under certain category called topics and other parts can fetch the data just by using the topic name.

The part of your app that is publishing is called a producer and the other one, which is consuming the data is called consumer. Kafka offers Producer API and a Consumer API for these tasks.

Logstash

Logstash is a light-weight, server-side data processing pipeline that can simultaneously ingest data from a wide variety of sources, then parse, filter, transform it and finally forward it to downstream system. This downstream system is usually ElasticSearch (These two, along with Kibana are famously known as the ELK stack), although it doesn’t always have to be ElasticSearch.

There are plenty of input plugins available for logstash that you can use to ingest data from a variety of sources, kafka being one of them.

Hands on example — Live tweets streaming

This little project takes data from twitter using the tweepy package and then uploads data to Kafka. Then kafka acts as the input system for Logstash which directly puts data into ElasticSearch, then finally uses Kibana to visualise the data.

Lets start with installation and setting thing up:

  1. Kafka

Head to this link, https://kafka.apache.org/downloads and download the binary file.

Go inside the downloaded folder , then start the ZooKeeper server followed by the Kafka server by running the following commands:

# Start ZooKeeper Server
bin/zookeeper-server-start.sh config/zookeeper.properties

# Start Kafka Server
bin/kafka-server-start.sh config/server.properties

Next, you will have to create a topic to which your system will be publishing data.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic trump

Here, I am creating a topic with name “trump” as we will be tracking tweets those contain the keyword trump in them.

You can check the list of topics like this:

bin/kafka-topics.sh --list --zookeeper localhost:2181

2 . ElasticSearch, Logstash, Kibana:

Download files according to your system from this link https://www.elastic.co/downloads/

Once downloaded, Run elasticsearchfolder/bin/elasticsearch (or elasticsearchfolder\bin\elasticsearch.bat on Windows) and kibanafolder/bin/kibana (or kibanafolder\bin\kibana.bat on Windows) to trigger elasticsearch and kibana respectively.

For logstash, you will need a config file before you run it. In logstashfolder/bin/ you will have to create a file with .conf extension (for eg. yourconfig.conf) and paste the following code inside it.

input {
kafka {
topics => "trump" #your kafka should have this topic at this point.
}
}
output {
elasticsearch { hosts => ["localhost:9200"] index => "practice_index"}
stdout { codec => "rubydebug" }
}

(Remember this value of index, practice_index in this case, you will be needing it later)

Run bin/logstash -f yourconfig.conf.

Setup is done, by this time you have ELK up and running with one topic ready in Kafka. All you need is a python script which fetches tweets and puts them in kafka.

The python script

For the script to work, you will need some keys and a bearer token from twitter so that you are able to fetch data through their API. Head over to https://developer.twitter.com/en/ for generating those.

For the following script to run, you will have to install some packages. Run the following command:

pip install python-dotenv tweety kafka-python

Next, you will need a .env file which contains your twitter access token and other keys. This file should be in the same folder as this file and the contents should look as follow:

TWITTER_API_KEY = your_twitter_api_keyTWITTER_API_SECRET = your_twitter_api_secretTWITTER_BEARER_TOKEN = your_twitter_bearer_tokenTWITTER_ACCESS_TOKEN = your_twitter_access_tokenTWITTER_TOKEN_SECRET = your_twitter_token_secret

Coming to the actual script now, the code is easy to follow and self explanatory, I have divided the script into some sections. Section 1 tells from where to pick the .env file and Section 2 grabs the values from your .env. Section 3 does your authentication and Section 4 contains a simple class that inherits the StreamListener class provided by the tweepy package (To read more, head over to the streaming documentation here. Finally Section 5 puts everything together: initialises a kafka producer, initialises an object of class that we created, initialises a stream object and finally starts the stream that tracks the keyword “trump”.

import os
from dotenv import load_dotenv
from pathlib import Path
import tweepy as tw
import json
import kafka
# Section 1
env_path = Path(“.”) / “.env”
load_dotenv(dotenv_path=env_path)
# Section 2
consumer_key = os.getenv(“TWITTER_API_KEY”)
consumer_secret = os.getenv(“TWITTER_API_SECRET”)
access_token = os.getenv(“TWITTER_ACCESS_TOKEN”)
access_token_secret = os.getenv(“TWITTER_TOKEN_SECRET”)
# Section 3
auth = tw.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tw.API(auth, wait_on_rate_limit=True)
# Section 4
class MyStreamListener(tw.StreamListener):
def on_status(self, data):
tweet_dict = {
“text”: data.text,
“user_name”: data.user.name,
“screen_name”: data.user.screen_name,
“id_string”: data.user.id_str,
“location”: data.user.location,
}
print(data.text)
#THE FOLLOWING LINE SENDS DATA IN KAFKA (Under topic "trump").
producer.send(“trump”, json.dumps(tweet_dict).encode(“utf-8”))
return True
def on_error(self, status_code):
if status_code == 420:
return False
# Section 5
producer = kafka.KafkaProducer(bootstrap_servers=”localhost:9092")
myStreamListener = MyStreamListener()
myStream = tw.Stream(auth=api.auth, listener=myStreamListener)
myStream.filter(track=[“trump”], is_async=True)

Once you run the script, you should start seeing some logs in the terminal that prints tweet message, this means that the script has executed successfully and you are able to stream the data using tweepy.

And the fact that it ran successfully, means that the data is indeed going into kafka, to check that, you should look at the terminal that is running logstash. It should also spit out continuous logs showing that logstash is getting input and is sending it to elasticsearch successfully.

Now its time to visualise this in Kibana, head over to localhost:5601 and you should see a beautiful Kibana dashboard. You will have to create an index pattern there which will be under index management, this is where you will need the index that you gave in the logstash config file. Once created go to discover section under kibana and you should see stats about your data, like how many hits came and at what time. This is how it looked like when I left it for some time.

It shows that there were 177,153 tweets made about Trump in one hour.

If you found this article insightful, do hit the clap button and for any queries/comments feel free to post a comment. I’d love to have a chat. :)

--

--