Build a Data Pipeline Using AWS Glue

Naseer Ahmad
Contino Engineering
10 min readMar 13, 2023

--

Organizations frequently generate and collect colossal volumes of raw data in today’s data-driven world. The real challenge is handling unstructured, incomplete, and inconsistent data. It’s essential to process, transform, and analyze the raw data to drive insights and value from this data using data pipelines. This blog post will discuss step-by-step instructions for building a data pipeline. So, whether you’re a data engineer, data scientist, or business analyst, this blog post will help you understand the fundamentals of data pipelines and equip you with the skills to build one from scratch.

Use Case

Your organization wants to process the data from CSV files stored locally, run analytical queries, and generate some reports. Let’s build a data pipeline (ETL pipeline) to ingest CSV format files using AWS Glue, run some analytical queries using AWS Athena, and visualize the data using AWS QuickSight. We will use the CloudFormation template (IaC) to build the required infrastructure, such as the AWS Glue job, IAM role, and Crawler, custom python scripts for the AWS Glue job, and to upload data files from the local directory to the S3 bucket. Below is the reference architecture for our use case:

Data Pipeline — Reference Architecture

ETL, which stands for extract, transform, and load, is the process that enables data integration from the source to its destination:

  • Extract: Pulling the raw data from its sources, such as flat files, databases, and IoT devices.
  • Transform: Perform data enrichment, normalization, standardization, verification, and deduplication of the data.
  • Load: Save the data into the target destination.
Figure 2- ETL Process

What is a Data Pipeline?

A data pipeline is a process that involves collecting, transforming, and processing data from various sources to make it usable for analysis and decision-making. It is a crucial component of any data-driven organization that must efficiently manage large volumes of data.

A data pipeline aims to ensure that data is accurate, reliable, and easily accessible for analysis. It typically involves a series of steps, including data ingestion, storage, processing, and visualization.

Why is a Data Pipeline needed?

A well-designed data pipeline can help organizations gain valuable insights from their data, which can be used to make informed decisions and drive business growth. It also enables businesses to automate data processing and analysis, reducing the manual effort required and freeing up time for more strategic tasks. A data pipeline is essential for any organization that wants to derive value from its data and gain a competitive advantage in today’s data-driven world.

Process Overview

  1. Upload the CSV data file(s) to S3 (Landing Zone) using the Python Boto3 library.

2. Create the following AWS artifacts using the CloudFormation template;

  • IAM Role: Grant permission to AWS Glue and S3 services and attach this role with the AWS Glue job.
  • Glue Job: Converting CSV file to Parquet format and saving the curated file(s) into S3.
  • Crawler: Crawl and Catalog curated data using AWS Glue Crawler.
  • Catalog: Catalog the metadata of the process file.
  • Trigger: Schedule an AWS Glue job every morning at 7 am.
  • You can change this schedule in the CloudFormation template under AWSGlueJobScheduleRule.

3. Analyze data using Athena.

4. Visualize the data using Amazon QuickSight.

Implementation Steps

Without further ado, let’s jump into the implementation steps;

  1. Create an S3 bucket using GUI
  2. Create four folders under this bucket
  • athena-queries-output: This folder contains Athena query results and metadata, and it’s required to run Athena queries
  • curated-data: This folder contains the curated raw data
  • scripts: This folder contains the AWS Glue job script
  • raw: This folder contains the raw data files
Figure 3 — Overview of the S3 Directory Structure

3. Copy the following AWS Glue job script and save it locally as glue-etl-process.py.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import logging
import boto3

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Set param values
region= "us-east-1"
# Replace following :param values with your S3 bucket path
source_file_path= "s3://data-ingestion-pipeline-demo/raw/billionairesData/"
target_file_path= "s3://data-ingestion-pipeline-demo/curated-data/"
# Crawler name should match with the name mentioned in CloudFormation template
glueCrawler_name= "data-crawler"

def process_csv_files(source_file_path: str, target_file_path: str):
"""Process CSV data files and convert to parquet format

Args:
source_file_path (str): S3 source file path
target_file_path (str): Target S3 path for process files

Raises:
NONE
"""

# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
format_options={
"quoteChar": '"',
"withHeader": True,
"separator": ",",
"optimizePerformance": False,
},
connection_type="s3",
format="csv",
connection_options={
"paths": [
source_file_path
],
"recurse": True,
},
transformation_ctx="S3bucket_node1",
)

# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
frame=S3bucket_node1,
connection_type="s3",
format="glueparquet",
connection_options={
"path": target_file_path,
"partitionKeys": [],
},
format_options={"compression": "gzip"},
transformation_ctx="S3bucket_node3",
)

def start_glue_crawler(glueCrawler_name, region: str):
"""Crawl and catalog the data files

Args:
glueCrawler_name (_type_): Glue Crawler name
region (str): AWS region name

Returns:
_type_: Response formatted as JSON

Raises:
Raise exception if crawler fails to run
"""
glueClient = boto3.client('glue', region_name=region)
try:
results = glueClient.start_crawler(Name= glueCrawler_name)
return results
except Exception as startCrawlerException:
logging.error("Error occurred while starting the Glue Crawler:{}".format(glueCrawler_name))
raise(startCrawlerException)

def main():
"""The following functions run sequentially;
1. Ingest CSV data files
2. Start AWS Crawler to catalog the data
"""
logging.info("Data Pipeline: STARTED")

# 1- Ingest CSV data file(s) to process
logging.info("Glue ETL Process: STARTED")
process_csv_files(source_file_path=source_file_path, target_file_path=target_file_path)
# 2- Start AWS Glue Crawler to catalog the data
logging.info("Crawler: STARTED")
start_glue_crawler(glueCrawler_name=glueCrawler_name, region=region)

logging.info("Data Pipeline: FINISHED")
job.commit()

if __name__ == "__main__":
main()

4. Upload the AWS Glue Job script under the scripts folder using the following snippet code. Also, AWS GUI can be used to upload the AWS Glue job script into the s3 bucket.

aws s3 cp glue-etl-process.py s3://data-ingestion-pipeline-demo/scripts/glue-etl-process.py
Figure 4 — Upload Glue Job

5. Upload the “The Worlds Billionaires Dataset.csv” data file using one of the following methods

NOTE: “The Worlds Billionaires Dataset.csv” can be downloaded from GitHub Repository.

  • Python script
import boto3
import logging
from botocore.exceptions import ClientError

# Set param values
local_file_path= '/Users/Documents/The Worlds Billionaires Dataset.csv'
bucket_name= 'data-ingestion-pipeline-demo'
s3_file= 'raw/billionairesData/world-billionaires-dataset.csv'


logging.info("Uploading file: STARTED")

# Upload the file
s3 = boto3.client('s3')
try:
s3.upload_file(local_file_path, bucket_name, s3_file)
logging.info("File upload: SUCCESSFULLY")
except ClientError as e:
logging.error(e)
except FileNotFoundError:
logging.error("File not found - ERROR")
  • AWS GUI
Figure 5— Upload Sample Data File

6. Create a stack using the below CloudFormation template

# This Cloudformation template to create the following AWS artifacts:
# 1- AWS IAM Role for AWS Glue Job
# 2- AWS Glue job to process the raw data files
# 3- AWS Glue Crawler to crawl and catalog the curated data
# 4- AWS Triiger to schedule AWS Glue Job

AWSTemplateFormatVersion: '2010-09-09'
Description: 'AWS Cloudformation Template to create AWS Crawler, S3, and Glue Job.'

# Set :param values
Parameters:
AWSIAMRoleName:
Description: Role Name for AWS Glue Job
Type: String
Default: data-pipeline-demo
AWSGlueJobName:
Description: AWS Glue Job Name
Type: String
Default: data-pipeline-etl-demo
AWSGlueCrawlerName:
Description: AWS Glue Crawler name
Type: String
Default: data-crawler
AWSGlueCatalogName:
Description: Name of the data Catalog database
Type: String
Default: worlds-billionaires
AWSGlueJobTriggerName:
Description: AWS Glue job schedule trigger name
Type: String
Default: data-pipeline-glue-trigger
# Replace the following values with your S3 bucket
GlueJobScriptLocation:
Description: AWS Glue job script location
Type: String
Default: 's3://data-ingestion-pipeline-demo/scripts/glue-etl-process.py'
CrawlerS3Path:
Description: Target file name for AWS Crawler
Type: String
Default: 's3://data-ingestion-pipeline-demo/curated-data/'

Resources:
#Creating IAM Role and attaching build-in policies
AWSIAMRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Ref AWSIAMRoleName
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- glue.amazonaws.com
- s3.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly
- arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
- arn:aws:iam::aws:policy/AmazonS3FullAccess
Path: "/"

#Create CUR Database
AWSGlueCatalog:
Type: 'AWS::Glue::Database'
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseInput:
Name: !Ref AWSGlueCatalogName
Description: Database for AWS CUR Data

#Create AWS Glue job
AWSGlueJob:
Type: AWS::Glue::Job
Properties:
Command:
Name: glueetl
PythonVersion: '3'
ScriptLocation: !Ref GlueJobScriptLocation
ExecutionProperty:
MaxConcurrentRuns: 2
MaxRetries: 0
Role: !Ref AWSIAMRole
Name: !Ref AWSGlueJobName
GlueVersion: '3.0'
WorkerType: 'G.1X'
NumberOfWorkers: 10

#Update the trigger based on your use case.
#Currently, it's scheduled to run every morning at 7 am.
AWSGlueJobScheduleRule:
Type: AWS::Glue::Trigger
Properties:
Type: SCHEDULED
Description: Schedule a glue job to run every morning at 7 AM.
Schedule: cron(0 7 ? * * *)
Actions:
- JobName: !Ref AWSGlueJobName
Name: !Ref AWSGlueJobTriggerName
StartOnCreation: True

#Create AWS Glue CUR Crawler
AWSGlueCrawler:
Type: 'AWS::Glue::Crawler'
Properties:
Name: !Ref AWSGlueCrawlerName
Role: !GetAtt "AWSIAMRole.Arn"
DatabaseName: !Ref AWSGlueCatalogName
RecrawlPolicy:
RecrawlBehavior: CRAWL_EVERYTHING
Targets:
S3Targets:
- Path: !Ref CrawlerS3Path

Outputs:
AWSIAMRole:
Description: IAM Role created using CloudFormation Template.
Value: !Ref AWSIAMRole
AWSGlueCatalog:
Description: AWS Glue Job created using CloudFormation Template.
Value: !Ref AWSGlueCatalog
AWSGlueJob:
Description: AWS Glue Database created using CloudFormation Template.
Value: !Ref AWSGlueJob
AWSGlueJobScheduleRule:
Description: AWS Glue Job schedule created using CloudFormation Template.
Value: !Ref AWSGlueJobScheduleRule
AWSGlueCrawler:
Description: AWS Crawler created using CloudFormation Template.
Value: !Ref AWSGlueCrawler

  • Copy and save the CloudFormation template code locally
  • Login into AWS Console ➞ Search CloudFormation
  • Click Create Stack ➞ Select Upload a template file under Specify template ➝ Choose file ➞ Next
Figure 6 — Create a Stack
  • Under Stack nameEnter a stack name Next
Figure 7 — Enter Stack Name
  • Click Next
  • Under Capabilities ✓ the checkmark ➞ Submit
Figure 8 — Acknowledge Terms
  • Under the Outputs tab, you can see all the artifacts created successfully
Figure 9 — Stack Outputs

Thus far, we have developed data pipeline artifacts such as AWS Glue Job, Crawler, Trigger (Schedule), and a Database. Next, we will look at data processing, validation, and visualization steps.

Data Processing Steps

  1. Go to AWS Glue Data Integration and ETLJobs
  2. ✓ Created job name ➞ Run Job
Figure 10 — AWS Glue Job

3. Check the job Run status under the Runs tab

Figure 11 — AWS Glue Job Runs Status

4. Since the AWS Glue job ran successfully, let’s check the S3 bucket curated-data folder to validate the processed file populated by Glue Crawler

  • Navigate to S3 bucket ➞ curated-data
Figure 12 — AWS Glue Job Output
  • To check Crawler ➞ Navigate to AWS Glue Data CatalogCrawlers
Figure 13 — Crawler Run Status

5. To query the data catalog, navigate to Amazon Athena Query your dataLaunch query editor

Figure 14- AWS Athena
  • To query the data, Data ➞ Select the Database (Created using CloudFormation template) Tables ➞ Click on three dots ➞ Run Query Preview Table

NOTE: You may need to set up Athena queries output in the S3 bucket (athena-queries-output)before running the Athena queries described in Implementation Step 2.

Figure 15 — Athena Table Preview
  • Query Results from auto-generated query
Figure 16 — Athena Query Results

Data Visualization Steps

  1. Navigate to Amazon QuickSight
  2. DatasetsNew dataset
Figure 17 — QuickSight Datasets

3. Select Athena

Figure 18 — QuickSight Data Source

4. New Athena data source Enter a name for the data source ➞ Leave default value for Athena workgroup Create a data source

Figure 19 — QuickSight Data Source Name

5. Choose your table Select

Figure 20— Athena Table

6. Finish data creationVisualize

Figure 21 — Finish Dataset Creation

7. New sheetCREATE

Figure 22 — Create New Sheet
Figure 23 — New Sheet with Fields List

8. To change the Fields list name, Click on the pencil icon to edit the Dataset

9. Click on three dots ➞ Edit

Figure 24 — Edit Dataset Fields

10. Click on each field to edit ➞ Rename the field ➞ Apply PUBLISH & VISUALIZE

Figure 25 — Add New Field Name

11. Drag and drop the column which you want to visualize; in this example, I have selected the “Net worth” field and grouped it by the “Age” field

Figure 26 — Sample Data Analysis

Clean-up

Remember to clean up AWS data pipeline artifacts created using the CloudFormation template to avoid AWS billing charges. Here are the steps:

  1. Go to the CloudFormation service
  2. Select the stack name created to build AWS data pipeline objects
  3. In the stack details pane, choose Delete
  4. Select Delete stack when prompted

In conclusion, this blog post walked you through creating an AWS data pipeline to ingest raw data files, store the processed data files to analyze the data using Athena query, and visualize the data using AWS QuickSight. Furthermore, we have jotted down the steps to clean up the AWS data pipeline to avoid the AWS billing charges.

NOTE: Code used in this blog post can be downloaded for Data Pipeline Git Repository.

“Learning never exhausts the mind.” — Leonardo da Vinci

--

--