Building an ETL process (or ELT really) with AWS Step Functions and Lambda

Lloyd Xie
9 min readDec 3, 2019

--

Screenshot from Step Functions Console

Problem Statement: Extract and transform all data in a fast and scalable manner.

Beyond throughput and scaling issues, we came up with a list of other issues with our existing ETL process, some major and some minor:

  1. Fault-tolerance. Individual tasks should not cause congestion or the system to stop execution entirely.
  2. Time/memory running out on Lambda.
  3. Unable to dynamically scale-up task resources if a giant amount of work is needed
  4. Weak error-handling

But let’s keep the main goal in mind and not lose sight of it as we dive into the gritty nitty. Hard, yes. Essential, also yes.

Before, we executed a state machine every 2 minutes that was set to extract/transform 0.1 % of data each time. This way we would get to 100% after a few days or so. However, a cron timer doesn’t exactly scale with increased input size. This translates into the first principle of ETL design — Dyjak’s Law:

Do work if there is work to be done — in a continuous session. Poll for work until there is none left.

Concurrency and looping are the main tools at our disposal in Step Functions. The state machine is executed every 12 hours. The initial trigger does work until there is no more work to be done (data to extract). It should process all data in ~5 hours if building from a blank slate; about 3 hours if otherwise. 5 hours of thousands of concurrently running lambdas within a Step Function execution. And then 5 concurrently running Step Function executions at any given time.

So every 12 hours, the state machine polls for work. This can cause repeated work and grab duplicate/unnecessary data! A quick and dirty solution is to store a column named “last_run” in each layer and only extract data if the layer hasn’t been extracted in the last day, week, or month. The filter logic exists in the queries.

One implementation detail deals with the technical limitation on the MAXIMUM # of Step Function events (25,000 events) — so the work has to be segmented. Source: https://docs.aws.amazon.com/step-functions/latest/dg/bp-history-limit.html. We stop a current execution if we feel the 25k limit is almost reached and trigger a new Step Function execution to resume execution. Because you cannot somehow obtain the # of events within an execution, we approximate with branching logic inside the state machine that stops execution once 2000 raw insights have been transformed or 1000 insights have been extracted. Each extract/transform requires anywhere from 5–10 state transition events.

High-Level Overview

GMB data is structured in a parent-child relationship. Account → Location → Insight.

In layman’s terms, it grabs all GMB Accounts in JSON. Stores the raw response first. Transforms those. Move on to Locations. Repeat. Move on to Insights. Repeat.

The devil is in the details for each layer due to the wildly varying payload size at each level. But a divide-and-conquer approach with concurrency gave us a more robust system that could do what previously took weeks to do in a few hours. Extract and transform all data in a fast and scalable manner.

The Three Layers

There are 3 sub-layers to the full state machine. Accounts, Locations, and Insights — they are somewhat segregated vertically in the visual diagram. The top 2, Account and Locations, extract their corresponding data fast — because there is minimal data. 134 Accounts and ~15k Locations as of today. It takes 5 minutes to get to the LocationsComplete state. Everything below is part of the Insights layer. This is a more complex flow that I will break down further into sub-parts after a quick introduction to the structure and purpose of each layer.

Account Layer

The simplest layer. 2 states that each all a respective lambda. Extract + Transform. We send 1 request to get 134 accounts and transform all in one shot. Simple and easy, no blood on our hands.

Location Layer

We first ran into a Lambda time out error (15 min) at this Location layer during the first release of GMB (before this refactor). The cause was trying to pack in too much JSON data to the raw table (10 MB per row) — all the locations from 10 accounts. Instead, we execute 1 lambda per account to get a raw response of all locations under ONLY that account. The payload ranges from 100KB to the largest main PP account (the name is “practices 13000+”) being 22 MB. Holy zoinks. Albeit being the limiting factor, it can still finish in 20 seconds compared to the sub-second runtime of other payloads. Nice!

Sidenote #1: You may notice when we have 134 accounts, we only have 130 raw locations. This is because if the account has 0 locations AKA EMPTY, we do not keep it. Keeping the world greener by tossing our trash out.

Now, if we did each execution sequentially, this could take a while. So we utilize one of Step Functions newer features called “MapIterator ”, a state which allows concurrency in a single state definition. Which is what we have done with that blue-shaded box in the Flow Diagram. That is 2 lambdas executed 134 times over.

Sidenote #2: Step Functions is not perfectly concurrent as you can see from the logs. There is a build-up in the initial start delay for each subsequent lambda being executed. This shouldn’t matter but it is something to factor into any math calculations.

Now the question is — how do we know how many concurrent lambdas to start? Well, we know 150 is a good upper bound. But alas, RATE LIMITS. Google will only allow 50 QPS (queries per second). Ok fine.

Let us view our set of concurrent lambdas as an encapsulated black box that can be repeated in a loop AD INFINITUM (or till there are no more accounts left to extract locations from). To play it safe, let’s choose 30 lambdas/QPS at a time, so our “black box” of concurrent badassery should run 5 times, in our specific case.

This is where the Choice state comes in. This is simply just if-else logic in Step Functions language. A mechanism to branch to multiple paths. So in order to loop our black box — we first create a new initial state that queries accounts for ones that need to still be translated. If there are accounts returned by the query (max of 30 accounts) → then there is work to be done → so we branch to the black box. If not, then move on to the next layer → INSIGHTS. Oh but before we jump the gun, let’s cover important ground about this Choice state. Essentially we are looping infinitely, so it’s good to implement a fail-safe mechanism (thanks Shawn for the tip). Well, that safety mechanism has already been implemented by the stop-restart logic on the overall Step Function execution.

How long does this process take? About 5 minutes.

Insights Layer

OK, first of all — why the hell is this layer looking much more complicated? Well, it’s not.

Keep the concurrent lambdas black box in mind as an entity — we will be reusing it once in the layer. Actually right at the start. The goal here is that before we even start to extract insights — we want to deplete our store for raw insights that have not been transformed yet. Think of it like fixing bugs before starting new features. A worthy ideal that all software creation should aspire to. Visually in the diagram, this is the middle section.

The bottom third section is where the extract actually happens. Instead of doing concurrent lambda executions, we found a way to batch requests in the Google Client of the Python code. So we batch 10 locations into a single request to grab all insights for those locations. Now, the 10 locations are not guaranteed to be under the same account. There is a possible mix-and-match. We cannot use a domain-unique identifier like the account name to seamlessly transform the batch we just extracted. We are left at the whims of randomness when we decide what to transform at the transform step. But let us remind ourselves of the goal. Extract and transform all data in a fast and scalable manner. So we sacrifice consistency for throughput by doing work at random.

The lower right box contains 3 states: 1 extract and 2 transforms. The 1 extract state actually grabs 10 insights. We are able to transform a measly 2 of those each time. (Any more would be pointless and nullified by DB locking transactions, multiple writes of the same data are ignored). This is because the same row would be transformed if there were 2 or 10 raw insights. They start in parallel so they grab the same row, so the second lambda is staggered with a 1-second wait…that is a hacky workaround. The entire parallel block requires all child states to finish before it can finish — so we don’t want too many states that have to wait 2 seconds, 3 seconds, and so on… On average, the extract takes 6–8 seconds and the transform takes 3–5 seconds, which leaves us that 1 second to spare to use as a stagger.

Really though, this doesn’t solve the problem of transforming all raw insights. Honestly, this is quite an unnecessary step and is meant to be a minor optimization. But it has an advantage. Rather than extract ALL AT ONCE and then transform ALL AT ONCE — only to the see a potential failure in each step of the chain after one has entirely finished — let’s check earlier on if the entire chain can work correctly. So think of this optimization as a way to test early if transforms work.

But really, the insights transform 1 and 2 states can be removed. Simply, because we can launch 1000 lambda transforms in the next step to quickly transform everything. Which is what happens when we loop back to the concurrent transform lambdas black box.

Again, we use the choice state to loop this section 100 times with 10 locations each time → a total of 1000 locations. It seems like a good enough point to stop before traveling all the way up back to the leftover raw insights transform the state to do work there. So now we cycle back to the middle section of the diagram. Essentially, the second and third sections of the diagram comprise one giant cycle. The cycling also lets us debug when looking at the AWS Console log. Otherwise, you’d suffer like I did to click “Load More” 200 times+ each debugging experience to finally see the error message.

Now finally, let’s zoom out to see if we can make this even faster. If we run 5 Step Function executions simultaneously, this will increase the throughput linearly. The isolation principle of concurrency holds true for the Insights Layer — if you were to run executions sequentially, the resulting data state is the same when running in parallel. This is why the starting point is a lambda that triggers 5 simultaneous Step Function executions (and not the Step Function state machine itself)

FAQ

  1. Why don’t we return a response for the transform lambdas in the Python code?

A: The response data size adds up if you have 2000 lambdas in parallel that join their output. Step Functions has a 32K character limit.

2. What happens when duplicate data is inserted?

A: We extended SqlAlchemy with an upsert implementation. Ignore on conflict. Note: The table must have unique constraints on specific columns for it to know when the conflict happens.

How can this be improved?

The next improvement to make is to isolate each layer. There is no reason each layer cannot be its own state machine.

Loose coupling, single responsibility, etc. One thing that speeds up performance is running multiple executions in parallel. If each layer was isolated, this allows you to target the insights layer for increasing throughput.

Misc Resources:

Cost: <$10 a month for AWS-related bills.

https://www.google.com/search?q=step+function+state+transition+cost&oq=step+function+state+transition+cost&aqs=chrome..69i57j0.3965j0j1&sourceid=chrome&ie=UTF-8

Step Function Loops:

https://oliverroick.de/code/2018/looping-step-functions.html

GMB API LIMITS:

https://developers.google.com/my-business/content/limits

PARALLEL LAMBDAS:

https://aws.amazon.com/blogs/aws/new-step-functions-support-for-dynamic-parallelism/

STEP FUNCTION LIMITS (I hit these a few times)

https://docs.aws.amazon.com/step-functions/latest/dg/limits.html

--

--