Streaming From Kafka to Snowflake : Part 1— Kafka to S3
Increasingly, organisations are finding that they need to process data as soon as it becomes available. In addition, there has been a growing demand of separating storage and compute. Enter Kafka and Snowflake; we can put streaming data in a cloud data warehouse. And then you can take the unstructured data from snowflake and use an ELT tool like Matillion and convert them to structured data and conduct advanced analytics with machine learning.

Since the entire end to end process involves multiple tools, I have divided it in multiple parts so that it can be modular and you can implement it as you see fit. This part contains how to get streaming data from Kafka to S3. Once you are done with this part you can go to part 2 to move data from s3 to snowflake.
What we will achieve at the end of this post
At the end of this post, we will be able to type in some sentences in our Kafka producer console and it will automatically appear in our s3 bucket as a json file.
Requirement:
- Access to S3 account so that you can get AWS_KEY_ID and AWS_SECRET_KEY
- Have docker installed.
- Some basic understanding of Kafka including what is a topic, consumer and producer.
Step 1: Get Kafka
Installing Kafka can be fairly challenging. Therefore, for this part of the process we will be an existing docker image. Among the popular Kafka docker images out there, I found Landoop to work better than others. So we will be using this for this post.
Hopefully you have docker installed. If you don’t have, please do so from docker website. If you have, open your terminal and run the following command
docker run -p 2181:2181 \
-p 3030:3030 \
-p 8081-8083:8081-8083 \
-p 9581-9585:9581-9585 \
-p 9092:9092 \
-e AWS_ACCESS_KEY_ID=your_aws_access_key_without_quotes \
-e AWS_SECRET_ACCESS_KEY=your_aws_secret_key_without_quotes \
-e ADV_HOST=127.0.0.1 \
landoop/fast-data-dev:latestPlease note a key point here. I have included AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY in the docker run. Please ensure you are following this for test purpose and not for production because I am exposing access keys and secrets.
In addition to that, you should see it loads quite a lot of services including Zookeeeper, broker, schema registry, rest proxy, connect. That is where we are opening various ports.

After about two minutes you should be able to go to http://127.0.0.1:3030 to see the Kafka connect UI.

Step 2: Create a S3 bucket
The next step is to connect to the S3 bucket since we will be uploading our files to s3 bucket. Login to your aws account and create your bucket.
Step 3: Connect with S3 bucket
From the User interface, click enter at Kafka connect UI . Once you are there, click New connector.

After you click new connector, you will see a lot of connector that you can connect to. Since we want to connect to S3, click the Amazon S3 icon. And you can see that you are presented with some settings with lots of errors.

In order to get rid of the error, we need to change the following settings. From the following list, you need to change your s3.region as your bucket may not be in Sydney and your s3.bucket.name to the bucket you have created.
connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=ap-southeast-2
format.class=io.confluent.connect.s3.format.json.JsonFormat
topics.dir=topics
flush.size=1
topics=Name_of_your_topic
tasks.max=1
value.converter=org.apache.kafka.connect.storage.StringConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
key.converter=org.apache.kafka.connect.storage.StringConverter
s3.bucket.name=your_bucket_nameAlthough few of the classes such as key.converter and value.converter and topics.dir is not there, not including them tends to create error message. In addition to this, you also need to provide a topic. I will be explaining what a topic is soon. For now, you can just name it as “first_topic”.
Once you fill up all the details and click create, you should see it similar to what I have.

Step 4: Go into docker bash
In order to open the bash inside the docker, we will need the docker id. To get the id, we can do a docker ps
docker psThis will give you a list of docker container that is running

If you notice from the image, I have only one container running with the ID ‘fb218ef3d1aa’. To open the bash inside there, we can now type in the following command.
docker exec -it fb218ef3d1aa /bin/bashThis will get the us in the bash and we can now create our topic, producer and consumer
root@fast-data-dev / $Step 5: Creating Kafka topic
Remember we mentioned about our Kafka topic? According to the definition from Kafka apache documentation page:
The Kafka cluster stores streams of records in categories called topics.
So if we don’t have a topic, we cant stream our records, or for our case type a message and send it through. We will be creating our topic and name it as first_topic. We also need to give the zookeeper URL and port which we have created before. Kafka also needs to know partitions and replication factors. We will go into details about that in another post. For now, we just want it to work from end to end.
root@fast-data-dev / $ kafka-topics --zookeeper 127.0.0.1:2181 --topic first_topic --create --partitions 3 --replication-factor 1Our command should now say, Created topic “first_topic”.
Step 6: Start Kafka console producer
To create messages, we will need to start our Kafka producer console. To create Kafka console producer, we will use the following command. Once you press enter, you should see a > appear on the screen expecting you to type something.
root@fast-data-dev / $ kafka-console-producer --broker-list 127.0.0.1:9092 --topic=first_topicStep 7: Start Kafka Console Consumer
To check if the message that you are typing is actually going through, lets open a consumer console. To do that, we will open a new terminal window and go in the same docker container by doing the following
docker exec -it fb218ef3d1aa /bin/bashAnd once we are in the bash, we can now open the consumer console
root@fast-data-dev / $ kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topicStep 8: Type in something in the producer console
If we now type anything in the kafka-console-producer, it will appear in the console consumer and create a json file in S3. Download it the file and you will see what you typed!

The next step is to send this data from S3 to Snowflake so that it appears in snowflake automatically. To do that, let’s go to the next part.
