Serverless ETL using Lambda and SQS
AWS recently introduced a new feature that allows you to automatically pull messages from a SQS queue and have them processed by a Lambda function. I recently started experimenting with this feature to do ETL (“Extract, Transform, Load”) on a S3 bucket. I was curious to see how fast, and at what cost I could process the data in my bucket. Let’s see how it went!
Note: all the code necessary to follow along can be found at https://github.com/PokaInc/lambda-sqs-etl
The goal
Our objective here is to load JSON data from a S3 bucket (the “source” bucket), flatten the JSON and store it in another bucket (the “destination” bucket). “Flattening” (sometimes called “relationalizing”) will transform the following JSON object:
{
"a": 1,
"b": {
"c": 2,
"d": 3,
"e": {
"f": 4
}
}
}
into
{
"a": 1,
"b.c": 2,
"b.d": 3,
"b.e.f": 4
}
Flattening JSON objects like this makes it easier, for example, to store the resulting data in Redshift or rewrite the JSON files to CSV format.
Now, here’s a look at the source bucket and the data we have to flatten.
Getting to know the data
Every file in the source bucket is a collection of un-flattened JSON objects:
Each line represents an event (for example, a user liked a post) that was captured in Poka. We use this data internally to build usage analytics reports.
Finally, here’s some more stats about the bucket:
- Number of objects in the bucket: 125 179
- Total bucket size: 13.5 GB
- Object size distribution:
As you can see from the chart above, most of the objects in the bucket are below 200 KB in size.
Keeping eyes on the prize
The core of the task at hand is, for each object in the source S3 bucket, to:
- Read the object, line by line
- Flatten each line
- Store all the flattened lines of the source S3 object in the destination bucket
In the end, each file in the source destination bucket will have a sibling file with the same key in the destination bucket:
All in all, pretty simple. Next, we’ll see how to accomplish the ETL using SQS and Lambda.
How it works
Here’s an overview of the ETL pipeline I created leveraging AWS Lambda and SQS:
Let me guide you through all the steps numbered on the diagram.
- The
ListPages
Lambda lists all pages in the source S3 bucket. A page is a group of up to 1 000 object keys from the S3 bucket. The 1 000 keys limit is imposed by the S3 API. Note that this Lambda is managed by a state machine. The state machine allows re-invocation of the Lambda, so we can thoroughly list all pages in the bucket (which can take more than the 5 minutes allowed by Lambda). - Each page is sent as a message into the Pages SQS queue.
- The
SplitPages
Lambda function automatically (thanks to the Lambda + SQS integration) receives pages messages from thePages
SQS queue. Each invocation of theSplitPages
lambda receives at most 1 page. - The
SplitPages
Lambda inserts the keys contained in the page into theS3Objects
SQS queue. The keys are inserted in groups of 10, because a SQS queue will accept at most 10 messages in a single API call. This means that a single invocation of theSplitPages
Lambda will execute up to 100 SendMessage calls on theS3Objects
queue. - The
Transform
Lambda receives S3 object keys from theS3Objects
queue. Again, this is all automatic with the Lambda + SQS integration. Note that heavy horizontal scaling happens at this stage. The Lambda + SQS integration will continually increase the number of parallel executions of theTransform
Lambda until the queue is depleted. - For each S3 object key received, the
Transform
Lambda will load the S3 object in memory, split the contents of the file line by line, flatten each line and store the resulting JSON in the destination bucket. Since only 10 messages can be retrieved from a SQS queue in a single API call, up to 10 S3 objects are processed by a singleTransform
Lambda invocation.
Why pages?
The main bottleneck of this ETL pipeline is getting to insert all the source bucket’s keys in the S3Objects
SQS queue as quickly as possible. Once the keys are in the S3Objects
queue, you just crack a cold one and let the Lambda + SQS integration scale out to deplete the S3Objects
queue automagically.
To populate the S3Objects
SQS queue with all keys in a S3 bucket, a few strategies come to mind.
The easiest, but slowest, would be to list keys in the source S3 bucket, and insert them 10 at a time in the S3Objects
queue. Considering that the source bucket contains over 125 000 objects, inserting 10 objects at a time in a single SQS queue would require at least 12 500 SendMessageBatch
API calls to be executed in sequence. That would be pretty slow.
What we want is to maximize the number ofSendMessageBatch
calls in a given time period. To that end, you would want to send multiple SendMessageBatch
calls in parallel.
The faster strategy I came up with is to split the source bucket into groups of keys (pages) and let parallel Lambda invocations deal with individual pages. Now, each parallel invocation of the SplitPages
Lambda will call SendMessageBatch
at most 100 times. This strategy greatly improves the speed at which messages are inserted in the S3Objects
queue.
How fast did it go?
Let’s have a look at how the run performed:
Looking at the graph above, we can see that the whole ETL took about 20 minutes to complete. At peak, 2.51 million lines from 10 800 S3 objects were processed every minute.
Improving throughput
If you were to populate the S3Objects
SQS queue even faster, I’m sure the throughput could be even better. As the queue would get longer, the Lambda + SQS integration would continue to pull messages out of the queue faster and invoke more Lambdas in parallel, until the concurrency limit of the Lambda (or the AWS account) is reached.
Another strategy that can be adopted requires S3 buckets that have a file system-like structure. For each root directory in the S3 bucket, you could invoke a Lambda that would in turn list all sub-directories in the root directory, in recursive fashion. If a directory contains keys, the Lambda inserts those keys in the S3Objects
queue. Supposing a bucket containing 1 000 leaf directories that each contain 100 S3 objects, you could potentially run 1 000 Lambdas (one per directory) in parallel, inserting 100 000 messages in the S3Objects
queue in under 5 minutes.
Cost analysis
To calculate the cost of running this ETL, I voluntarily omit the Lambda and SQS costs related to the ListPages
andSplitPages
Lambda functions as well as the Pages
SQS queue, as there are only 125 pages in my bucket and the Lambda and SQS costs would be negligible. Also, I omit the costs related to reading and writing data from S3, as it’s independent of the pipeline architecture.
Considering that the bucket contains approximately 125 000 objects, we can estimate that the Lambda + SQS integration executed 12 500 SendMessageBatch
(1 call for 10 S3 objects) and 12 500 ReceiveMessage
calls (again, 1 call for 10 S3 objects). Also, during my test run, the S3Objects
Lambda was invoked approximately 13 600 times for a total of 203 200 000 milliseconds of execution. Also important to note is that I configured this Lambda to use the lowest possible memory, which is 128 MB.
Lets break down the costs:
- 12 500
SendMessageBatch
requests @ $0.0000004/request -> $0.005 - 12 500
ReceiveMessage
requests @ $0.0000004/request -> $0.005 - 13 600
S3Objects
invoke requests @ $0.0000002/request -> $0.00272 - 203 200 000 ms of execution time @ $0.00000000208/ms -> $0.422656
So, the total execution cost of a single ETL run on the whole bucket (excluding the S3 costs) amounts to approximately $0.43.
Wrap up
The new integration between SQS and Lambda allowed me to create a reasonably fast, cheap, serverless ETL pipeline. Even though there are probably faster ways to run ETL on S3, this method proved to be a good match for the task at hand. I would be curious to retry this serverless approach on a much larger bucket.