Streamlined Data Processing: From API to S3 with AWS and Airflow

Rahul dhanawade
Towards Data Engineering
6 min readJan 30, 2024

Github Code Link

Buckle up as we guide you through a hands-on, step-by-step process of building a slick data pipeline using AWS wonders, starring the ONS API as our data playground.

Step 1: Grabbing Data from ONS API

Let’s kick things off by diving into the ONS API. This powerhouse lets us snag datasets with just a bit of HTTP magic. Learn how to wield Python like a wizard, extracting JSON data and effortlessly transforming it into a data frame. Each JSON field becomes a sleek column, making our data crystal clear.

import requests
import json
import pandas as pd
import s3fs

def extract_field(item, field_name, default='N/A'):
if item is not None and isinstance(item, dict):
field_value = item.get(field_name, default)
return field_value
else:
return default

In this initial section, we set the stage for our data journey. We import essential libraries like requests for API interaction, json for handling JSON data, pandas for data manipulation, and s3fs for seamless interaction with AWS S3. The extract_field function becomes our handy tool for safely retrieving values from nested dictionaries

URL = "https://api.beta.ons.gov.uk/v1/datasets"
response = requests.get(URL)
json_data = json.loads(response.text)

items = json_data["items"]

Here, we make a connection to the Office for National Statistics (ONS) API by sending a request to the specified URL. The API responds with a JSON payload containing valuable information. We extract the relevant data by loading the JSON content and isolating the ‘items’ key, laying the foundation for subsequent processing steps.

data_list = []

for item in items:
data = {
"ID": extract_field(item, 'id'),
"Description": extract_field(item, 'description')
}

# Extract and append contacts
for i, contact in enumerate(item.get('contacts', []), start=1):
data[f"Contact {i} Name"] = extract_field(contact, 'name', 'Contact')
data[f"Contact {i} Email"] = extract_field(contact, 'email', 'N/A')
data[f"Contact {i} Telephone"] = extract_field(contact, 'telephone', 'N/A')

# Extract and append keywords
data["Keywords"] = extract_field(item, 'keywords', 'No keywords available')

# Extract and append links
data["Editions Link"] = extract_field(item['links']['editions'], 'href', 'No editions link available')
data["Latest Version Link"] = extract_field(item['links']['latest_version'], 'href', 'No latest version link available')
data["Taxonomy Link"] = extract_field(item['links']['taxonomy'], 'href', 'No taxonomy link available')

# Extract and append methodologies link if present
data["Methodologies Link"] = extract_field(item['links'].get('methodologies'), 'href', 'No methodologies link available')

# Extract and append methodology details
for i, methodology in enumerate(item.get('methodologies', []), start=1):
data[f"Methodology {i} Title"] = extract_field(methodology, 'title', 'N/A')
data[f"Methodology {i} National Statistic"] = extract_field(methodology, 'national_statistic', 'N/A')
data[f"Methodology {i} Next Release"] = extract_field(methodology, 'next_release', 'N/A')
data[f"Methodology {i} QMI"] = extract_field(methodology, 'qmi', 'N/A')

# Extract and append additional fields
data["Release Frequency"] = extract_field(item, 'release_frequency', 'No release frequency available')
data["State"] = extract_field(item, 'state', 'No state field available')
data["Title"] = extract_field(item, 'title', 'No title available')

data_list.append(data)

Then we iterate through the extracted data items, structuring the information into a more organized format. Each item is transformed into a dictionary, capturing essential fields such as ID, description, contacts, keywords, links, methodologies, and additional details. This structured data is stored in the data_list for further processing.

# Create DataFrame
df = pd.DataFrame(data_list)

# Save to CSV
df.to_csv("s3:path")

In this final section, we leverage the Pandas library to convert our structured data into a DataFrame. The DataFrame provides a tabular representation of the information, making it easy to work with and analyze. Subsequently, the data is stored in CSV format and uploaded to an S3 bucket.

Step 2: Setting Up AWS EC2 and Airflow

Time to set up camp in the cloud with AWS EC2. We’ll show you how to spin up an EC2 instance and install Airflow using some command-line kung fu. Once that’s done, upload your code to Airflow, where the real magic happens. Create Directed Acyclic Graphs (DAGs) for a smooth data dance.

Steps:

Access AWS Console: Log in and go to EC2 service.

Launch Instance: Click "Launch Instance."

Choose AMI: Select an Amazon Machine Image.

Select Instance Type: Choose instance specifications.

Review and Launch: Check configurations and click "Launch."

Create Key Pair: If needed, create or select a key pair.

Launch Instance: Confirm and launch the instance.

Connect to Instance: Use SSH to connect to your instance.

Open Command Prompt (Windows) or Terminal (Linux/Mac):

cd path/to/your/key

ssh -i "your-key.pem" ubuntu@your-instance-ip

This command establishes a secure SSH connection to your EC2 instance, allowing you to interact with it via the command line. Make sure to replace placeholder values with your actual key file name and EC2 instance IP

Step 3: Installing Dependencies and Apache Airflow on EC2

# Update package list
sudo apt-get update

# Install Python3 pip
sudo apt install python3-pip

# Install Apache Airflow using pip
sudo pip install apache-airflow

# Install additional Python libraries
sudo pip install pandas
sudo pip install s3fs
sudo pip install requests
pip install jsonlib-python3

Make sure to execute these commands in sequence on your EC2 instance to install the necessary dependencies and Apache Airflow

#Start Airflow 

Airflow standalone

Ensure that you run this command in the Airflow home directory on your EC2 instance. you will see credentials to sign in into web UI

You can access the Airflow web UI at http://your-instance-ip:8080 in your web browser.

You will be prompted to log in. Use the following credentials:

Username: admin
Password: Password

You should now have access to the Airflow web UI and can explore and manage your DAGs (Directed Acyclic Graphs) and tasks. If you have any DAGs already defined, you’ll be able to see and trigger them from the UI.

Step 4: Creating an Airflow DAG

from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from api_extraction import extract_field

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}

dag1 = DAG(
'ONS_API_Extraction',
default_args=default_args,
description="First Airflow project",
schedule_interval=timedelta(days=1), # Adjust the schedule interval as needed
)

task_1 = PythonOperator(
task_id='data_extraction_from_api',
python_callable=extract_field,
dag=dag1,
)

task_1

Make sure to adjust the start_date and schedule_interval parameters according to your requirements. Additionally, you may need to define the dependencies between tasks based on your workflow.

Step 5: Storing Data in AWS S3

Get ready to be blown away by Amazon S3, the data storage juggernaut. We’ll guide you through creating an S3 bucket and seamlessly integrating it into your data pipeline. Your code becomes the maestro, conducting a symphony that stores your DataFrame as a CSV in S3. Secure, scalable, and oh-so-cool.

I am Assuming you have created an S3 bucket, you need to provide the bucket path in your Python function code.

# Replace 'your-s3-bucket' with the actual name of your S3 bucket
s3_bucket = 'your-s3-bucket'

# Example path in S3 where you want to store the CSV file
s3_path = f's3://{s3_bucket}/your/data/path/data.csv'

# Update your data storage code with the S3 path
# For example, in your Pandas DataFrame to CSV conversion
df.to_csv(s3_path)

Now, Moving DAG Script and Python Function Code to airflow in your preferred directory

# Step 1: Save the DAG Script and Python Function Code
# Save your DAG script (e.g., my_ons_dag.py) and Python function code (e.g., api_extraction.py) in your preferred directory.

# Step 2: Locate the Airflow DAGs Folder
AIRFLOW_HOME="/path/to/your/airflow"
DAGS_FOLDER="${AIRFLOW_HOME}/dags"
CODE_FOLDER="${AIRFLOW_HOME}/code"

# Step 3: Copy the DAG Script and Python Function Code (you also can open the file and copy paste code)
cp my_ons_dag.py "${DAGS_FOLDER}"
cp api_extraction.py "${CODE_FOLDER}"

# Step 4: Verify in Airflow Web UI
# Access the Airflow web UI to check if the new DAG appears.

# Step 5: Trigger the DAG
# Manually trigger the DAG in the Airflow web UI or wait for the scheduled run.

Step 5: Testing, Testing

Before we wrap up this tech fiesta, let’s make sure everything’s rock-solid. Test your pipeline multiple times, validating each step to ensure a smooth ride. This final check ensures your creation is battle-ready and can handle anything the real world throws at it.

and Testing…

And there you have it! From ONS API to AWS S3, a simplified data pathway. No tech overwhelm, just a smooth, streamlined process. Ready to give it a try?

Making a Living out of writing, if it helped you. Just clap.

Happy Learning :)

--

--

Rahul dhanawade
Towards Data Engineering

Self-taught in Data and Analytics | I am not an educator, Just documenting my Learnings