Analyzing Twitter in real time with Kinesis, Lambda, Comprehend and ElasticSearch

Fernando Pereiro
Fernando Pereiro
Published in
7 min readNov 5, 2018

I have to say that I’m in love with machine learning, I honestly think that is a very big thing for business and an opportunity to develop all the great apps and features that we can imagine!

Machine learning is not just about making evil robots, there is a big number of simple and useful services that we can use. Today we are going to use AWS Big Data and Machine Learning services… some of my favorite ones!

For example, with sentiment analyze services we can know when the best economists think is the best moment to invest based on the sentiment hidden in their social media comments. Of course that you can read all that information by your own, but with machine learning services you can have it automatically based on thousands of accounts and comments.

This time we are going to set a system to evaluate in real time the sentiment of all Tweets made with a specific Twitter hashtag. You can use it to evaluate your next smartphone based on other people opinion or you can know what people say about your company and products.

Let’s take a look to the architecture and pieces to use:

We are going to use a basic Python script to obtain real time Tweets thanks to the Twitter API, from the script we’ll put the Tweets directly in a Kinesis Firehose delivery stream where we have a transformation Lambda function, in that moment we are going to obtain the sentiment information using Amazon Comprehend and obtain a clean Twitter comment, finally the Tweet and its sentiment data will be stored in an Elasticsearch domain where we can see real time information using custom charts.

Ready?! Lets see and configure each piece:

Twitter API

First we’ll need credentials to access the Twitter API so if you don’t have them this is where you can start: https://apps.twitter.com/

Amazon Elasticsearch

Amazon Elasticsearch Service, is a fully managed service that makes it easy for you to deploy, secure, operate, and scale Elasticsearch to search, analyze, and visualize data in real-time.

Lets create a new domain:

First lets set a domain name and an Elasticsearch version:

This time we’ll choose a small instance with not too much storage, don’t need more for this demo:

Just for accelerating things and make it easier we are setting a public access:

Remember!! don’t try this at home!

And we are done! After a few minutes the domain will be created and we will have an Elasticsearch endpoint with a Kibana URL for access the information stored.

Lambda Function

Next it’s time to create the Lambda function responsible of data transformation in the delivery stream and add the sentiment information using Amazon Comprehend.

Here you can find the function (remember the star :) ): https://github.com/fepereiro/FirehoseWithComprehend

Before continue to the next step lets take a closer look to the Lambda function and whats happening there. First, we get the Tweet’s text from each record decoding the information sent from the Python script (we are just sending the text and the Firehose stream is responsible of encoding it inside the parameter ‘data’):

dict_data = base64.b64decode(record['data']).decode('utf-8').strip()        print(dict_data)

Then we have to invoke the Amazon Comprehend Service using the ‘detect_sentiment’ method. After that we’ll get the sentiment type (POSITIVE, NEGATIVE, NEUTRAL or MIXED) and the score based in the positive and negative confidence:

comprehend = boto3.client(service_name='comprehend', region_name='eu-west-1')
sentiment_all = comprehend.detect_sentiment(Text=dict_data, LanguageCode='en')
sentiment = sentiment_all['Sentiment']
print(sentiment)
positive = sentiment_all['SentimentScore']['Positive']
negative = sentiment_all['SentimentScore']['Negative']
total = positive - negative
print(total)

I wrote that score because most of the Tweets have a NEUTRAL sentiment, however they have an amount of positive and negative sentiment; with this score even if they are mostly a neutral Tweet we can identify that little amount of positive or negative inside them.

Finally we’ll create the data structure for the Tweet and sentiment information encoded inside the required data structure for Kinesis Firehose output:

data_record = {            
'message': dict_data,
'sentiment': sentiment,
'total': total
}
print(data_record)
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(data_record).encode('utf-8')).decode('utf-8')
}
print(output_record)

Note: Remember to give the function an execution role with permissions to access Amazon Comprehend and CloudWatch logs.

Amazon Kinesis Firehose

Amazon Kinesis Data Firehose is the easiest way to reliably load streaming data into data stores and analytics tools. It can capture, transform, and load streaming data into Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, and Splunk, enabling near real-time analytics with existing business intelligence tools and dashboards you’re already using today.

The place where the previous steps come together is the Delivery Stream:

Write a name for the new stream:

And set Direct PUT as the source because we’ll use a custom script for it:

Now its time to choose the previously created Lambda function enabling the record transformation inside the stream:

And set Amazon Elasticsearch Service as destination giving the information of the domain previously created:

Just in case, let’s set a S3 bucket where failed records can be stored for future analyze:

And now we are ready to use all this recently created services with real world information coming from Twitter!

Python Script

You can find the script in the same link that the Lambda function: https://github.com/fepereiro/FirehoseWithComprehend

The script will connect with the Twitter stream API and filter the information using the indicated hashtag:

stream = Stream(auth, l)
stream.filter(track=['thewalkingdead'])

And then will put the Tweet’s text in the Kinesis Firehose delivery stream:

client.put_record(DeliveryStreamName=DeliveryStreamName,Record={'Data': json.loads(data)["text"]})

Remember: If you want to put more Tweet’s information (author, date, etc.) inside the delivery stream you’ll need to read it inside the Lambda function.

Just save the Python script as a ‘.py’ file in a local folder and if you execute it from your terminal window with something like this:

python twitter-streaming.py

You’ll see the magic taking place! First the Tweets will appear in that terminal window, then the Lambda function’s Cloudwatch logs will appear with all the information and finally you can visualize all this from the Kibana URL.

Kibana

When you have some data stored in Elasticsearch you can create an Index Pattern inside Kibana, just introduce the index name you already gived in the Kinesis Firehose delivery stream configuration and there it is! Your index pattern matches 1 index!:

Just continue and create it so you can go to ‘Visualize’ to create a new chart or visualization:

I like pies so I chose a pie chart to visualize the number of Tweets of each sentiment type:

And there it is! the number of Tweets for each sentiment type, there you see the real time information about anything you want.

Homework 1

As you can see, is not very useful to know that almost all the Tweets have a neutral sentiment, but as I said before even if is a neutral Tweet there is an amount of positive and negative too. Thats why I created the total score inside the Lambda function.

Your first homework is to set a chart where you can see the SUM of all the Tweets sentiment score, if the result is > 0 the general sentiment will be positive but if the result < 0 the general sentiment will be negative.

Homework 2

Maybe you already know this but Twitter is a global platform, that means that you can find Tweets in every language.

Your second homework it’s identify each Tweet’s language and even translate it as needed for using Amazon Comprehend in a better way. You can make this by using Amazon Translate Service inside de Lambda function.

I hope you liked this article and you find it useful for some project idea that you could have in mind. Don’t forget to follow me here and in Twitter! @Fhercho84

--

--

Fernando Pereiro
Fernando Pereiro

Highly experienced DevOps and Cloud Solutions Architect.