Twitter sentiment analysis using Zookeeper, Kafka and PySpark live-streaming on Windows 10 in 2022

Hector Rodriguez Dominguez
MCD-UNISON
11 min readMay 8, 2022

--

Let’s sum up all the tutorials out there on the wild and make our lives easier T-T

Introduction

On this entry, I’ll guide you (at least I’ll try) through this hell of an experience of trying to install and run everything on Windows, so call me “Virgil”.

Q: Why is it a hell of an experience?

A: Most of the tutorials out there are meant of Linux users or its information is outdated.

At the end of this tutorial we aim to have a development environment with Zookeeper, Kafka and PySpark with which we can stream live tweets using the Twitter API and classify them as “positive”, “neutral” or “negative”.

Technology used

Twitter API v2: If you got your credentials (consumer_key, consumer_secret, access_token, access_secret) way to go champ! If not, you can go through the slow and tedious process of requesting them in here.

Python 3.9:

What is it, and what is it used for?

Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together.

You can download it here

Java JRE:

What is it, and what is it used for?

Java Runtime Environment (JRE) is a piece of software that is designed to run other software. As the runtime environment for Java, the JRE contains the Java class libraries, the Java class loader, and the Java Virtual Machine

You can download it here

Spark 3.2.1 with Hadoop 3.2.1:

What is it, and what is it used for?

Apache Spark is basically a computational engine that works with huge sets of data by processing them in parallel and batch systems. Spark is written in Scala, and PySpark was released to support the collaboration of Spark and Python.

You can download Spark 3.2.1 with here

Apache Kafka 3.1.0:

What is it, and what is it used for?

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

You can download Kafka 3.1.0 with Scala 2.13 here

Apache Zookeeper 3.7.0 (stable version):

What is it, and what is it used for?

ZooKeeper is a high-performance coordination service for distributed applications. It exposes common services — such as naming, configuration management, synchronization, and group services — in a simple interface so you don’t have to write them from scratch. You can use it off-the-shelf to implement consensus, group management, leader election, and presence protocols. And you can build on it for your own, specific needs.

You can download Zookeeper 3.7.0 here

Tools configurations on Windows 10

After downloading the files mentioned above, create three folders on C: (ZOOKEEPER, KAFKA, SPARK), each for a different tool.

Extract the contents of each compressed file to its corresponding folder.

  • JRE Installation

1. Start the JRE installer and press install, by default the installation folder will be “C:\Program Files\Java\jre1.8.0_321”

2. Now let's open Windows environment variables menu by opening Control Panel → System → Advanced system settings → Environment Variables.

3. Hit the New User Variable button in the User variables section, then type JAVA_HOME in “Variable name” and give your JRE path in the “Variable value”.

4. Search for a Path variable in the “System Variable” section in the “Environment Variables” and press edit.

5. Press “New” and write the path to the bin folder → “%JAVA_HOME%\bin” and press ok.

6. Now to verify the correct installation, open a terminal and type “java –version” the following message should appear.

  • Python 3.9 Installation
  1. Start the Python installer and press install, by default the installation folder will be “C:\Users\USER_NAME\AppData\Local\Programs\Python\Python39” -Change the USER_NAME, according to your personal folder-
  2. Open the environment variables window, select the “Path” variable and hit the Edit button in the System Variables section.

3. Now add the new environment variables

→ C:\Users\USER_NAME\AppData\Local\Programs\Python\Python39

→ C:\Users\USER_NAME\AppData\Local\Programs\Python\Python39\Scripts

4. To verify if the installation is correct, open a terminal and type “python — version”

  • ZooKeeper Installation
  1. First, we need to open the folder containing our zookeeper files and head to the “conf” folder at “C:\ZOOKEEPER\conf”, there we’ll find a file named “zoo_sample.cfg”. Rename the file to “zoo.cfg” and open the file with a text editor.
  2. Find and change “dataDir=/tmp/zookeeper” → “dataDir=C:/ZOOKEEPER/data”

3. Now we have to add an Environment Variable named “ZOOKEEPER_HOME” aimed to “C:\ZOOKEEPER_HOME”, as we did before with JRE.

4. Search for a Path variable in the “System Variable” section in the “Environment Variables” and press edit and add “%ZOOKEEPER_HOME%\bin”.

5. To verify the correct installation, open a terminal and type “zkserver” the following message should appear.

NOTE: By default zookeeper will run in port 2181, you can change this option on the zoo.cfg file

  • Kafka Installation
  1. First, we need to open the folder containing our Kafka files and head to the “config” folder at “C:\KAFKA\config”, there we’ll find a file named “server.properties”. Change the line “log.dirs = /tmp/kafka-logs” to “log.dir=C:/KAFKA/kafka-logs” and save it.

2. To validate Kafka, correct installation, open a terminal and navigat to the KAFKA folder at C:. Then type “.\bin\windows\kafka-server-start.bat .\config\server.properties”, now the terminal should show a list of text from Kafka processes.

3. The next step is to create a topic for Kafka streaming (In the next segment I’ll explain what is a topic, consumer and producer). We can do this by opening a new terminal and heading to “C:\KAFKA” and typing “.\bin\windows\kafka-topics.bat — create — bootstrap-server localhost:9092 — replication-factor 1 — partitions 1 — topic TW_ANALYSIS”, where TW_ANALYSIS is the name of the topic we are going to use, you can choose any name as long as you remember to change it on the kafkaConsumer.py, kafkaProducer.py and main.py files.

NOTE: By default Kafka runs in port 9092, you can change it in the “server.properties” file, and also modify the Zookeeper default connection port with Kafka.

  • Spark Installation
  1. Once we have moved our files to the “C:/SPARK” folder, we need to download the winutils.exe for the Hadoop 3.2.1 version. You can find the winutils versions here and our specific version in here.
  2. Create a folder named “hadoop\bin” inside our “C:\SPARK” folder, so the final route should be “C:\SPARK\hadoop\bin”, and extract the contents of the winutils compressed file we just downloaded.

3. Now we should create a new SPARK_HOME and HADOOP_HOME environment variables in the User variables section. They should be aiming as follows:

  • HADOOP_HOME → C:\SPARK\hadoop
  • SPARK_HOME → C:\SPARK

4. Edit the path variable to add two new variables:

  • %SPARK_HOME%\bin
  • %HADOOP_HOME%\bin

5. At this point, I recommend you to restart your computer so that the environment variables take effect. Now, to verify your spark installation, open a new terminal and type “PySpark”.

How it works? Producer.py, Consumer.py and Main.py files

Now we got the good stuff mon’!

  1. To start with the streaming we must first run our servers(Zookeeper, Kafka, Spark), as we did before, we need to open three new terminals and in each, we start a different process.
  • Terminal 1: Type → “zkserver”
  • Terminal 2: Move to “C:\KAFKA” and type → “.\bin\windows\kafka-server-start.bat .\config\server.properties”
  • Terminal 3: Type → “pyspark”

2. Create four files in your project folder:

  • File 1 → “auth_tokens.py” →Here we will store our Twitter API keys to be granted access to the twitter hell.
  • File 2 → “kafkaProducer.py→ This will be our Twitter data extractor, that will send the data to our consumer file
  • File 3 → “kafkaConsumer.py” → Here we will receive the information sent by our producer and prepare it for processing in our main.py file
  • File 4 → “main.py→ In this file we will make all our processing functions, data cleaning, sentiment analysis and parquet storage.

auth_tokens.py

In this file, we’ll declare our twitter keys (Not much to say in this file)

kafkaProducer.py

  • We begin by importing the libraries
  • We generate the instance of our Kafka producer, our search term, and topic name, which we established up in our tutorial. Since we are using our computer as our host we will configure the Kafka producer with its default port 9092 → ‘localhost:9092".
  • We define our tweeter authentication function
  • We overwrite our tweet listener function in order to send information to our Kafka consumer
  • Now we write our main function where we call all the other functions we just wrote
  • Finished file

kafkaConsumer.py

  • With this script, we’ll catch the information that the Producer is sending us, and we’ll make it available to our main.py script
  • Import libraries and define our topic variable previously defined
  • We create or Kafka consumer instance and configure it so that the information received in hex is transformed to a readable string in JSON.
  • Finished file

main.py (The big one… not really)

In this file, we will start working with the information received from our Kafka consumer. We’ll go through a process of Data reading → Data preparation → Data cleaning → Subjectivity and polarity calculation (using the TextBlob) → Sentiment classification → Data storage. All in a day's work!

Let's start by importing libraries need for our tasks.

Data Reading

  • On our main function, we create our SparkSession instance and Kafka stream reader.

Data preparation

  • In here, we will convert the stream into a readable string and create the schema for the dataframe that will store it.

Functions definition

  • Once we have a data frame with which we can work, it's time to create the function that will clean all the link, emojis, usernames, etc. from the tweets we captured.
  • Now that we are defining function, we should define our subjectivity, polarity and Sentiment classification functions. For this, we will use Textblob’s predefined functions. For more information on Textblob click here.
  • Once defined our functions, we should convert them to UDF or User Defined Functions in our main function. More information on UDFs click here

Subjectivity and polarity calculation and Sentiment classification

  • Now defined our functions, the next easy step is to calculate the subjectivity and polarity of each tweet, then classify it as positive, negative or neutral. So we add the next code segment to our main function.

Data storage

  • Finally! Now that we have calculated the polarity and we have a usefull dataframe we need to store it. For tis we will use the parquet format that can save us arroun a 75% of storage space. Really usefull of we are paying for storage. For this we can add the following segment on our main.py.
  • To open parquet files and see its insides, I use ParquetViewer, you can download it here.
  • NOTE: As you will see in the following code segments, i’ve set a wait time of 60 sec, this is for testing purposes, in production you should adjust this time according to your needs.

Quick validation

  • For a quick validation of what is going on thorugh console, we can add the following segment at the end of our main function.

Final File

Let it there be light!!!

To run the proyect, just follow this simple steps…. DELETE EVERYTHING!… Just joking… Am I???

STEPS:

  1. Run our servers in different terminals (Zookeper, Kafka, Spark).
  2. Run kafkaProducer.py on an independent terminal and let it run!
  3. Run kafkaConsumer.py on an independent terminal.
  4. Run main.py and watch your parquet files be generated with our dataframe.
  5. Kill every process when you are done… KILL THEM WITH FIRE!!!

Final Notes

Well it’s been quite the trip! If you managed to reach this point i want to thank you for your time! If you got any questions you can write me a message to my medium account.

The github project is in here → https://github.com/Silvertongue26/zookeeper_kafka_pyspark_polarity-analysis

So for now… May the force be with you… .you’ll need it….

Bibliography

Special Thanks

To Lorena Gongang who’s guide was really helpful in the elaboration of this entry, please visit her webpage for many great projects → https://lorenagongang.com/

--

--