Automate executing AWS Athena queries and moving the results around S3 with Airflow: a walk-through

Marek Šuppa
Slido developers blog
11 min readJun 19, 2019
Photo by Marten Bjork on Unsplash

If you happen to store structured data on AWS S3, chances are you already use AWS Athena. It is a hosted version of Facebook’s PrestoDB and provides a way of querying structured data (stored in say .csv or .json files) using standard ANSI SQL. Given its competitive pricing structure (5 USD for 1 TB of scanned data), it currently seems to be the best tool for digging through data saved in “cold storage” in S3 (as opposed to “hot storage” in for instance Amazon Redshift or some other analytical or transactional database).

Athena has some very nice features. It not only provides the query results in the AWS Console or as a response to an API request, but it also automatically saves them as a .csv file in S3. There is only one small caveat: although you can specify the path on S3, the actual filename will be a random Universally Unique IDentifier (UUID). This is a double edged sword as on the one hand it prevents overwriting query results by mistake, but on the other it means that saving query results to specific locations on S3 with specific file names cannot be done by Athena alone. Thankfully, Apache Airflow can help us with that.

In this quick post we will see how to:

  1. Execute an Athena query
  2. Wait until it successfully completes
  3. Move the query results to a specific location on S3

All of that can be easily done using community-supplied Airflow operators,
very little code and some knowledge of Airflow — most of which you can find in the following sections.

Note: The following sections assume you already have Airflow installed and set up. If you do not, it may be a good idea to do that first, for instance by following the steps outlined in this blog post.

Executing Athena queries in Airflow

One of the great things about Apache Airflow is that just as in any other framework, the most common use cases are already covered, tested and ready to be used. Executing AWS Athena queries is no exception, as the newer versions of Airflow (at least 1.10.3 and onward) come pre-installed with a specific operator that covers this use case. You can easily spot it in the big list of Airflow operators by its “uninspiring” name: AWSAthenaOperator.

The AWSAthenaOperator only requires us to specify three parameters:

  • query: the Presto query to run
  • database: the database to run the query against
  • output_location: the S3 path to write the query results into

Armed with that information, we can put together a small Directed Acyclic Graph (DAG), using a few Airflow modules. With a bit of imagination it may look as follows:

A very simple DAG with a single task in it: an execution of AWS Athena query.

So what exactly is happening in this DAG? As we can see on lines 5 to 7, the DAG we defined is really minimal: all it has specified are the start date and the schedule interval (set to None, meaning this DAG will not be scheduled). Line 11 is a bit more interesting: it specifies the Athena/Presto query we would like to run. In our case that would be

SELECT * FROM UNNEST(SEQUENCE(0, 100))

It may look quite scary at the first sight but it really is not — it just lists numbers from 0 to 100 using Presto’s sequence function.

There really is not much else happening here, so provided we specified the database and output_location parameters correctly, we can just take this DAG and specifically the run_query task for a test run. In order to do so, however, Airflow needs to have a connection to AWS set up properly.

Intermezzo: Setting up connection to AWS with Airflow

To connect to AWS services from within Airflow, we need to obtain two values from AWS:

  • AWS_CLIENT_ID
  • AWS_CLIENT_SECRET

You can get both by creating your own IAM user access keys — a process nicely described in the AWS Docs.

Once we have both of these values, they need to be somehow passed to Airflow so that it can use them to authenticate requests to the AWS API. Although one can just fill them in a form inside Airflow’s Admin interface (Menu -> Admin -> Connections -> aws_default), it may be easier to pass these values to Airflow by exporting the AIRFLOW_CONN_AWS_DEFAULT environment variable. In our case, this may amount to executing the following command inside the terminal, from which we will execute all the other Airflow commands later on:

Note that in the command above, $AWS_CLIENT_ID and $AWS_CLIENT_SECRET should be replaced with the actual values obtained from the AWS Console. The $AWS_REGION should be changed to the region where the S3 bucket and the Athena instance you are trying to query reside.

Sadly, this sometimes may not be enough, in which case it seems that the safest approach is to just export all the relevant environment variables separately:

export AWS_DEFAULT_REGION=$AWS_REGION
export AWS_ACCESS_KEY_ID=$AWS_CLIENT_ID
export AWS_SECRET_ACCESS_KEY=$AWS_CLIENT_SECRET

Just as in the other option, the variables (the right hand side) in these commands should be replaced with actual values obtained from the AWS Console.

Once that has been taken care of, we can run the test by executing:

airflow test simple_athena_query run_query 2019–05–21

The test command will start the specified task (in our case run_query) from a given DAG (simple_athena_query in our example). The output will provide some useful details, such as what task is being executed, which connection details are being used and how is the task progressing.

Looking at the command above, we can see that we not only specified the DAG name and the ID of the task to run, but also the execution date (2019–05–21). Our run_query task definition does not make use of it, but we can very easily change that: the query parameter is templated and the execution date is exposed via the ds variable. Our updated DAG may then look as follows:

The DAG from above, featuring a ‘templated query’.

Not much has changed, only the line 11 now looks really scary:

SELECT * FROM UNNEST(SEQUENCE(DATE(‘2019–05–01’), DATE_TRUNC(‘day’, DATE(‘{{ ds }}’)), INTERVAL ‘1’ DAY))

This seems to be mostly due to the fact that the whole query is a complex oneliner — split into multiple lines the query looks at least a bit more readable:

SELECT 
*
FROM UNNEST(SEQUENCE(DATE(‘2019–05–01’),
DATE_TRUNC(‘day’, DATE(‘{{ ds }}’)),
INTERVAL ‘1’ DAY))

Upon closer examination, we can see that what the query does is actually quite straightforward: it lists all dates starting from 1st of May, 2019 until the date when the task got executed (that is what {{ ds }} will be replaced with). In practical terms, this means that once we execute

airflow test simple_athena_query run_query 2019–05–21

and then look at the S3 location we specified in the task’s parameters (s3://my-bucket/my-path/ in our case), we should find a .csv file with 21 lines in it: one for each day between the 1st and 21st of May, 2019.

The ability to condition task execution on external parameters (such as the
execution date for instance) is quite powerful. Considering just our
small example, we are now able to create a list of days until the end of
the month without changing a single line of code in our DAG. All we need to do is to run

airflow test simple_athena_query run_query 2019–05–31

and then head to the same S3 path as before. We should be able to find a .csv file with 31 lines there.

This really only scratches the surface of the capabilities of Airflow’s “templated strings” — you can read much more about their more advanced features in the docs.

While the Athena’s ability to automatically store results on S3 is certainly a great feature, the way it does so comes with a small caveat we already mentioned in the beginning: we can specify the prefix of the S3 path the file will be saved in, but the actual file name will be an UUID string followed by the .csv suffix.

This basically means that if we specify s3://my-bucket/my-path/ as our output_path , the results will actually be located in s3://my-bucket/my-path/5eaeeec8-d488–4b07-b4fc-51929cb91954.csv. That is not very pleasant and so to keep things clean and ordered, it would be great if we could rename it. Or better yet, move the file into a new location. Just as before, Airflow has us covered here with its built-in S3FileTransformOperator .

Moving files around S3 with Airflow

One of the first things you find out very quickly when looking up “how to rename a file on S3” is that it’s actually not possible. Since S3 is an object-based storage solution, there are no files and folders like on an ordinary file system — just keys (i.e. S3 paths) and values (content associated with them). So to actually rename a file we need to send two requests to the AWS API: one that copies the already existing file to the new path and another one that removes the original file. Put differently, this means that renaming a file in S3 essentially amounts to moving it to a new location.

Airflow’s S3FileTransformOperator can handle all of this in a pretty straightforward manner. It actually goes one step further by going through the following steps:

  1. Download a file from a specific S3 location ( source_s3_key )
  2. Process it with a local script ( transform_script )
  3. Upload it to a specified S3 location ( dest_s3_key )

Taken at face value, it may seem a little overboard but as we will soon see, it provides a very robust framework for applying all sorts of transformations on data stored on S3 and uploading the results back.

Since we only want to move the file to a new location, we are going to use a very simple transformation script — probably the simplest of them all: a plain copy command.

Combining all of this together, a sample DAG that uses the S3FileTransformOperator may look as follows:

A simple DAG that makes use of S3FileTransformOperator

Looking at the code above, it would be quite reasonable to ask what happens in the second (transformation) step of the process mentioned above? In other words, how exactly is the transform_script used?

It is actually quite straightforward. When the file gets downloaded from S3, it is saved in a temporary file on the local filesystem (say /tmp/tmpekzxuzb4). The S3FileTransformOperator then crates another temporary file (for instance /tmp/tmpib45wk81) and executes a command in which both of these are passed as arguments to the transform_script.

In our case the executed command would look as follows:

/bin/cp /tmp/tmpekzxuzb4 /tmp/tmpib45wk81

It may not seem like much, but this is actually the core of the robust framework we mentioned above — since transform_script can be literally any program that can be executed in the same environment as Airflow, S3FileTransformOperator can cover all sorts of transformation needs of data on S3.

Returning to our previous examples with AWSAthenaOperator, what we would really like to do is a bit more involved than the sample script above. We would like to pick up the UUID of the query we executed, construct the path on S3 where we should be able to find the results once the query finishes, and then use this path as source_s3_key in S3FileTransformOperator . To do so, we will need a way of passing information from one Airflow task to another. Luckily, Airflow has us covered once again with a feature called XComs.

Passing information between Airflow tasks

As an abbreviation of “cross-communication”, XComs can be viewed as messages Airflow tasks are passing down the DAG. Each task can xcom_push a message which can then be xcom_pulled by another task — the only necessary parameter is the task_id of the task that xcom_pushed the message in the first place.

It is also possible to use XComs in a much more advanced way. That is out of scope of this article — please consult the documentation for more info on that.

When it comes to XComs and Airflow Operators, they are very neatly integrated — the return value of Operator’s execute() function is automatically xcom_pushed, so that subsequent tasks can xcom_pull the return value by just specifying the task_id they are interested in.

On top of all this, xcom_pull can be used directly in templated strings we already mentioned. This means that with very small changes, execution of Airflow’s tasks can be conditioned on previous tasks return values — we just need to specify these parameters in templated strings. Here is a quick example of what such a string may look like:

SELECT * FROM some_table
WHERE
id={{ task_instance.xcom_pull(task_ids='generate_id') }}

Sadly, not all Airflow Operators return values that may be useful to subsequent tasks in the DAG. Although it may change in the future, the AWSAthenaOperator is one such example. It does not return anything, even though we could really use the UUID of the executed query: that is how we locate the file with results.

Thankfully, this problem has a pretty straightforward solution. Since AWSAthenaOperator is just a Python class, we can inherit it and ensure that this new class of ours works just the same, except for returning the query ID in its execute function. Here is what it may look like:

With an operator like this, we should have all pieces of the puzzle ready:

  1. A way of executing Athena queries
  2. A way of finding the results on S3
  3. A way of copying a file on S3 to a different location

Let’s finally put it all together.

The grand finale: putting it all together

After all this preparation, we can finally do what the title promised: execute an Athena query and move the results to a specific S3 location. We will do so by using a slightly altered AWSAthenaOperator described above which will allow us to piece together the S3 path with the results of the Athena query. We’ll then download this file from S3 and reupload it to a different location using the S3FileTransformOperator .

As Linus Torvalds says, “Talk is cheap. Show me the code”. We agree, so here it is:

As we can see, it took only about 30 lines to do something like this: a tribute to Airflow, its operators and a community of contributors who build and maintain them.

Testing this DAG out should also be fairly straightforward — if you still have all the environment variables set up correctly, the following command can be used to execute the first task:

airflow test athena_query_and_move run_query 2019–06–07

A quick note on XCom messages: since they are stored in the Airflow’s DB, they persist between task runs. This is of great advantage to us in this case, as we can just test the move_results task knowing full well that xcom_pull will be able to retrieve the query UUID from the previous run:

airflow test athena_query_and_move move_results 2019–06–07

If all went well, at this point you should be able to locate the results of your query at s3://mybucket/otherpath/myresults.csv!

BONUS: Transforming query results to different data formats

As we mentioned above, the transformation script did not do anything particularly interesting: just an exact 1:1 copy. With a little bit of code, however, it can be updated to convert the downloaded data to a different format: say from CSV to Apache Parquet, which can then be read back by AWS Athena (and the circle will be complete).

To do so, we will need to install pandas along with the pyarrow module Pandas requires for working with the Parquet data format.

Our transformation script will basically be a oneliner: we will just use Pandas to read the CSV file (the first argument of the script) in and save it to a Parquet file (the second argument). The updated DAG, along with the small transformation script, may look something like this:

Just as before, we can test this updated DAG via Airflow’s test command:

airflow test athena_query_and_move run_query 2019–06–07
airflow test athena_query_and_move move_results 2019–06–07

and if all went well, we should be able to find the query results stored in the Apache Parquet format at s3://mybucket/otherpath/myresults.parquet!

Conclusion

Although it took over 2000 words to describe, executing Athena queries and moving the results around S3 is not that difficult in the end — it could even be argued that it is actually quite easy!

As we saw, this is mostly due to the well thought-out architecture of Airflow’s tasks, Operators and other concepts that allow us to neatly tie them together. With all of these pieces of the puzzle at our disposal, executing Athena queries, moving the results around S3, and even applying transformations on top of it is pretty much a solved problem.

--

--