Decentralising the Ownership of ETLs using Amazon Athena and Apache Airflow

hipages Data Platform Journey From a Data Mess to the Data Mesh — Part 1

Vidura Mudalige
hipages Engineering
6 min readNov 18, 2021

--

Managing the data assets of a fast-growing organisation is a big challenge in the data world. Usually, the data assets grow much faster compared to the organisation itself. In such an environment, delivering the data needs of different product teams by building and maintaining different data pipelines is a big challenge for Data Engineers. In this article, I’ll explain how we overcame this challenge at hipages by making self-serve ETLs using Amazon Athena as an ETL solution with Apache Airflow.

Where did we start?

A data platform can be defined as a scalable system for the collection, curation, transformation and integration of datasets. A feature-rich comprehensive data platform is not the first thing that a startup company wants to have at the initial phase of its growth; hipages was no different. The main goal of the first version of our data platform was mainly focused on data collection because the business didn’t have many data requirements at the time. The first version of our data platform consisted of an AWS S3 based data lake, an Amazon Athena based SQL interface to query the data in S3, Pandas python library based ETLs, an Apache Airflow based workflow manager and Looker based BI solution. Looker was integrated with the data lake through Amazon Athena. There have been many data sources that bring data to the data lake. Business users and data analysts create various looker dashboards using the data available in the data lake to provide data-driven solutions to the business. To make this process efficient, commonly used datasets have been being extracted from the raw data in S3, transformed and stored in a different S3 location alongside an Athena table. Pandas library and Apache Airflow were used to implement these scheduled ETLs.

The internal structure of hipages consists of cross-functional teams, which are driven by a clear business scope and a vision. Each cross-functional team includes a data analyst, who is an expert on the business domain of his cross-functional team. When a cross-functional team needs to make a data-driven decision, the data analyst provides the data insights by analysing data through Looker. In contrast to the cross-functional teams, the data platform at hipages was fully centralised, monolithic and traditional. If data analysts couldn’t find the required dataset in the data lake for data analysis, they had to reach out to the Data engineers to request the required ETLs. All the ETLs were owned by the data platform (ultimately Data Engineers) irrespective of the nature of the business domain of data. The Data Engineers used to build various ETLs using Pandas to provide the requested Athena tables to data analysts for looker integrations.

The Challenge

This centralised data platform had a few drawbacks. Most of the time, the Data engineers who implement these ETLs have little to no domain knowledge of the data they work with. That makes it hard for Data engineers to identify some data quality issues and some subtle issues with the data transformation logic. As the business grows, implementing and maintaining ETLs had become full-time work for Data engineers. There was no time left for Data engineers to add new features and functionalities to the data platform.

The Solution

The trend towards decentralised architecture was started a long time ago with the arrival of service-oriented architecture and micro-services. The self-service architecture wasn’t a new concept for the hipages data platform. The self-service BI using Looker had already been a part of it. Applying this concept to our data pipelines by decentralising the ownership of the ETLs was the solution we came up with for our challenges. It was our first step towards the decentralised futuristic data platform or the data mesh. To decentralise the ownership of the ETLs, first, we had to make them self serviceable for data analysts. SQL has always been the preferred language for data analysts. Therefore, we decided to implement a SQL based self-serviceable ETL solution for our Amazon S3 based data lake.

Amazon Athena comes with some ETL support that we can easily adapt for our use case. Athena’s Create Table As Select (CTAS) and INSERT INTO statements can be used to extract, transform, and load (ETL) data in Amazon S3. The INSERT INO feature was released sometime after the CTAS feature. Therefore, we had to use the CTAS feature creatively for our ETL solution.

A CREATE TABLE AS SELECT (CTAS) query creates a new table in Athena from the results of theSELECT statement. Athena stores data files created by the CTAS statement in a specified location in Amazon S3. Here is a sample CTAS query,

By executing the query on Amazon Athena, we get following directories in the given S3 location,

The SELECT part of the CTAS query extracts the data from “source_table” and loads it to the given location as parquet files. Here we can include all the transformation logics in the SELECT query.

Here we have used the CTAS partitioned_by feature. In this case, we get 5 partitions in the given S3 location. However, if we execute this query again for a different time range, all the partitions in this S3 location get overwritten. We had two key use cases for this ETL solution.

  1. It should be able to run on a schedule
  2. It should generate time-based partitions

Therefore, it was problematic when using this CTAS feature as it’s for an ETL solution.

To address this, we decided to use CTAS in a bit different way for our ETL solution. For example, let’s say we need to execute the above query to create yearly partitions. Therefore, it should run once a year and create a partition for that year. First, we need to find the number of yearly partitions that need to be created in the given location. For that, we need to execute a CTAS query like this;

In this temp-data/ directory we get all the yearly partitions this table has. Then, we can execute a SHOW PARTITIONS temp_new_parquet query to get the list of partitions. This list includes all the partition s3 keys. Using this result, we can execute CTAS queries for each partition with a “WHERE” condition to extract the data for that particular partition and the s3 key to create the partition. For example, if this source_table has 5 yearly partitions, we execute 5 CTAS queries like this for each partition. The following shows the execution for the year=2015 partition.

Airflow

To use this ETL solution for real business use cases, we needed a tool for orchestration and SQL templating. Apache Airflow was the obvious solution for us because it was already a part of our data platform. We designed an Airflow plugin using the AWS boto3 library to programmatically execute CTAS queries on Amazon Athena. We can pass SQL queries with the target s3 location and the partition column to this plugin to generate a transformed, partitioned, parquet based Athena table. When passing the SQL we can easily parameterise the WHERE condition using templates. For example, we can pass the following query.

This plugin is compatible with any complex SQL query. In this example, we pass the partition column (run_date ) and the target s3 location to the Airflow plugin. In Airflow we can easily inject the DAG execution date into this SQL and execute this SQL as a CTAS query for the DAG execution date using our plugin.

Conclusion

This ETL solution has become one of the widely used features in the hipages data platform. The data analysts and the engineers in the wider engineering team now use this solution a lot to build their lake to lake ETLs. All they need to do is write a SQL query for the data transformation logic. They can do all the orchestrations and monitoring of their data pipelines via Airflow. The people who write these ETLs are domain experts and they have a good understanding of their data. Therefore, this solution leverages a domain-oriented, self-serve design for ETLs. In other term, a step towards the “Data Mesh”.

--

--