In January 2020, one of the highest priority projects for the Data Engineering team was to migrate Groupon’s on-prem system to Cloud (AWS). Groupon has a large on-prem environment that involves 4 different Hadoop environments and a Teradata cluster.
In the first phase of Cloud Migration, we determined we would be migrating approximately 71 Teradata pipelines to the cloud. At this point, it was already decided that we would be building our data lake on AWS S3 and end-users would be using the hive tables to query the data. In addition, we had the requirement of replicating the same data in Snowflake for the end-users who were going to use Snowflake.
While we were evaluating all of the components/frameworks needed to do a successful migration to the cloud, one of the missing pieces of the puzzle was how to capture the SCD type 1, 2 & 3 transactions on our cloud data lake. A group of us in technical lead roles were brainstorming and agreed that all the pipelines in the cloud will be generating incremental data that needs to be merged into the final target table either via SCD type 1, type 2, or type 3 in S3 and Snowflake. This means we need a framework that has the support for the following features:
- Support for ACID transactions
- Schema Evolution
- Time Travel
- Concurrent Read and Writes
Possible Solutions —
We started evaluating all the open-source frameworks that could provide the above capabilities and we were able to narrow down the list to Apache HUDI and Apache Delta Lake. Fortunately at this point, one of my ex-colleagues had already completed a POC (proof of concept) on how we can leverage Delta Lake to capture changed data sets. So, instead of reinventing the wheel, we decided to go ahead and use Delta Lake to do any SCD operation on a hive table in S3 and spark-snowflake connector to do it in Snowflake tables. Hence, Pinion was born.
Evolution of Pinion —
In the initial days of our cloud migration days when we were brainstorming this idea, it was clear that we didn’t want the logic of merging the incremental data into the target table to be coded in all the pipelines, rather we wanted to build a framework that utilizes all the pipelines to capture changed data set into the target table. This framework will be an abstraction over the Delta lake APIs for S3 and spark-snowflake connector for Snowflake to do SCD type 1,2 & 3 operations in the respective target system.
Before we proceed further, let’s set up the definition of a few terminologies that we will be using in this blog post.
LRF — An LRF aka Load Ready File is an immutable incremental file in parquet format that will be loaded into the target table via any of the APIs that are exposed by Pinion Framework. LRF follows the concept of creating once for a pipeline and consuming multiple times.
Target Table — A Target Table could be any table present in S3 (hive or delta table) and Snowflake. Incremental data generated by a pipeline in the form of LRF/s would be merged into the target table.
What is Pinion Framework -
The Pinion is a plug-and-play template-based load framework that can be used to load incremental data into the target table on S3 or Snowflake. It is written in Python. Any pipeline that generates an LRF after its ETL process could use this framework to finally load LRF into the target table. As of this day, Pinion has the support to interact with a delta table on S3 and Snowflake table. Pinion internally uses the Delta Lake APIs to interact with the Delta tables and spark-snowflake connector for Snowflake tables. If the need arises, Pinion could be extended to interact with any other sink system (e.g. Teradata vantage, Redshift, etc.).
Since the inception of this framework, it was clear to us that we didn’t want users of this framework to write any code to use it, it should be done by providing all the details via a configuration file, and Pinion should do all the heavy lifting. Pinion framework will parse all the params provided in the configuration file and will call the appropriate service and API to merge the incremental data into the target table.
As outlined above, users of this framework have to create a configuration file (.yaml) by providing all the required information in it e.g. service_name, api_name, target table name, lrf path on s3, etc and then run Pinion framework as a spark job by providing the configuration file as a parameter. We will see an example in the latter part of this post.
The diagram shows the integration of the Pinion Framework in a conventional cloud pipeline.
Pinion Architecture —
The below diagram depicts the current architecture of the Pinion Framework.
The Driver Program takes the configuration file as an input parameter, does the bootstrapping, and passes control to the validation layer which validates the configuration file against the predefined template file already configured in the validation layer. The validation layer raises an Exception and aborts the job if any mandatory information is missing from the configuration file.
Once validation is successful it passes control to the service layer, service layer reads the LRFs from S3, invokes the requested service, and merges LRFs into the target system using the requested API.
As of this writing, Pinion has 2 service implementations each of which is targeted to a dedicated target system.
- S3DeltaLakeService: It leverages the Spark DeltaLake APIs to perform all the DML operations on the target system which is S3 in this case. Service reads the LRFs from S3, creates a spark data frame on top of it, and merges it into the target system via DeltaTable.
- SnowFlakeLoadService: It leverages the Spark-Snowflake connector to perform all the DML operations in the target table in Snowflake. Service reads the LRFs from S3 and creates a temp table in Snowflake and then merges the temp table into the Snowflake target table.
Pinion Interface & Available APIs —
Pinion exposes multiple APIs to the end-users to choose from. Each API has its own designated function and should be chosen according to the type of operation you want to perform on the target table.
E.g. insert() API will be used to append the incremental data from LRF into the target table. merge() or upsert() API will be used to update the matching records in the target table and insert the non-matching records from the LRF into the target table.
The interface below represents all the APIs available in the Pinion Framework.
Configuration File —
The below code snippet represents a small subset of the actual configuration file. It highlights only the most important fields which need to be filled by the users of the Pinion Framework and then provides this configuration as a parameter to the Pinion job. To view the complete configuration YAML file, please refer to the following link — Complete Pinion Config YML File
At high-level configuration file is categorized into Default Section and User Input Section.
The Default Section contains the values which are environment-specific and remain the same across the pipelines, however, they can also be overridden on the need basis but it is not recommended.
The User Input section is the place where the user has to provide value to all the mandatory fields specific to their pipeline. In a YAML file, you will also observe that the value for some of the keys is defined as DEFAULT and DERIVED.
DEFAULT value means that this value will remain the same across multiple pipelines (kind of environment properties) and we read them from AWS SystemsManager. DERIVED value means that value will be constructed at the run time using the DEFAULT values and user-provided input.
If you are interested in looking at the complete configuration file, please refer to the link provided below. All the fields in the configuration file are self-explanatory and a description is also provided for each field for better understanding.
Build & Deployment —
Pinion has been written in Python and it is built and distributed as a zip file. Build process generates a zip file and uploads it to a predefined standard location on S3. All the pipelines refer to the Pinion zip file from this predefined location on S3.
The image below shows the location of the Pinion zip file in the AWS S3 production region.
How to execute Pinion —
As explained above Pinion framework is built on top of Apache Spark and this gives us the flexibility to submit as a spark application.
For local development and testing, we can submit a Pinion job as a spark application in Standalone mode.
For including a Pinion task in a pipeline we had used an Airflow operator called as DPPSparkSubmitOperator built by Groupon’s Data Platform team.
Pipeline Integration —
The below image shows one of the production pipelines and highlights the ETL and the Pinion tasks integration in the pipeline represented via an airflow dag. In the example below, the pipeline interacts with two target tables (deal_window_map & deal_window_map_history), that’s why we have two S3 Pinion tasks and two Snowflake Pinion tasks.
Each of these pinion tasks (tasks with prefix pn_) runs as an individual Spark application and refers to the same Pinion zip file present on S3, the only difference among these four Pinion tasks is the details provided in the configuration YAML file.
For reference, please go through the following YAML files that have been used in the example above. To save some space, we are selectively showing those fields in the YAML for which user input was needed and removed the docs from YAML file, for the complete YAML please refer to the provided links. Both the YAML files are targeted for the deal_window_map table however first one is for S3 and the second one is for Snowflake.
S3 YAML File (deal_window_map)-
Complete YAML File — Pinion S3 Deal_Window_Map YAML File
Snowflake YAML File (deal_window_map) -
Complete Snowflake YAML File — Pinion Snowflake Deal_Window_Map YAML File
Advantages of using the framework:
- Completely configuration-based with no need of writing the code to use the framework.
- Plug-N-Play framework without having any hard dependency on any other component of the pipeline.
- A shallow learning curve, all it needs is an understanding of the structure and components of the configuration YAML file.
- Could be used by anyone having SQL knowledge.
- Can be extended to support any other sink system.
- Powered by Apache Spark, providing best in class performance when we are dealing with BigData.
- Language agnostic e.g. Could be used in a pipeline written in Scala or Java, even though Pinion is written in Python.
Success Metrics —
After the first phase of migration was completed, the Pinion framework was used to load data in 157 tables in 71 different pipelines.
Below are some interesting stats on Pinion Framework.
This blog post is Part-1 of the “Pinion” framework multi-part series. In subsequent articles, we will do a deep dive into how we leveraged DeltaLake API to execute transactional operations on billions of rows at a time and how we overcome the challenge of small parquet files which are created after each delta operation.
Last but not the least, we’d like to give a shout-out and say thank you to the Groupon Data Engineering team for providing their feedback and support in testing the framework, without their support Pinion would not have been in the shape it is right now.
Special thanks to Suresh Appavu, Amardeep Kumar Gupta, Prabhu Thiruvambalam for introducing the idea of Pinion Framework and to DE leads for continuously providing their guidance during all the phases of Pinion Development.