Analyzing Kinesis Data Streams of Tweets Using Kinesis Data Analytics

Amany Abdelhalim
Nerd For Tech
Published in
6 min readMay 21, 2020

In this article, I am illustrating how to collect tweets into a kinesis data stream and then analyze the tweets using kinesis data analytics.

The steps that I followed:

  1. Create a kinesis data stream.

I created a kinesis data stream which I called “twitter” with one shard.

2. Prepare the script that will collect the tweets and write them into the kinesis data stream.

I prepared the following python script, where I select 11 attributes from each tweet and make sure to write them into the “twitter” kinesis data stream that I created in the first step. I ran the script from my local machine, but you can run the script on an EC2 instance and you can even run the script using nohup to ensure that the script runs in the background even after disconnecting the ssh session.

Python Script

In the above script I am hard coding my twitter credentials which is not recommended. There are other safer options available, such as using environment variables or passing arguments to your script.

3. Create an application in kinesis data analytics that will be used to analyze the data in the kinesis data stream.

I created an application in kinesis data analytics and I called it “twitter_analysis”. I also chose to process the data using SQL which is the default option, then I clicked create application.

After the application was successfully created, I clicked on connect streaming data in order to choose the source of the data stream. The source of the data stream can only be one streaming data source.

There is two options, where you can choose an existing source that you have created before or you can configure a new stream.

The default is “choose a source”, I selected the kinesis data stream that I created before which is the “twitter” data stream.

I hit the “Discover schema” button.

The schema was successfully discovered as shown below.

The name of the “twitter” kinesis data stream that I have to use in the SQL editor is shown below which is “SOURCE_SQL_STREAM_001”.

I clicked on the “Go to SQL editor” button.

I was prompted with a message asking me to start running the kinesis data analytics application “twitter_analysis” that I created. I chose the option “Yes, start application”.

The following shows a sample of the streaming data coming from the source kinesis data stream “twitter”, which is referred to as the “SOURCE_SQL_STREAM_001” stream.

Twitter Data Stream

The first tab “Save and run SQL” will allow you to write SQL statements and run the code on the streaming source data.

SQL Editor

The follwoing window opens when you select the tab “Add SQL from templates” which will show you some ready made templates that allow you to perform some analysis on the stream data such as anomaly detection.

SQL Templates

Below, I will show three examples of SQL statements that I wrote in the SQL Editor and I hit the tab “save and run SQL” to display the results. The following examples is just for illustrating how to write SQL in the SQL Editor and show the results, much more useful queries can be performed on the streaming data after cleaning it.

Example1:

In the following example, I am only selecting the tweets column.

As you can see, first I created a stream that will be holding the output that I desire, I called the stream “TEMP_STREAM”.

Then I prepared a PUMP by the keywords “CREATE OR REPLACE PUMP” to insert into the output stream “TEMP_STREAM” the values of the tweet coulmn selected from the source stream “SOURCE_SQL_STREAM_001”.

TEMP_STREAM

The following shows the output of the “TEMP_STREAM”.

Example2:

In the following example, I am only selecting the tweets that have the word trump present.

As you can see, first I created a stream that will be holding the output that I desire, I called the stream “TEMP_STREAM”.

Then I prepared a PUMP by the keywords “CREATE OR REPLACE PUMP” to insert into the output stream “TEMP_STREAM” the values of the tweet coulmn selected from the source stream “SOURCE_SQL_STREAM_001” that have the word “trump” present.

TEMP_STREAM

The following shows the output of the “TEMP_STREAM”.

Example3:

In the following example, I am only selecting the tweets that have a have a negative sentiment.

As you can see, first I created a stream that will be holding the output that I desire, I called the stream “TEMP_STREAM”.

Then I prepared a PUMP by the keywords “CREATE OR REPLACE PUMP” to insert into the output stream “TEMP_STREAM” the values of the tweet coulmn selected from the source stream “SOURCE_SQL_STREAM_001” that have a negative sentiment.

TEMP_STREAM

The following shows the output of the “TEMP_STREAM” which is updated every 2 to 10 seconds if new results are available.

The output stream gets updated with new results every 2–10 seconds. So as you can see new tweets were added as time goes by and tweets with negative sentiment gets added to the source stream.

Note that the in-application streams such as the “TEMP_STREAM” above can be connected to a Kinesis stream, or to a Firehose delivery stream, to continuously deliver SQL results to AWS destinations.

The limit of destinations is three destinations for each application. You will be allowed either to select an existing destination or create a new one.

You can also choose the output format whether Json or CSV.

As a Note if you choose your destination to be kinesis firehose, you can write the results in redshift and display the results on Superset dashboard.

--

--

Amany Abdelhalim
Nerd For Tech

PhD. in Computer Engineering | Research Associate | Computer Science Instructor | love Machine Learning & Big Data.