Real-Time Data Pipeline using Apache Kafka, Glue and Athena on AWS Cloud
Apache Kafka is an open-source software platform designed for handling and managing large amounts of data, particularly data streams or real-time event data. It’s like a powerful data pipeline that helps organizations collect, process, and distribute data between various applications and systems.
Let me explain this in a better way using a real-life example :
Imagine a group of friends who share interesting stories, pictures, and videos with each other. Apache Kafka in this case would be a super organized messenger that helps one manage all these messages.
Apache Kafka acts as a hub, which receives all the messages sent by the friend group, instead of them directly sending the messages to each other. This hub helps keep track of all the things being sent. This hub maintains the order in which the messages are received and keeps them safe. Also, the friends might assign a category ( for example, “News” or “Meme” ) and the hub keeps track of that as well.
But here’s where it gets really interesting: you and your friends don’t have to be online at the same time. If someone sends a message while you’re not online, you can check it out later because Apache Kafka persists these messages until you’re ready to see them. Also, you can subscribe to some particular category of messages and receive them whenever they are sent.So, Apache Kafka is like a super reliable, organized messenger hub that helps you and your friends share messages, making sure they’re saved, ordered, and sent to the right people. It’s especially handy when you have lots of messages to manage and want to make sure nothing gets lost.
Now that we have understood what Apache Kafka is, let us move on to its implementation.
In this project, we are going to set up a Kafka server on an EC2 instance on AWS. We simulate Real Time Stock Market data by producing data constantly.
Architecture
Technology Used
- Programming Language — Python
- Amazon Web Service (AWS)
- S3 (Simple Storage Service)
- Athena
- Glue Crawler
- Glue Catalog
- EC2
- Apache Kafka
Dataset used
Here is the dataset used in the project: LINK
As can be seen from the Architecture diagram, the project involves a Real-Time Stock Market simulation producing data, which the Kafka Server tracks and eventually gets saved in an AWS S3 bucket. We then run a Glue crawler over this datastore, to create a schema which is saved in the AWS Glue catalog. This schema can further be used to query the data as SQL queries using AWS Athena.
Let's move on to the project :
Step 1: Launching an EC2 Instance.
For the sake of the project, we would keep most of the properties of the instance as default. The only change required would be adding a Security Group that allows traffic from All IPs ( just for the sake of the project ) , so that our instance can handle requests over the internet.
Step 2: Logging in to the instance using ssh
When we create the instance, we get a key pair for connecting to the instance using ssh. Make sure to download the primary key and move it to the project folder. Now, while in the project folder ssh into the instance using the command under the Connect section as shown below “
Step 3: Installing and downloading the required packages like apache-kafka and Java :
Installing JAVA on the EC2 instance: We run the following commands to setup JAVA on the instance :
1. wget https://download.oracle.com/java/20/latest/jdk-20_linux-x64_bin.rpm
2. sudo rpm -Uvh jdk-20_linux-x64_bin.rpm
3. export JAVA_HOME=/usr/java/jdk-20.0.1
4. export PATH=$PATH:/usr/java/jdk-20.0.1/bin
Once you run these commands, JAVA 20.0.1 ( or any version you choose ) gets installed onto your instance.
Then we download Kafka onto the instance :
1. wget https://downloads.apache.org/kafka/3.5.1/kafka_2.12–3.5.1.tgz
2. tar -xvf kafka_2.12–3.5.1.tgz
Step 4: Starting the Zookeeper
To start the zookeeper we write the following commands :
1. cd kafka_2.12-3.5.1
2. bin/zookeeper-server-start.sh config/zookeeper.properties
Step 5: Starting the Kafka server
To start the Kafka server we write the following commands :
1. export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
2. cd kafka_2.12-3.5.1
3. bin/kafka-server-start.sh config/server.properties
It is currently pointing to a private server, change server.properties so that it can run in public IP ( can be checked from the Instance properties page in AWS )
Do a “sudo nano config/server.properties” — change ADVERTISED_LISTENERS to public IP of the EC2 instance
Step 5: Create the topic
To create a topic run the following commands :
1. cd kafka_2.12-3.5.1
2. bin/kafka-topics.sh --create --topic demo_testing2 --bootstrap-server {Put the Public IP of your EC2 Instance:9092} --replication-factor 1 --partitions 1
Step 6: Start the Producer
To start the producer run the following commands
1. cd kafka_2.12-3.5.1
2. bin/kafka-console-producer.sh --topic demo_testing2 --bootstrap-server {Put the Public IP of your EC2 Instance:9092}
Step 6: Start the Consumer
To start the consumer run the following commands
1. cd kafka_2.12-3.3.1
2. bin/kafka-console-consumer.sh --topic demo_testing2 --bootstrap-server {Put the Public IP of your EC2 Instance:9092}
After following these steps, we have setup a Kafka server with the producer and consumer running locally on the terminal of the instance.
Instead of using this manual producer, we would instead generate real-time data using Python code and make it act like the producer. This is done in the following way.
Step 7: Writing Python code for Producer :
Open a Jupyter Notebook and install all the Kafka-python package
pip install kafka-python
producer = KafkaProducer(bootstrap_servers=['13.232.87.142:9092'], #change ip here
value_serializer=lambda x:
dumps(x).encode('utf-8'))
The above statement declares a Kafka Producer with the different parameters mentioned as arguments to the KafkaProducer function.
We are going to use the dataset mentioned before to generate and simulate real-time stock market data.
The process followed here is, that we randomly pick a row from the dataset and consider it to be real-time stock data, in order to simulate the real thing. In the future, actual Stock Market APIs can also be used, but for simplicity's sake, we just simulate the behavior here.
while True:
dict_stock = df.sample(1).to_dict(orient="records")[0]
producer.send('demo_testing2', value=dict_stock)
sleep(1)
The above snippet acts as a constant data producer, sending stock market data every second to the Kafka server.
Step 8: Writing Python code for Consumer
In a new notebook declare the consumer using the following statements:
consumer = KafkaConsumer(
'demo_testing2',
bootstrap_servers=['13.232.87.142:9092'], #add your IP here
value_deserializer=lambda x: loads(x.decode('utf-8')))
Now if the cell in the producer notebook that produces data is running the consumer will start consuming the data. We can display it as follows :
But the final goal is to write this to an AWS S3 bucket.
So we will use the Python package s3fs to write the data onto an S3 bucket.
First of all, we will install the s3fs package using pip.
Next, we declare the S3 object :
s3 = S3FileSystem()
We use the S3 object to write the data as a JSON file onto the mentioned S3 bucket.
for count, i in enumerate(consumer):
with s3.open("s3://kafka-stock-market-ishaan/stock_market_{}.json".format(count), 'w') as file:
json.dump(i.value, file)
The count is introduced to have unique file names.
We will have to keep in mind the permissions between the EC2 instance and the S3 bucket. This would be done by creating new IAM roles ( read more from the documentation ), which allow access to these AWS resources from the other resources.
Once all these steps have been followed, the files will be written to the S3 bucket. This can be seen as follows :
Step 9: Implementing a AWS Glue Crawler :
An AWS Glue Crawler is a tool provided by Amazon Web Services (AWS) that automates the process of discovering and cataloging metadata about your data sources, such as databases, data lakes, or flat files. In simpler terms, think of it as a virtual detective that goes through your data to understand its structure and characteristics.
To create a Crawler :
- Go to AWS Glue and choose Crawlers from the sidebar menu.
- Click on the Create Crawler button and follow the steps of the crawler creation.
- Mention the data source to parse when prompted. This data source in our case would be the bucket we created in the last step.
Again since we have to access S3 from Glue, we need to set permissions for these resources. To do this we create an IAM role for Glue, giving it Admin access ( for our convenience ).
4. Next we create an output database ie. the database that stores the results of the crawler run.
Once all these steps have been followed we are able to create a AWS Glue Crawler and it will be listed in the list of Crawlers available.
Once the crawler gets created, we run the crawler and generate the schema of the datastore it crawls over as a table.
Step 10: Using AWS Athena to query the data
AWS Athena is a serverless interactive query service provided by Amazon Web Services (AWS).In other words, it lets you run SQL queries on large datasets stored in S3, quickly extracting valuable insights without the complexity of traditional database management.
Open the AWS Athena editor and choose the Database ( created in the above step ) and the table name and write any SQL query you want to run on the data in table form and get results.
There might be an error saying No Target Location found. To solve it, go to the Settings tabs in Athena editor and click on Manage. Create an extra S3 bucket to store the temporary Query results as prompted by the page.
Once all these steps have been followed, we have created an end-to-end Real Time Stock Market Data Ingestion pipeline and be able to query it using SQL statements.
If we turn on the producer and the consumer from the Jupyter Notebooks we have created, we can see the data is being written onto the S3 bucket in real time and can be queried in real-time as well.
For more insights, you may visit the git repository of the above project: https://github.com/Ishaan-Rawat/stock-market-kafka