Streaming From Kafka to Snowflake : Part 1— Kafka to S3

Avi Paul
Avi Paul
Aug 31, 2018 · 6 min read

Increasingly, organisations are finding that they need to process data as soon as it becomes available. In addition, there has been a growing demand of separating storage and compute. Enter Kafka and Snowflake; we can put streaming data in a cloud data warehouse. And then you can take the unstructured data from snowflake and use an ELT tool like Matillion and convert them to structured data and conduct advanced analytics with machine learning.

Since the entire end to end process involves multiple tools, I have divided it in multiple parts so that it can be modular and you can implement it as you see fit. This part contains how to get streaming data from Kafka to S3. Once you are done with this part you can go to part 2 to move data from s3 to snowflake.

What we will achieve at the end of this post

At the end of this post, we will be able to type in some sentences in our Kafka producer console and it will automatically appear in our s3 bucket as a json file.

Requirement:

  1. Access to S3 account so that you can get AWS_KEY_ID and AWS_SECRET_KEY
  2. Have docker installed.
  3. Some basic understanding of Kafka including what is a topic, consumer and producer.

Step 1: Get Kafka

Installing Kafka can be fairly challenging. Therefore, for this part of the process we will be an existing docker image. Among the popular Kafka docker images out there, I found Landoop to work better than others. So we will be using this for this post.

Hopefully you have docker installed. If you don’t have, please do so from docker website. If you have, open your terminal and run the following command

docker run -p 2181:2181 \
-p 3030:3030 \
-p 8081-8083:8081-8083 \
-p 9581-9585:9581-9585 \
-p 9092:9092 \
-e AWS_ACCESS_KEY_ID=your_aws_access_key_without_quotes \
-e AWS_SECRET_ACCESS_KEY=your_aws_secret_key_without_quotes \
-e ADV_HOST=127.0.0.1 \
landoop/fast-data-dev:latest

Please note a key point here. I have included AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY in the docker run. Please ensure you are following this for test purpose and not for production because I am exposing access keys and secrets.

In addition to that, you should see it loads quite a lot of services including Zookeeeper, broker, schema registry, rest proxy, connect. That is where we are opening various ports.

After about two minutes you should be able to go to http://127.0.0.1:3030 to see the Kafka connect UI.

Step 2: Create a S3 bucket

The next step is to connect to the S3 bucket since we will be uploading our files to s3 bucket. Login to your aws account and create your bucket.

Step 3: Connect with S3 bucket

From the User interface, click enter at Kafka connect UI . Once you are there, click New connector.

After you click new connector, you will see a lot of connector that you can connect to. Since we want to connect to S3, click the Amazon S3 icon. And you can see that you are presented with some settings with lots of errors.

In order to get rid of the error, we need to change the following settings. From the following list, you need to change your s3.region as your bucket may not be in Sydney and your s3.bucket.name to the bucket you have created.

connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=ap-southeast-2
format.class=io.confluent.connect.s3.format.json.JsonFormat
topics.dir=topics
flush.size=1
topics=Name_of_your_topic
tasks.max=1
value.converter=org.apache.kafka.connect.storage.StringConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
key.converter=org.apache.kafka.connect.storage.StringConverter
s3.bucket.name=your_bucket_name

Although few of the classes such as key.converter and value.converter and topics.dir is not there, not including them tends to create error message. In addition to this, you also need to provide a topic. I will be explaining what a topic is soon. For now, you can just name it as “first_topic”.

Once you fill up all the details and click create, you should see it similar to what I have.

Step 4: Go into docker bash

In order to open the bash inside the docker, we will need the docker id. To get the id, we can do a docker ps

docker ps

This will give you a list of docker container that is running

If you notice from the image, I have only one container running with the ID ‘fb218ef3d1aa’. To open the bash inside there, we can now type in the following command.

docker exec -it fb218ef3d1aa /bin/bash

This will get the us in the bash and we can now create our topic, producer and consumer

root@fast-data-dev / $

Step 5: Creating Kafka topic

Remember we mentioned about our Kafka topic? According to the definition from Kafka apache documentation page:

The Kafka cluster stores streams of records in categories called topics.

So if we don’t have a topic, we cant stream our records, or for our case type a message and send it through. We will be creating our topic and name it as first_topic. We also need to give the zookeeper URL and port which we have created before. Kafka also needs to know partitions and replication factors. We will go into details about that in another post. For now, we just want it to work from end to end.

root@fast-data-dev / $ kafka-topics --zookeeper 127.0.0.1:2181 --topic first_topic --create --partitions 3 --replication-factor 1

Our command should now say, Created topic “first_topic”.

Step 6: Start Kafka console producer

To create messages, we will need to start our Kafka producer console. To create Kafka console producer, we will use the following command. Once you press enter, you should see a > appear on the screen expecting you to type something.

root@fast-data-dev / $ kafka-console-producer --broker-list 127.0.0.1:9092 --topic=first_topic

Step 7: Start Kafka Console Consumer

To check if the message that you are typing is actually going through, lets open a consumer console. To do that, we will open a new terminal window and go in the same docker container by doing the following

docker exec -it fb218ef3d1aa /bin/bash

And once we are in the bash, we can now open the consumer console

root@fast-data-dev / $ kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic

Step 8: Type in something in the producer console

If we now type anything in the kafka-console-producer, it will appear in the console consumer and create a json file in S3. Download it the file and you will see what you typed!

Kafka Producer to Consumer and S3

The next step is to send this data from S3 to Snowflake so that it appears in snowflake automatically. To do that, let’s go to the next part.

https://www.servian.com/data-and-analytics/

WeAreServian

The Cloud and Data Professionals

Avi Paul

Written by

Avi Paul

Data Engineer and Machine Learning Developer @ Servian.

WeAreServian

The Cloud and Data Professionals

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade