Imagine having to turn a ton of meat into 125 000 sausages in half and hour with a 43 cents budget. It’s basically impossible (and definitely not the subject of this article). What I can make you do with that time and money though is extracting, transforming and loading 125 000 S3 objects using Lambda and SQS.

Serverless ETL using Lambda and SQS

Simon-Pierre Gingras
poka-techblog
Published in
7 min readAug 13, 2018

--

AWS recently introduced a new feature that allows you to automatically pull messages from a SQS queue and have them processed by a Lambda function. I recently started experimenting with this feature to do ETL (“Extract, Transform, Load”) on a S3 bucket. I was curious to see how fast, and at what cost I could process the data in my bucket. Let’s see how it went!

Note: all the code necessary to follow along can be found at https://github.com/PokaInc/lambda-sqs-etl

The goal

Our objective here is to load JSON data from a S3 bucket (the “source” bucket), flatten the JSON and store it in another bucket (the “destination” bucket). “Flattening” (sometimes called “relationalizing”) will transform the following JSON object:

{
"a": 1,
"b": {
"c": 2,
"d": 3,
"e": {
"f": 4
}
}
}

into

{
"a": 1,
"b.c": 2,
"b.d": 3,
"b.e.f": 4
}

Flattening JSON objects like this makes it easier, for example, to store the resulting data in Redshift or rewrite the JSON files to CSV format.

Now, here’s a look at the source bucket and the data we have to flatten.

Getting to know the data

Every file in the source bucket is a collection of un-flattened JSON objects:

Sample of a file in the source bucket. Each line of the file is a different JSON object.

Each line represents an event (for example, a user liked a post) that was captured in Poka. We use this data internally to build usage analytics reports.

Finally, here’s some more stats about the bucket:

  • Number of objects in the bucket: 125 179
  • Total bucket size: 13.5 GB
  • Object size distribution:

As you can see from the chart above, most of the objects in the bucket are below 200 KB in size.

Keeping eyes on the prize

The core of the task at hand is, for each object in the source S3 bucket, to:

  • Read the object, line by line
  • Flatten each line
  • Store all the flattened lines of the source S3 object in the destination bucket

In the end, each file in the source destination bucket will have a sibling file with the same key in the destination bucket:

The objects have the same keys in both buckets

All in all, pretty simple. Next, we’ll see how to accomplish the ETL using SQS and Lambda.

How it works

Here’s an overview of the ETL pipeline I created leveraging AWS Lambda and SQS:

Let me guide you through all the steps numbered on the diagram.

  1. The ListPages Lambda lists all pages in the source S3 bucket. A page is a group of up to 1 000 object keys from the S3 bucket. The 1 000 keys limit is imposed by the S3 API. Note that this Lambda is managed by a state machine. The state machine allows re-invocation of the Lambda, so we can thoroughly list all pages in the bucket (which can take more than the 5 minutes allowed by Lambda).
  2. Each page is sent as a message into the Pages SQS queue.
  3. The SplitPages Lambda function automatically (thanks to the Lambda + SQS integration) receives pages messages from the Pages SQS queue. Each invocation of the SplitPages lambda receives at most 1 page.
  4. The SplitPages Lambda inserts the keys contained in the page into the S3Objects SQS queue. The keys are inserted in groups of 10, because a SQS queue will accept at most 10 messages in a single API call. This means that a single invocation of the SplitPages Lambda will execute up to 100 SendMessage calls on the S3Objects queue.
  5. The Transform Lambda receives S3 object keys from the S3Objects queue. Again, this is all automatic with the Lambda + SQS integration. Note that heavy horizontal scaling happens at this stage. The Lambda + SQS integration will continually increase the number of parallel executions of the Transform Lambda until the queue is depleted.
  6. For each S3 object key received, the Transform Lambda will load the S3 object in memory, split the contents of the file line by line, flatten each line and store the resulting JSON in the destination bucket. Since only 10 messages can be retrieved from a SQS queue in a single API call, up to 10 S3 objects are processed by a single Transform Lambda invocation.

Why pages?

The main bottleneck of this ETL pipeline is getting to insert all the source bucket’s keys in the S3Objects SQS queue as quickly as possible. Once the keys are in the S3Objects queue, you just crack a cold one and let the Lambda + SQS integration scale out to deplete the S3Objects queue automagically.

To populate the S3Objects SQS queue with all keys in a S3 bucket, a few strategies come to mind.

The easiest, but slowest, would be to list keys in the source S3 bucket, and insert them 10 at a time in the S3Objects queue. Considering that the source bucket contains over 125 000 objects, inserting 10 objects at a time in a single SQS queue would require at least 12 500 SendMessageBatch API calls to be executed in sequence. That would be pretty slow.

What we want is to maximize the number ofSendMessageBatch calls in a given time period. To that end, you would want to send multiple SendMessageBatch calls in parallel.

The faster strategy I came up with is to split the source bucket into groups of keys (pages) and let parallel Lambda invocations deal with individual pages. Now, each parallel invocation of the SplitPages Lambda will call SendMessageBatch at most 100 times. This strategy greatly improves the speed at which messages are inserted in the S3Objects queue.

How fast did it go?

Let’s have a look at how the run performed:

The number of lines and S3 objects processed during the ETL job

Looking at the graph above, we can see that the whole ETL took about 20 minutes to complete. At peak, 2.51 million lines from 10 800 S3 objects were processed every minute.

Improving throughput

If you were to populate the S3Objects SQS queue even faster, I’m sure the throughput could be even better. As the queue would get longer, the Lambda + SQS integration would continue to pull messages out of the queue faster and invoke more Lambdas in parallel, until the concurrency limit of the Lambda (or the AWS account) is reached.

Another strategy that can be adopted requires S3 buckets that have a file system-like structure. For each root directory in the S3 bucket, you could invoke a Lambda that would in turn list all sub-directories in the root directory, in recursive fashion. If a directory contains keys, the Lambda inserts those keys in the S3Objects queue. Supposing a bucket containing 1 000 leaf directories that each contain 100 S3 objects, you could potentially run 1 000 Lambdas (one per directory) in parallel, inserting 100 000 messages in the S3Objects queue in under 5 minutes.

Cost analysis

To calculate the cost of running this ETL, I voluntarily omit the Lambda and SQS costs related to the ListPages andSplitPages Lambda functions as well as the Pages SQS queue, as there are only 125 pages in my bucket and the Lambda and SQS costs would be negligible. Also, I omit the costs related to reading and writing data from S3, as it’s independent of the pipeline architecture.

Considering that the bucket contains approximately 125 000 objects, we can estimate that the Lambda + SQS integration executed 12 500 SendMessageBatch (1 call for 10 S3 objects) and 12 500 ReceiveMessage calls (again, 1 call for 10 S3 objects). Also, during my test run, the S3Objects Lambda was invoked approximately 13 600 times for a total of 203 200 000 milliseconds of execution. Also important to note is that I configured this Lambda to use the lowest possible memory, which is 128 MB.

Lets break down the costs:

  • 12 500 SendMessageBatch requests @ $0.0000004/request -> $0.005
  • 12 500 ReceiveMessage requests @ $0.0000004/request -> $0.005
  • 13 600 S3Objects invoke requests @ $0.0000002/request -> $0.00272
  • 203 200 000 ms of execution time @ $0.00000000208/ms -> $0.422656

So, the total execution cost of a single ETL run on the whole bucket (excluding the S3 costs) amounts to approximately $0.43.

Wrap up

The new integration between SQS and Lambda allowed me to create a reasonably fast, cheap, serverless ETL pipeline. Even though there are probably faster ways to run ETL on S3, this method proved to be a good match for the task at hand. I would be curious to retry this serverless approach on a much larger bucket.

--

--