Taming StepFunction’s Concurrency With SQS

Hey SQS, ‘Un’-Watch This!

Or
Melio’s R&D blog
Published in
9 min readSep 4, 2024

--

As software engineers and architects, we’re tasked with maximizing the potential of the tools and technologies at our disposal. To an outside observer, some of our implementations might seem like we’re bending the rules, pushing these tools beyond their intended use. But perhaps that’s exactly how they should be used — by placing the puzzle pieces in ways that weren’t initially designed, we unlock new possibilities. In this post, we’ll dive into one such example, where we took common AWS services and adapted them to handle parallel processing, showing how we can truly harness their full capabilities in ways that go beyond the conventional.

At Melio we’re building cloud-native and serverless systems in various shapes and sizes. It has served us tremendously in a large number of use cases and workflows. In its simplest form, a true serverless system is a distributed system. Many bits and pieces of code and infrastructure are combined to form larger cohesive processes. And as with any distributed process, there are always interesting challenges to solve.

Concurrency and Step Functions Don’t Mix Well

Some of Melio’s systems perform batch processing at certain times of the day, which usually aligns with bank hours to process payments. In our example, the main process is a StepFunction state machine, which is triggered for every job that is submitted.

A StepFunction that calls an API Gateway, fetches information from DynamoDB and then a third-party API. Finishes by notifying a SNS topic on a success or failure.
Real-world workflows are almost guaranteed to be more complex

Our StepFunction performs a lot of tasks, some of them communicating with external services to validate data, pull information, and do other tasks related to the job itself. As one would expect, those external services have their limits. Some of these limits are rate limits, and some are concurrency limits.

A common way to deal with request failures of rate and concurrency limits is to perform retries. For example, if the server you call returns a 429 status code, you should respect the Retry-After header and retry your request. Sometimes you might not even get a proper 429 response, which makes it even harder to know in what way you should address the error response.

StepFunction workflow calling a third-party API, and then retrying if a 429 status received until a successful response is received. Finally publishing a SNS message on success.
If received 429 status, we could retry until we get a successful response

The problem with the above, even if we get a proper response, is that we don’t have a way to control these limits on our side of the request. In StepFunction, executions are independent of each other and there is no built-in way to communicate this retry or limit behavior to other running executions. This is similar to running multiple independent processes on EC2 instances, without any coordination of limits.

We can solve this problem by wrapping the third-party APIs we’re accessing with our own gateway that will coordinate these limitations. But this solution is cumbersome and doesn’t solve the issue at large. Our StepFunction executions will still be running and retrying endlessly until getting a proper response, and at times we can reach a very large number of running executions, wasting resources and money.

The ideal solution would be to limit the total number of StepFunction executions, but without creating a large number of failed requests or throttling errors. Additionally, we aim to ensure fair resource utilization so that attempts do not compete with each other. This is when we looked at SQS FIFO as a potential component in this situation.

SQS FIFO To the Rescue

SQS FIFO queues are SQS queues but with certain guarantees. The main guarantee, as the name suggests, is to provide messages in the order they were received. Another one is that messages will be processed exactly-once. To achieve this, while a message is being processed the queue will block and not provide any messages to consumers until the message that is in progress is done.

This means that we can only process messages one by one (or batch by batch). In real-world use cases, this is not always required, since the queue is handling messages of different processes or entities that are not related to each other.

Understanding Message Grouping

For this reason, SQS FIFO introduced the concept of a MessageGroup. This allows you to specify the relationship between messages and make SQS FIFO apply these limitations per group. So specifying a MessageGroupID basically creates sub-queues inside the general queue to maintain the order within a group.

Example of how message ordering and deduplication work in an Amazon SNS FIFO topic scenario involving different AWS services and message group IDs. It shows the flow of messages from Lambda functions through an Amazon SNS FIFO topic to various types of Amazon SQS queues (FIFO and standard), maintaining strict order in FIFO queues while demonstrating the potential disorder in standard queues.
Source: https://docs.aws.amazon.com/sns/latest/dg/fifo-message-grouping.html

Using SQS FIFO Message Groups to Limit Concurrency

We took this grouping capability and used it to apply concurrency limits on our StepFunction executions nicely and cleanly with a small trick to make it happen.

An architecture diagram describing a pipeline of SQS FIFO feeding into StepFunction through a Lambda function handler. That handler is returning a “failed to process” to the LambdaTrigger so that it won’t really delete the message, leaving this delete action to the workflow. This way the workflow has full control over the messages.
What makes this workflow possible are steps (3) and (5).
  1. A general queue accepts jobs to be processed.
  2. The handler for the general queue accepts the message and picks a random MessageGroupID, where the range of the group ID is the concurrency number we’re looking for.
  3. The handler then pushes the message to a FIFO queue, along with the MessageGroupID.
  4. Another handler, this time of the FIFO queue, pulls the next message and executes our StepFunction.
  5. As the StepFunction executes, the handler actually responds that the message was not processed successfully in order for the queue to wait before handing another message.
  6. Once the StepFunction has finished, it deletes the message with the provided ReceiptHandle.
  7. Steps 4–6 now repeat as messages become available in the various groups.

This implementation results in a Standard StepFunction executing many parallel jobs but will not go beyond our defined number of concurrent executions. The example shows a small limit of five, but the idea is to have hundreds and thousands of groups. One of the bigger benefits of this over other methods is that we don’t waste job state transitions or cycles trying to acquire a lock or retrying many failed requests — we only run the amount of StepFunction executions that we want. As you probably noticed, there are a couple of “tricks” that are used together.

So What Are Those Tricks?

Utilizing Partitions for Random Group ID Assignment

The first one is assigning a message group ID before processing, to limit the concurrency. For example, if we want to limit the concurrency to 200 we will pick a random number between 1–200. In SQS FIFO nature, this makes it so that messages with the same group ID will be blocked while one of them (or batch) is being processed. But, if you look at the broader horizontal view of multiple groups this also means that the queue will allow processing a maximum number of messages that correspond to the number of groups. In our case, we process them one by one so we get a maximum number of in-flight messages matching the group count.

Grouping messages to control concurrency alone isn’t sufficient. If Lambda processes an item and returns a ‘success,’ the message is deleted from the queue. This creates a significant issue: StepFunction executions would be piling up and our entire concurrency model will collapse. We needed a way that will allow us to delete the message only when the StepFunction execution really ends.

Controlling Message Handling with BatchItemFailure

The second trick is by signaling back to the “Lambda trigger” that we failed to process the message. Yes, this is counter-intuitive but it’s important to understand what happens when we do this. In our serverless system, we use a lot of Lambda triggers to consume SQS messages. When one of these fails, “Lambda trigger” basically drops the message from handling and allows the visibility timeout mechanism to take over. If we take this into account, we can basically “take control” of the message handling and hand them to the StepFunction execution.

In default Lambda configuration, throwing an error from Lambda is enough to signal back a failure. However, there are a few side effects that happen when you do it this way. First, you get invocation errors in your monitoring system. Second, AWS automatically scales down your Lambda execution attempts so that your system will not be overwhelmed with failed processes or retries. Instead, we turn on the batchItemFailure for our Lambda trigger. Although the name implies batch processing, you can still use it for single message processing.

// Lambda handler example that returns a batch item failure.
// For simplicitly this code assumes that only one job "Record" has been received.

exports.handler = async (event, context) => {
try {
// .. Do something with the job
} catch (e) {
// Return the batch item failures structure to signal the item has "failed"
return {
batchItemFailures: [
{
itemIdentifier: event.Records[0].messageId
}
]
};
}
};

Using the batchItemFailure return structure not only can we signal back a failure without throwing an error, showing a successful invocation, but Lambda explicitly will not scale down our Lambda concurrency, allowing it to deal with a high volume of message processing.

And with this we can achieve a very simple, robust, fully serverless, and accurate self-limiting system for concurrency. However, there are still some limitations to this method. Let’s discuss them and consider some alternatives.

We Still Need to Consider Limits

FIFO queues can only have a maximum of 20K in-flight messages at any given time. So, using one queue you can only reach concurrency of 20K jobs. For us that was more than enough, but if you need more it’s also possible to have more queues and alternate between them.

Besides an upper limit, there’s also a practical lower limit. Since we’re relying on message grouping on the FIFO queue, we’re basically predetermining a lane for the message to be processed. FIFO will not “switch” groups of a given message if one of the groups finishes all their messages. Because of that, this solution will not work well for small concurrency numbers.

If you need a low concurrency number, it’s probably better to use a lock acquiring mechanism or a similar pattern. Usually low concurrency numbers also mean that the amount of jobs is meant to be low so there’s less concern about the large number of state transitions that wait for a lock.

It’s important to take into account that we have essentially taken control of the message processing. Since jobs can actually fail for various reasons, we will want to do one of the following:

  1. Call “ChangeVisibilityTimeout” on the message so that it returns immediately to the queue.
  2. Delete the message and manually re-submit it or move it to our own dead-letter queue.

If we fail to do one of these, we will end up blocking all of the messages in a certain group for no real reason. Remember, we use these groups for concurrency and not because the jobs actually depend on one another.

Finishing with one small improvement

One way to expand this solution is to keep in mind that we are not really bound to a fixed concurrency number. Some of our jobs might need to be limited to 50 concurrent runs, others may have 1500 concurrent runs. This can easily be achieved with the same code by changing the message group ID to a {type}-{number}. For example, we can have these two ranges a-[1–50] and b-[1–1500], which will each apply a different concurrency limit using the same pipeline.

At the center of this pattern are two distinct uses of different mechanisms around Lambda. One is grouping for concurrency, and the other is signaling a failed processing to take control of the message. While grouping does not really break any paradigms, returning the failed processing of a message is actually using an abstraction that perhaps we’re not meant to use.

Usually breaking abstractions is something I’m not in favor of, but this solution is so simple and “basically works” that the pros really outweigh the cons. In a more philosophical sense, the key takeaway is that we actually should be exploiting the resources and tooling and squeeze as many useful use-cases and possible solutions as we can, especially with proven and stable services like SQS.

If you want to experiment with this workflow, we have published a ready-to-use SAM template. Happy hacking.

visit our career website

--

--