Unraveling Twitter’s Sentiments: Big Data Analysis with Netflix Tweets using AWS & PySpark

Rochita Sundar
Data And Beyond
Published in
5 min readJun 12, 2023

Continuing with the Web Scraping Series, this project is a step-by-step guide to extract sentiments from a massive corpus of Twitter data, with a focus on content related to the popular streaming platform Netflix.

Software tools used in the project.

The key steps involved in the project are:

  1. Launching an AWS EC2 instance, creating a scalable computing environment.
  2. Gathering over 100K+ live tweets & user information using Twitter Developer API focused on keyword ‘Netflix’.
  3. Reliably streaming tweets using AWS Kinesis Firehose into AWS S3 storage buckets.
  4. Launching a Databricks Notebook to create a distributed computing environment to develop a sentiment analysis machine learning model using PySpark. The predictions are written back to S3 storage buckets.
  5. Leveraging AWS Athena to execute SQL queries on the stored data to perform ad-hoc analysis and generate insights.
  6. Building an interactive dashboard using AWS QuickSight to provide a comprehensive view on insights and model performance.

Tweepy

Tweepy is a python library that simplifies interacting with Twitter Developer API. It can be used to collect tweets based on specific search criteria, retrieve user information and stream real time tweets.

The code below uses Tweepy to extract tweets & user information related to keyword “Netflix” or “netflix”.

from tweepy import Stream
from tweepy import OAuthHandler
import json
import boto3
import time

class TweetStreamListener(Stream):
# on success
def on_data(self, data):
tweet = json.loads(data)
try:
if 'text' in tweet.keys():
message_lst = [str(tweet['id']),
str(tweet['user']['name']),
str(tweet['user']['screen_name']),
str(tweet['user']['followers_count']),
str(tweet['user']['statuses_count']),
str(tweet['user']['location']),
tweet['text'].replace('\n',' ').replace('\r',' '),
str(tweet['entities']['hashtags']),
str(tweet['geo']),
str(tweet['created_at']),
str(tweet['reply_count']),
str(tweet['retweet_count']),
str(tweet['favorite_count']),'\n']
message = '\t'.join(message_lst)
print(message)
firehose_client.put_record(
DeliveryStreamName=delivery_stream_name,
Record={'Data': message})
except (AttributeError, Exception) as e:
print(e)
return True

def on_error(self, status):
print (status)

if __name__ == '__main__':
# create kinesis client connection
session = boto3.Session()
firehose_client = session.client('firehose', region_name='<region-name>')

# Set kinesis data stream name
delivery_stream_name = '<your-delivery-stream-name>'

# Set your twitter developer API credentials
consumer_key = '<your-consumer-key>'
consumer_secret = '<your-secret-consumer-key>'
access_token = '<your-access-token>'
access_token_secret = '<your-secret-access-token>'

while True:
try:
print('Twitter streaming...')

# create instance of the tweet stream listener
stream= TweetStreamListener(consumer_key, consumer_secret,access_token, access_token_secret)

# search twitter for the keyword
stream.filter(track=['Netflix','netflix'], languages=['en'], stall_warnings=True)
except Exception as e:
print(e)
print('Disconnected...')
time.sleep(5)
continue

TextBlob

TextBlob is a Python library that offers methods to generate a sentiment polarity score for text data.

The following code uses TextBlob to generate the target variable for training a sentiment analysis model.

from textblob import TextBlob

def get_sentiment(text):
blob = TextBlob(text)
sentiment = blob.sentiment.polarity
if sentiment > 0:
return 'positive'
elif sentiment < 0:
return 'negative'
else:
return 'neutral'

sentiment_label_udf = udf(lambda x: get_sentiment(x))

The code below is written in PySpark in a Databricks notebook and involves constructing a pipeline, incorporating several components from PySpark’s machine learning library including NGram, Tokenizer, StopWordsRemover, IDF, VectorAssembler, LabelEncoder and LogisticRegression model.

  • NGram is used to extract n contiguous words to capture the context and relationship between words.
  • Tokenizer splits input text into individual words.
  • StopWordsRemover removes common words from text such as “is”, “the”, “and” etc.
  • IDF (Inverse Document Frequency) assigns weights to words that appear less frequently across the corpus to capture distinguishing features of the sentiment.
  • VectorAssembler combines the output of multiple feature extraction processes into a single feature vector.
  • StringIndexer encodes string labels into numerical indices. It assigns a unique index to each distinct sentiment label, enabling the model to work with categorical data.
  • LogisticRegression is a classification algorithm that can be used for sentiment analysis. It learns the relationship between the input features and sentiment labels and predicts the sentiment polarity based on the learned model parameters.
# Read dataset (csv files in S3 buckets) with user defined schema
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, FloatType, StringType, DateType
cdrSchema = StructType([
StructField("tweet_id", LongType(), True),
StructField("user_name", StringType(), True),
StructField("user_screen_name", StringType(), True),
StructField("user_followers_count", IntegerType(), True),
StructField("user_statuses_count", IntegerType(), True),
StructField("user_location", StringType(), True),
StructField("tweet_text", StringType(), True),
StructField("tweet_hashtags", StringType(), True),
StructField("tweet_geo", StringType(), True),
StructField("tweet_created_at", StringType(), True),
StructField("tweet_reply_count", IntegerType(), True),
StructField("tweet_retweet_count", IntegerType(), True),
StructField("tweet_favorite_count", IntegerType(), True)])

twitter = (spark.read
.option("header", "false")
.option("delimiter", "\t")
.schema(cdrSchema)
.csv('<path-to-dataset>'))

# Target sentiment labeling using TextBlob
twitter = twitter.select(col("*"), sentiment_label_udf("tweet_text").alias("tweet_sentiment_label"))

...
from pyspark.ml.feature import NGram, VectorAssembler, StopWordsRemover, IDF, Tokenizer, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Use 90% cases for training, 10% cases for testing
train, test = twitter.randomSplit([0.9, 0.1], seed=20200819)

# Create transformers for the ML pipeline
tokenizer = Tokenizer(inputCol="tweet_text", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="1gram_idf", minDocFreq=5)
assembler = VectorAssembler(inputCols=["1gram_idf"], outputCol="features")
label_encoder= StringIndexer(inputCol = "tweet_sentiment_label", outputCol = "label")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, assembler, label_encoder, lr])

pipeline_model = pipeline.fit(train)
predictions = pipeline_model.transform(test)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(test.count())
roc_auc = evaluator.evaluate(predictions)

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))
Accuracy Score: 0.9357
ROC-AUC: 0.9359

The baseline model is able to achieve over 93% ROC-AUC score on test data.

Afterwards, one can utilise AWS Athena to construct personalised SQL queries, which can then be integrated with AWS QuickSight to create an engaging and interactive dashboard.

Below is an example query that aims to identify Twitter users with the most significant number of followers and categorises them based on predicted sentiment of their tweets.

/*
Top 5 twitter user names by follower count & predicted sentiments
*/

select (case when prediction = 0 then 'positive'
when prediction = 1 then 'neutral'
when prediction = 2 then 'negative'
end) as prediction_cat,
user_name,
avg(user_followers_count) -- since number of followers may change with time
from twitter_netflix
group by prediction_cat, user_name
order by 3 desc
limit 5; -- replace to get top 'x'
Extract from the QuickSight dashboard that shows top 50 usernames with the largest follower base, with an option to filter further based on the predicted tweet sentiments (positive, neutral or negative).

If the provided code snippets were helpful and you would like to explore further, you can find the complete code for the project here.

This project aims to uncover valuable business insights by transforming vast amounts of social media data using data science methods & cloud technologies. By monitoring and analyzing sentiments on social media, companies can gain valuable insights to adapt their strategies, enhance investor relations, and improve customer satisfaction.

Thank you for taking the time to read this post! I hope you found it valuable and insightful.

I would like to thank WeCloudData for assistance with some of the project components as well as help navigating the AWS environment.

Disclaimer: The author acknowledges utilizing ChatGPT’s custom prompts for support in composing the article, but all the code presented within the blog is the author’s original work.

--

--

Rochita Sundar
Data And Beyond

Passionate Data Scientist. Constantly learning about the evolving field of data with a knack for sharing insights. In my spare time, I love to travel and read.