Engineering@ZenOfAI
May 23 · 9 min read

Introduction to AWS Step Functions

AWS Step Functions make it easy to coordinate the components of distributed applications and microservices using visual workflows. Building applications from individual components that each perform a discrete function lets you scale and change applications quickly. Step Functions are a reliable way to coordinate components and step through the functions of your application. Step Functions provides a graphical console to arrange and visualize the components of your application as a series of steps.

For deep dive into Step Functions please go through the official docs.

Activities In Step Functions

Activities are an AWS Step Functions feature that enable you to have a task in your state machine where the work is performed by a worker. That can be hosted on Amazon Elastic Compute Cloud (Amazon EC2), Amazon Elastic Container Service (Amazon ECS), mobile devices — basically anywhere.

The execution pauses at the activity task state and waits for your activity worker to poll for a task. Once a taskToken is provided to your activity worker, your workflow will wait for task success or failure response.


Creating an activity state machine with lambda

Step 1: Create a New Activity

  • Ensure that the activity task is under the same AWS account as your state machine.
  • Go to Step Functions console, choose Activities in the left navigation panel.
  • Choose Create activity. Enter an Activity Name, for example, file-transfer, and then choose Create Activity.
  • When activity task is created, the ARN looks like the following:

Step 2: Create Lambda Functions

AWS Lambda lets you run code without provisioning or managing servers. You pay only for the compute time you consume — there is no charge when your code is not running. — AWS Lambda

Create an IAM role for lambda which can access Step functions and Cloudwatch logs.

Create an IAM role for your Lambda API:

  • Sign in to the AWS Console and go to IAM -> Roles -> Create Role
  • Select AWS service and then choose Lambda as service and click next
  • Search for and select AWSStepFunctionsFullAccess, CloudWatchLogsFullAccess and AmazonS3FullAccess, AWSLambdaFullAccess then click next to continue
  • Choose Create role

Create File-transfer Lambda Function:

  • Go back to the AWS console and then to Lambda -> Create function
  • Make sure the author from scratch is selected.
  • Name your function fast-lambda like file-transfer-master, use python 3.6 as your runtime.
  • Choose the role we just created under an existing role and click on the Create function.
  • In the same way, we create slave lambda also.

Now lambda function with step function and s3 access is available. Add code to the lambda for file transfer in s3, for example from

s3://test-bucket/raw/ to s3://test-bucket/refined.

An optimal way to handle file transfer is to a create a master-slave combination of lambdas

Master lambda queries from the specified prefix like (test-bucket/raw) and passes the filename to the lambda which will transfer file raw to the specified zone(refined).

Add the following code to file-transfer-master lambda and click on save.

Master lambda queries the objects from the given prefix (src_prefix) and sends the file_name to slave lambda.

File transfer status as started inserted into DynamoDB.

import os
import boto3
import json
from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key, Attr

# connection to dynamodb
dynamodb = boto3.resource('dynamodb', region_name='us-west-2')
status_table_name = 'file-transfer-status'

# file transfer configuration
src_bucket_name = 'file-transfer-integration'
dest_bucket_name = 'file-transfer-integration'
src_prefix = 'raw'
dest_prefix = 'refined'
s3 = boto3.resource('s3')
src_bucket = s3.Bucket(src_bucket_name)
dest_bucket = s3.Bucket(dest_bucket_name)
client_lambda = boto3.client('lambda')

def lambda_handler(event, context):
print("Starting landing new To landing processed File Transfer Lambda")
try:

for obj in src_bucket.objects.filter(Prefix = src_prefix):
path=obj.key
if (".csv" in path):
payload_data = {
'file_path': path
}
payload = json.dumps(payload_data)
print(payload)
add_item_to_table(status_table_name, {'file_name':path, 'file_transfer_status':'started'})
client_lambda.invoke(
FunctionName='file-transfer-slave',
InvocationType='Event',
LogType='None',
Payload=payload
)
except Exception as e:
print(e)
raise e
# inserts file transfer status to dynamodb
def add_item_to_table(table_name, item):
table = dynamodb.Table(table_name)
table.put_item(
Item=item
)

Add the following code to file-transfer-slave lambda and click on save.

Slave lambda receives file_name from an event, copies the file from source to destination and deletes the file from the source.

File transfer status updated to completed.

import os
import boto3
import json
from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key, Attr

# dynamo connection
dynamodb = boto3.resource('dynamodb', region_name='us-west-2')
status_table_name = 'file-transfer-status'

# file transfer configuration
src_bucket_name = 'file-transfer-integration'
dest_bucket_name = 'file-transfer-integration'
src_prefix = 'raw'
dest_prefix = 'refined'
s3 = boto3.resource('s3')
src_bucket = s3.Bucket(src_bucket_name)
dest_bucket = s3.Bucket(dest_bucket_name)
client_lambda = boto3.client('lambda')

def lambda_handler(event, context):
try:
new_source = { 'Bucket': src_bucket_name, 'Key': event['file_path']}
path=event['file_path']
new_key = path.replace(src_prefix, dest_prefix)
new_obj = dest_bucket.Object(new_key)
new_obj.copy(new_source)
src_object = src_bucket.objects.filter(Prefix=path)
src_object.delete()
updated_item_in_table(status_table_name, path)

except Exception as e:
print(e)
print(event['file_path'])
raise e

# updates the file transfer status to completed
def updated_item_in_table(table_name, file_name):
table = dynamodb.Table(table_name)
table.update_item(
Key={
'file_name': file_name
},
UpdateExpression='SET file_transfer_status = :val1',
ExpressionAttributeValues={
':val1': 'completed'
}
)

Test the lambda setup:

  • Go to file-transfer-master lambda console and click on Test
  • Enter event name as a test or suitable name next click on create.
  • After configuring event name click on Test
  • Now file from file-transfer-integration/raw to file-transfer-integration/refined is transferred.

Create file-transfer-status lambda which sends success/failure token to the activity using previous configurations.

Add the code below to file-transfer-status lambda and save.

import os
import boto3
import json
from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key, Attr

dynamodb = boto3.resource('dynamodb', region_name='us-west-2')
status_table_name = 'file-transfer-status'

# file transfer config
src_bucket_name = 'file-transfer-integration'
dest_bucket_name = 'file-transfer-integration'
src_prefix = 'raw'
dest_prefix = 'refined'
s3 = boto3.resource('s3')
src_bucket = s3.Bucket(src_bucket_name)
dest_bucket = s3.Bucket(dest_bucket_name)
client_lambda = boto3.client('lambda')

# # step-fn-activity
client = boto3.client('stepfunctions')
# replace activity arn with respective activivty arn
activity = "arn:aws:states:us-west-2:XXXXXXXXXXXX:activity:file-transfer"


def lambda_handler(event, context):

# TODO implement
class NewToProcessedFileTransferIncompleteException(Exception):
pass

count_status = get_file_transfer_status(src_bucket_name, src_prefix)
count_status_table = get_count_match_status_in_table(status_table_name)

if (count_status and count_status_table):
#send activity success token
task = client.get_activity_task(activityArn=activity, workerName="file-transfer-worker")
print(task)
response = client.send_task_success(taskToken=task['taskToken'], output=json.dumps({'message':'File Transfer Completed'}))
else:
raise NewToProcessedFileTransferIncompleteException('File Transfer Incomplete!')


def get_count_match_status_in_table(table_name):
table = dynamodb.Table(table_name)
all_response = table.scan()
all_items = all_response['Items']
filtered_response = table.scan(
FilterExpression=Attr('file_transfer_status').eq('completed')
)
filtered_items = filtered_response['Items']
return (len(all_items) == len(filtered_items))

def get_file_transfer_status(bucket_name,prefix):
s3_bucket = s3.Bucket(bucket_name)
file_count = 0
for obj in s3_bucket.objects.filter(Prefix = prefix):
path=obj.key
if (".csv" in path):
file_count = file_count + 1
elif (".txt" in path):
file_count = file_count + 1
print(file_count)
if file_count == 0:
return True
else:
return False

Now lambdas are ready.

Step 3: Create an activity step function with lambda

  • From the AWS console open Step Functions
  • Click create state machine
  • Make sure Author with code snippets is selected and name your Step Function complex-state-machine
  • Use the below code to set it up (make sure you replace the arn with the arn for your Lambdas. If you click in the resource field it will give you a choice of your arn’s)
{
"Comment": "ETL Orchestration",
"StartAt": "PrepareFileTransfer",
"States": {
"PrepareFileTransfer": {
"Type": "Parallel",
"End": true,
"Branches": [
{
"StartAt": "InvokeFileTransferActivity",
"States": {
"InvokeFileTransferActivity": {
"Type": "Task",
"Resource": "arn:aws:states:us-west-2:XXXXXXXXXXXX:activity:file-transfer",
"TimeoutSeconds": 3600,
"Catch": [
{
"ErrorEquals": [
"States.TaskFailed"
],
"Next": "FileTransferFailed"
},
{
"ErrorEquals": [
"States.ALL"
],
"Next": "FileTransferFailed"
}
],
"Next": "FileTransferPassed"
},
"FileTransferFailed": {
"Type": "Fail",
"Cause": "File Transfer Failed"
},
"FileTransferPassed": {
"Type": "Pass",
"Result": "File Transfer Passed. Send SES emails etc to admin.",
"End": true
}
}
},
{
"StartAt": "WaitForFileTransferWorker",
"States": {
"WaitForFileTransferWorker": {
"Type": "Wait",
"Seconds": 5,
"Next": "InvokeFileTransferWorker"
},
"InvokeFileTransferWorker": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:file-transfer-master",
"TimeoutSeconds": 300,
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "FileTransferFileTransferWait"
}
],
"Next": "FileTransferFileTransferWait"
},
"FileTransferFileTransferWait": {
"Type": "Wait",
"Seconds": 30,
"Next": "FileTransferWorkerCheckFileStatus"
},
"FileTransferWorkerCheckFileStatus": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:file-transfer-status",
"TimeoutSeconds": 300,
"Catch": [
{
"ErrorEquals": [
"FileTransferIncompleteException"
],
"Next": "FileTransferWorkerCheckFileStatusFailed"
},
{
"ErrorEquals": [
"States.ALL"
],
"Next": "FileTransferWorkerCheckFileStatusFailed"
}
],
"Next": "FileTransferWorkerCheckFileStatusPassed"
},
"FileTransferWorkerCheckFileStatusFailed": {
"Type": "Fail",
"Cause": "File Transfer Failed"
},
"FileTransferWorkerCheckFileStatusPassed": {
"Type": "Pass",
"Result": "FileTransferWorkerCheckFileStatusPassed.",
"End": true
}
}
}
]
}
}
}

In State machines several states are available. Most used states are Pass, Task, Wait, Fail, Parallel.

  1. Type Task represents a single unit of work performed by a state machine.
  2. Type Pass passes its input to its output, without performing work. Pass states are useful when constructing and debugging state machines.
  3. Type Wait state delays the state machine from continuing for a specified time. You can choose either a relative time, specified in seconds from when the state begins or absolute end time, specified as a timestamp.
  4. Type Parallel used to create parallel branches in a state machine.
  5. Type Fail stops the execution of a state machine and marks the state machine as failed.

Steps in the state machine

  1. Invoke the activity that created in Step 1.
  2. Wait for some time to start the activity
  3. Invoke file transfer master lambda from the state machine
  4. Wait for some time to complete the execution of master lambda.
  5. Invoke count status lambda which will pass the success/failure token to the state machine.
  • After updating the code snippet, the visual workflow looks like the following
  • Configure IAM role next click on create state machine

Add AWSLambdaFullAccess policy to the created role and remove any deny policies for the created role.

Start Execution of State Machine:

  • Go to Step functions console and choose state machines
  • select from created state machines (file-transfer-state-machine)
  • Click on start execution and enter execution name.

Click on start execution. It starts the execution of the state machine.

While the state machine is executing the visual workflow looks like the following one.

The failed execution of state machine looks like the following.

The step function should fail because of AccessDeniedException. The error looks like the following.

To resolve this error edit the IAM role and add the AWSLambdaFullAccess policy and delete any deny policies in the role.

Start the state machine execution again with another execution name. The execution is succeeded.

This is how we can successfully integrate lambda functions with the step functions. Please reach out via comments in case you have any queries.

This story is authored by P V Subbareddy. He is a Big Data Engineer specializing on AWS Big Data Services and Apache Spark Ecosystem.

ZenOf.AI

AI | Machine Learning | Big Data

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade