Test Automation Integrated into AWS Step Function Workflow

Atlantbh
Atlantbh Engineering
14 min readNov 20, 2023

Test Automation Integrated into AWS Step Function Workflow

AWS Step Function is an AWS resource that offers serverless function orchestration making it easy to sequence multiple AWS resources into one business workflow. This tool allows us to create and run many checkpointed and event-driven workflows that maintain the application state, ideal for projects such as data pipelines and many others. In this blog, we will explain the concept of development and application of the Step Function, with test automation integrated into its workflow.

Pricing of AWS Step Function

AWS Step Function pricing works based on ‘you pay what you use’. We have two types of workflows: Standard Workflows and Express Workflows. By default, Step Function uses Standard Workflow, and its price is $0.000025 per state transition. The pricing for Express Workflow is $0.000001 per request. In this blog, we will use the default option.

Step Function states

AWS Step Function provides a set of states that you can use to build workflows. Each state represents a specific action or behavior within the workflow. Here are some of the commonly used Step Function states:

  1. Task State: Represents a single unit of work in a Step Function workflow. It can be used to invoke AWS Lambda functions, run ECS tasks, or perform other activities.
  2. Choice State: Allows you to define conditional logic within your workflow. It evaluates a condition and transitions to different states based on the result.
  3. Wait State: Delays the execution of the workflow for a specific period.
  4. Pass State: Passes its input to its output without performing work. It is useful when constructing and debugging state machines.
  5. Parallel State: Allows us to run multiple branches of execution in parallel. Within the parallel state every branch can be separated.
  6. Succeed State: Ends execution as successfully processed.
  7. Fail State: Ends execution as failure. It also provides more ways to find out the root cause of errors. Additionally, it could provide us with the ‘Catch’ block if necessary.
  8. Map State: You can think of it like a for-each loop for Step Function. It allows us to iterate over an array/list of items and perform the same workflow steps for every item in the array.

Let us learn by the example:

Demo project description:

In this example, we will create a simple demo software with the Step Function, Lambda, DynamoDB, GitHub Actions, and Serverless framework for easier Step Function creation. In the DynamoDB, we will create one table called ‘messages’. This table is supposed to receive data from other imaginary services. Message record should contain the following:

  • ID,
  • First name (fName),
  • Last name (lName),
  • Region (US/EU),
  • Points,
  • Column representing flag for processed messages (is_processed)

Our Step function should contain a workflow that takes all of the unprocessed messages, separates them by the region of the US and EU users, and puts those messages into the other appropriate tables: US_users and EU_users.

Step function implementation:

We will use the Serverless framework to easily implement the Step Function, its Lambdas and IAM roles. The Serverless framework offers infrastructure as a code implementation and deployment for the AWS Step function. It uses Lambdas and additional necessary services that we need in the state machine. We will not go in-depth with the Serverless framework, but if you would like to know more, you can find it on their official website.

Follow these steps to start with the Serverless framework:

  1. Create empty folder

2. Instantiate NPM with the ‘npm init’ command

3. Install serverless framework via NPM ‘npm install -g serverless’

4. Add the following dependencies that we will further use in package.json:

{
"name": "sfn-blog",
"dependencies": {
"@aws-sdk/client-dynamodb": "^3.405.0",
"@aws-sdk/client-s3": "^3.405.0",
"axios": "^1.4.0",
"jest": "^29.4.1",
"jest-junit": "^15.0.0"
},
"version": "1.0.0",
"description": "",
"main": "index.js",
"directories": {
"test": "test"
},
"scripts": {
"test": "jest"
},
"author": "",
"license": "ISC",
"devDependencies": {
"@types/jest": "^29.2.5",
"serverless-step-functions": "^3.13.1"
}
}

5. Run ‘npm install’

6. Create serverless.yml which will be used for infrastructure as a code

7. Create for lambda functions written in Node JS

Note: Do not forget to add your AWS credentials (access key and secret key). If you are on a MAC machine, you can do this by running the following command:

sls config credentials --provider aws --key <YOUR_ACCESS_KEY> --secret <YOUR_SECRET_KEY>

8. Create DynamoDB table ‘messages” with ID type String as a Partition key:

9. Choose Customize Settings, On-demand capacity mode, and click ‘ Create table ‘ at the end of the page.

10. In the same way, create two more tables: US_users_events and EU_users_events.

11. In the serverless yml file, we will define our needed Step Function workflow:

service: sfn-blog
frameworkVersion: "3"
# Define cloud provider settings and IAM roles needed for our SF to work
provider:
name: aws
runtime: nodejs18.x
region: eu-central-1
environment: ${file(env.json)}
iamRoleStatements:
- Effect: Allow
Action: dynamodb:*
Resource: arn:aws:dynamodb:eu-central-1:178190218027:table/messages
- Effect: Allow
Action: dynamodb:*
Resource: arn:aws:dynamodb:eu-central-1:178190218027:table/US_users_events
- Effect: Allow
Action: dynamodb:*
Resource: arn:aws:dynamodb:eu-central-1:178190218027:table/EU_users_events
- Effect: Allow
Action: s3:*
Resource: arn:aws:s3:::sfn-blog
plugins:
- serverless-step-functions
# Define a path to the AWS Lambda Functions
functions:
FetchFromDynamoDBState:
handler: handler.FetchFromDynamoDBState
ProcessUSUsersEvents:
handler: handler.ProcessUSUsersEvents
ProcessEUUsersEvents:
handler: handler.ProcessEUUsersEvents
InsertMessageDynamoDBTest:
handler: handler.InsertMessageDynamoDBTest
VerifyStepFunctionOutcomeTest:
handler: handler.VerifyStepFunctionOutcomeTest
# Define Step Function and its state machine
stepFunctions:
stateMachines:
proceedRewards:
name: proceedRewards
definition:
StartAt: InsertMessageDynamoDBTest
States:
InsertMessageDynamoDBTest:
Type: Task
Resource:
Fn::GetAtt: [InsertMessageDynamoDBTest, Arn]
ResultPath: "$.response"
Next: WaitState
WaitState:
Type: Wait
Seconds: 200
Next: FetchFromDynamoDBState
FetchFromDynamoDBState:
Type: Task
Resource:
Fn::GetAtt: [FetchFromDynamoDBState, Arn]
ResultPath: "$.items"
Next: ProcessDataState
ProcessDataState:
Type: Map
ItemsPath: "$.items.items"
ResultPath: "$.mappedData"
MaxConcurrency: 2
Iterator:
StartAt: ProceedChoiceState
States:
ProceedChoiceState:
Type: Choice
Choices:
- Variable: $.region.S
StringEquals: "US"
Next: ProcessUSUsersEventsState
- Variable: $.region.S
StringEquals: "EU"
Next: ProcessEUUsersEventsState
Default: DefaultState
ProcessUSUsersEventsState:
Type: Task
Resource:
Fn::GetAtt: [ProcessUSUsersEvents, Arn]
End: true
ProcessEUUsersEventsState:
Type: Task
Resource:
Fn::GetAtt: [ProcessEUUsersEvents, Arn]
End: true
DefaultState:
Type: Fail
Cause: 'Invalid region value.'
Error: 'InvalidRegionError'
Next: VerifyStepFunctionOutcomeTest
VerifyStepFunctionOutcomeTest:
Type: Task
Resource:
Fn::GetAtt: [VerifyStepFunctionOutcomeTest, Arn]
End: true

12. Include the following Lambda functions in the handler.js file (if you are wondering why we are using these keys ‘S’, ’N’ and ‘BOOL’ in params for DynamoDB client it is because the library itself needs that specification of the data types, ‘S’ for string, ’N’ for number, ‘BOOL’ for boolean and etc.):

const { DynamoDBClient, PutItemCommand, ScanCommand, UpdateItemCommand } = require('@aws-sdk/client-dynamodb');
const dynamoDbClient = new DynamoDBClient({ region: 'eu-central-1' });
module.exports.FetchFromDynamoDBState = async () => {
try {
const params = {
TableName: 'messages',
FilterExpression: 'is_processed = :processed',
ExpressionAttributeValues: {
':processed': { BOOL: false },
}
};
const scanCommand = new ScanCommand(params);
let result = await dynamoDbClient.send(scanCommand);
const items = result.Items;
return {
items
};
} catch (error) {
throw error;
}
}
module.exports.ProcessUSUsersEvents = async (item) => {
return await processUsersEvents(item, 'US_users_events');
}
module.exports.ProcessEUUsersEvents = async (item) => {
return await processUsersEvents(item, 'EU_users_events');
}
const processUsersEvents = async (item, tableName) => {
let dataProcessed = false;
try {
await processItem(item, tableName);
await updateIsProcessedColumn(item.ID.S);
dataProcessed = true;
} catch (error) {
return dataProcessed;
}
return dataProcessed;
}
const processItem = async (item, tableName) => {
const eventItem = {
ID: { S: item.ID.S },
fName: { S: item.fName.S },
lName: { S: item.lName.S },
points: { N: item.points.N },
}
const params = {
TableName: tableName,
Item: eventItem
};
const command = new PutItemCommand(params);
await dynamoDbClient.send(command);
}
const updateIsProcessedColumn = async (itemId) => {
const paramsForUpdate = {
TableName: 'messages',
Key: {
ID: { S: itemId },
UpdateExpression: 'SET is_processed = :value',
ExpressionAttributeValues: { ':value': { BOOL: true } },
ReturnValues: 'UPDATED_NEW'
}
};
const updateItemCommand = new UpdateItemCommand(paramsForUpdate);
await dynamoDbClient.send(updateItemCommand);
}13. Run command: ‘sls deploy’ to the Step Function:

Test Automation

In real-world scenarios, this Step Function can be scheduled to run a workflow at regular intervals, such as every hour, so we have to split the test automation into two parts: data insertion and verification.

Firstly, we will create a test automation script to insert a message with test data to the DynamoDB table ‘messages’ and save this message as a test job artifact to an S3 bucket.

AWS S3 is a cloud-based object storage service provided by Amazon Web Services (AWS) (you can learn more here ) that we need in this case to store our test artifacts that will be later used from the second script that will do the verification part. You can think of the S3 bucket as a virtual container where our test data will be kept, but of course, you can use other services for this purpose. The good alternatives could be saving the data as the GitHub artifacts, or if you choose to work this with Jenkins, you can have the same concept and also save the data on your own servers. It is also important to explain that we can not save the artifacts locally on AWS Lambdas because they are serverless computing services that are packaged into isolated environments (containers) where it runs on a multi-tenant cluster of machines managed by AWS.

We will use Node JS with JEST for this example.

const { DynamoDBClient, PutItemCommand } = require('@aws-sdk/client-dynamodb');
const dynamoDbClient = new DynamoDBClient({ region: 'eu-central-1' });
const S3Client = require('./S3Client');
describe('Adding message to the DynamoDB messages table', () => {
let item;
beforeAll(async () => {
item = {
ID: { S: new Date().toISOString() },
fName: { S: 'Test_First_Name' },
lName: { S: 'Test_Last_Name' },
points: { N: '500' },
is_processed: { BOOL: false },
region: { S: 'US' }
};
});
it('Adding message', async () => {
const params = {
TableName: 'messages',
Item: item
};
try {
const command = new PutItemCommand(params);
await dynamoDbClient.send(command);
console.log('Data inserted successfully.');
} catch (error) {
console.log('Error inserting data:', error);
}
}, 10000);
it('Save message to S3 as a job artifact ', async () => {
const s3Client = new S3Client();
const bucket = 'sfn-blog';
const key = 'test-message.json';
await s3Client.removeObject(bucket, key);
await s3Client.uploadFileToS3(
bucket,
key,
JSON.stringify(item),
'application/json'
);
}, 20000);
});

Secondly, we will create a script that should be run after the Step Function finishes to verify that the message has been processed. In this script, we will pull the artifact JSON file from S3 bucket, and verify that this record can be found in the ‘US_users_events’ table in DynamoDB.

const { DynamoDBClient, ScanCommand } = require('@aws-sdk/client-dynamodb');
const dynamoDbClient = new DynamoDBClient({ region: 'eu-central-1' });
const S3Client = require('./S3Client');
describe('Verify that message has been processed', () => {
let item;
const s3Client = new S3Client();
beforeAll(async () => {
item = await s3Client.getObject('sfn-blog', 'test-message.json');
}, 200000);
it('Verify DynamoDB US_users_events', async () => {
const params = {
TableName: 'US_users_events'
};
const scanCommand = new ScanCommand(params);
const response = await dynamoDbClient.send(scanCommand);
const records = response.Items;
const testRecord = records.filter(it => it.ID.S == item.ID.S);
expect(testRecord.length).toBe(1);
expect(testRecord[0].fName.S).toBe(item.fName.S);
expect(testRecord[0].lName.S).toBe(item.lName.S);
expect(testRecord[0].points.N).toBe(item.points.N);
}, 20000);
});

We also need a helper file for S3 related methods. Create S3Client.js file:

const { S3, DeleteObjectCommand, GetObjectCommand, PutObjectCommand } = require('@aws-sdk/client-s3');
class S3Client {
s3Client;
constructor() {
this.s3Client = new S3({ region: 'eu-central-1' });
}
async getObject(bucketName, key) {
try {
const params = {
Bucket: bucketName,
Key: key
};
const getObjectCommand = new GetObjectCommand(params);
const response = await this.s3Client.send(getObjectCommand);
const bodyStream = response.Body;
let data = "";
for await (const chunk of bodyStream) {
data += chunk;
}
const object = JSON.parse(data);
return object;
} catch (error) {
console.log(error);
return null;
}
}
async removeObject(bucketName, key) {
try {
const params = {
Bucket: bucketName,
Key: key
};
const removeObjectCommand = new DeleteObjectCommand(params);
await this.s3Client.send(removeObjectCommand);
} catch (error) {
console.log(error);
return null;
}
}
async uploadFileToS3(bucketName, key, body, contentType) {
try {
const params = {
Bucket: bucketName,
Key: key,
Body: body,
ContentType: contentType
}
const putObjectCommand = new PutObjectCommand(params);
await this.s3Client.send(putObjectCommand);
} catch (error) {
console.log(error);
return null;
}
}
}
module.exports = S3Client;

We will now create workflows (think of it as a Jenkins job) for these two scripts with GitHub Actions, but you can use any tool you want, for instance, Jenkins, TeamCity, etc. If you want to know more about GitHub Actions, you can find it on their official website.

To define workflows in GitHub Actions, follow these steps:

  1. Create a repository on GitHub,
  2. Upload test files, including package.json,
  3. Click on the ‘Actions’ button
  4. Click on the ‘New workflow’ button,
  5. Choose ‘Node.js’ workflow,
  6. Define your workflow for the first test script as a yml file inside folder ‘.github/workflows’:
name: insert-message-DynamoDB
on:
workflow_dispatch:
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AWS_REGION: eu-central-1
jobs:
insert_message_in_DynamoDB:
name: Insert message in DynamoDB
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Set up Node.js
uses: actions/setup-node@v2
with:
node-version: '18'
- name: Configure AWS credentials
run: aws configure set aws_access_key_id ${{ env.AWS_ACCESS_KEY_ID }} && aws configure set aws_secret_access_key ${{ env.AWS_SECRET_ACCESS_KEY }} && aws configure set region ${{ env.AWS_REGION }}
- name: Install dependencies
run: npm install
- name: Run InsertMessageDynamoDB tests
run: npm run test insertMessageDynamoDB.test.js

7. Do the same for the second script:

name: verify-step-function-outcome
on:
workflow_dispatch:
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AWS_REGION: eu-central-1
jobs:
verify_outcome:
name: Verify Step Function Outcome
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Set up Node.js
uses: actions/setup-node@v2
with:
node-version: '18'
- name: Configure AWS credentials
run: aws configure set aws_access_key_id ${{ env.AWS_ACCESS_KEY_ID }} && aws configure set aws_secret_access_key ${{ env.AWS_SECRET_ACCESS_KEY }} && aws configure set region ${{ env.AWS_REGION }}
- name: Install dependencies
run: npm install
- name: Run VerifyStepFunctionOutcome tests
run: npm run test verifyStepFunctionOutcome.test.js

8. Add InsertMessageDynamoDBTest as a first step in the Step Function States array, which should invoke the Lambda function that will trigger the Github Action workflow for running the first test script that will insert a message in DynamoDB ‘messages’ table,

9. Add a wait state after state for triggering the first test script so we can be sure that script execution on the GitHub side has been finished

10. Add a state at the end of the Step Function that should invoke the Lambda function that will trigger the Github Action workflow for running the second test case that will verify that the message has been processed successfully

11. Add env.json file with the GitHub Actons credentials and trigger IDs:

{
"INSERT_ACTION_TRIGGER_ID": <GITHUB_WORKFLOW_ID_FOR_DYNAMODB_TEST_DATA_INSERTION>,
"VERIFY_ACTION_TRIGGER_ID":<GITHUB_WORKFLOW_ID_FOR_DYNAMODB_TEST_DATA_VERIFICATION>,
"GITHUB_OWNER": <YOUR_GITHUB_USERNAME>,
"GITHUB_REPO": <YOUR_GITHUB_REPO>,
"GITHUB_TOKEN": <YOUR_GITHUB_TOKEN>
}

12. Specify the environment file inside serverless.yml file under the provider configuration

The final version of the serverless.yml file for the Step Function definition should look like the following:

service: sfn-blog
frameworkVersion: "3"
# Define cloud provider settings and IAM roles needed for our SF to work
provider:
name: aws
runtime: nodejs18.x
region: eu-central-1
environment: ${file(env.json)}
iamRoleStatements:
- Effect: Allow
Action: dynamodb:*
Resource: arn:aws:dynamodb:eu-central-1:178190218027:table/messages
- Effect: Allow
Action: dynamodb:*
Resource: arn:aws:dynamodb:eu-central-1:178190218027:table/US_users_events
- Effect: Allow
Action: dynamodb:*
Resource: arn:aws:dynamodb:eu-central-1:178190218027:table/EU_users_events
- Effect: Allow
Action: s3:*
Resource: arn:aws:s3:::sfn-blog
plugins:
- serverless-step-functions
# Define a path to the AWS Lambda Functions
functions:
FetchFromDynamoDBState:
handler: handler.FetchFromDynamoDBState
ProcessUSUsersEvents:
handler: handler.ProcessUSUsersEvents
ProcessEUUsersEvents:
handler: handler.ProcessEUUsersEvents
InsertMessageDynamoDBTest:
handler: handler.InsertMessageDynamoDBTest
VerifyStepFunctionOutcomeTest:
handler: handler.VerifyStepFunctionOutcomeTest
# Define Step Function and its state machine
stepFunctions:
stateMachines:
proceedRewards:
name: proceedRewards
definition:
StartAt: InsertMessageDynamoDBTest
States:
InsertMessageDynamoDBTest:
Type: Task
Resource:
Fn::GetAtt: [InsertMessageDynamoDBTest, Arn]
ResultPath: "$.response"
Next: WaitState
WaitState:
Type: Wait
Seconds: 200
Next: FetchFromDynamoDBState
FetchFromDynamoDBState:
Type: Task
Resource:
Fn::GetAtt: [FetchFromDynamoDBState, Arn]
ResultPath: "$.items"
Next: ProcessDataState
ProcessDataState:
Type: Map
ItemsPath: "$.items.items"
ResultPath: "$.mappedData"
MaxConcurrency: 2
Iterator:
StartAt: ProceedChoiceState
States:
ProceedChoiceState:
Type: Choice
Choices:
- Variable: $.region.S
StringEquals: "US"
Next: ProcessUSUsersEventsState
- Variable: $.region.S
StringEquals: "EU"
Next: ProcessEUUsersEventsState
Default: DefaultState
ProcessUSUsersEventsState:
Type: Task
Resource:
Fn::GetAtt: [ProcessUSUsersEvents, Arn]
End: true
ProcessEUUsersEventsState:
Type: Task
Resource:
Fn::GetAtt: [ProcessEUUsersEvents, Arn]
End: true
DefaultState:
Type: Fail
Cause: 'Invalid region value.'
Error: 'InvalidRegionError'
Next: VerifyStepFunctionOutcomeTest
VerifyStepFunctionOutcomeTest:
Type: Task
Resource:
Fn::GetAtt: [VerifyStepFunctionOutcomeTest, Arn]
End: true

The additional Lambdas for triggering GitHub workflows should look like this:

const axios = require('axios');
module.exports.InsertMessageDynamoDBTest = async () => {
await triggerWorkflow(process.env.INSERT_ACTION_TRIGGER_ID);
}
module.exports.VerifyStepFunctionOutcomeTest = async () => {
await triggerWorkflow(process.env.VERIFY_ACTION_TRIGGER_ID);
}
const triggerWorkflow = async (workFlowId) => {
const owner = process.env.GITHUB_OWNER;
const repo = process.env.GITHUB_REPO;
const token = process.env.GITHUB_TOKEN;
let response;
try {
response = await axios.post(
`https://api.github.com/repos/${owner}/${repo}/actions/workflows/${workFlowId}/dispatches`,
{
ref: 'main'
},
{
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
}
}
);
console.log('Job triggered successfully:', response.data);
} catch (error) {
console.error('Failed to trigger job:', error.response.data);
}
return response;
}

The final version of the Step Function should look like the following:

This way, we will ensure that our Step Function continuously processes the messages successfully.

Note: The testing states should be present only in the testing environments.

Alternatives

We could implement test automation for this kind of system by using other approaches. For example, we could create scheduled triggers for GitHub Actions Workflows. The first one for data insertion to the DynamoDB could be scheduled at the beginning of the day, and the second one for the verification of processed data could be scheduled at a time when we expect that at least one Step Function Workflow execution has been finished. The advantage of this approach is that we could avoid using the first and the last lambda functions for triggering the GitHub Actions Workflows that we specified in our Test Automation part. The advantage of the first approach that we explained is that if someone makes some changes on the Step Function, it will be verified on the next execution. We could also implement both approaches with Jenkins and any other CI/CD tool.

The main challenges

The main challenges for the implementation of test automation on systems based on Step Functions (systems that run periodically or per some event) are unique, and here are some of the main ones:

  1. Time-dependent testing: Systems triggered periodically often involve time-based operations. We may need to control the passage of time during tests to validate time-dependent behaviors.
  2. Data Security: We must ensure that test data are handled securely and that the test environments are adequately isolated, especially if we are working with sensitive data.
  3. End-to-End Testing: We need to ensure that all components work together, and it often requires two or more separate scripts that will orchestrate together by exchanging the data as artifacts for testing just one system behavior.
  4. Continuous Integration/Continuous Deployment (CI/CD): We must ensure that tests run automatically with each deployment.
  5. Handling Large Data Sets: Systems with Step Functions may process large data sets. Testing with large data can take time and resources, so it needs a particular approach.
  6. Environment Consistency: To catch environment-specific issues, test environments should closely resemble production environments, including Cloud configurations.
  7. Resource Setup and Teardown: Setting up and tearing down AWS resources, such as databases, queues, or S3 buckets, for each test can be time-consuming and costly.
  8. Data Generation: Creating test data to simulate real-world scenarios and edge cases can be challenging. We must ensure that the data is consistent with the state transitions in our Step Function workflows.
  9. Connection with External Services: Establishing connections to the different services like databases and etc., can sometimes be very challenging based on many different situations, including security configurations, etc.

Conclusion

We have gone through the overall concept of development and application of the Step Function and the integration of automation testing within its workflow. Through all this, we were introduced to additional tools from the domain of cloud computing, CI/CD, and test automation.

You can find the full project on this GitHub repository.

Sources:

https://docs.aws.amazon.com/step-functions/latest/dg/concepts-states.html

If you found this useful, check out other Atlantbh blogs!

Originally published at https://www.atlantbh.com on November 20, 2023.

--

--

Atlantbh
Atlantbh Engineering

Tech-tips and lessons learned about Software Engineering