A Deep Dive into Unified’s Data Lake

Nick Nathan
unified-engineering
11 min readJan 6, 2020
Photo by Jakob Owens on Unsplash

This post is the second in a mini series about Unified’s data lake. See my last post The Unified Data Lake for more background on what a data lake is and the types of problems it solves for Unified. In this post I’ll go into more detail about how our data lake works, elaborate on the different logical layers, and discuss some of the technologies that compose the system.

What is a data lake actually?

At the highest level a data lake is a set of technologies that allows massive parallel computation across large datasets. So what does that mean? I find it helpful to think of data lakes as consisting of two logical layers, a compute layer and a storage layer. The storage layer is a persistent data store composed of a bunch of files that are configured to be easily accessible by the compute layer. The compute layer is best understood as a cluster of a servers with different software installed on each that allows the group to perform highly coordinated transformations and calculations on those files.

More concretely, at Unified we use Amazon’s S3 object storage as our storage layer, meaning all our data is stored in specially configured files on S3, while our compute layer is represented by Amazon’s Elastic Map Reduce (EMR) service.

  • Large Datasets — Storage Layer — S3
  • Massive Parallel Computation — Compute Layer — EMR

It’s worth exploring each layer in a bit more detail to give context to some of the decisions we made when configuring each layer and to better understand how the data lake is used by our engineering teams.

Storage Layer

We’ll begin with the storage layer. This is probably the most intuitive layer to grasp because it is most like a file system. Put simply, it stores all the data in the data lake in a variety of file formats, compression types and locations. The data can live across a number of different servers or data centers but is all accessible to the compute layer. At Unified we use the EMR File System (EMRFS) on S3 to store all our data.

Compute Layer

Next let’s discuss the compute layer and use Amazon’s EMR service as a point of reference. An EMR cluster is group of EC2 servers with common software installed on each that enables them to coordinate to complete computational tasks on some underlying data storage system. This common software is mostly derived from the open source Apache Hadoop project which originally pioneered the technology. The table below describes each software component of the stack installed on the EMR cluster. I’ve included the traditional Apache Hadoop Stack for reference because it remains the core technology and has been built upon by other open source projects at Apache and by commercial vendors like Amazon.

So what do each of these components in the compute layer do exactly?

Query Interface

The query interface is the way that a developer or analyst interacts with the system and issues commands to the EMR cluster. It is the language used by client applications to instruct the cluster on what data to fetch and which calculations to perform. Unified uses Hive as the query interface which has a SQL like language called HiveQL. Hive itself is fairly complex and we won’t go into too much detail here but there are two relevant components:

  • Client Logic — this includes features like a command line interface (CLI), a web interface, and a thrift server i.e the HiveServer etc. These are the mechanism by which a client issues HiveQL commands to the system.
  • Metastore — the metastore is a relational database specific to Hive but deployed independently of the EMR cluster. It is primarily responsible for preserving the logical structures used to represent data in the storage layer and thus enables schematized datasets. In other words, it stores mappings between S3 files and Hive tables which contain columns and rows like a more traditional relational database system. Therefore the logical representation of a table is largely independent of the underlying data. While this decoupling enables powerful abstractions like HiveQL it also requires that developers manage the underlying S3 data. Finally, the metastore contains … you guessed it … metadata about each of the tables to help improve query performance.

Execution Framework

The execution framework sits below Hive and is responsible for taking a parsed HiveQL query string and converting it into a query plan. These query plans are represented by a data structure called a directed acyclic graph or a DAG. Nodes in the DAG represent map and reduce tasks. Tez is responsible for building out the logical instructions for how to efficiently execute a query or calculation across all the nodes in the cluster and then running those instructions. Tez performs a very similar job as Hadoop MapReduce however it includes a number of additional optimizations. While I won’t go into an explanation of MapReduce here it’s worth looking into to better understand why these distributed systems are so powerful when working with large datasets.

Resource Manager

Because all these operations are distributed across multiple servers there is a need for logic to help coordinate a single task across all the different nodes in the cluster. YARN which stands for “Yet Another Resource Negotiator” sits beneath Hadoop MapReduce or Tez. It is a scheduling system used to allocate work, including mapping and reducing workloads, across nodes in the cluster while taking into account the fact that some nodes may fail and become unavailable. YARN consists of two main parts:

  • The ResourceManager — this is the logic running on the “master” node and is responsible for issuing commands across all other nodes in the cluster. It monitors for new nodes joining the cluster and for existing nodes failing and assigns work accordingly.
  • The NodeManager — the node manager is the logic running on each of the “task” nodes. The NodeManager does the actual work of executing tasks on each node in the cluster and communicating back to the master node.

Each of these components, the query interface, the execution framework, and the resource manager are all installed on the EMR cluster and make up a compute layer that can execute against the data store. Now that we have established a baseline for understanding how data is stored and manipulated we’ll discuss how Unified has set up our data lake to convert raw data into performant datasets useful to consuming applications and end users.

Data Transformation Zones

Not only must Unified collect massive amounts of raw data from different social publishers and client partners but we must transform that raw data into useful schematized datasets that can be analyzed or exported to high performance databases. Our data lake gives us the ability to create highly flexible data pipelines which do the hard work of transforming and shaping these large raw datasets. To accomplish this we segregate data in the data lake into four major zones. Each zone is represented by a different method of storing the data after some transformation.

  1. Raw Zone
  2. Structured Zone
  3. Curated Zone
  4. Consumer Zone

Raw Zone

The raw zone represents all data stored in the data lake in its original source format. At Unified, most of the raw zone consists of files containing JSON HTTP responses from the various social publishers we use to collect data. One raw input file might contain thousands of JSON strings, one on each line. It’s worth noting that once ingested and stored in the raw zone the data never changes and can serve as the source data set for a wide variety of different data transformations. While Unified primarily stores JSON data our data lake can accommodate data in any format from gzipped csv files to raw text files to image files etc.

Structured Zone

The structured zone contains all files after they’ve completed an initial transformation from their raw format. This first transformation takes the raw data and converts it into Parquet format. Parquet is an open source file format standard, managed by Apache, that uses a columnar store instead of a row based store more commonly used by traditional relational databases. While we won’t go into depth on columnar stores the basic idea is that instead of storing all the values for every column for a single record together, all the values for a single column across many records are stored together. Therefore data is grouped together on disk based on the column. This file format enables high performance queries across large datasets because it can greatly reduce the amount of data that must be accessed for a given query. Joins can become less costly and often queries only require that one or two columns be returned anyway.

Row Store (Simplified Example)

Database File 1
col_a col_b col_c col_n
1 b1 c1 n1
2 b2 c2 n2
3 b3 c3 n3
Database File 2
col_a col_b col_c col_n
4 b4 c4 n4
5 b5 c5 n5
6 b6 c6 n6

Column Store (Simplified Example)

Database File 1
metadata: col_b, col_c
b1 b2 b3
b4 b5 b6

c1 c2 c3

Database File 2
metadata: col_c, col_n
c4 c5 c6
n1 n2 n3
n4 n5 n6

At Unified, we use Spark jobs (another distributed computing framework from Apache) to convert our raw JSON datasets into Parquet formatted files. Once formatted as Parquet the data can be easily queried and manipulated in Hive. This allows the data to be represented as tables with rows and columns. Below is an example of a Hive table used to represent the now structured data files. This particular table contains information about advertising campaigns being run on Facebook.

Let’s walk through the table definition to try and get a better understanding of each component. The first thing you’ll notice is the statement CREATE EXTERNAL TABLE. The key idea here is that by marking this as an “external” table we tell Hive that all of the data making up this table lives in the data store. Remember, this table definition is stored in the Hive metastore and is only a logical representation of the underlying data. The statement LOCATION tells Hive where to look for the files that contain the underlying data. As you can see the location is simply an S3 key prefix. The statement right above, STORED AS PARQUET tells Hive that these files are stored in that particular file format. Similarly, the TBLPROPERTIES statement indicates that the Parquet formatted files are compressed using the Snappy Codec. Snappy is compression and decompression library written by Google for big data technologies. Snappy optimizes for speed as opposed to maximum size compression. Gzip, by comparison, compresses the files down to smaller sizes but is a bit slower.

Finally, there is the PARTITION statement. Partitioning is a way of grouping data files into directories to make processing easier. Therefore instead of storing all of our Parquet data files in a single directory, we group the files by a particular field, in this case the collection_dt or the date on which the data was collected. When querying the data using HiveQL, a client or user can specify which partition(s) they are interested in and Hive will only scan the files in those directories thus improving performance. Once partitioned, the table directory structure would look something like below.

Curated Zone

Now that the data is in a structured format we can begin to run Hive queries to improve the data quality and reformat the data further to improve query performance. Broken or duplicate records can be removed at this stage. The new datasets created by this second transformation are represented by what are called bucketed Hive tables. When a table is bucketed all the data is distributed among a fixed number of files of roughly equal size. If bucketing is done on a partitioned table then each partition will contain the number of buckets specified. How does Hive know which records go in which bucket? When creating the bucketed Hive table a column or columns must be specified which are then hashed to produce the bucket number. See below an example of the same facebook campaigns table transformed into a bucketed table.

This table is broken into 8 buckets as indicated by the CLUSTERED statement. The column campaign_id is used as the input to a hash function which will determine which file to store that record’s data in i.e.

hash(campaign_id) mod 8 = bucket number

It’s worth noting here that while the data is being partitioned and bucketed on a record level basis the data remains stored in columnar format according to the Parquet standard. Finally, the SORTED BY statement indicates how the data should be arranged within any particular bucket. In this case the data would be sorted by campaign_id and account_id.

Ultimately, the goal of all this bucketing and sorting is to improve query performance. The choice of columns to be used for partitions, buckets and sorting is all heavily dependent on the types of queries that are going to be executed against the table. Therefore when deciding which columns to use it is very important to first consider how the table is intended to be used.

Consumer Zone

The consumer zone represents Hive tables after the final stage of transformation. It is at this stage that any business logic can be applied to the formatted data to create tables specific to a particular application or client. Hive tables at the consumer stage can be much smaller because they contain only the most recent data, for example, or because they contain only a subset of records or columns. Similarly the consumer tables can combine datasets or records from multiple tables at the curated zone. It is these tables that will be most performant and tuned specifically for reporting use cases or simply exported into high performance relational databases like MySQL or Postgres.

Summing It All Up

Unified’s data lake is a powerful compilation of technologies which we use to efficiently store and process the massive amounts of data collected by the business. The unique structure of the data lake allows us to cheaply store raw data from a number of different sources in a single place and then selectively transform that data into new datasets as dictated by the needs of our analysts and applications. Unlike more traditional databases the data lake requires a more in depth understanding of the underlying storage architecture and the software used to query and manipulate that data. Furthermore, it means that our data engineers must work closely with end consumers to understand their use cases and query requirements such that datasets can be composed for fast query times.

While Unified’s Hadoop stack sits mostly on Amazon infrastructure the concepts discussed in this post can be broadly applied to any big data project. Unified’s data lake is still fairly young in its development and we continue to improve and optimize it. If you’re a developer interested in working with big data and distributed systems then be sure to check out Unified at https://unified.com/about/careers-and-culture.

--

--

Nick Nathan
unified-engineering

Building apps and technical infrastructure for startups and growing businesses.