Add Newly Created Partitions Programmatically into AWS Athena schema
Problem Statement
Amazon Athena uses a managed Data Catalog to store information and schemas about the databases and tables that you create for your data stored in Amazon S3. In case of tables partitioned on one or more columns, when new data is loaded in S3, the metadata store does not get updated with the new partitions. You need to explicitly add the partitions in metadata store.
Refer : “AWS Partitions” for detailed information.
In this article, we will look at the various approaches which help us achieve adding partitioning programmatically.
To begin with, the basic commands to add a partition in the catalog are :MSCK REPAIR TABLE
orALTER TABLE ADD PARTITION
To keep Athena Table metadata updated without the need to run these commands manually we can use the following :
- Programmatic approach by running a simple Python Script as a Glue Job and scheduling it to run at desired frequency
- Glue Crawlers
What are Partitions ?
In AWS S3, partitions play an important role when querying data in Amazon Athena/Presto or Redshift Spectrum since, it limits the volume of data scanned, dramatically accelerating queries and reducing costs.
Partitions are logical entities in a metadata store such as Glue Data Catalog or Hive Metastore which are mapped to Folders which are physical entities where data is stored on S3,
Partitions in itself would be a completely different topic to cover sometime later. For now lets just understand that it’s the physical division of data in S3 similar to a folder structure in file system where the column on which the partition is created becomes the object (folder) under which the data set would be stored.
Dealing with Dynamic Partitions
While creating a table in Athena we mention the partition columns, however, the partitions are not reflected until added explicitly, thus you do not get any records on querying the table.
In the scenario where partitions are not updated frequently, it would be best to run MSCK REPAIR TABLE
to keep the schema in sync with complete dataset.
For use cases where streaming data or continuous data is added and partition is normally done on a date column or new partitions are created on daily/weekly/monthly basis we could use a GLUE Crawler (can be expensive in case of very huge data sets and files).
Another way which I find to be more cost effective would be to have a python script to compare the Athena metadata with the S3 data structure and add the new partitions without having the overhead of scanning all the files in the bucket. This is the method that I am going to focus on in this article.
Pseudo Code for a high level overview of the process :
- Passing parameters through ETL Job to set
'region'
= AWS Region'database'
= Athena Database'tableName'
= Athena table to update partitions'athenaResultBucket'
= Temporary bucket to hold athena query result'athenaResultFolder'
= Temporary folder to store athena query result's3Bucket'
= S3 Bucket from where the table is created's3Folder'
= S3 Folder from where the table is created'timeout'
= Athena query timeout
2. Scan AWS Athena schema to identify partitions already stored in the metadata.
3. Parse S3 folder structure to fetch complete partition list
4. Create List to identify new partitions by subtracting Athena List from S3 List
5. Create Alter Table query to Update Partitions in Athena
Understanding the Python Script Part-By-Part
import boto3
import re
import time
import botocore
import sys
from func_timeout import func_timeout, FunctionTimedOut
from awsglue.utils import getResolvedOptions
boto3
is the most widely used python library to connect and access AWS components.
getResolvedOptions
is used to read and parse Glue job parameters.
args = getResolvedOptions(sys.argv, ['region', 'database', 'tableName', 'athenaResultBucket', 'athenaResultFolder', 's3Bucket', 's3Folder', 'timeout'])
params = {
'region': args['region'],
'database': args['database'],
'tableName': args['tableName'],
'athenaResultBucket': args['athenaResultBucket'],
'athenaResultFolder': args['athenaResultFolder'],
's3Bucket': args['s3Bucket'],
's3Folder': args['s3Folder'],
'timeout': int(args['timeout']) # in sec
}
print("Parameters : ")
print(params)
print("----------------------------------")
print()
Parameters can be hard coded inside the params
or passed while running the Glue Job.
s3Client = boto3.client('s3', region_name=params['region'])
s3Resource = boto3.resource('s3')
athenaClient = boto3.client('athena', region_name=params['region'])
Clients for connecting to AWS Athena and AWS S3.
# Check if Bucket Exists
s3CheckIfBucketExists(s3Resource, params["athenaResultBucket"])
The main execution of the python scripts starts from this line.
def s3CheckIfBucketExists(s3Resource, bucketName):
try:
s3Resource.meta.client.head_bucket(Bucket=bucketName)
print("Athena Bucket exists")
print("----------------------------------")
print()
except botocore.exceptions.ClientError as e:
print("Athena Bucket does not exist.")
print(e)
print("----------------------------------")
location = {'LocationConstraint': params['region']}
s3Client.create_bucket(Bucket=params['s3Bucket'], CreateBucketConfiguration=location)
print()
print("Athena Bucket Created Successfully.")
print()
Function checks if bucket exists in S3 to store temporary Athena result set, if not we can create a temporary bucket using s3client or throw an error depending on the requirement.
def athena_query(athenaClient, queryString):
response = athenaClient.start_query_execution(
QueryString=queryString,
QueryExecutionContext={
'Database': params['database']
},
ResultConfiguration={
'OutputLocation': 's3://' + params['athenaResultBucket'] + '/' + params['athenaResultFolder'] + '/'
}
)
return response
The above function is used to run queries on Athena using athenaClient i.e. “SHOW PARTITIONS foobar”
& “ALTER TABLE foobar ADD IF NOT EXISTS PARTITION(year=’2020', month=03) PARTITION( year=’2020', month=04)”
.
athenaClient will run the query and the output would be stored in a S3 location which is used while calling the API. Output path is mentioned in ResultConfiguration :: OutputLocation key
def athena_to_s3(athenaClient, params):
queryString = "SHOW PARTITIONS " + params["tableName"]
print("Show Partition Query : ")
print(queryString)
print("----------------------------------")
print()
execution = athena_query(athenaClient, queryString)
execution_id = execution['QueryExecutionId']
state = 'RUNNING'
while (state in ['RUNNING', 'QUEUED']):
response = athenaClient.get_query_execution(QueryExecutionId=execution_id)
if 'QueryExecution' in response and 'Status' in response['QueryExecution'] and 'State' in response['QueryExecution']['Status']:
state = response['QueryExecution']['Status']['State']
if state == 'FAILED':
print(response)
print("state == FAILED")
return False
elif state == 'SUCCEEDED':
s3_path = response['QueryExecution']['ResultConfiguration']['OutputLocation']
filename = re.findall('.*\/(.*)', s3_path)[0]
return filename
time.sleep(1)
return False
This function will call the athena_query
method and wait till it is executed on Athena. The result set is a text file stored in temp S3 {bucket}.{folder}. Function returns the temporary filename for parsing further. It will return ‘false’ boolean if something goes wrong while execution
# Fetch Athena result file from S3
s3_filename = athena_to_s3(athenaClient, params)
A simple calling to the mentioned functions to fetch the result filename for parsing.
# Fetch Athena result file from S3
try:
s3_filename = func_timeout(params['timeout'], athena_to_s3, args=(athenaClient,params))
except FunctionTimedOut:
print("Athena Show Partition query timed out.")
print()
print("#~ FAILURE ~#")
print()
print()
raise#s3_filename = athena_to_s3(athenaClient, params)
print("Athena Result File At :")
print(params['athenaResultBucket'] + '/' + params["athenaResultFolder"]+'/'+s3_filename)
print("----------------------------------")
print()
As per my usecase, I had to encapsulate the Athena query function with a timeout. For this, I have used a very cool, neat and easy python library
func_timeout
(Author: Tim Savannah)
NOTE : To add external library to Glue Job is itself a neat trick to cover in a separate blog (Will update once its ready)
You can use either of the above two approaches (direct calling or with function timeout).
# Read Athena Query Result file and create a list of partitions present in athena meta
fileObj = s3Client.get_object(
Bucket=params['athenaResultBucket'],
Key=params['athenaResultFolder']+'/'+s3_filename
)
fileData = fileObj['Body'].read()
contents = fileData.decode('utf-8')
athenaList = contents.splitlines()
print("Athena Partition List : ")
print(athenaList)
print("----------------------------------")
print()
Athena query result is a .txt format file hence, the result has to be parsed in a list for comparison to identify the newly created partitions. I have used the splitlines()
method to separate the resultset into a list.
def s3ListObject(s3, prefix):
resultList = []
result = s3.list_objects_v2(
Bucket=params['s3Bucket'],
Delimiter='/',
Prefix=prefix
)
if result['KeyCount'] == 0:
return False
try:
resultList.extend(result.get('CommonPrefixes'))
while (result['IsTruncated']):
result = s3.list_objects_v2(
Bucket=params['s3Bucket'],
Delimiter='/',
Prefix=prefix,
ContinuationToken=result['NextContinuationToken']
)
resultList.extend(result.get('CommonPrefixes'))
except Exception as e:
print("#~ FAILURE ~#")
print("Error with :")
print(result)
raise
return resultList
The above function is used to parse the S3 object structure to gather the partition list using the aws sdk list_objects_v2 method. Pagination of S3 objects is handled using the NextContinuationToken as AWS API returns max 1000 objects in a single API call.
# Parse S3 folder structure and create partition list
prefix = params['s3Folder']
yearFolders = s3ListObject(s3Client, prefix)
if yearFolders:
monthList = []
for year in yearFolders:
result = s3Client.list_objects_v2(
Bucket=params['s3Bucket'],
Delimiter='/',
Prefix=year.get('Prefix')
)
try:
monthList.extend(result.get('CommonPrefixes'))
except Exception as e:
print("#~ FAILURE ~#")
print("Error with :")
print(result)
raise
s3List = []
for thingType in monthList:
string = thingType.get('Prefix').replace(params['s3Folder'], "")
s3List.append(string.rstrip('/'))
# To filter out default spark null partitions and folders like _SUCCESS, _temporary, __HIVE_DEFAULT_PARTITION__
s3List = [i for i in s3List if (('month' in i) and (i.startswith('year')) and not ('__HIVE_DEFAULT_PARTITION__' in i))]
print("S3 Folder Structure At :")
print(params['s3Bucket'] + '/' + params['s3Folder'])
print("----------------------------------")
print()
print("S3 Partition List : ")
print(s3List)
print("----------------------------------")
print()
This is a part which should be tweaked depending on your partition level. This example has 2 levels of partitions i.e. year and month.
Looping over the S3 structure by fetching CommonPrefixes
and iterating over them again to fetch the inner partition list to have the final partition list.
Fetching the CommonPrefixes
will avoid parsing through the entire S3 file structure thus making it faster and leaner.
There is a specific handling to filter out default spark null partitions which could exist at S3 locations with partitions which creates folders like _SUCCESS, _temporary, __HIVE_DEFAULT_PARTITION__.
The existence of such folders can cause issues with query formation thus its better to ignore.
# Compare Athena Partition List with S3 Partition List
resultSet = set(s3List) - set(athenaList)
print("Result Set : ")
print(resultSet)
print("----------------------------------")
print()
Set subtraction of Athena partition list from S3 partition list would give us the list of newly created / missing partitions
# Create Alter Query for Athena
try:
if len(resultSet) != 0:
print("Partition Count : " + str(len(resultSet)))
result = split(list(resultSet), 1000)
for resultSet in result:
queryString = "ALTER TABLE " + params['tableName'] + " ADD IF NOT EXISTS PARTITION(" + repr(resultSet) + ")"
queryString = queryString.replace("[", "")
queryString = queryString.replace("]", "")
queryString = queryString.replace("{", "")
queryString = queryString.replace("}", "")
queryString = queryString.replace(",", ") PARTITION(")
queryString = queryString.replace("'", "")
queryString = queryString.replace("date=", "date='")
queryString = queryString.replace("/", "', ")
print("Alter Query String : ")
print(queryString)
print("----------------------------------")
print()
# Run Alter Partition Query
execution = athena_query(athenaClient, queryString)
if execution['ResponseMetadata']['HTTPStatusCode'] == 200:
# Temp Folder Cleanup
cleanup(s3Resource, params)
print("*~ SUCCESS ~*")
print()
else:
print("#~ FAILURE ~#")
print()
else:
# Temp Folder Cleanup
cleanup(s3Resource, params)
print()
print("*~ SUCCESS ~*")
except Exception as e:
# Temp Folder Cleanup
cleanup(s3Resource, params)
print("#~ FAILURE ~#")
print("Error with :")
print(resultSet)
print(e)
raise
Finally comes the part where the Alter Table partition query is formed by creating a complete query string and altering the string as per the syntax.
NOTE : To add variations to schema I have set year column to be string and month column to be int thus the query had to be formed respectively
UPDATE : Recently I came across a scenario where my table schema got updated and all partitions had to be reloaded (approximately 6000 partitions). A humongous query was formed and when it was executed, Athena had a query length limitation ie. ‘queryString’ failed to satisfy constraint: Member must have length less than or equal to 262144
To overcome this scenario, it was better to run the partitions in batches. Thus I have updated the code to execute batches of 1000 partitions.
The last piece of code is encapsulated in an if..else
block to check if the result set is empty to avoid triggering an empty query.
def cleanup(s3Resource, params):
print('Cleaning Temp Folder Created: ')
print(params['athenaResultBucket']+'/'+params["athenaResultFolder"]+'/')
print()
s3Resource.Bucket(params['athenaResultBucket']).objects.filter(Prefix=params["athenaResultFolder"]).delete()
print('Cleaning Completed')
print("----------------------------------")
print()
# s3Resource.Bucket(params['athenaResultBucket']).delete()
And lastly the cleanup()
method is used to delete the temporary result file, Athena folder (and Bucket if required — uncomment the last line)
Complete Python Script
Can be found at github open repository.
Script OUTPUT :
After all the geeky coding comes the most easy part of click-click-run-done. Create a Glue job using the given script file and use a glue trigger to schedule the job using a cron expression or event trigger.
Thank you for reading till the end. Hope you found it worthy. I am not an AWS expert but pursuing to be one. Just shared my personal experience working on a POC which I thought would help others like me. Do like the blog, comment your feedback and improvements and also let me know if you need any help understanding it. Please follow for more such easy and interesting write ups. This would motivate me to keep writing and sharing my experiences.
Till then Keep Smiling and Keep Coding ✌️😊 !!
About ME :
Data Engineer/Software Developer/Lead with Masters in Data Analytics from Dublin City University having 6+ years of work experience in Data Pipelines, Core Development, Reporting, Visualizations, DB & Data Technologies with progressive growth. I believe in taking ownership of projects and proactively contributing towards the enhancement of the overall business solution.
Currently working with Verizon Connect as a Big Data Engineer. Thanks Verizon Connect for giving me this awesome opportunity to work for some really cool projects of migrating a legacy on-premise data server to AWS thereby, getting my hands dirty while working on neat POCs on AWS.
Updated On : 02/March/2021 || Published On : 11/May/2020 || Version : 3
version 3 :
Updated python script to ignore temp folders in s3 and batch execution on query due to athena query string limit
version 2 :
Null check handling for athena and s3 partition list.
Added About Me Section
version 1 :
Initial commit