AWS Datapipeline and Dynamodb Lookup

In my last blog, we saw about an use case about server less compute using AWS Lambda and real-time streaming data to AWS redshift using AWS Firehose. This time I am going to talk about another interesting requirement I worked recently.

The requirement was to capture the failure events published by the upstream service that contained information that our team wanted for failure analysis that would later be fed back as an input to Machine learning algorithm involved in building a user scoring mechanism (I will refrain from going too much in to the domain). But the event that is getting published was a super set of any failures that happen during a particular process in the Amazon fulfillment center. For our needs we have to filter the events with an identifier that belongs to our program.

System Architecture

Using DataPipeline to populate Dynamodb

Architecture Break Down

ProductId is the identifier based on which we are going to filter the events. So essentially all I need is a real time store that contained ProductIds that are part of our program. Idea here is to use the real time store containing ProductIds as a lookup table. For every event that is processed, ProductId present in the incoming event will be checked against the real time store and then the decision is taken whether to consume the event or not.

I chose dynamodb as that real time store because I needed a fast, flexible and single-digit millisecond latency store for this use case. All the ProductIds that are part of our program are already present in the redshift table. Now I need a way to populate the real time dynamodb store from the redshift table (Note: redshift is not meant to be used for real time queries and its a data warehouse solution) Let’s see how I got the data from the source first,

AWS Redshift

I already talked about redshift in my previous blog. However I didn’t mention anything about connecting to redshift cluster and getting the information from redshift tables. Below link has the useful information about the connection and setup.

For my use case I connected to Redshift and got the ProductIds that are part of our program and stored that into a json file. For example, the contents of the file will have ProductIds in a separate line like,




AWS Dynamodb

Amazon DynamoDB is a fast and flexible NoSQL database service for all applications that need consistent, single-digit millisecond latency at any scale. It is a fully managed cloud database and supports both document and key-value store models

In my use case the table I created contained ProductId as the hash key based on which the look up will be performed. More information about different components of dynamo db can be found here,

Let’s see how I managed to populate the table with the information obtained from the redshift query.

Approach 1 — put-item API

Dynamo db exposes put-item API that can be used to put items into the table. The syntax for putting a single item would look like this,

bin/aws dynamodb put-item --table-name IdentifierStore --item ‘{“product_id”: {“S”: “product_id1”}}’ --return-consumed-capacity TOTAL

I could write a script to pass in the inputs to the put-item api to populate the table.


  • No parallelization and hence takes long time
  • Writing own script and monitoring till it completes (possibility of something going wrong during the execution)
  • In case of failures, have to manually check the table to see where the execution stopped and re-run the script from the point where it stopped.
  • Manual intervention needed.

Approach 2 — batch-write-item API

Dynamo db exposes batch-write-item API that can also be used to put 25 items at once into the table. The syntax for batch write item would look like this,

bin/aws dynamodb batch-write-item --request-items file://request-items.json --return-consumed-capacity TOTAL
For example, contents of the request-items.json file will look like below,
IdentifierStore”: [{“
PutRequest”: {“
Item”: {“
product_id”: {“
S”: “product_id1”
}, {“
PutRequest”: {“
Item”: {“
product_id”: {“
S”: “product_id2”
}, {“
PutRequest”: {“
Item”: {“
product_id”: {“
S”: “product_id3”

With this approach I could write a script to pass in the inputs to the api to populate the table.


  • Even though we are doing a batch write, the time would vary based on the number of input provided.
  • Writing own script to split the input into batches of 25 and making sure the execution completes for each batch and ensuring all the items are written to the table (a good amount of manual work to be done)

Approach 3— AWS Datapipeline

Newer AWS component I got exposed to this time and used it to perform my data processing job is the AWS Datapipeline.

AWS Data Pipeline is a web service that helps you reliably process and move data between different AWS compute and storage services, as well as on-premise data sources, at specified intervals. With AWS Data Pipeline, you can regularly access your data where it’s stored, transform and process it at scale, and efficiently transfer the results to AWS services such as Amazon S3, Amazon RDS, Amazon DynamoDB, and Amazon EMR

As you can see, lot of manual work involved in the use case like writing a script, maintaining the script, handling the scale of the input, handling check points, re-running the script or handling other unknown failures has been taken care by the Datapipeline. Before going to AWS Datapipeline, let’s see how I uploaded the object into S3


AWS Simple Storage Service (Amazon S3) is object storage with a simple web service interface to store and retrieve any amount of data.

In the redshift section I mentioned how I fetched the results of the query and placed it into a json file. I have to upload the json file into S3. Wait, before that I have to make sure the json file is in right format complying to the table schema (Note: Format of the json file should match that of the table’s schema). So, I reformatted the json file to the format accepted by the Datapipeline to import the data from S3 and export it to dynamo db .

I uploaded the formatted json file into S3. Contents of the formatted json file will look like this,


Creating Datapipeline

Now I created a datapipeline to import the json object stored in s3 into dynamo db items. Pipeline can be created using existing templates defined or build one using the Architect page. For this task, I used the “Import Dynamodb backup data from S3” template under Dynamodb templates.

Other Configurations:

  • S3 folder — folder where the json object is stored
  • Dynamo db table — table that I am going to populate.
  • Schedule — Instead of a one time job, pipeline can be scheduled to run with different options. For example, Run every day at 12 AM and end after 100 occurrences and so on.
  • Security Access — If you are creating the datapipeline for the first time, a default IAM role required to access the pipeline and ec2 instances (internally a EMR job is setup by the pipeline to complete the task, sample of architect page shows EMR activity, details in the section) will be created

There is an option to verify the set of tasks that will be performed for the job by clicking on “Edit in Architect” button on the console page. A sample view of the architect below,

Architect Page for the Datapipeline created

In the architect page, on the left side is the workflow diagram that shows the steps of the execution and on the right side are the various options of the datapipeline that can be modified like table name, s3 folder, etc.,


As you can see AWS Data pipeline has saved a considerable amount of time for me. I don’t have to write even a single line of code for this data processing job and the required configurations are updated through the Datapipeline console which takes care of validating the existence of the required components by itself making users life easier.

Feel free to share your use cases and stint with AWS components. Also please share/critique your comments about the blog.