A Simple Data Lake ETL Pipeline

Nick Nathan
unified-engineering
6 min readFeb 5, 2020
Photo by Jeremy Bishop on Unsplash

For the third and final installment of our data lake mini series we’ll go through a simplified example of an ETL pipeline our engineering team built using our data lake. Check out the previous installment A Deep Dive Into Unified’s Data Lake for a more in depth architectural overview. In this post I wanted to show more concretely how the concepts previously discussed could be applied towards solving an actual business engineering problem. This particular example is useful in illustrating the power of this distributed computational methodology and why without a data lake Unified wouldn’t be able to generate the kinds of sophisticated analytics it offers to our customers.

Setting the Scene

This particular project involved ingesting and transforming a large dataset from an external vendor into our data platform. The dataset is now used by our engineering and data science teams to enrich our existing datasets and increase the number of metrics available in our analytics products. Our data vendor delivers about 620 files every month, each of which are about 725 MB compressed, into a shared s3 bucket. In total that’s about 450 GB of compressed data and more than a terabyte uncompressed. In addition the vendor delivers a single mapping file which we use to make sense of the massive data set.

On each line, in each of the approximately 620 data files there is a single id value, followed by a pipe, and then followed by an unknown number of subsequent values delimited by a comma. Each file contains about a half million lines.

<id-A> | <value-1>, <value-2>, <value-3>, … <value-n>
<id-B> | <value-1>, <value-3>, <value-6>, … <value-n>
<id-C> | <value-2>, <value-10>, <value-12>, … <value-n>

On each line of the mapping file there is the set of possible values and their corresponding descriptions.

<value-1>,<value-1-description>
<value-2>,<value-3-description>
<value-3>,<value-4-description>

<value-n>,<value-n-description>

The problem was that in order for our application to utilize the data we had to re-group it by value and then join it back to the mapping table. Furthermore, we needed to transform the data to be able to easily look up how many distinct ids existed for each value. Therefore the following query structure informed the ETL pipeline design:

SELECT v.id
FROM value_table AS v
INNER JOIN mapping_table AS m
ON v.value_id = m.value_id
WHERE value = <some value>

The Main Act

This would have been a challenging problem to solve prior to the construction of our data lake given the scale of the data and likely impossible without some kind of distributed computational framework. With the help of our data lake however building this ETL pipeline became relatively simple. You might recall from the previous post in the series that there are 4 zones in our data lake each representing a different stage in the ETL pipeline: the raw zone, the structured zone, the curated zone, and the consumer zone. We’ll walk through each phase of the pipeline to show how the data is gradually transformed in the different zones until it reaches a state easily accessible by our applications.

Phase 1: The Raw Zone

The first phase of the pipeline involved copying the raw data from an s3 bucket shared with the vendor into our data lake bucket and then updating the hive metastore so that the team could manipulate the raw data as a hive table. When copying we define the new s3 keys with a table name and a partition corresponding to the date so that we could easily distinguish between data deliveries month to month. Therefore we issued a bunch of commands that look something like:

$ aws s3 cp s3://vendor-bucket/file-1.csv.gz s3://unified-datalake/vendor/raw_table/20190801/file-1.csv.gz

Notice how the new s3 key is structured like s3://<datalake>/<namespace>/<table-name>/<partition>/<file-name>. This enables us to then define a hive table like:

After the table is defined we run the following hive command and then the team can begin to query and manipulate the data in hive.

MSCK REPAIR TABLE `vendor`.`raw_table`

Phase 2: The Structured Zone

Once in hive we could then begin to transform the data so that it could be grouped by value. The key operation was a transpose so that instead of each row containing an id and string of values each row contained only a single id and then a single value. Using the example above the transformation looked something like

Raw Table

<id-A> | <value-1>, <value-2>, <value-3>, …. <value-n>

Structured Table

<id-A>, <value-1>
<id-A>, <value-2>
<id-A>, <value-3>

<id-A>, <value-n>

Not only was this a fairly computationally heavy operation but it increased the size of the dataset because the id values had be duplicated across each distinct value. However, once we had a one to one mapping between ids and values we could begin to group the data by value. By partitioning the transformed table by value the team could more easily calculate the distinct set of ids per value. See the table definition and load script below:

Table Definition

Load Script

Notice how the transformed table has two layers of partitions. While the first layer of partitions collection_dt is useful for isolating only the most recent data for transformation the second layer of partitioning applies the business logic.

Phase 3: The Curated Zone

After the data was transformed and grouped to meet the business use case the final stage of the pipeline involved clustering and sorting the data for fast and efficient querying by the end application. Because our application’s query logic involved a WHERE clause filter on the value field we wanted to be sure that the data was stored in such a way make those queries as fast as possible. The final table definition therefore looked as follows:

Once complete the data could be efficiently queried by value_id and then joined with the mapping table to find all the distinct ids for that value. The whole pipeline could be completed in under 12 hours using a relatively small EMR, something that would have taken days to complete if we were to use a more conventional relational system or scripting.

Behind the Curtain

While the entire pipeline was very simple from a code logic perspective the real challenge was figuring out how to configure the EMR cluster to handle the workload in a fast and efficient way. After testing several different instance types we found that one r4.8xlarge core node and two r4.4xlarge task nodes gave us enough compute to perform the complex transpose operations. While the memory requirements for each node were relatively modest the disk space requirements were enormous. In order to execute Hive queries the EMR writes out intermediate results to disk on the core nodes. In the end, the pipeline required a terabyte volume in order to support the size of the workload.

Because of the data scale the team ran into EMR configuration issues when querying the data as well. Initially, when running our application against the new tables our client kept dropping connections inexplicably. We quickly realized that the Hive Server 2 (HS2) process running on the cluster couldn’t handle more than a handful if incoming connections because the duration of the queries was fairly long. When we checked our HiveServer2 logs we found the following:

INFO [HiveServer2-Handler-Pool: Thread-46([])]: thrift.ThriftCLIService (ThriftBinaryCLIService.java:deleteContext(137)) — Session disconnected without closing properly.

We subsequently observed that the HiveServer2 service restarting itself at the same time as we saw a spike in the number of incoming network connections. Eventually we discovered that the high number long running queries was using up all the available heap space allocated to the HiveServer2 process. By increasing the amount of memory available to the server we were able to dramatically increase the number of connections and thus total throughput.

The Finale

Even though this is an example of a fairly simple use case I think it does a good job of showcasing the power of the tools and the value that the data lake creates for the business. What could have been a very time consuming and painful project was accomplished using relatively simple Hive logic and some minor troubleshooting and tuning of an EMR cluster. Any business that needs to build ETL pipelines at scale to feed analytics tools whether for customer or internal use can benefit from these types of distributed technologies.

If you’re a developer interested in working with big data and distributed systems then be sure to check out Unified at https://unified.com/about/careers-and-culture.

--

--

Nick Nathan
unified-engineering

Building apps and technical infrastructure for startups and growing businesses.