Serverless ETL using AWS Lambda -Part 2

Loading President Trump’s Twitter data using nosql Dynamodb

This article will continue on the serverless architecture design that we developed in part 1 of the series. To summarize, we finished part 1 by building a lambda function that will trigger when a file is uploaded to an s3 bucket. Although the lambda function did very little other than printing the name of the bucket, file name and content type of the file, it did provide a foundation to do other things on the trigger. For example, we will now extend the lambda function to process President Trump’s tweets data using AWS Dynamodb, a fully managed serverless nosql database service. Before we begin, let’s cover the basics about AWS Dynamodb.

What is DynamoDB?

Amazon DynamoDB is a fully managed NoSQL database service that supports both document and key-value store models. It’s fully managed in the sense that you don’t need to spin up servers, just decide how much throughput you want and let AWS manage the internals. You can scale up the throughput programatically as your data need increase or you can set auto scaling target and let the service handle the rest. All this along with ability for fine-grained user access control makes Dynamodb a great choice for large scale unstructured data applications.

Create a table in DynamoDB

Let’s get started. Fireup the DynamoDB service from the AWS console and click Create table — can it get any simpler?

Make sure you are in the same region as your s3 bucket and lambda function. On the next screen, decide a name for your table and give it a partition key. Think of the partition key as the primary key of the table. You will have to ensure the json data load has uniquely identifying field. If a single column is not enough then you can add sort key such that the combination of partition and sort keys is unique. That’s all the preparation needed in DynamoDB to start loading data.

Getting the Twitter data

My source for this data set was this page that explains the use of twitter data api. https://nocodewebscraping.com/twitter-json-examples/

I had to massage the file a little to ensure that it is a standard json format. I have made it available here on github.

Below is a subset of elements from the dataset; the one’s that I will pull for this exercice are id,created_at, retweet_count, and text:

{
 “contributors”: null, 
 “coordinates”: null, 
 “created_at”: “Tue Aug 23 13:53:11 +0000 2016”, 
 “entities”: {
 “hashtags”: [], 
 “symbols”: [], 
 “urls”: [], 
 “user_mentions”: []
 }, 
 “favorite_count”: 6682, 
 “favorited”: false, 
 “geo”: null, 
 “id”: 768083669550366720, 
 “id_str”: “768083669550366720”, 
 “in_reply_to_screen_name”: null, 
 “in_reply_to_status_id”: null, 
 “in_reply_to_status_id_str”: null, 
 “in_reply_to_user_id”: null, 
 “in_reply_to_user_id_str”: null, 
 “is_quote_status”: false, 
 “lang”: “en”, 
 “place”: null, 
 “retweet_count”: 2301, 
 “retweeted”: false, 
 “source”: “<a href=\”http://twitter.com/download/android\” rel=\”nofollow\”>Twitter for Android</a>”, 
 “text”: "It is being reported by virtually everyone, and is a fact, that the media pile on against me is the worst in American political history!”, 
 “truncated”: false,….

Extending the lambda function

This is how the lambda function looked at the end of part 1, and we will now add a few things to it.

Lambda function — before

First, let’s get a client hook to DynamoDB service and create an object for the table we created above -‘tweets’ in this case.

dynamodb = boto3.resource(‘dynamodb’)
table = dynamodb.Table(‘tweets’)

Next, we will load the json data into a json object that will allow us to use keys from the json file. To do this we read the “Body” item of the response object and use the json.loads function in python to get a json object.

response = s3.get_object(Bucket=bucket, Key=key)
 tweets = json.loads(response[‘Body’].read())

From the json object we can now pull the key, values that we need:

‘tweetID’:tweet[‘id’],
 ‘tweeTime’:datetime.strptime(re.sub(“\+\d+\s”,””,tweet[‘created_at’]),”%a %b %d %H:%M:%S %Y”).isoformat(),
 ‘tweetRecount’:tweet[‘retweet_count’],
 ‘tweetText’:tweet[‘text’]

re.sub in above code replaces the +00000 in timestamp data, and then strptime converts the replaced string into a date time object, based on the format of the data. We then convert the timestamp data into nice looking isoformat.

The last step is to put these values into the DynamoDB table. DynamoDB provides APIs for interacting with the service. The basic API to put an item into the database is put_item.

put_item(**kwargs)
Creates a new item, or replaces an old item with a new item. If an item that has the same primary key as the new item already exists in the specified table, the new item completely replaces the existing item. You can perform a conditional put operation (add a new item if one with the specified primary key doesn’t exist), or replace an existing item if it has certain attribute values. You can return the item’s attribute values in the same operation, using the ReturnValues parameter.

While this is fine for a single row, we would need some batch operation when loading large data set. Bulk upload can be achieved using batch_write_item.

batch_write_item(**kwargs)
The BatchWriteItem operation puts or deletes multiple items in one or more tables. A single call to BatchWriteItem can write up to 16 MB of data, which can comprise as many as 25 put or delete requests. Individual items to be written can be as large as 400 KB.

The only issue I saw with this approach so far is that for the tiny amount of default throughput(read as Read Capacity Units, RCU and Write Capactiy Units, WCU) assigned to the DynamoDB service, I ran into time out errors when running large dataset. I will explore these options more in the future. For now I just reduced the default file size to under 10mb and that worked. For larger dataset, auto scaling may be a necessity. The lambda function should look like this now.

Role for DynamoDB

Before we load the data, we need to attach a policy to the role being used by lambda function that gives write privilege to the role. I added the AmazonDynamoDBFullAccess policy to the role to give read, write and create access.

Finally, loading the data

Let’s upload the sample data now to the s3 bucket, that will trigger this lambda function. You may recall, the lambda function is only triggered when filetype is .json and file name has prefix data. I will use cloudberry explorer to load the data file to s3 bucket.

Let’s look at the cloudwatch logs to ensure success.

If all went well you should see the items appear in DynamoDB.

We can now query this data in python using APIs. I will come back to this article and update a section for querying this data. For more information, refer the docs here.

Next, we will see how we can pull live twitter data and build upon the current design to load and analyze live twitter stream using Kinesis. At some point, it would be nice to build a visualization around this information. Hope this was fun, and till next time..

<Prv: Serverless ETL using AWS Lambda -Part 1