Streaming using Kinesis Data Firehose and Redshift

fact
8 min readFeb 11, 2019

--

I want to show you how easy it is to setup a streaming application using AWS firehose. It is interesting to know that it uses manifest file to keep track of the files processed and copy command to continuously ingest the streaming data.

AWS has at the moment of writing this blog 5 types of real time processing services. That includes -

1. Amazon Kinesis Video Streams

2. Amazon Kinesis Data Streams

3. Amazon Kinesis Data Firehose

4. Amazon Kinesis Data Analytics

5. Amazon Kinesis Agent for Microsoft Windows

This topic is about Firehose since you can connect it directly to Redshift. Setting up the entire process can be broadly divided into steps as follows with setting up IAM credentials in between.

1. Setting up Twitter App

2. Creating Redshift cluster

3. Creating database, schema and table in Redshift.

4. Creating a S3 bucket to hold streaming data.

5. Creating Kinesis delivery stream

6. Feeding real time Twitter data into Kinesis.

Setting Twitter app.

In order to get access Twitter API for streaming into your application, it is required to set up an app in your Twitter account and it’s pretty simple. For this

1. Go to https://developer.twitter.com/en/apps and create an account if you don’t have one.

2. Once logged in click on the create app button. Fill in the fields mandatory like App Name, Description, Website URL( provide an unique URL), Tell us how this app will be used. You may leave fields which are not mandatory.

3. Navigate to Keys and Tokens and under Access token & access token secret click the create button. We will need the Consumer API keys and Access token & access token secret later to connect and download data from Twitter.

Creating Redshift cluster

When you first launch a Redshift cluster, it is not accessible to you from the internet. This is because , it is locked down by default so nobody has access to it. To grant other users inbound access to an Amazon Redshift cluster, you associate the cluster with a security group.

Before we launch the Redshift cluster, let us first set up a security group and make it available to be accessed from the internet. We will then attach this security group to the Redshift cluster. That way the Redshift cluster will be available for you to be connected.

To create a VPC security group you may go to the Security tab under Amazon Redshift service. In my case my account does not support creation from here because of the region.

If this is the case, go to VPC section under Services → Networking and Content Delivery or search for VPC in the search box. Once there click the Security Groups tab and Create security group button.

Once the security group is created, edit it and add the inbound rule. In this case I added “All traffic”. This is end of security group creation process.

Next lets us create and launch the Redshift cluster. For this navigate to Services tab and under Database you will find Amazon Redshift or alternatively you can use the search box in the services to find Redshift. Once there click the launch cluster and follow the steps and move forward.

Click Launch cluster.

Provide you cluster name, database , port is by default 5439, master user name credentials.

You can choose node type here as follows, for our example Single node and dc2 large will suffice. For production environment we use 8 nodes and dc2.8xlarge node type which gives us around 18 Tb db size. We also use ds2.xlarge node type but I would no recommend it if your tables are huge and you run analysis everyday.

Select the VPC security group you have created above and create the cluster.

Once the cluster is launched attach the IAM Role — AWSServiceRoleForRedshift to the cluster by clicking on the link marked below or via Manage IAM roles button, you will see the selection in the drop down.

Without the VPC security group and IAM role, you can still connect to your cluster using the Query Cluster tab under the Cluster drop down button.

Connect to the cluster using the Enpoint provided and your admin user/ password provided during cluster creation.

Creating database, schema and table in Redshift

Once connected to Redshift, you can create a Schema and the table and view as provided in the code. I am here intending to store the whole feed into a single column and use redshift’s Json function to extract the data. Alternatively if you are comfortable with python you can structure the data before sending it to Kinesis and in that case you may use a table structure that suits you. In redhsift you can choose your distribution and sort key while creating the table. In production environment it’s very import to choose one, in this case since I am just demonstrating it , I haven’t used one.

Creating a S3 bucket to hold streaming data

If you are going to create a S3 bucket for the first time go to the S3 section under Storage and click on the Create bucket button. Leave everything as default unless you want to have specific features.

Next create an user using AIM and remember to save the Access key ID and the Secret access key.

Once the user is created click on the user and Add permissions using Attach existing policies directly. I have selected Full access here which you will need.

You can use existing policy from the Attach existing policies directly button.

Now you may create a folder for the twitter feed under your bucket so as not to clutter your bucket. You can use S3 Browser to create it.

Creating Kinesis delivery stream

Under Services Analytics you can find the Kinesis. Select the Delivery streaming data with Kinesis Firehose delivery streams.

Give a name and select the Direct put or other sources as the Source.

If you intend to transform the feed before loading into Redshift you may select Record transformation option in the next screen else leave default an move on to he next screen.

Select Amazon Redshift as Destination in the next page and provide Redshift details. The table name should be in the format of schema_name.table_name. It would be note worthy that in case of production you may want to create a user ID in Redshift and use that instead of the admin User Id.

In the Intermediate S3 destination section provide the bucket and the folder name followed by a slash in the Prefix. Here since I am loading the feed as it is I have provided fixed width as the copy option. If you don’t specify anything here by default the delimiter is , (comma). Specify the retry duration by default it’s 3600. In case you want the copy command to be retried you can specify longer duration upto 7200 seconds. The benefit for having a longer retry duration is that you will get some time to fix the file or your copy command options.

The s3 folder will be used to run the copy command from.

Note for AWSKinesis Data Firehose delivers your data to your S3 bucket first and then issues an Amazon Redshift COPY command to load the data into your Amazon Redshift cluster.

Next at the Configure settings page, specify the Buffer size and Buffer interval and other requirements. I provided an aggressive Buffer size and interval since twitter feeds are going to be fast. Enabling Error logging will enable you to see the errors like copy command failure in the console. Again a note from AWS — “If data delivery to the destination falls behind data writing to the delivery stream, Kinesis Data Firehose raises the buffer size dynamically to catch up”

Select firehose_delivery_role in the IAM role section. Finally review and create it.

Feeding real time Twitter data into Kinesis

For this lets create an user in IAM and attach the AmazonKinesisFirehoseFullAccess policy. Review and create the user and note down the Access key ID and Secret access key. If you forget to note it down you can create the access key again by going to the security credentials for the user.

For this part I have small python file which connects to you twitter account searches for a tag(in this example “donald”) and uploads into firehose. I have used python 3 for this example.

I have used tweepy library, if you don’t have tweepy install it using

pip install tweepy

Now in you code import StreamListener,Stream,OAuthHandler

from tweepy import StreamListener,Stream,OAuthHandler

The whole code looks like this. Run this code on your console and after sometime you may exit it. Query the redshift table you created and you should be able to get the records.

Now go ahead and have some fun streaming.

--

--