Event-Driven Data Pipeline with AWS EventBridge, Step Functions, Lambda and Glue

Muhammad Rivaldi Idris
5 min readOct 3, 2023

--

In this article, I will explain the step-by-step process for building an event-driven data pipeline using AWS services, such as EventBridge, Step Function, Lambda, and Glue. On previous article I already set up batch pipeline using Airflow and output for the data in AWS S3.

Now I’ll set up AWS EventBridge to continually monitor an S3 bucket for incoming data. When new data is detected, an AWS Step Function is triggered to orchestrate the workflow. Here’s the step to set up AWS EventBridge:

Create an AWS EventBridge Event Rule for Data Detection on AWS S3.

Start by creating an event rule within AWS EventBridge. This rule acts as the foundation for detecting new data arrivals in your AWS S3 bucket.

Define the event source as the AWS S3 bucket to monitor for new object creations. Select “Event Source” as “AWS Service” and choose “S3” as the service name. In the “Event matching pattern” section, configure the rules that will match events indicating new object creations in the specified S3 bucket. Set the event pattern to match “Object Created” events.

Define the target for the event rule. In this case, I want to trigger an AWS Step Function when a new object is created in the S3 bucket. Select “Step Functions state machine” as the target.

That’s all for setting up an event rule for AWS EventBridge to detects new object creations in the specified AWS S3 bucket and triggers the specified AWS Step Function when these events occur.

Configure AWS Step Function.

Move to AWS Step Functions, where I created a new state machine to define the workflow for processing the newly created S3 objects. This workflow consists of two execution states: one for logging and another for data transformation. Each state is configured to execute AWS Lambda functions.

The first state executes a Lambda function responsible for creating log payloads, while the second state invokes another Lambda function to trigger AWS Glue for data transformation.

Create AWS Lambda Script.

  1. s3-filein-function

I’m creating an AWS Lambda function to generate and store log payloads for each newly uploaded S3 file named s3-filein-function s3-filein-function.

import json
import boto3

def lambda_handler(event, context):

# Extract information about the newly created S3 object
object_key = event['detail']['object']['key']
object_size = event['detail']['object']['size']
bucket_name = event['detail']['bucket']['name']

# Create a dictionary containing file information
file_info = {
'filename': object_key,
'size': object_size,
'bucket_name': bucket_name
}

# Convert file information to JSON format
json_data = json.dumps(file_info)

# Specify the destination S3 bucket for logging
destination_bucket = 'log-filein-rawdata-119727'
json_key = object_key

# Initialize an S3 client
s3_client = boto3.client('s3')

# Upload the JSON log payload to the destination S3 bucket
s3_client.put_object(Bucket=destination_bucket, Key=json_key, Body=json_data)

This Lambda function captures key information about newly arrived S3 objects, formats it into a JSON log payload, and stores it in a dedicated S3 bucket.

2. gluejob_trigger

Next, I create Lambda script to trigger Glue to do data transformation job for each newly file uploaded on AWS S3.

import boto3

def lambda_handler(event, context):

# Extract the S3 object key from the event
obj_key = event['detail']['object']['key']

# Define the AWS Glue job name and its arguments
job_name = 'transform_data'
job_args = {
'--obj_key': obj_key # Pass the object key as an argument to the Glue job
}

# Initialize the AWS Glue client
glue = boto3.client('glue')

# Start the AWS Glue job run with the specified job name and arguments
response = glue.start_job_run(JobName=job_name, Arguments=job_args)

This Lambda function serves as the bridge between the S3 event-driven trigger and the AWS Glue job, ensuring that data transformation is automatically invoked whenever new data arrives in the S3 bucket.

Create Transformation Job on AWS Glue.

You can use the simple visual interface in AWS Glue Studio to create your ETL jobs, but in this case I’m using use a script editor to work directly with code in the AWS Glue Studio ETL job script. Here’s the glue script to transforming data from json to parquet and save in AWS S3 bucket named cleandata-119727:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# Get command-line arguments, including 'JOB_NAME' and 'obj_key'
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'obj_key'])
obj_key = args['obj_key']

# Initialize Spark context, Glue context, and the Glue job
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Define source and destination S3 buckets and object paths
source_bucket = 'rawdata-119727'
destination_bucket = 'cleandata-119727'
source_path = f's3://{source_bucket}/{obj_key}'

# Read data from the source S3 object as a dynamic frame
read_data = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
connection_options={"paths": [source_path]},
format="json",
format_options={"multiline": False},
)

# Apply data mappings to transform the structure of the data
transform_data = ApplyMapping.apply(
frame=read_data,
mappings=[
# Define mappings from source fields to destination fields
# Example: ("source_field_name", "source_field_type", "destination_field_name", "destination_field_type"),
# ... (Mappings for each field you want to transform)
]
)

# Define the output path for the transformed data in Parquet format
output_path = 's3://' + destination_bucket + '/' + obj_key.rsplit(".", 1)[0]

# Write the transformed data to the destination S3 path in Parquet format
glueContext.write_dynamic_frame.from_options(
frame=transform_data,
connection_type="s3",
connection_options={"path": output_path},
format="parquet",
)

# Commit the Glue job
job.commit()

The data transformation phase involves applying mapping rules to the source data. Each field from the source JSON is mapped to a corresponding field in the destination schema. The transformation logic includes renaming fields, changing data types, and formatting timestamps and dates. After transformation, the script writes the transformed data to the specified output path in Parquet format. The transformed data is now available in the “cleandata-119727” S3 bucket in Parquet format and ready for further processing which is load into AWS Redshift. I made detail explanation how to load data from AWS S3 to AWS Redshift, here.

--

--