From API to Dashboard: Building an End-to-End ETL Pipeline with AWS
Today I will share practical instructions on how to build a complete end-to-end ETL pipeline using AWS cloud services.
In this step-by-step guide, we’ll extract job market data specifically for Data Engineering roles in Canada from the Adzuna API, store it in Amazon S3, transform it for analysis, load it into Amazon Redshift, and finally, create a simple dashboard in Looker Studio to visualize the insights.
My name is Maksim, and I like learning and experimenting with new tools and technologies.
Link to the project repository — https://github.com/kazarmax/adzuna_etl_aws/
Project Architecture and Execution Flow
The following diagram represents the solution architecture implemented in this project.
Project flow:
- EventBridge daily schedule
- StepFunction
- Run data extraction code
- Store raw data
- Transform raw data
- Store transformed data
- Load transformed data
- Visualize transformed data
Used Services
In this project, we will be using the following services:
- Adzuna API — provides access to job market data, allowing users to query job listings and other employment-related information in real-time. In this project, it serves as the public data source for the ETL pipeline.
- AWS Cloud (Amazon Web Services) is a comprehensive cloud computing platform offering a wide range of computing power, storage, networking, and databases services. AWS provides the backbone for all services in this project, enabling the deployment and management of cloud-based resources.
- AWS IAM (Identity and Access Management) allows you to manage access to AWS resources securely. It controls who has access to which services. In this project, it defines the roles and permissions for Lambda functions, S3, Redshift, and other services.
- AWS Secrets Manager securely stores credentials such as database passwords, API keys, and other secrets used by your applications. In this project, it stores the credentials for Redshift, ensuring secure connections without hardcoding sensitive information.
- AWS CloudWatch is a monitoring service for AWS cloud resources and applications. It helps track performance metrics and monitor logs. In this project, it’s used to monitor the Lambda function execution logs.
- EventBridge allows the scheduling and automation of AWS services. In this project, it’s used to trigger the Step Function that orchestrates and executes the ETL Lambda Functions on a daily schedule.
- AWS Step Function orchestrates complex workflows by coordinating multiple AWS services into a seamless flow. In this project, AWS Step Functions manage the execution of Lambda functions that extract, transform, and load (ETL) job data from Adzuna to Redshift.
- AWS Lambda is a serverless compute service that runs your code in response to events. In this project, it automates the data extraction, transformation, and loading (ETL) processes by running custom Python code to handle Adzuna API data and move it through the pipeline.
- Amazon S3 (Simple Storage Service) is a scalable object storage service. In this project, S3 stores the extracted and transformed files from the Adzuna API, acting as a staging area for the data before it is processed and moved to Redshift.
- AWS Redshift Serverless is a fully managed, serverless data warehouse service that automatically scales based on demand. It is used in this project as the data warehouse to store and analyze the transformed data from the Adzuna API.
- Looker Studio is a free, cloud-based business intelligence tool that allows for the creation of interactive dashboards and visualizations. It connects to Amazon Redshift to visualize the processed data from the ETL pipeline.
Project Prerequisites
You will need the following stuff to be able to follow along with this guide and accomplish the project:
- Python
- Adzuna API account (with application id and application key)
- AWS Account
- Google Account (required to access Looker Studio)
Data Source
In this project, we query the Adzuna API to extract job listings specifically related to Data Engineer roles in Canada. The API is configured to get job posts in Canada that mention data engineer in the job title or description. Additionally, we limit the results to job listings posted within the last two days.
Project Implementation Steps
The project implementation can be broken down into the following major steps:
- Researching Adzuna API
- Creating Amazon S3 bucket and folders
- Developing Lambda function to extract data
- Developing Lambda function to transform data
- Creating and configuring Redshift Serverless storage
- Developing Lambda function to load data
- Creating and Configuring Step Function
- Creating EventBridge schedule to automate ETL pipeline
- Connecting Looker Studio to Redshift
- Creating dashboard in Looker Studio
Step 1. Researching Adzuna API
We will use the Adzuna API to extract Data Engineering job data in Canada. The first step is to explore the API, understand its data structure, and identify the fields we need (e.g., job title, location, company). Tools like Jupyter Notebook or VS Code are useful for testing API calls and transforming data.
The goal is to connect to the API, pull sample data, and prepare it for the ETL pipeline. You can refer to this script from the project repo for guidance: adzuna_research.py
import json
import os
import requests
from datetime import datetime
import math
from loguru import logger
import pandas as pd
ADZUNA_APP_ID = os.getenv('ADZUNA_APP_ID')
ADZUNA_APP_KEY = os.getenv('ADZUNA_APP_KEY')
# Define the API endpoint and base parameters
url = "https://api.adzuna.com/v1/api/jobs/ca/search/"
base_params = {
'app_id': ADZUNA_APP_ID,
'app_key': ADZUNA_APP_KEY,
'results_per_page': 50, # Maximum allowed results per page
'what_phrase': "data engineer", # an entire phrase which must be found in the description or title
'max_days_old': 2,
'sort_by': "date"
}
# Initialize a list to store all job postings
all_job_postings = []
# Make the first request to determine the total number of pages
logger.info("Making the first request to determine the total number of pages")
response = requests.get(f"{url}1", params=base_params)
# Check if the request was successful
if response.status_code == 200:
data = response.json() # Parse the JSON response
total_results = data['count'] # Get the total number of results
results_per_page = base_params['results_per_page']
# Calculate the total number of pages
total_pages = math.ceil(total_results / results_per_page)
logger.info(f"Total number of page = {total_pages}")
# Store the results from the first page
all_job_postings.extend(data['results'])
# Loop through the remaining pages and request data from each
logger.info("Looping through the remaining pages and request data from each")
for page in range(2, total_pages + 1): # Start from page 2 to total_pages
response = requests.get(f"{url}{page}", params=base_params)
# Check if the request was successful
if response.status_code == 200:
page_data = response.json()
# Append job postings from this page to the list
all_job_postings.extend(page_data['results'])
else:
logger.error(f"Error fetching page {page}: {response.status_code}, {response.text}")
else:
logger.error(f"Error: {response.status_code}, {response.text}")
# Now all_job_postings contains data from all pages
logger.info(f"Total jobs retrieved: {len(all_job_postings)}")
# Transformation: picking up only necessary fields
parsed_jobs = []
for job in all_job_postings:
parsed_jobs.append(
dict(
job_id = job['id'],
job_title = job['title'],
job_location = job['location']['display_name'],
job_company = job['company']['display_name'],
job_category = job['category']['label'],
job_description = job['description'],
job_url = job['redirect_url'],
job_created = job['created']
)
)
jobs_df = pd.DataFrame.from_dict(parsed_jobs)
jobs_df['job_created'] = pd.to_datetime(jobs_df['job_created'])
logger.info("Done")
Step 2. Creating Amazon S3 bucket and folders
We will need the following structure in Amazon S3 for the project:
adzuna-etl-project:
- raw_data
- to_process
- processed
- transformed_data
- to_migrate
- migrated
adzuna-etl-project is a bucket. All the rest is the folders inside it.
To create a bucket, open https://console.aws.amazon.com/s3/ and click Create bucket button. Use default settings.
Create the necessary folders inside the bucket using the Create folder button (you can use default settings).
Step 3. Developing Lambda function to extract data
AWS Lambda allows you to run your Python (and not only) scripts in AWS Cloud triggered by some event.
In this step, we will create a Lambda function that executes Python script to extract data from the Adzuna API and save it into S3 bucket in .json format.
The process of creating and configuring the lambda function includes the following activities:
- Creating IAM role for Lambda function
- Creating a Lambda function
- Configuring the Lambda function
- Creating a layer to import the requests Python library into the Lambda function
- Testing the Lambda function
3.1 Creating IAM role for Lambda function
For our Lambda function to access the required AWS resources, such as S3, and write execution logs to CloudWatch service, we must create an IAM role with all the required policies and assign it to the function.
Follow these steps to do that:
- Go to https://console.aws.amazon.com/iam/
- Under Access management, click Roles and then Create Role
- Select AWS Service as Trusted entity type
- Under Use case, select Lambda
- Click Next to skip Step 2 for now
- At Step 3, fill in the Role name, for example, adzuna_lambda_function_role
- Provide some meaningful description
- Click Create role.
- In the list of roles, click the newly created role.
- In the Permission policies block, click Add permission, then Create inline policy
- In the Policy editor, select JSON and insert the following contents into the text area. It will allow the Lambda function to:
- Access and manage objects in the s3 bucket
- Write log files into CloudWatch Logs
- Insert data into Redshift data warehouse
- Get secrets from AWS Secrets Manager
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:ListBucket",
"s3:GetObject",
"s3:PutObjectAcl",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::adzuna-etl-project",
"arn:aws:s3:::adzuna-etl-project/*"
]
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*"
},
{
"Effect": "Allow",
"Action": [
"redshift-data:ExecuteStatement"
],
"Resource": "arn:aws:redshift-serverless:us-east-1:343218212410:*"
},
{
"Effect": "Allow",
"Action": [
"secretsmanager:GetSecretValue"
],
"Resource": "arn:aws:secretsmanager:us-east-1:343218212410:secret:*"
}
]
}
12. Click Next.
13. Fill in the Policy name, for example adzuna_lambda_role_policy
3.2. Creating Lambda function to extract raw data from the Adzuna API
- Go to https://console.aws.amazon.com/lambda/
- Click Create function
- Select Author from scratch
- Give it a Function name, for example, lambda_adzuna_extract_data
- As a Runtime, select Python (I used Python 3.11 for the project)
- Select x86_64 as Architecture
- In the Execution role, select Use an existing role and select the role you created in the previous step, adzuna_lambda_function_role
- Click Create function
3.3 Configuring the Lambda function
- In the lambda_function code text area, insert the following code:
import json
import os
import requests
from datetime import datetime
import math
import boto3
import logging
# Set up logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Getting Adzuna API creds
ADZUNA_APP_ID = os.getenv('ADZUNA_APP_ID')
ADZUNA_APP_KEY = os.getenv('ADZUNA_APP_KEY')
BUCKET = "adzuna-etl-project"
# Define the API endpoint and base parameters
url = "https://api.adzuna.com/v1/api/jobs/ca/search/"
base_params = {
'app_id': ADZUNA_APP_ID,
'app_key': ADZUNA_APP_KEY,
'results_per_page': 50, # Maximum allowed results per page
'what_phrase': "data engineer",
'max_days_old': 2,
'sort_by': "date"
}
def lambda_handler(event, context):
# Initialize a list to store all job postings
all_job_postings = []
# Make the first request to determine the total number of pages
logger.info("Making the first request to determine the total number of pages")
response = requests.get(f"{url}1", params=base_params)
# Check if the request was successful
if response.status_code == 200:
data = response.json() # Parse the JSON response
total_results = data['count'] # Get the total number of results
results_per_page = base_params['results_per_page']
# Calculate the total number of pages
total_pages = math.ceil(total_results / results_per_page)
logger.info(f"Total number of page = {total_pages}")
# Store the results from the first page
all_job_postings.extend(data['results'])
# Loop through the remaining pages and request data from each
logger.info("Looping through the remaining pages and request data from each")
for page in range(2, total_pages + 1): # Start from page 2 to total_pages
response = requests.get(f"{url}{page}", params=base_params)
# Check if the request was successful
if response.status_code == 200:
page_data = response.json()
# Append job postings from this page to the list
all_job_postings.extend(page_data['results'])
else:
logger.error(f"Error fetching page {page}: {response.status_code}, {response.text}")
else:
logger.error(f"Error: {response.status_code}, {response.text}")
# Now all_job_postings contains data from all pages
logger.info(f"Total jobs retrieved: {len(all_job_postings)}")
# Generate a filename with the current timestamp
current_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
file_name = f"adzuna_raw_data_{current_timestamp}.json"
logger.info(f"File name to store raw data: {file_name}")
# Storing Spotify json raw data to AWS S3
logger.info("Storing Adzuna json raw data to AWS S3")
client = boto3.client('s3')
file_key = f"raw_data/to_process/{file_name}"
try:
client.put_object(
Bucket=BUCKET,
Key=file_key,
Body=json.dumps(all_job_postings)
)
logger.info(f"File {file_key} successfully created in bucket {BUCKET}.")
return file_key
except Exception as e:
logger.error(f"Error occurred while creating file: {str(e)}")
return None
2. Click Deploy to save changes.
3. Click the Configuration tab and in the General configuration menu, click Edit
4. Change the Timeout to 30 seconds.
5. Click the Environment variables menu under the Configuration tab and add ADZUNA_APP_ID and ADZUNA_APP_KEY using your Adzuna API account information.
3.4 Creating a layer to import the requests Python library into the Lambda function
A simple
import
doesn’t work in Lambda for external libraries (likepandas
orrequests
) as Lambda comes only with a minimal set of libraries. So, you need to package them through Lambda layers and attach them to a Lambda function to make them available during execution.
- Open the command prompt and execute the following commands to create .zip archive with the requests python package
mkdir requests_layer
cd requests_layer
mkdir python
cd python
pip install requests -t .
2. Create a .zip archive of the requests_layer folder.
3. Go to https://console.aws.amazon.com/lambda/ and click Layers.
4. Click Create layer.
5. Enter Name, for example, lambda_requests_layer.
6. Upload the previously prepared .zip archive with the requests package.
7. Choose x86_64 as architecture
8. Choose Python as a runtime (it should be of the same version as your Lambda function runtime)
9. Click Create
10. Go back to the Lambda function page
11. In the Function overview block, under the Lambda function name, click Layers
12. Click Add a layer
13. In the Choose a layer, select Custom layers, and in the dropdown select the previously created lambda layer.
14. Select Version -> 1
15. Click Add
3.5 Testing the Lambda function
- In the Lambda function page, the Code source block, click the Test button
- In the Configure test event popup, enter any Event name, for example Test, and click Save
- Click the Test button again
If everything was done properly in the previous steps, you should see the following function logs (no errors)
and the json file with the extracted raw data in the s3 folder
Step 4. Developing Lambda function to transform data
In this step, we will create another Lambda function that:
- Takes the raw data .json file from the S3 bucket raw_data/to_process folder
- Transforms the raw data and saves it to transformed_data/to_migrate folder
- Move the initial raw data json file from raw_data/to_process to raw_data/processed folder
The process of creating and configuring the new lambda function includes the following activities:
- Creating a Lambda function
- Configuring the Lambda function
- Adding a layer to import the Pandas Python library into the Lambda function
- Testing the Lambda function
4.1. Creating Lambda function to transform raw data
- Go to https://console.aws.amazon.com/lambda/
- Click Create function
- Select Author from scratch
- Give it a Function name, for example, lambda_adzuna_transform_data
- As a Runtime, select Python (I used Python 3.11 for the project)
- Select x86_64 as Architecture
- In the Execution role, select Use an existing role and select the role you created in the previous step, adzuna_lambda_function_role
- Click Create function
4.2 Configuring the Lambda function
- In the lambda_function code text area, insert the following code:
import boto3
import json
from datetime import datetime
from io import StringIO
import pandas as pd
import csv
import logging
# Set up logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3 = boto3.client('s3')
def put_object_to_s3(bucket, key, data_df):
logger.info("Started uploading transformed file in s3 ...")
try:
# Convert DataFrame to CSV in-memory
buffer = StringIO()
data_df.to_csv(buffer, index=False, encoding='utf-8', quoting=csv.QUOTE_ALL)
content = buffer.getvalue()
# Upload CSV content to S3
s3.put_object(Bucket=bucket, Key=key, Body=content)
logger.info(f"File successfully uploaded to {bucket}/{key}")
return key
except Exception as e:
logger.error(f"Error occurred while uploading file to {bucket}/{key}: {str(e)}")
return None
def delete_s3_object(bucket_name, object_key):
try:
# Delete the object
s3.delete_object(Bucket=bucket_name, Key=object_key)
logger.info(f"File {object_key} deleted successfully from bucket {bucket_name}.")
except Exception as e:
logger.error(f"Error occurred while deleting file: {str(e)}")
def move_s3_object(bucket, source_key, destination_key):
logger.info(f"Started moving {source_key} to 'processed' folder in s3 ...")
try:
# Copy the object
s3.copy_object(
Bucket=bucket,
CopySource={'Bucket': bucket, 'Key': source_key},
Key=destination_key
)
logger.info(f"File copied from {source_key} to {destination_key} successfully.")
delete_s3_object(bucket, source_key)
except Exception as e:
logger.error(f"Error occurred while copying file: {str(e)}")
def get_parsed_raw_jobs_data(json_raw_data):
logger.info("# Started parsing json data")
all_job_postings = json_raw_data
parsed_jobs = []
for job in all_job_postings:
parsed_jobs.append(
dict(
job_id = job['id'],
job_title = job['title'],
job_location = job['location']['display_name'],
job_company = job['company']['display_name'],
job_category = job['category']['label'],
job_description = job['description'],
job_url = job['redirect_url'],
job_created = job['created']
)
)
jobs_df = pd.DataFrame.from_dict(parsed_jobs)
jobs_df['job_created'] = pd.to_datetime(jobs_df['job_created'])
logger.info("Successfully extracted job postings data")
return jobs_df
def lambda_handler(event, context):
# Processing raw data
logger.info("# Started processing raw data files")
# Process the raw data file provided by the extract lambda function
logger.info("Processing raw data file from previous Step Function output")
# Get the S3 object key from Step Function's output
s3_bucket = "adzuna-etl-project"
s3_object = event['s3ObjectKey'] # Get object key from Step Function output
logger.info(f"Processing S3 object {s3_object} from bucket {s3_bucket}")
# Retrieve the raw data from S3
s3_object_data = s3.get_object(Bucket=s3_bucket, Key=s3_object)
content = s3_object_data['Body']
json_raw_data = json.loads(content.read())
# Transforming retrieved json data and storing it back to another s3 folder "transformed_data"
logger.info(f"Started processing {s3_object} file ...")
current_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
jobs_transformed_data = get_parsed_raw_jobs_data(json_raw_data)
s3_destination_key = f"transformed_data/to_migrate/adzuna_transformed_data_{current_timestamp}.csv"
created_object_key = put_object_to_s3(s3_bucket, s3_destination_key, jobs_transformed_data)
# Moving raw files from unprocessed to processed folder inside s3
source_key = s3_object
source_file_name = source_key.split('/')[2]
destination_key = "raw_data/processed/" + source_file_name
move_s3_object(s3_bucket, source_key, destination_key)
return created_object_key
2. Click Deploy to save changes.
3. Click the Configuration tab and in the General configuration menu, click Edit
4. Change the Timeout to 30 seconds.
4.3 Adding a layer to import Pandas Python library into the Lambda function
- In the Function overview block, under the Lambda function name, click Layers
2. Click Add a layer
3. In the Choose a layer, keep AWS layers.
4. In the AWS layers dropdown menu, select AWSSDKPandas-Python311 (your Python version may differ if you chose a distinct version of Python in the previous steps)
5. Select the available Version and click Add
4.4 Testing the Lambda function
We will perform functional testing of the whole workflow later after configuring the Step Function.
At this stage, it makes sense to try to just execute the function to catch syntax errors.
For that, use the same approach as we did for testing the extraction function.
Step 5. Creating and configuring Redshift Serverless storage
We will use Redshift Serverless data warehouse to store and analyze the transformed data from the Adzuna API.
The process of creating and configuring Redshift Serverless includes the following steps:
- Creating a Redshift Workgroup
- Creating connection to Redshift database
- Creating a new database, schemas and tables to ingest the Adzuna transformed data
Creating Redshift Workgroup and Namespace
Before moving on to the procedural steps, it’s important to familiarize yourself with two terms we will create soon:
- Workgroup: A workgroup is the compute layer that manages queries and connects to the Redshift database. It provides the compute resources (like scaling and query execution) needed to run your SQL queries.
- Namespace: A namespace is the data storage layer where your Redshift data resides. It holds databases, schemas, tables, and other objects, similar to a traditional database storage system.
Follow these steps:
- In the AWS Console, services search input string, type in Redshift, then click Serverless under Top features
2. In the Amazon Redshift Serverless page, click Create Workgroup
3. Type in adzuna-etl-project-redshift-wg as a Workgroup name
4. You can leave the default Base capacity or select 32
5. Keep default settings for Network and security
6. Click Next
7. Type in adzuna-etl-project-redshift-ns as a Namespace name
8. Under Database name and password, select the Customize admin user credentials checkbox and then select the Manage admin credentials in AWS Secrets Manager option. AWS will automatically generate user name and password credentials for Redshift and securily store them in AWS Secrets Manager.
9. In the Associated IAM roles, click Manage IAM roles and select Create IAM role
10. Select Specific S3 buckets and then select adzuna-etl-project
11. Click Create IAM role as default
12. Under Encryption and security, select checkboxes for User log, Connection log and User activity log to export the logs for further monitoring them in Amazon CloudWatch Logs
13. Click Next
14. In the Review and create screen, under Permissions, note down the IAM role as you will need it later to configure a Lambda function. It looks like:
arn:aws:iam::827312043843:role/service-role/AmazonRedshift-CommandsAccessRole-20240914T132839
15. Click Create
Creating a connection to Redshift database
- Open the created Redshift namespace (adzuna-etl-project-redshift-ns) and click the Query Data button at the top-right part of the page. Redshift query editor will open.
2. Click the three dots to the right of the workgroup name (adzuna-etl-project-redshift-wg) and select Create connection
3. In the popup window, select AWS Secrets Manager, and in the Secret list, select the secret (redshift!adzuna-etl-project-redshift-ns-admin)
You should be connected now and see the list of available databases, including dev
Creating a new database, schemas and tables to ingest the Adzuna transformed data
To ingest the Adzuna data, we will create a new database, two schemas and two tables:
- database: adzuna
- schemas: stg (for staging table) and dw (for data warehouse table)
- tables: stg.staging_jobs to temporarily store and deduplicate daily extracted jobs data before inserting to the main table dw.jobs that will keep all historical data ingested from Adzuna API
Follow these steps:
- In the Redshift query editor, create a new database by executing the following query
CREATE DATABASE adzuna;
2. Refresh the browser page with the Redshift query editor and select the newly created adzuna database in the dropdown right above the query editor area
3. Create the stg schema:
CREATE SCHEMA stg;
4. Create the staging_jobs table:
CREATE TABLE stg.staging_jobs (
job_id BIGINT,
job_title VARCHAR(150),
job_location VARCHAR(150),
job_company VARCHAR(150),
job_category VARCHAR(150),
job_description VARCHAR(550),
job_url VARCHAR(150),
job_created TIMESTAMP
);
5. Create the dw schema:
CREATE SCHEMA dw;
6. Create the jobs table:
CREATE TABLE dw.jobs (
job_id BIGINT,
job_title VARCHAR(150),
job_location VARCHAR(150),
job_company VARCHAR(150),
job_category VARCHAR(150),
job_description VARCHAR(550),
job_url VARCHAR(150),
job_created TIMESTAMP
);
The adzuna database should look this way:
Step 6. Developing Lambda function to load data into Redshift
In this step, we will create a Lambda function that:
- Takes a .csv file from the transformed_data/to_migrate/ s3 folder as an input
- Loads the .csv file to the adzuna.stg.staging_jobs table, deduplicates the data and inserts it into the adzuna.dw.jobs table in Redshift
- Moves the .csv file from transformed_data/to_migrate/ to transformed_data/migrated/ folder in s3
The process of creating and configuring the load lambda function includes the following activities:
- Creating a Lambda function
- Configuring the Lambda function
- Testing the Lambda function
6.1. Creating Lambda function to load data into Redshift
- Go to https://console.aws.amazon.com/lambda/
- Click Create function
- Select Author from scratch
- Give it a Function name, for example, lambda_adzuna_load_data
- As a Runtime, select Python (I used Python 3.11 for the project)
- Select x86_64 as Architecture
- In the Execution role, select Use an existing role and select the role you created in the previous step, adzuna_lambda_function_role
- Click Create function
6.2 Configuring the Lambda function
- In the lambda_function code text area, insert the following code (from this file in the repo):
import boto3
import os
import logging
from botocore.exceptions import ClientError
# Set up logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Redshift creds
REDSHIFT_REGION = 'us-east-1' # Change to the region your Redshift Serverless is in
REDSHIFT_WORKGROUP = 'adzuna-etl-project-redshift-wg'
REDSHIFT_SECRET_ARN = os.getenv('REDSHIFT_SECRET_ARN')
REDSHIFT_IAM_ROLE = os.getenv('REDSHIFT_IAM_ROLE')
# Initialize the Secrets Manager client
session = boto3.session.Session()
# Initialize the Redshift Data API client
client_redshift = session.client('redshift-data', region_name=REDSHIFT_REGION)
# SQL query template to import csv files from s3 and merge data from staging to dw table
QUERY_TEMPLATE = """
BEGIN;
TRUNCATE TABLE adzuna.stg.staging_jobs;
COPY adzuna.stg.staging_jobs
FROM '{s3_uri}'
IAM_ROLE '{iam_role}'
FORMAT AS CSV DELIMITER ',' QUOTE '"' IGNOREHEADER 1
REGION AS '{region}';
INSERT INTO adzuna.dw.jobs
SELECT stj.*
FROM adzuna.stg.staging_jobs stj
LEFT JOIN adzuna.dw.jobs dwj ON stj.job_id = dwj.job_id
WHERE dwj.job_id IS NULL;
COMMIT;
"""
# Function to execute query in Redshift
def execute_redshift_query(query_str, database='adzuna'):
try:
logger.info(f"Executing SQL query: {query_str}")
response = client_redshift.execute_statement(
Database=database, # Redshift database name
SecretArn=REDSHIFT_SECRET_ARN,
Sql=query_str,
WorkgroupName=REDSHIFT_WORKGROUP # Redshift Serverless workgroup name
)
logger.info("Query executed successfully")
logger.info(f"Response: {response}")
except ClientError as e:
logger.error(f"Error executing query: {e}")
raise e
except Exception as e:
logger.error(f"Unexpected error occurred: {str(e)}")
raise e
# Function to delete object from s3 bucket
def delete_s3_object(bucket_name, object_key):
s3_client = boto3.client('s3')
try:
# Delete the object
s3_client.delete_object(Bucket=bucket_name, Key=object_key)
logger.info(f"File {object_key} deleted successfully from bucket {bucket_name}.")
except Exception as e:
logger.error(f"Error occurred while deleting file: {str(e)}")
# Function to move object to another folder in s3 bucket
def move_s3_object(bucket, source_key, destination_key):
logger.info(f"Started moving {source_key} to 'migrated' folder in s3 ...")
s3_client = boto3.client('s3')
try:
# Copy the object
s3_client.copy_object(
Bucket=bucket,
CopySource={'Bucket': bucket, 'Key': source_key},
Key=destination_key
)
logger.info(f"File copied from {source_key} to {destination_key} successfully.")
delete_s3_object(bucket, source_key)
except Exception as e:
logger.error(f"Error occurred while copying file: {str(e)}")
def lambda_handler(event, context):
# Get the S3 object key from the Step Function input
s3_object = event['s3ObjectKeyTransformed'] # Input from previous function (Step Function)
s3_bucket = "adzuna-etl-project"
s3_uri = f's3://{s3_bucket}/{s3_object}'
logger.info(f"S3 URI of transformed file: {s3_uri}")
query = QUERY_TEMPLATE.format(s3_uri=s3_uri, iam_role=REDSHIFT_IAM_ROLE, region=REDSHIFT_REGION)
logger.info(f"Redshift query to execute: {query}")
logger.info("Started importing data from csv file in s3 and merging to dw table")
execute_redshift_query(query_str=query)
# Moving migrated file to another folder in s3
source_key = s3_object
source_file_name = source_key.split('/')[2]
destination_key = "transformed_data/migrated/" + source_file_name
move_s3_object(s3_bucket, source_key, destination_key)
2. Click Deploy to save changes.
3. Click the Configuration tab and in the General configuration menu, click Edit
4. Change the Timeout to 30 seconds.
6.3 Adding environment variables
We will create two environment variables:
- REDSHIFT_SECRET_ARN — it will point to the secret in AWS Secrets Manager with the Redshift connection credentials
- REDSHIFT_IAM_ROLE — it will point to the Redshift IAM role
- Click the Configuration tab, then click Environment variables
- Click Edit, then click Add environment variable
- For the first env variable:
- type in REDSHIFT_SECRET_ARN in the Key field
- copy and paste the Redshift Secret ARN value to the Value field.
To get the Redshift Secret ARN, open AWS Secrets Manager, find the secret named redshift!adzuna-etl-project-redshift-ns-admin, click on its name, and in the Secret details page, copy the Secret ARN value for the secret. It should start with arn:aws:secretsmanager:
4. Click Add environment variable
5. For the second env variable:
- type in REDSHIFT_IAM_ROLE in the Key field
- copy and paste the Redshift IAM role ARN value to the Value field. You should have it noted from the Creating Redshift Workgroup and Namespace step
6. Click Save
6.4 Testing the Lambda function
We will perform functional testing of the whole workflow later after configuring the Step Function.
At this stage, it makes sense to try to just execute the function to catch syntax errors.
For that, use the same approach as we did for testing the extraction function before.
Step 7. Creating and Configuring Step Function
AWS Step Functions orchestrate complex workflows by coordinating multiple AWS services into a seamless flow. In this project, AWS Step Functions manage the execution of the Lambda functions that extract, transform, and load (ETL) job data from Adzuna to Redshift.
Step Functions ensure reliable workflow execution by handling errors, retries, and task dependencies automatically. By visualizing the entire pipeline, it simplifies debugging and monitoring, improving overall process management and resilience.
The process of creating and configuring Step Function includes the following steps:
- Creating IAM role for Step Function
- Creating and configuring Step Function
- Testing Step Function
7.1 Creating IAM role for Step Function
For our Step Function to execute the created ETL Lambda Functions and write execution logs to CloudWatch service, we must create an IAM role with all the required policies.
Follow these steps to do that:
- Go to https://console.aws.amazon.com/iam/
- Under Access management, click Roles and then Create Role
- Select AWS Service as Trusted entity type
- Under Use case, select Step Functions
- Click Next to skip Step 2 for now
- At Step 3, fill in the Role name, for example, adzuna_step_function_role
- Provide some meaningful description
- Click Create role.
- In the list of roles, click the newly created role.
- In the Permission policies block, click Add permission, then Create inline policy
- In the Policy editor, select JSON and insert the following contents into the text area
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"arn:aws:lambda:us-east-1:123456789123:function:lambda_adzuna_extract_data:*",
"arn:aws:lambda:us-east-1:123456789123:function:lambda_adzuna_transform_data:*",
"arn:aws:lambda:us-east-1:123456789123:function:lambda_adzuna_load_data:*"
]
},
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"arn:aws:lambda:us-east-1:123456789123:function:lambda_adzuna_extract_data",
"arn:aws:lambda:us-east-1:123456789123:function:lambda_adzuna_transform_data",
"arn:aws:lambda:us-east-1:123456789123:function:lambda_adzuna_load_data"
]
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogDelivery",
"logs:CreateLogStream",
"logs:GetLogDelivery",
"logs:UpdateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:PutLogEvents",
"logs:PutResourcePolicy",
"logs:DescribeResourcePolicies",
"logs:DescribeLogGroups"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"logs:PutRetentionPolicy",
"logs:DescribeLogGroups"
],
"Resource": "*"
}
]
}
12. Click Next
13. Fill in the Policy name, for example adzuna_step_function_policy and save changes
7.2 Creating and configuring Step Function
In this step, we will create the workflow that orchestrates an end-to-end ETL (Extract, Transform, Load) process using the three Lambda functions. Here’s a breakdown of what happens:
- ExtractDataFromAPI: The workflow starts by executing a Lambda function that extracts data from the Adzuna API and stores the raw data in S3. The key of the S3 object is passed to the next step.
- TransformData: The workflow moves to the next step, where a second Lambda function is invoked to transform the raw data. The S3 object key from the extraction step is passed as input. The transformed data is saved, and its S3 object key is passed to the final step.
- LoadDataToRedshift: In the final step, another Lambda function is triggered to load the transformed data from S3 into an Amazon Redshift data warehouse for analysis. It uses the S3 object key generated by the previous transformation step.
- Error Handling: Each step includes error handling, so if any function fails, the workflow moves to a FailureState, indicating that an error occurred in the ETL process.
Follow these steps:
- Open this page https://console.aws.amazon.com/states/
- Click Create state machine
- In the Choose a template window, select Blank
- Click the {} Code button, and insert the following json code (adjust resource ARNs with your values):
{
"StartAt": "ExtractDataFromAPI",
"States": {
"ExtractDataFromAPI": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789123:function:lambda_adzuna_extract_data:$LATEST",
"Next": "TransformData",
"ResultPath": "$.s3ObjectKeyExtract",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "FailureState"
}
]
},
"TransformData": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789123:function:lambda_adzuna_transform_data:$LATEST",
"Parameters": {
"s3ObjectKey.$": "$.s3ObjectKeyExtract"
},
"Next": "LoadDataToRedshift",
"ResultPath": "$.s3ObjectKeyTransformed",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "FailureState"
}
]
},
"LoadDataToRedshift": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789123:function:lambda_adzuna_load_data:$LATEST",
"Parameters": {
"s3ObjectKeyTransformed.$": "$.s3ObjectKeyTransformed"
},
"End": true,
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "FailureState"
}
]
},
"FailureState": {
"Type": "Fail",
"Cause": "An error occurred in the ETL process"
}
}
}
5. Click the Config button
6. Update the State machine name to AdzunaStateMachine
7. In the Execution role, select the recently created adzuna_step_function_role
8. In the Logging, set Log level to All and select the Include execution data checkbox to enable logging your state machine’s execution history to CloudWatch Logs
9. Click Create
7.3 Testing Step Function
To test the created Step Function, click Start Execution on the page with the newly created State machine
In case of successful execution, you should see the following graph with all green (Succeeded) steps:
Apart from that, your s3 folder should look like as follows:
- raw_data/processed/ should contain a new .json file
- raw_data/to_process/ should be empty
- transformed_data/migrated/ should contain a new .csv file
- transformed_data/to_migrate/ should be empty
In the Redshift adzuna database, you should see that the both staging and datawarehouse tables have data:
Step 8. Creating EventBridge schedule for automating ETL pipeline
At this point, when the ETL pipeline is implemented, tested and works as expected, it’s time to schedule it for automatic execution.
For this purpose, we will create an EventBridge schedule.
Follow these steps:
- Open AdzunaStateMachine page
- Click Actions, then Create EventBridge rule
- Fill in the rule Name, for example, adzuna_etl_run_daily
- Fill in the Rule description, for example, Schedule to run Adzuna ETL daily
- In the Rule type, select Schedule and then click Continue in EventBridge Scheduler
- Under Schedule pattern, select Recurring schedule
- Select the right Time zone for you
- Under Schedule type, select Cron-based schedule and type in the following cron expression: cron(0 10 * * ? 2024)
- In the Flexible time window, select Off
- Click Next
- Under Target detail, select Templated targets, and then select AWS Step Functions (StartExecution)
- Under StartExecution, in the State machine dropdown, select AdzunaStateMachine and click Next
- In the Action after schedule completion, select NONE
- Optionally, in the Retry policy, set the Maximum age of event to 5 minutes and Retry attempts to 3 times
- Click Next
- Click Create schedule
Now, the ETL pipeline will be triggered every day at 10am UTC during 2024 year
Step 9. Connecting Looker Studio to Redshift
The process of connecting Looker Studio to Redshift includes the following steps:
- Creating a database user in Redshift for Looker Studio
- Configuring Redshift network settings
- Creating and configuring Looker Studio connection to Redshift
9.1. Creating a database user in Redshift for Looker Studio
According to security best practices, we don’t want to use the Redshift admin user for Looker Studio. We will create a new user with the minimum required privileges.
To do that, open the Redshift query editor and execute the following queries:
-- Create user
CREATE USER looker_user WITH PASSWORD 'ab1c#de@sdf';
-- Grant usage on the schema
GRANT USAGE ON SCHEMA dw TO looker_user;
-- Grant select privileges on specific table
GRANT SELECT ON TABLE adzuna.dw.jobs TO looker_user;
9.2. Configuring Redshift network settings
By default, after we created a Redshift workgroup, it is not publicly accessible. Since Looker Studio is an external service that lives outside of AWS, we need to:
- Enable public accessibility for Redshift
- Configure Redshift VPC security group to accept connections from Looker Studio IP addresses only
Follow these steps:
- Go to your Redshift Workgroup page (adzuna-etl-project-redshift-wg)
- Under Data access, click Edit
- Under Publicly accessible, select the Turn on Publicly accessible checkbox
- Click Save changes
- On the Redshift Workgroup page, under Network and security, click the link of the VPC security group
- Click the link in the Security group ID field
- Click Edit inbound rules
- Click Add rule
- In the Type dropdown, select Redshift
- In the Source, select Custom and copy/paste the following CIDR for Looker Studio service: 142.251.74.0/23
- In the Description, type in Looker Studio connection
- Click Save rules
9.3. Creating and configuring Looker Studio connection to Redshift
To proceed further, you need to login into your Google Account. If you don’t have one, sign up.
Follow these steps:
- Open https://lookerstudio.google.com/
- In the top-left corner, click Create, then select Data source
- Under Google Connectors, select Amazon Redshift
4. In the connector configuration page, click JDBC URL
5. Copy the JDBC URL field value on the Redshift Workgroup page in the General information
It will look like: jdbc:redshift://adzuna-etl-project-redshift-wg.123456781234.us-east-2.redshift-serverless.amazonaws.com:5439/dev
Correct /dev to /adzuna
jdbc:redshift://adzuna-etl-project-redshift-wg.123456781234.us-east-2.redshift-serverless.amazonaws.com:5439/adzuna
6. Copy and paste the corrected JDBC URL value into the URL field on the Amazon Redshift connector configuration page
7. In the Username and Password fields, type in the username and password for the looker_studio_user you created in the previous step in Redshift database
8. Download the Amazon Redshift SSL certificate using this link — https://s3.amazonaws.com/redshift-downloads/amazon-trust-ca-bundle.crt (article — https://docs.aws.amazon.com/redshift/latest/mgmt/connecting-ssl-support.html)
9. In the connector configuration settings, select the Enable SSL checkbox and in the Redshift SSL Configuration Files, select the downloaded certificate
10. Click Authenticate
If everything was set up correctly, you should see the Redshift tables
11. Select CUSTOM QUERY, enter the following query in the Custom Query text area and click Connect
select * from adzuna.dw.jobs
12. In the next screen, update the Type of the job_url field from Text to URL-URL
13. Click Create Report
Step 10. Creating dashboard in Looker Studio
In this step, we will create a simple dashboard in Looker Studio that shares insights on the Data Engineering job market in Canada.
The dashboard will display key metrics, such as the total number of available jobs, job distribution by location, popular job titles, and a timeline of job postings per day.
Follow these steps:
- Use the Add a chart option from the top menu to add the necessary charts to the dashboard and the following screenshot with a ready dashboard as a reference.
2. Give it a name, for example DE Canada Jobs Report by Adzuna and click View
3. If you want, make the dashboard publicly accessible using the Share button
By default, the dashboard is refreshed every 12 hours. But you can make force update at any time using the Refresh data option to the right of the Edit button
Conclusion
In this project, we explored how to build a fully automated, end-to-end ETL pipeline using AWS cloud services.
We started by extracting Data Engineering job listings in Canada from the Adzuna API, storing raw data in Amazon S3, transforming it for analysis, and loading the processed data into Amazon Redshift Serverless. Finally, we visualized our insights by creating an interactive dashboard in Looker Studio.
By following these steps, you now have a powerful, scalable ETL pipeline that can be applied to many other use cases. This project demonstrates how AWS services can work together to automate data flows and provide real-time insights, from data collection to visualization.
If you found this guide helpful or have any questions, feel free to reach out! You can also explore the full project repository and code on GitHub.
Follow me on LinkedIn — https://www.linkedin.com/in/kazarmax/