Step Functions is one of my favourite AWS services. I enjoy experimenting with it, especially when new features are released. In my opinion, two of the most important changes since launch have been the additional Service Integrations and Task Tokens.
Prior to the new Service Integrations, if you wanted to interact with other AWS services from your Step Function, you needed to invoke a Lambda function that would call the other service. On Nov 29, 2018, AWS added eight new Service Integrations that allow you to directly call more core services like DynamoDB, SQS, and SNS.
Service Integrations give Step Functions a nice way to access other services. On the other hand, until recently, the only way the outside world could interact with a running Step Function was to abort it. That changed on May 23, 2019 when Task Tokens were released as an extension to the Task state.
Now, when you append
.waitForTaskToken to a Task state’s resource, an opaque token is generated for you to pass to another service. For example, you could include the token in the message you’re sending to an SQS queue. The Step Function will then pause until a task success or failure API call is made containing the token. The callback also contains a JSON string that becomes the output of the Task state.
Service Integrations combined with Task Tokens enables native asynchronous communication. Previously, you’d achieve this by polling a database or having multiple Step Functions, both of which are more expensive and complex. The poster child for this functionality is implementing manual approval steps. However, it can also be used in automated flows to deal with temporal coupling.
Temporal coupling occurs when one service can’t continue without a response from another service. For example, a Step Function is temporally coupled to another service if it synchronously calls its REST API. Imagine an order processing Step Function that needs to call a Payment API to authorize a payment. In these cases, you must pay for a Lambda function to wait and you might pay for retries.
To remove temporal coupling, the other service should accept requests while its down and be able to respond while the requester is down. Also, following serverless principles, nothing should happen or cost anything until the response is ready. That means no polling and or synchronous waiting.
In this post, I’ll show how to implement asynchronous communication from a Step Function to another service via an SQS queue. I assume you have some Step Functions knowledge already, but will provide links where possible.
👩🏫 Basic example
Let’s take a look at the most basic example. To make things easier, I’m using the Serverless Framework with the Serverless Step Functions plugin. For simplicity, there will only be one service talking to itself. In the real world, this would usually be two separate services communicating.
The full code for this example can be found here on GitHub 📥.
example service is written in Node.js, and uses two plugins:
- serverless-step-functions — Removes a lot of the pain of defining a Step Function, its states and triggers, etc.
- serverless-iam-roles-per-function — (Optional) Lets you add IAM statements to individual functions without writing CloudFormation.
The SQS queue defined in the resources section will buffer requests from the Step Function to the “worker” service. Using a queue means the Step Function can send requests while the worker service is down, and the worker can process them when it’s healthy again.
stepFunctions section comes from the first plugin. Here I define a very simple Step Function with one
Task state named
Take a look at the
Resource property of the
PublishAndWait state. On the end is
waitForTaskToken which converts this Task from a simple API call to one that subsequently waits a callback.
Parameters property I provide the URL of the queue defined earlier. I also configure the
MessageBody so that it gets the input of the Task state and the Task Token.
Below is an example of what the messages sent to SQS will look like. Since the Task is the first state, its input comes from the Step Function execution itself.
Lastly, I define the
worker function. It’s configured with an SQS event trigger linked to the same queue the Step Function will send to.
iamRoleStatements property comes from the second plugin. I’m allowing the
states:SendTaskFailure actions so the worker can notify the Step Function when the response is ready. These actions don’t support resource-level permissions, so
Resource is all (
*). This is nice because the worker doesn’t need to know about all of the Step Functions that may send it requests. The task tokes themselves are around 620 characters long which seems secure enough.
As I said earlier, the worker would normally be a different service, but it’s defined here for the sake of simplicity.
I’m importing the
uuid package to do some fake work, along with the AWS SDK from which I create a Step Functions API client.
sleep function is just a little helper that I’ll use to make it look like the worker is doing something difficult.
Let’s dive into the handler logic.
It starts by sleeping for 2 seconds to pretend to be working. Then we check if the input
number is even which will affect the result.
number is even, we call
SendTaskSuccess. We must
JSON.stringify the output JSON before assigning it to the
uuidv4() is called to give the output some spice.
On the other hand, if
number is odd, we call
SendTaskFailure to report an
error and its
cause. You could use these for error handling with
Error properties on the Task state.
👀 Trying it out
After deploying the service, we can try it out by starting a new execution of the Step Function using the AWS console. Here I pass the number
12, which is even, so I expect it to succeed.
The Execution event history can be used to keep track of the execution.
ExecutionStarted— The Step Function execution was started.
TaskStateEntered— It transitioned to the
sqs:sendMessage.waitForTaskTokentask was scheduled to run behind the scenes.
TaskStarted— The behind the scenes task runner picked up our task.
TaskSubmitted— The message was sent to the SQS queue. You can see the SQS API
RequestIdat the bottom.
❄️ The execution is paused at this point.
TaskSucceeded— Over five seconds has elapsed. The worker handled the message and called
SendTaskSuccess. See the spicy UUID in
TaskStateExited — The Step Function is awake again and has exited the
PublishAndWait state. The output from the worker became the state’s output.
ExecutionSucceeded — There was only one state, so the execution is done.
Scrolling back up to the diagram, we can click on the
PublishAndWait state to see the same input and output with
Let’s quickly see what failures look like. This time I’ll pass a
7 in the execution input.
As you probably guessed, the Execution event history now shows a
TaskFailed record. I didn’t configure any error handling, so the error bubbled up and caused the execution to fail with
Scrolling back up again we can see the same Error and Cause that the worker returned in the
SendTaskFailure API call.
I’ve shown that Step Functions can communicate with other services via SQS queues. Not only that, but the Step Function can go to sleep while waiting for a response.
The Step Function is no longer temporally coupled to the worker service, but this came at the cost of latency. The worker sleeps for 2 seconds but execution pauses for around 5 seconds.
Another trade-off is that the worker service needs to know it should call the Step Functions API and needs permission to do so. I know some people will see this as a problem, so I’ll provide alternatives in a follow-up post.
Service Integrations and Task Tokens open up a lot of interesting patterns. For example, call two services and await both responses with a Parallel state. Something I didn’t mention is configuring the Task to timeout waiting for the callback, which is good for optional requests or error handling. What other uses have you found?
Please let me know if you have any questions or would like to hear more about a particular topic.