Create an ETL solution using AWS Step Functions, Lambda and Glue

Engineering@ZenOfAI
ZenOf.AI
Published in
9 min readMay 30, 2019

AWS Glue

AWS Glue is a fully managed Extract, Transform and Load (ETL) service that makes it easy for customers to prepare and load their data for analytics. You can lookup further details for AWS Glue here.

AWS Glue Crawlers

A crawler can crawl multiple data stores in a single run. After completion, the crawler creates or updates one or more tables in your Data Catalog. Extract, Transform and Load (ETL) jobs that you define in AWS Glue use these Data Catalog tables as sources and targets.

For deep-dive into AWS Glue crawlers, please go through official docs.

Creating Activity based Step Function with Lambda, Crawler and Glue

Create an activity for the Step Function

For creating activity, please follow Step 1 in my previous article

Create Role for Glue and Step Function

In my previous article, I created a role which can access Step Functions, Lambda, S3, and Cloudwatch logs. Edit the role to access Glue also. For that

Go to IAM console -> Roles and select the created role

Click on Attach policies and add AWSGlueConsoleFullAccess

The role has access to Lambda, S3, Step functions, Glue and CloudwatchLogs.

We shall build an ETL processor that converts data from csv to parquet and stores the data in S3. For high volume data storage, parquet format is more preferable than storing in CSV format. In most cases, the incoming data is in csv/txt format. For this use case, incoming data is dumped into particular location in S3 (s3://file-transfer-integration/raw/data/) in csv/txt format.

Now run the crawler to create a table in AWS Glue Data catalog.

Create the crawler

Go to AWS Glue console -> Crawlers

Click on Add crawler and give a name to crawler

Specify crawler source type as Data stores which are the default

Specify the path to which need to crawl data into Glue Data catalog i.e., s3://file-transfer-integration/raw/data/

Select option Choose an existing IAM Role and select AWSGlueServiceRoleDefault

Leave Schedule as Run on demand which is the default

In Output, option configure database where crawler creates/updates the table

Review and finish

Run the crawler

Go to AWS Glue console -> Crawlers

Select the created crawler and choose Run Crawler option

Go to Athena console and select the Database that provided in the crawler configuration, the table has been created.

Create Lambda Functions to run and check the status of crawler

In case you are just starting out on Lambda functions, I have explained how to create one from scratch in my previous article.

Create a Lambda function named invoke-crawler-name i.e., invoke-raw-refined-crawler with the role that we created earlier. And increase Lambda execution time in Timeout to 5 minutes.

Place the following code in invoke-raw-refined-crawler Lambda, which will run the crawler from lambda function.

import json
from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key, Attr
import boto3
client = boto3.client('glue')
glue = boto3.client(service_name = 'glue', region_name = 'us-west-2',
endpoint_url = 'https://glue.us-west-2.amazonaws.com')
# step-fn-activity
client_sf = boto3.client('stepfunctions')
# replace activity arn with respective activivty arn
activity = "arn:aws:states:us-west-2:XXXXXXXXXXXX:activity:file-transfer"
def lambda_handler(event, context):
# TODO implement
print("Starting Glue Crawler")
class CrawlerException(Exception):
pass

try:
response = client.start_crawler(Name = 'raw-refined-crawler')
except CrawlerRunningException as c:
raise CrawlerException('Crawler In Progress!')
print('Crawler in progress')
except Exception as e:
#send activity failure token
task = client_sf.get_activity_task(activityArn = activity, workerName = 'raw-refined-crawler')
response = client_sf.send_task_failure(taskToken = task['taskToken'])
print('Problem while invoking crawler')

Create another Lambda function named check-raw-refined-crawler-status that checks the status of the crawler that was invoked by the previous Lambda function.

Place the following code in check-raw-refined-crawler-status Lambda.

import json
from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key, Attr
import boto3
client = boto3.client('glue')
glue = boto3.client(service_name='glue', region_name='us-west-2',
endpoint_url='https://glue.us-west-2.amazonaws.com')
# step-fn-activity
client_sf = boto3.client('stepfunctions')
# replace activity arn with respective activivty arn
activity = "arn:aws:states:us-west-2:XXXXXXXXXXXX:activity:file-transfer"
def lambda_handler(event, context): class CrawlerException(Exception):
pass

response = client.get_crawler_metrics(CrawlerNameList = ['raw-refined-crawler'])
print(response)
print(response['CrawlerMetricsList'][0]['CrawlerName'])
print(response['CrawlerMetricsList'][0]['TimeLeftSeconds'])
print(response['CrawlerMetricsList'][0]['StillEstimating'])

if (response['CrawlerMetricsList'][0]['StillEstimating']):
raise CrawlerException('Crawler In Progress!')
elif (response['CrawlerMetricsList'][0]['TimeLeftSeconds'] > 0):
raise CrawlerException('Crawler In Progress!')
else :
#send activity success token
task = client_sf.get_activity_task(activityArn=activity, workerName="raw-refined-crawler-activity")
print(task)
response = client_sf.send_task_success(taskToken=task['taskToken'], output=json.dumps({'message':'Raw To Refined Crawler Completed'}))

Create another Lambda function named crawler-activity-fail which sends failure token to the Step Function.

Place the following code in this Lambda.

import os
import boto3
import json
import re
from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key, Attr
# step-fn-activity
client_sf = boto3.client('stepfunctions')
# replace activity arn with respective activivty arn
activity = "arn:aws:states:us-west-2:XXXXXXXXXXXX:activity:file-transfer"
def lambda_handler(event, context):

# TODO implement
task = client.get_activity_task(activityArn=activity, workerName="prod-cog-goodwill-crawler-activity")
print(task)
response = client.send_task_failure(taskToken=task['taskToken'])

Create a Glue job

The purpose of the Glue job is to take care of the ETL process and convert incoming csv/txt format to parquet format. If required, transformations are also possible in this ETL job such as applying filter conditions, data typecasting and more.

Go to AWS Glue console -> select jobs under ETL click on Add job

Enter the name of the job i.e., file-type-conversion and choose IAM Role as AWSGlueServiceRoleDeffault.

Make sure AWSGlueServiceRoleDeffault has access to Step Functions to send success/failure token from AWS Glue

And in Security configuration, script libraries, and job parameters section, Maximum Capacity is nothing but the number of DPUs that runs the job.

Choose a Data Source

“data” table is created in the “raw” database when the crawler ran.

In Data Target -> Select Create tables in your data target option and select Data Store as Amazon S3

And configure the file format as Parquet and Target Path i.e., s3://file-transfer-integration/refined/

Next select SaveJobandEditScript option. And edit the script based on the requirement.

Now create a lambda function named invoke-file-type-conversion-glue-job which can invoke Glue job and pass activity task token as the argument from Lambda.

Please replace the following code in invoke-file-type-conversion-glue-job Lambda.

import json
import boto3
from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key, Attr
client = boto3.client('glue')
glue = boto3.client(service_name = 'glue', region_name = 'us-west-2',
endpoint_url = 'https://glue.us-west-2.amazonaws.com')

client_sf = boto3.client('stepfunctions')
# replace activity arn with respective activivty arn
activity = "arn:aws:states:us-west-2:XXXXXXXXXXXX:activity:file-transfer"
def lambda_handler(event, context):
task = client_sf.get_activity_task(activityArn=activity, workerName="Raw-To-Refined-ETL")
print(task)

response = client.start_job_run(JobName = 'file-type-conversion',
Arguments = {
'--task_token': task['taskToken']
})

Edit the file-type-conversion Glue job and place the following code which converts txt/csv file to parquet format.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import json
import boto3
# step-fn-activity
client = boto3.client(service_name='stepfunctions', region_name = 'us-west-2')
# replace activity arn with respective activivty arn
activity = "arn:aws:states:us-west-2:XXXXXXXXXXXX:activity:file-transfer"
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'task_token'])
# args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
try:
## @type: DataSource
## @args: [database = "raw", table_name = "data", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "raw", table_name = "data", transformation_ctx = "datasource0")

## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://file-transfer-integration/refined"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = datasource0, connection_type = "s3", connection_options = {"path": "s3://file-transfer-integration/refined"}, format = "parquet", transformation_ctx = "datasink4")
# send success token to step function activity
response = client.send_task_success(taskToken = args['task_token'], output = json.dumps({'message':'ETL Completed'}))
except Exception as e:
# send failure token to step function activity
response = client.send_task_failure(taskToken = args['task_token'])
print(e)
raise e

job.commit()

Please replace the database and table name as needed. In this case, we are reading data from the data table and writing into S3 in parquet format.

Creating a Step Function using Lambda and Glue

Resources required to create a Step Function with Lambda and Glue have been created. Now configure the created resources in Step Function.

I explained how to create a Step Function in my previous article.

  1. Create a Step Function named file-conversion-state-machine.
  2. Edit Step Function definition and replace the following code.
{
"Comment": "ETL Orchestration",
"StartAt": "PrepareRawToRefinedCrawler",
"States": {
"PrepareRawToRefinedCrawler": {
"Type": "Parallel",
"End": true,
"Branches": [
{
"StartAt": "InvokeRawToRefinedCrawlerActivity",
"States": {
"InvokeRawToRefinedCrawlerActivity": {
"Type": "Task",
"Resource": "arn:aws:states:us-west-2:XXXXXXXXXXXX:activity:glue-lambda-step-function",
"TimeoutSeconds": 3600,
"Catch": [
{
"ErrorEquals": [
"States.TaskFailed"
],
"Next": "RawToRefinedCrawlerFailed"
},
{
"ErrorEquals": [
"States.ALL"
],
"Next": "RawToRefinedCrawlerFailed"
}
],
"Next": "RawToRefinedCrawlerPassed"
},
"RawToRefinedCrawlerFailed": {
"Type": "Fail",
"Cause": "File Transfer Failed"
},
"RawToRefinedCrawlerPassed": {
"Type": "Pass",
"Result": "File Transfer Passed. Send SES emails etc to admin.",
"Next": "PrepareFileConversionETL"
},
"PrepareFileConversionETL": {
"Type": "Parallel",
"End": true,
"Branches": [
{
"StartAt": "InvokeFileConversionETLActivity",
"States": {
"InvokeFileConversionETLActivity": {
"Type": "Task",
"Resource": "arn:aws:states:us-west-2:XXXXXXXXXXXX:activity:glue-lambda-step-function",
"TimeoutSeconds": 3600,
"Catch": [
{
"ErrorEquals": [
"States.TaskFailed"
],
"Next": "FileConversionETLFailed"
},
{
"ErrorEquals": [
"States.ALL"
],
"Next": "FileConversionETLFailed"
}
],
"Next": "FileConversionETLPassed"
},
"FileConversionETLFailed": {
"Type": "Fail",
"Cause": "File Transfer Failed"
},
"FileConversionETLPassed": {
"Type": "Pass",
"Result": "File Transfer Passed. Send SES emails etc to admin.",
"End": true
}
}
},
{
"StartAt": "WaitForFileConversionETLWorker",
"States": {
"WaitForFileConversionETLWorker": {
"Type": "Wait",
"Seconds": 10,
"Next": "InvokeFileConversionETLWorker"
},
"InvokeFileConversionETLWorker": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:invoke-file-type-conversion-glue-job",
"Catch": [
{
"ErrorEquals": [
"States.TaskFailed"
],
"Next": "FileConversionETLWorkerFailed"
},
{
"ErrorEquals": [
"States.ALL"
],
"Next": "FileConversionETLWorkerFailed"
}
],
"Next": "FileConversionETLWorkerPassed"
},
"FileConversionETLWorkerFailed": {
"Type": "Fail",
"Cause": "File Conversion ETL Failed"},
"FileConversionETLWorkerPassed": {
"Type": "Pass",
"Result": "RefinedToCuratedETLWorker Passed.",
"End": true
}
}
}
]
}
}
},
{
"StartAt": "WaitForRawToRefinedCrawlerWorkerAvailability",
"States": {
"WaitForRawToRefinedCrawlerWorkerAvailability": {
"Type": "Wait",
"Seconds": 5,
"Next": "InvokeRawToRefinedCrawlerWorker"
},
"InvokeRawToRefinedCrawlerWorker": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:invoke-raw-refined-crawler",
"TimeoutSeconds": 300,
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "WaitForRawToRefinedCrawlerToComplete"
}
],
"Next": "WaitForRawToRefinedCrawlerToComplete"
},
"WaitForRawToRefinedCrawlerToComplete": {
"Type": "Wait",
"Seconds": 10,
"Next": "RawToRefinedCrawlerCheckStatus"
},
"RawToRefinedCrawlerCheckStatus": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:check-raw-refined-crawler-status",
"Retry": [
{
"ErrorEquals": [
"CrawlerException"
],
"IntervalSeconds": 30,
"BackoffRate": 2,
"MaxAttempts": 10
},
{
"ErrorEquals": [
"States.All"
],
"IntervalSeconds": 30,
"BackoffRate": 2,
"MaxAttempts": 10
}
],
"Catch": [
{
"ErrorEquals": [
"CrawlerException"
],
"Next": "RawToRefinedCrawlerCheckFailed"
},
{
"ErrorEquals": [
"States.ALL"
],
"Next": "RawToRefinedCrawlerCheckFailed"
}
],
"Next": "RawToRefinedCrawlerCheckPassed"
},
"RawToRefinedCrawlerCheckFailed": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:crawler-activity-fail",
"End": true
},
"RawToRefinedCrawlerCheckPassed": {
"Type": "Pass",
"Result": "RawToRefinedCrawlerCheckPassed.",
"End": true
}
}
}
]
}
}
}

After updating the state machine the visual workflow looks like the following

Start Execution of the Step Function. While the execution is in progress, the flow looks like so:

If the execution is successful, then flow looks like so:

Hope this blog on setting up a basic ETL workflow using AWS Step Functions, Lambda Functions and Glue has been helpful for you.

This story is authored by P V Subbareddy. He is a Big Data Engineer specializing on AWS Big Data Services and Apache Spark Ecosystem.

--

--