Building a Data warehouse with Hive at Helpshift — Part 1

Abhishek Gupta
helpshift-engineering
11 min readMar 23, 2020

This will be a series of posts describing how we built Helpshift’s data warehouse and how we run it on production. It will cover our journey from having no existing data warehouse to designing and building one from scratch. How we use Apache Hive with ORC on Hadoop, how we do ETL, leverage Kafka for data portability. In the subsequent posts, we talk about how we handle data security using Apache Ranger, data quality checks and reporting using Hue.

What we had in the beginning

Helpshift is a digital customer service platform where users can engage in conversations with brands and the brands can efficiently manage the conversations with the help of AI-powered automation. We run a MongoDB cluster in production as our primary database. A subset of the data is stored in ScyllaDB. The production databases store all the data related to these conversations (issue tickets, messages, faqs, chatbot interactions, user profiles, etc).

Figure 1: Analysts running queries on Production replicas

Initially, we had no specific data store for accessing these data in ad-hoc “analytics-like queries” (described below). Although we did have a data pipeline on production catering to the analytics needs of our customers in the form of realtime Analytics dashboard and Analytics APIs that provided insights on the support performance, there was nothing available for internal analytics. As a result, data analysts and product managers had to connect to replicas of the production databases for running ad-hoc analytics-like queries, internal BI tools, and financial reporting.

What were the limitations

OLTP databases like MongoDB and ScyllaDB are not good at handling large analytical queries (OLAP workloads) that involve joins across tables, leveraging complex SQL functions. In fact, databases like ScyllaDB are modeled strictly to work efficiently only for a small set of query patterns, like

SELECT * FROM user_table WHERE id = <user_id>

compared to an OLAP query like

--users who have sent more than 100 messages since 1st Jan, 2020SELECT ut.id, count(mt.id) as count_user_messages 
FROM user_table ut, message_table mt
WHERE ut.id = mt.user_id AND mt.created > '2020-01-01'
GROUP BY ut.id
HAVING count_user_messages > 100
LIMIT 1000;

On top of that, our data model on MongoDB was such that it didn’t allow multi-tenant queries across multiple customer’s data as they were segregated into separate databases, one per customer.

There were performance problems like queries taking a long time, frequent timeouts, manual tasks for data aggregation and post-processing which lead to unhappy readers of the data and a growing need for a real data warehouse.

Coming up with the requirements

A data warehouse (from here on referred to as “DWH”) is a repository of all the data from various data sources organized in a manner that is easy to run analytics-like queries for data-driven decision making. It is different from the primary OLTP databases that are tuned to serve live traffic for business transactions and not OLAP queries.

At the time, we were running secondary non-production use-cases from our production pipeline. Our machine learning models were querying primary databases for training data, our reporting systems were querying primary databases for daily and hourly reports. This impacted our ability to tune our databases for our end-users.

There are two ways we can go about building a DWH

  1. Build it in a bottom-up iterative way based on immediate consumer requirements and build upon them as new requirements come
  2. Dump all of the available data onto the DWH in one go

Eventually, we decided to go with the first approach because our initial target was to enable offline training of machine learning models and faster feedback. Based on this, we gathered requirements from the ML team, data scientists, analysts, and product managers. We identified that we need to onboard only a subset of the data from our transactional data stores onto the DWH.

Approach #1 has several benefits described in this article that talks about

Taking due care early in the process to ensure that the data contained in the data lake is used, or else there’s a real risk that the data lake becomes a data swamp, basically a dumping ground for data of varying quality. These cost a lot to maintain and deliver little value to the organization

The first version of the DWH was supposed to have

  • A subset of our production MongoDB and ScyllaDB tables (Issues, Messages, FAQs, Users, Agents, Bots interactions)
  • Some of our event stores (Kafka logs on S3) with a structured relational view
  • A relational schema that would make it easy to run fairly complex analytics using powerful SQL queries
  • Data sync SLA of 3 hours

Picking up the tech-stack

While choosing the stack, the goal was to have a solution that is

  • Easy to use. We agreed that SQL was the best way to query the data for all our planned requirements.
  • Scalable
  • Maintainable and observable i.e easy to inspect and debug
  • Cost-efficient vis-à-vis storage and compute cost
  • Leverages open source for open standards

Cloud or Self-Managed?

Existing cloud DWH solutions like AWS Redshift and Google BigQuery were suitable for our requirements. Both Redshift and BigQuery are great choices if you need SQL queries and mutable tables without the hassle of managing a distributed database at the cost of paying a premium. However, our initial experience with running Redshift on production and its high-cost implications and the high likelihood of running into vendor-lockins lead us to explore open-source self-managed options.

Since our existing data pipelines were already on a self-managed Hadoop cluster (HDP 2.6) and we had some expertise running HDFS for storage and Tez/Spark on Yarn for batch compute, we evaluated and decided to use Apache Hive and a combination of Batch and streaming ETL (Extract, Transform, Load). Following was the stack

  • Apache Hive as the distributed data warehousing database and also as a relational table metadata store
  • Column-oriented file formats on HDFS using ORC files for super high compression and high performance reads
  • Jenkins for distributed job scheduling (Tez, MR/Spark)
  • Kafka as a data bus
  • Apache Ranger for security and fine-grained access control

Architecture and components

Figure 2: Architectural components of Helpshift’s DWH

Batch ETL & Streaming approaches

We use Jenkins as a distributed Cron scheduler for our ETL jobs as we are heavily invested in it. You can alternatively choose to use Apache Airflow which is pretty cool. These jobs run every 3 hours to capture the changed datasets from Mongo, Scylla and Kafka event logs on S3. Following are the approaches used for change capture

  1. Range Query This is a common ETL approach where data is extracted via range queries at regular intervals. We use this for Mongo tables for all the customer databases as Mongo has good support for running range queries if a column that captures update timestamps is indexed.
  2. Double Writing This is a streaming approach used for databases like Scylla and Cassandra where range query may not be feasible if the table is modeled for lookups. The idea here is to publish the change to a Kafka topic every time a write is performed on Scylla. The topic is connected to an S3 sink which is later read in the Batch ETL job.
    This approach has some performance problems due to the need for writing to two different systems instead of one. Also, there is the problem of consistency in the absence of distributed transactions. However, a distributed transaction defeats the purpose of a high write-throughput database. For non-mission critical systems like data warehousing, it was wise to go ahead with this approach for capturing Scylla changes as the data gets consistent eventually.

Although both the above approaches work for us at the moment, they have the limitation of being difficult to scale in the future. This is why we are planning to move to a Change-Data-Capture (CDC) architecture using Debezium, which provides a scalable way to stream data changes from different databases to the DWH.

The captured data changes from the various data sources are run through the necessary extractions and transformations before getting loaded into a staging data table on Hive. Transformations mostly involve modeling the data points into a relational schema and standardizing field names.

Hive ACID tables

Hive is a distributed data warehouse software built on top of Hadoop for reading, writing, and managing large datasets residing in distributed storages like HDFS and S3 using SQL. Files on HDFS and S3 are immutable for the purpose high performance reads and writes and Hive uses HDFS/S3 files in its storage engine. Hence, by default, the only modes of writing on a Hive table are to either overwrite the whole table or to append to the table, both of which create new immutable files. There is no way to update the rows on the table.

However, most of our data entities like issue conversation, user profile, agent profile are updateable. Hence, we need a table that supports data mutations and updates. For facilitating this we use Hive ACID tables. These tables provide ACID transaction semantics with Snapshot Isolation*.

Each DML(Data manipulation language) mutation on the table creates a new Delta file on HDFS and reads on the table refer to all the Delta files to serve the updated result, similar to a storage engine based on LSM trees. A Compactor process runs periodically to merge/compact the delta files and thus manage garbage cleanups. These tables must be marked as transactional and additionally use an ACID-compliant file format like ORC as shown below.

CREATE TABLE dwh.issues
...
PARTITIONED BY(year STRING, month STRING)
CLUSTERED BY (id)
INTO 64 BUCKETS STORED AS ORC
TBLPROPERTIES("orc.compress"= "SNAPPY" , "transactional"="true");

As described before, the ETL jobs write the changed datasets to a staging Hive table which is overwritten for every job run. This change-set needs to get merged into the main table which is a Hive ACID table. This merge operation will lead to some rows getting updated and some inserted. Next, we look at the problem of executing such merge operations reliably.

*Snapshot isolation for a data warehouse may be important when you want users to have a consistent view of the data in a complex SQL query that involves multiple sub-queries and joins on the table, while other transactions may be concurrently mutating the underlying data.

Problem of atomically merging changes in bulk

As a part of executing the merge stage of the job, the rows in the staging Hive table need to be transformed into a set of insert and update queries on the main ACID table. This is challenging as Hive SQL grammar doesn’t support UPSERTs. This makes the process of generating the insert and update mutations require a complex join between the staging data set and the main table.

Let’s understand why running insert and update queries in bulk is not a good idea. Running hundreds of thousands of such UPSERTs from the change-set would not have been ideal from a performance perspective as each such transaction would have created a new delta file per table bucket which in turn would have put significant compaction pressures on Hive.

On top of that, as these mutations are not happening atomically there is this difficult problem of handling partial failures. Following are the two approaches for solving this problem

  1. Merge Join Rewrite table The idea here is to load all the rows from the main table and join it with the rows of the staging table, generate the merged rows from the join of the two and overwrite the main table. This approach has the disadvantage of re-writing the main table for every ETL job run which leads to wasteful reads/writes. But this was the only approach feasible before there were Hive ACID tables.
  2. Hive Merge DML This is the preferred approach for merging change-sets atomically in bulk on the ACID tables. The MERGE DML allows actions to be performed on a target table based on the results of a join with a source/staging table. The merge operation is executed in a single atomic transaction with no possibility of partial failures. It also results in all the insert and update mutations going to a single delta file per bucket. This leads to minuscule pressure on the compactions.
MERGE INTO dwh.issues
USING staging_issues ON issues.id = staging_issues.id
WHEN MATCHED THEN UPDATE SET
title=staging_issues.title,
...
WHEN NOT MATCHED THEN INSERT
VALUES(staging_issues.id
staging_issues.title
...);

This is also the reason why we preferred a batch ETL approach to registering these mutations instead of a real-time streaming approach which would have put a lot of load on both the Hive Transaction manager as well as the Compactor.

Column-oriented file format with ORC

We use ORC as the file-format in Hive. Being a Column-oriented file format, ORC offers excellent compression, delivered through a number of techniques including Run-length encoding, Dictionary encoding, and Bitmap encoding. Unlike traditional RDBS stores, a column-oriented storage engine will lay out the data column-wise instead of row-wise, such that the values of a single column will be stored together in disk blocks.

In a typical column-oriented DB, the set of data in a column tends to compress better (as shown in Figure 3) as they are similar and also repetitive. In columns of low cardinality eg a table column storing platform values like android, ios , web, etc, will have very few unique values and will compress efficiently if they were to be stored using a column-oriented disk layout. This is not possible in a traditional RDBMS storage engine where disk blocks store all the columns of a row together.

Figure 3: Data compression offered by ORC file format (Source: https://blog.cloudera.com/orcfile-in-hdp-2-better-compression-better-performance/)

In addition to a significant reduction in the storage cost, ORC files make use of intelligent indexing to support Predicate Pushdowns (PPD) for your Hive queries. Here, the queries that use some filters can skip disk blocks intelligently using the index information encoded in the ORC files. This achieves significant query speedups as shown in the following TPC-DS benchmark (Figure 4)

Figure 4: Query speedups offered by ORC file format (Source: https://blog.cloudera.com/orcfile-in-hdp-2-better-compression-better-performance/)

Columnar file formats are also useful if you have extremely wide tables, which most DWH fact tables have, but typical queries read only a subset of columns. The query engine in such cases needs to load only a few files containing the column’s data instead of all which is the case in row-oriented databases where the query engine needs to read blocks containing entire rows data even if only a subset of columns are queried.

Summary

Just to summarise, in this post we

  • Described our process of understanding the need for setting up a data warehouse
  • Coming up with the requirements for building one and picking up the technology stack.
  • Talked about Batch ETL strategies for capturing change-sets from different types of data sources and loading them into mutable ACID tables in a reliable and efficient manner.
  • We also talked about column-oriented storage engines and their benefits from the perspective of a DWH system.

In the next post

In the next post, we will look into

  • How we handle the monitoring of such infrastructure on production and how we deal with failures
  • Secure access to the data using Apache Ranger with fine-grained access control at the column level (for securing PII data accesses)
  • Expose the query interface to different types of consumers like Data analysts doing ad-hoc SQL queries, Data scientists and ML engineers needing to run some Spark pipeline for experimentation, internal BI tools/reports, etc
  • How to run data quality checks on the DWH
  • Limitations of Batch ETL and the need to move to real-time streaming writes on DWH (The next frontier in Data engineering)

--

--