AWS Data Wrangler — Simplifying Pandas integration with AWS data related services

Subhash Burramsetty
4 min readSep 11, 2020

--

Enterprise organisations are utilising cloud services to build data lakes, warehouses and automated ETL pipelines. In AWS Cloud, data lakes are built on top of Amazon S3 due to its durability, availability, scalability and cheap of cost. Amazon Athena is one of the best tools to query data from S3. When it comes to programatic interaction with AWS services, Boto3 is the first python package that comes to everyones mind. But programatically querying the S3 data using Athena into Pandas dataframes to do ETL hasn’t been that easier when using the Boto3 package alone as it is.

Recently, I came across an amazing Python package called “AWS Data Wrangler”. It was developed by AWS as part of the AWS Professional Service Open Source initiative. It makes the lives of data engineers a lot simpler with the amazing integration it provides with big data services and tools in AWS.

Let me illustrate the difference with an example:

Use case: Perform some ETL operations using Pandas on data present in data lake by extracting data using Amazon Athena queries

Old way of reading Athena Query output into Pandas Dataframe using Boto3:

The process will involve following steps:

  1. Submit query to Athena using Start Query execution method via Athena client using AWS Boto3
  2. Retrieve the QueryExecutionId from response
  3. Poll the Query status by passing QueryExecutionId to the Get Query Execution method
  4. Once the query is succeeded, read the output file from Athena output S3 location into Pandas Dataframe (Also you might need to deal with eventual consistency behaviour of S3 because the output file might not be immediately available in S3 for reading into Pandas dataframe)

The code would look like follows:

New way of reading Athena Query output into Pandas Dataframe using AWS Data Wrangler:

AWS Data Wrangler takes care of all the complexity which we handled manually in our old code snippet like dealing with query submission, polling, reading data into Pandas dataframe, s3 eventual consistency etc.

We can achieve the equivalent output of above code using the following simple code snippet 🎉:

import awswrangler as wr
import pandas as pd
# Retrieving the data from Amazon Athena
athena_results_df = wr.athena.read_sql_query('''SELECT * FROM table_name WHERE column_1="value1" AND column_2="value2"''', database="sample_db")
AWS Data Wrangler

AWS Data Wrangler is built on top of open-source projects like Pandas, Boto3, SQLAlchemy, Apache Arrow etc. It provides easier and simpler Pandas integration with a lot of other AWS services by providing abstract functions. Some of the services and their basic features are as follows:

  • Amazon S3
# Reading CSV files
wr.s3.read_csv(f"s3://sample-bucket/sample.csv")
# Writing CSV files
csv_file_path=f"s3://sample-bucket/sample_2.csv"
wr.s3.to_csv(df, csv_file_path, index=False)
# Read JSON files
wr.s3.read_json(f"s3://sample-bucket/sample.json")
# Writing JSON files
json_file_path=f"s3://sample-bucket/sample_2.json"
wr.s3.to_json(df, json_file_path)
# Reading parquet files
wr.s3.read_parquet(f"s3://sample-bucket/sample.parquet")
# Writing parquet files
parquet_file_path = f"s3://sample-bucket/sample_2.parquet"
wr.s3.to_parquet(df, parquet_file_path)
  • AWS Glue Catalog
# Retrieve databases in Glue catalog
wr.catalog.databases()
# Retrieve tables in a Glue database
wr.catalog.tables(database="sample_db")
# Search a table in Glue database
wr.catalog.tables(name_contains="sample")
wr.catalog.tables(name_prefix="sample_")
wr.catalog.tables(name_suffix="_table")
wr.catalog.tables(search_text="table description")
# Retrieve table details
wr.catalog.table(database="sample_db", table="sample_table")
# Delete a table
wr.catalog.delete_table_if_exists(database="sample_db", table="sample_table")
# Delete a database
wr.catalog.delete_database('sample_db')
  • Databases (MySQL, PostgreSQL, Redshift)
# Get SQLAlchemy Engine from a Glue Catalog Connection
wr.catalog.get_engine(name='sample_connection')
# Get SQLAlchemy Engine using the given db parameters
postgres_engine = wr.db.get_engine(
db_type="postgresql",
host="127.0.0.1",
port=5432,
database="sample_db",
user="sample_user",
password="sample_password"
)
# Reading data from table in database
wr.db.read_sql_query("SELECT * FROM public.sample_table", con=postgres_engine)
# Writing data into table in database
wr.db.to_sql(df, postgres_engine, schema="public", name="sample_table", if_exists="replace", index=False)

# Load data into redshift using COPY command

wr.db.copy_to_redshift(
df=df,
path=path,
con=redshift_engine,
schema="public",
table="sample_table",
mode="overwrite",
iam_role=iam_role
)
# Unload data from redshift using UNLOAD command
wr.db.unload_redshift(
sql="SELECT * FROM public.sample_table",
con=redshift_engine,
iam_role=iam_role,
path=path,
keep_files=True
)
  • Amazon Athena
# Read data from Athena using SQL query
wr.athena.read_sql_query("SELECT * FROM sample_table", database="sample_db")
# Read data from Athena using SQL query using specific chunksize
df_chunks = wr.athena.read_sql_query(
"SELECT * FROM sample_table",
database="sample_db",
chunksize=10000
)
for df in dfs:
print(len(df.index))
  • Amazon EMR
# Create an EMR cluster
wr.emr.create_cluster(params)
# Sumbit EMR step
wr.emr.submit_step(cluster_id, command=f"spark-submit s3://sample-bucket/sample-test-script.py")
# Terminate an EMR cluster
wr.emr.terminate_cluster(cluster_id)
  • Cloudwatch logs and Insights
  • QuickSight

For the detailed list of all available APIs, go through the following link:

Go through the tutorials section for additional detailed examples:

AWS Data Wrangler package makes it easier to do ETL tasks involving Pandas dataframes and AWS data related services. Go give it a try and experience its awesomeness.

--

--