How to connect data being distributed via a web-socket to Kafka and then onto an S3 bucket.
If you’ve made it here you’ve probably exhausted stack-overflow and just want to know how to stream data into an S3 bucket. Fear not, the code is included below.
But first, a couple of helpful resources (other than stack-overflow) that i came across on my quest for real-time data storage.
- The Confluent Slack group: You may have to put up being bombarded with sales emails but the first hand help you’ll get to specific problems is second to none.
- If you’re completely unfamiliar with Kafka then do check out this Medium post for a good intro.
- Confluent provide a load of test code on their git page which helped me towards forming a base configuration.
As you may have already guessed, this article uses Kafka Connect; a tool created by Confluent. My first mistake was to start with the original, vanilla Kafka released by the Apache foundation. Save yourself a load of trouble and head over to Confluent and use their set of client libraries. Set up by the same guys who developed Kafka at LinkedIn, they’ve created a whole eco-system of frameworks and tools centred around Kafka.
The reason for all of this is to have all the data being produced by the web socket stored in a more “permanent” data store. This maybe be to do any data analytics, ML training or in my case, have recorded environmental data at hand for any models that may need it.
Now in this example we’re converting the received data over the web socket into json, so Minio may not be the best choice and you maybe better off with a proper document store like MongoDB. However, Minio still remains a solid choice as it can hold just about anything… but that’s a debate for another day.
The goal of this article it to create something like this:
There’s basically two parts to getting this up and running:
- A python app that connects to the web socket and then streams the messages to our Kafka server over a pre-configured topic (test_topic).
- A docker-compose file that pulls all the dependancies we need to run the pipeline. This includes, Minio, Kafka and all the Confluent tools.
The docker compose file also runs some in-line scripts to configure Minio and Kafka Connect with anything it needs to run. This is also handy as it means the only app that we need to deploy is the python data producer as any extra set up is pumped straight into the tools on start up. More on that below.
The Web-socket Data Producer
Given we got a web socket thats pumping out data, the first step is to write a small python script which will:
- Connect to a web socket
- Connect to Kafka
- Convert the data from the web socket into JSON and pipe it into a Kafka topic
This should do the trick:
To aid deployment we’ll wrap all the above in a Docker file:
The script above using the following requirements.txt:
We can then combine this docker file along with any other services we need ( like Kafka) into a docker-compose file to bring everything up together.
I’ll link it here as its quite large.
There’s alot going on in the compose file so i’ll try and break it down.
This service waits for MINIO to start before it creates a bucket for the data to go into. This bucket name will be used by Kafka Connect later.
This service is the dockerized kafka_websocket_producer.py file which streams the data into a Kafka topic. You’ll probably need to play around with the path to the Docker file. I had mine in a folder called “streaming”
This service is the Confluent tool which listens to a topic and and puts the data into an S3 bucket. Amongst other things it:
- It downloads the Confluent s3 sink add-on
- Tells the tool which topic to listen to
- Sets up the Minio connection
There’s a load of settings which define how the data gets stored and partitioned in the Minio bucket which i’d recommend having a play with, primarily flush.size and partitioner.class. In this example it creates a new partition in the bucket every time 100mb has been received in the topic.
Other tools which form part of the Confluent Kafka eco-system
Core Kafka services:
A user interface, viewable at http://localhost:9021/
Running it all up
You should be able to see the data going into the bucket by navigating to the Minio server URL and having a look in test-bucket.