ETL Data Pipeline and Data Reporting using Airflow, Spark, Livy and Athena for OneApp

Writing ETL Batch job to load data from raw storage, clean, transform, and store as processed data.

Parteek Singhal
Deutsche Telekom Digital Labs
9 min readJul 13, 2020

--

Photo by JJ Ying on Unsplash

Overview

In my previous blog, I talked majorly about the high-level building blocks of our Big Data architecture. In this, we would discuss in-depth how the core of the system works at DT.

This article focuses on creating a Data Pipeline to

  • Extract raw data from S3
  • Transform it using spark batch jobs
  • And finally, load the transformed records back to S3.
  • Also, we will talk about how to make the data queryable using Athena.

Before moving forward let me give you a quick estimate about the scale our data

  • We have around 70+ GB of daily data and growing, from our Selfcare App- OneApp
  • More than 80 million documents a day

At the end of this pipeline, we are able to reduce that data to around 10GB per day of parquet files, which is around 86% compression.

Seems a lot, Right? Yes!! this is the beauty of columnar storage formats and a few compression techniques.

Major Requirements

  • Convert raw JSON files to parquet ( both are stored on S3).
  • Monitoring of the complete data pipeline.
  • Write erroneous records to S3, found while the transformation of raw files.
  • Automate handling of new columns.
  • Run SQL Queries over transformed data
  • Add new partitions to Athena table to query new data.
  • Cost and performance optimizations

Apache Livy

Why we used Livy — just because it makes the interaction between your code and spark cluster super easy. I believe an API call provides a much more structured way to talk to a system rather than writing the bash commands.

Source: apache.org
  • Apache Livy is a service that enables easy interaction with a Spark cluster over a REST interface.
  • It enables easy submission of Spark jobs or snippets of Spark code, synchronous or asynchronous result retrieval, as well as Spark Context management, all via a simple REST interface or an RPC client library.
  • It will help us to interact between airflow and the spark cluster on EMR using simple rest calls.

High-Level Architecture

First, I’ll discuss the core processing part of the architecture, which runs over an on-demand EMR cluster with Spark and Hadoop deployed. Then we will dive deep into how the pipeline is scheduled and monitored using airflow. I will also be mentioning the optimizations as and when we come across them.

Spark Batch Job — Running on EMR

This is the core of our data pipeline where all the heavy computation process takes place.

Step-by-step functionalities performed by it:

  1. Fetches the list of file paths from mongo. We have used mongo as our data persistence layer for airflow and spark.
  2. Reads the raw JSON files from S3
  3. Transform and Clean Data, with the help of Schema fetched from our central schema registry service — Frame
  4. Write any poisonous data/error records to a separate path on S3.
  5. Check if any new column is present in the data and add that column to the schema registry. Also, save the information about the new columns to mongo.
  6. Partition the data and infer the partitions that will be written. Save these partitions to mongo.
  7. Write data to local HDFS storage in the EMR cluster, in the form of partitioned parquet files.

Step 5 has been done due to our requirement of handling any new column that flows in the upstream without manual addition to the schema registry and the Athena tables.

Compress and Splittable Files — Optimization #1

We decided on our storage file format to be Apache’s famous Parquet Format. These files are splittable and as it a columnar format it achieves greater compression. I have talked about Why Parquet in more detail in my previous blog.

Key Performance Factors:

  • Compressing your data can speed up your queries significantly, as long as the files are either of an optimal size (see the Optimization #3), or the files are splittable.
  • The smaller data sizes reduce network traffic from Amazon S3 to Athena.
  • Splittable files allow the execution engine in Athena to split the reading of a file by multiple readers to increase parallelism.
  • If you have a single unsplittable file, then only a single reader can read the file while all other readers sit idle.

Partition Columns — Optimization #2

Partitioning an External Table should be a must, you have to do it. You will be amazed at the reduction in query time as the size of data scanned reduces, which in return reduces the cost.

  • Partitioning divides your table into parts and keeps the related data together based on column values such as date, country, region, etc. Partitions act as virtual columns.
  • You define them at table creation. You can restrict the amount of data scanned by a query by specifying filters based on the partition.

Let’s go through how we have partitioned our data while storing

The directory structure of partitioned data
  1. nc: This specifies the natco/country code.
  2. year: The year when the event was generated.
  3. date: And finally the date of the event.

Hourly and Nightly ETL DAGs

We have chosen Airflow as our workflow management framework. It lets you create, schedule, and monitor your workflows using Python code.

You define your tasks in the form of a Directed Acyclic Graph (DAG). We have created two DAGs one for the transformation and the second one to optimize file sizes.

Hourly DAG

  • This DAG extracts new files every hour that are written into S3, transforms to parquet and loads it back into S3
Dependency Graph — Hourly ETL DAG

Nightly DAG

  • This DAG is scheduled every night to combine all the smaller parquet files written during the day into a single larger file (see the Optimization #3).
  • As I have mentioned earlier that the data is partitioned on the basis of country code, year, and date, so ultimately one new partition is created for each country every day.
  • A separate task is scheduled for each new partition to combine and overwrite files in S3
Dependency graph — Nightly DAG

Brief Explanation of Tasks:

Catalog Files :

  • Find the new files written in S3 based on the LastModified time of the file
  • Stores the above list in mongo

Create Cluster:

  • Spawns an EMR Cluster ( with Hadoop, Spark, and Livy) using boto3 library

Wait for Cluster Creation:

  • Polls the status of the cluster using cluster-id until it reaches waiting for ( IDLE ) state

Transform Events:

  • Submits a spark job using Livy Rest Interface to run on EMR cluster
  • Waits until the spark job either completes successfully or returns an error

Transfer Data to S3:

  • Transfers processed data written by spark job from HDFS to S3 using S3DistCp functionality provided by AWS

Terminate Cluster:

  • Terminates the cluster using cluster-id

Add Partitions:

  • Fetches the partitions written by spark job in Mongo
  • Runs a DDL Add Partition query on Athena to load the new partitions if any

Check New Columns:

  • Checks if any new columns were inferred by the spark-job ( saved in mongo by the spark job )
  • Triggers a mail for the same with the column name and its datatype
  • Runs a DDL Add Columns query on Athena to load the new columns if any

Combine files:

  • Submits a spark job using Livy Rest Interface to run on the EMR cluster.
  • All the combine-file tasks are scheduled in parallel for each country.
  • Spark job reads the files written on the partition and combines them to form a single file and write it to a temporary location on S3. A temporary location is used to prevent data corruption if the spark job fails mid execution.

Overwrite files:

  • Deletes the original smaller files on S3.
  • Renames the larger file on the temporary location, to move to the appropriate partition location.

Note: Boto3 is used to interact with AWS services

Optimize File Sizes — Optimization #3

  • Queries run more efficiently when reading data can be parallelized and when blocks of data can be read sequentially. Ensuring that your file formats are splittable helps with parallelism regardless of how large your files may be.
  • However, if your files are too small (generally less than 128 MB), the execution engine might be spending additional time with the overhead of opening Amazon S3 files, listing directories, getting object metadata, setting up data transfer, reading file headers, reading compression dictionaries, and so on.

Some benefits of having larger files:

  • Faster listing
  • Fewer Amazon S3 requests
  • Less metadata to manage

ADD Partition Query — Optimization #4

  • MSCK REPAIR TABLE can be a costly operation because it needs to scan the table’s sub-tree in the file system (the S3 bucket). Multiple levels of partitioning can make it more costly, as it needs to traverse additional sub-directories.
  • If you are adding new partitions to an existing table, then you may find that it’s more efficient to run ALTER TABLE ADD PARTITION commands for the individual new partitions.
  • This avoids the need to scan the table’s entire subtree in the file system. It is less convenient than simply running the MSCK REPAIR TABLE, but sometimes the optimization is worth it.
  • A viable strategy is often to use MSCK REPAIR TABLE for an initial import, and then use ALTER TABLE ADD PARTITION for ongoing maintenance as new data gets added into the table.

Till now we have done all the heavy loading, the data is transformed and comfortably sitting in our data-lake ready to be explored.

Access Data using Athena

Athena is your digging tool. It is essentially a distributed SQL Engine that will make your life easier, enabling you to run pretty fast queries over that huge dataset.

It is a serverless managed service by AWS, which only costs for the amount of data scanned by your query ( 5$ per TB ).

Why Athena? — The USP of Athena is that you don’t have to worry about setting up your own Presto Cluster, taking care of upscaling/downscaling your cluster. And also the serverless model saves your pocket if you are not running a lot of queries throughout the day.

Let’s get it over with!!

Just define the schema of your external table stored in the S3 bucket and you are ready to go.

Sample CREATE TABLE query

Yes, just that, and it’s done!!

Integrate Athena with a BI Tool

I know you might be thinking I just said that it’s done. Yes for data engineers like us it’s done.

But for end-users like data analysts or your business stakeholders you need a simple and capable BI Tool which enables them to visualize the results of analysis and create pretty dashboards to present.

Some features of BI Tools include

  • Management, security, and architecture of the BI platform
  • Metadata management
  • Analytics dashboards
  • Interactive visual exploration
  • Support mobile display
  • Embed analysis content
  • Embedded advanced analytics
  • Self-service data preparation
  • Publish and share analysis content
  • Ease of use and visualization

In future blogs, I will explain the integration with BI tools and how we are using them.

--

--

Parteek Singhal
Deutsche Telekom Digital Labs

Software Engineer at Deutsche Telekom | Budding Data Engineer