Automating AWS Athena batch jobs with Python 3

Introduction

AWS Athena is certainly a powerful tool for all those people that need to analyze vast amounts of data in S3. It’s as simple as dumping your data in S3, define the database and data format and voila! You can now analyze your data using standard SQL queries. Since AWS service logs from Cloudwatch, RDS, ELB, and IAM are all in JSON and dumped in S3, it’ll be all queryable making security or operational audits less tedious for all AWS systems engineers/administrators out there.

It is really easy, amazingly fast and cost effective at $5 per TB compared running custom EMR jobs which require huge costly short lived machines that take forever to run and a big headache if it fails mid process. Just don’t forget to put an object lifecycle rules on your output S3 bucket to avoid extra storage costs as all query results get dumped to S3. I’m not gonna dive on the inner workings of Athena but if you want to learn more here’s the link to the documentation page.

Without further ado, here’s a short how-to to automate Athena batch jobs using a simple python3 script to get you started.

Installing and configuring the Boto3 SDK

Install the SDK to make API calls to AWS.

pip3 install boto3

To configure the credentials please refer to the link below and setup the authentication method best suited to your situation. http://boto3.readthedocs.io/en/latest/guide/quickstart.html#configuration

Function to query Athena

Here we define a function that we can reuse for all our queries and accept three basic parameters — the query, database and a custom s3 output path.

#!/usr/bin/env python3
import boto3
#Function for starting athena query
def run_query(query, database, s3_output):
client = boto3.client('athena')
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': s3_output,
}
)
print('Execution ID: ' + response['QueryExecutionId'])
return response

Sample S3 data

Athena supports a variety of data formats and compression formats. For this tutorial, we will just use a plain old JSON file uploaded to S3.

{"name":"Alice","sex":"F","city":"Seattle","country":"USA","age":25,"job":"Professional Zombie Killer"}
{"name":"Bob","sex":"M","city":"Los Angeles","country":"USA","age":40,"job":"Actor Extraordinaire"}
{"name":"Joe","sex":"M","city":"New York","country":"USA","age":35,"job":"Policeman"}
{"name":"Amanda","sex":"F","city":"Los Angeles","country":"USA","age":29,"job":"Ex Child Star"}

Creating the database and table

Much like a normal SQL server, a database must be created first to house all the tables which direct to the s3 data based on the ‘LOCATION’ attribute defined during table creation as seen below.

s3_input = 's3://athena-how-to/data'
database = 'test_database'
table = 'persons'
create_database = "CREATE DATABASE IF NOT EXISTS %s;" % (database)
create_table = \
"""CREATE EXTERNAL TABLE IF NOT EXISTS %s.%s (
`name` string,
`sex`string,
`city` string,
`country` string,
`age` int,
`job` string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION '%s'
TBLPROPERTIES ('has_encrypted_data'='false');""" % ( database, table, s3_input )

Select queries

Standard S3 queries that we’re all familiar with.

#Query definitions
query_1 = "SELECT * FROM %s.%s where sex = 'F';" % (database, table)
query_2 = "SELECT * FROM %s.%s where age > 30;" % (database, table)

Pasting it all together

#!/usr/bin/env python3
import boto3
#Function for executing athena queries
def run_query(query, database, s3_output):
client = boto3.client('athena')
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': s3_output,
}
)
print('Execution ID: ' + response['QueryExecutionId'])
return response
#Athena configuration
s3_input = 's3://athena-how-to/data'
s3_ouput = 's3://athena-how-to/results/'
database = 'test_database'
table = 'persons'
#Athena database and table definition
create_database = "CREATE DATABASE IF NOT EXISTS %s;" % (database)
create_table = \
"""CREATE EXTERNAL TABLE IF NOT EXISTS %s.%s (
`name` string,
`sex`string,
`city` string,
`country` string,
`age` int,
`job` string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION '%s'
TBLPROPERTIES ('has_encrypted_data'='false');""" % ( database, table, s3_input )
#Query definitions
query_1 = "SELECT * FROM %s.%s where sex = 'F';" % (database, table)
query_2 = "SELECT * FROM %s.%s where age > 30;" % (database, table)
#Execute all queries
queries = [ create_database, create_table, query_1, query_2 ]
for q in queries:
print("Executing query: %s" % (q))
res = run_query(q, database, s3_ouput)

API calls on Athena are asynchronous so the script will exit immediately after executing the last query. As of this writing, boto3 still doesn’t provide a waiter. You will just have to write your own waiter based on the execution ID returned. Now time to execute the script.

me@linuxbox$ ./athena-script.py
Executing query: CREATE DATABASE IF NOT EXISTS test_database;
Execution ID: a7754952-****-****-****-65970b60e580
Executing query: CREATE EXTERNAL TABLE IF NOT EXISTS test_database.persons (
`name` string,
`sex`string,
`city` string,
`country` string,
`age` int,
`job` string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://athena-how-to/data'
TBLPROPERTIES ('has_encrypted_data'='false');
Execution ID: ed98c98f-****-****-****-56b0ae69cca9
Executing query: SELECT * FROM test_database.persons where sex = 'F';
Execution ID: f68e622a-****-****-****-fd5aea939b9a
Executing query: SELECT * FROM test_database.persons where age > 30;
Execution ID: aa047728-****-****-****-a414963f1c98

Since this will just query one JSON file it’ll finish instantaneously and immediately show up in S3. It’s up to your next script or job to process this further.

From my production experience, our 800GB($4) Athena batch job finishes around 15 minutes down from 1–2 hours from our previous EMR based solution that costs around 20–30$ per run. It was a win in all aspects when we moved to Athena, less cost, less time less complicated, and easy to learn, manage and automate. That’s it, hope this helps! Happy automating!