Building scalable near-real time indexing on HBase

Pinterest Engineering
Pinterest Engineering Blog
9 min readJul 20, 2021

--

Ankita Wagh | Software Engineer, Storage and Caching

HBase is one of the most critical storage backends at Pinterest, powering many of our online traffic storage services like Zen (graph database) and UMS (wide column data store). Although HBase has many advantages like strong consistency at row level in high volume requests, flexible schema, low latency access to data, and Hadoop integration, it doesn’t natively support advanced indexing and querying. Secondary indexing is one of the most demanded features by our clients, but supporting that directly in HBase is quite challenging. Maintaining separate index tables as the number of indexes grows is not a scalable solution in terms of query efficiency and code complexity. This motivated us to build a storage solution called Ixia, which provides near real-time secondary indexing on HBase. The design is largely inspired by Lily HBase Indexer.

Ixia provides a generic search interface on top of HBase, which plays the role of the source-of-truth database. A search engine is natively optimized for inverted index lookup, stores indexes. Search Engines also provide a rich set of search and aggregation queries and support the majority of indexing and retrieval use cases at Pinterest.

In this post, we will first provide a brief introduction on the architecture of the system and the design choices. We will then cover how we maintain Ixia’s data consistency SLA, how we address disaster recovery, and what measures we had to take to improve the overall system performance. Finally, we will provide some guidelines regarding future work and opportunities.

Architecture

This diagram describes how an insert request travels through different components of Ixia.
Figure 1: System architecture showing data flow

In the subsections, we will briefly describe each component to finally explain the whole flow.

HBase

HBase is a column-oriented NoSQL database management system that runs on top of the Hadoop Distributed File System (HDFS). It is modelled after Google’s Big Table and written in Java. It is well suited for real-time data processing or random read/write access to large volumes of data. We use HBase as the source-of-truth DB for Ixia.

Replication Mechanism & Change Data Capture (CDC) System

HBase replication is a mechanism to copy data from one HBase cluster (primary) to another cluster (secondary). This is done asynchronously by replaying the Write Ahead Log entries (WALEdits) in Write Ahead Log (WAL) from the active cluster to standby cluster region servers. A WALEdit is an object that represents one transaction and can have more than one mutation operation. Since HBase supports single row-level transactions, one WALEdit can only have entries for one row.

We introduced custom HBase replication sink servers implementing the replication sink APIs of standby clusters. The replication sink service provides the business logic to translate WAL events into messages and asynchronously publish them to Kafka with no changes to HBase code. The replication sink server sends an acknowledgement (ACK) to the replication source (primary) cluster if it is able to publish this event to Kafka.

public class ReplicationSinkService implements HBaseRPCErrorHandler, AdminService.BlockingInterface, PriorityFunction { ….. ….. ….. /** * Replicate WAL entries on the region server. * * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override public ReplicateWALEntryResponse replicateWALEntry(RpcController controller, ReplicateWALEntryRequest request) throws ServiceException { …… } }
Figure 2: Code snippet showing API of Replication Sink Service

The in-house notification framework called Argus receives Kafka events and triggers business logic based on the events.

These two systems, together with Kafka, form the backbone of our HBase-based CDC framework, which is used widely for several use cases in addition to Ixia. The HBase replication sink mechanism guarantees all WALs are published to Kafka, whose consumers execute customer defined business logic for each event.

Search Engine

We have an in-house search engine called Muse, which is a generic information retrieval platform. Muse is heavily optimized for online serving and provides a rich set of search and aggregation queries. Muse is used at Pinterest for different critical use cases like home feed, ads, shopping, etc. Ixia uses Muse as its search engine to provide rich search functionalities in the non-row-key columns.

We evaluated Solr or ElasticSearch to use as search engines. Both have wide industry adoption and good query and index performance. Muse has equivalent functionality to Solr and Elasticsearch, and it is fully integrated in the Pinterest stack. We decided to opt for Muse in order to keep the tech stack in accordance with the rest of Pinterest. Nonetheless, our pluggable query engine interface in Ixia makes it easier to switch to a different search engine if needed in the future.

Cache

Ixia uses Pinterest’s distributed caching infrastructure backed by Memcached and Mcrouter. It uses a look-aside caching strategy to optimize performance of reads and reduce the load on HBase. Ixia’s request pattern is characterized by a very high read-write ratio (~15:1) and adding a cache has led to huge infrastructure cost savings. Each cache entry corresponds to an ixia document which in turn corresponds to an HBase Row. Read requests are first synchronously checked in cache and returned to the client if available, and missing entries are asynchronously backfilled from source-of-truth storage. All write requests are first deleted in cache to maintain Ixia’s data consistency contract. The query API in Ixia gives the flexibility to request a subset of fields from a document. To reduce the implementation complexity of assembling a document from cache and HBase, we decided to deliberately reject such cached entries which didn’t contain all the fields requested and asynchronously backfill them in the cache.

We experimented with a write-through cache strategy and found out that the hit rate was very poor if Ixia wrote to the cache during write time. Due to high read/write ratio, the write traffic was so low that documents ended up being read from HBase and cache was not very useful. After changing the strategy to look-aside as described above, the cache hit rate significantly improved and has been around 90%.

End-to-End Request Flow

  1. When an insert/upsert request comes in, Ixia deletes this key from the cache and writes synchronously to HBase (as shown in Figure 1). The WAL gets published to Kafka after getting replicated to replication sink servers. The notification framework (Argus) consumes the update event and sends the request to Ixia with a flag that instructs the service to only update Muse via Kafka. The flow is similar for a delete/remove request.
  2. When a query request comes in, Ixia sends it to Muse for inverted index lookup. Ixia then performs forward data lookup of all the doc keys returned by Muse in HBase to ensure that the docs exist in the source of the truth database.
  3. When a Get request comes in, Ixia gets the result directly from the database and returns it to clients. This RPC is a relatively thin wrapper for direct KV access to HBase.

Data Model

Ixia is a document store which is made up of different logical components as follows-

Namespace — Ixia’s namespace has a 1:1 mapping with namespace in HBase and namespace in Muse.

Document Key — This key uniquely identifies an Ixia Document and has 1:1 mapping with an HBase row key.

Table — An Ixia table maps to an HBase table.

Fields — An Ixia field is structured as <namespace>:<table>:<hbase_column_family>:<hbase_column_name> which uniquely identifies an HBase cell and a Muse Index field.

Schema Management & Time To Live (TTL) Support

Ixia supports online schema change. HBase is schemaless, but Muse uses typed schemas that are declared beforehand. Currently, Ixia supports the addition of new indexes with no effect on online traffic. These indexes need to be deployed in Muse configuration and Ixia thrift layer. The new indexes can later be backfilled by clients for older documents.

Ixia supports TTL with guarantee of Hbase TTL +/- cache TTL.There can be cases of inconsistency if a document is cached and expires in HBase. Therefore we set shorter cache TTL for use cases which are more sensitive to expiry accuracy.

Consistency Model

Ixia, similar to HBase, supports strongly consistent Get requests. The index is updated only after a write request to the database is successful. This is a natural consequence of CDC framework since the WAL events are guaranteed to be published to replication sink service and hence to kafka. The notification framework consumes these messages and sends indexing request to Muse via Ixia. This guarantee was one of the core design motivations of using CDC layer for indexing. The Query API is used for searching documents from the search engine. It provides rich features like filtering based on match, membership, range, etc., and aggregation results based on sum, average, maximum, minimum, etc as shown below in Figure 3. Due to architecture’s asynchronous indexing, the Query API is eventually consistent. The P99 latency between the time a document is written and the time it is searchable is ~1 second.

This example shows how an Ixia query will look in practice. Logical statement: a = “a” AND (b = “b” OR c = “c”) Equivalent Filter struct: Filter{ logicalOperator: LogicalOperator.CONJUNCTION, criteria: [ FilterCriterion{ match: Match{field: “a”, value: “a”} }, FilterCriterion{ filter: Filter{ logicalOperator: LogicalOperator.DISJUNCTION, criteria: [ FilterCriterion{ match: Match{field: “b”, value: “b”} }, FilterCriterion{ match: Match{field: “c”, value: “c”} } ] } } ] }
Figure 3: An example of Ixia’s query request

Performance

Ixia has been in production for more than a year and serves several critical use cases like Shopping, Ads, Trust and Safety, etc. Due to the distributed nature of the components, the system is horizontally scalable. The API thrift layer and Muse have CPU alarm-based auto scaling. We have monitoring and alerting in place to ensure that scaling needs are met before they start to affect system availability and reliability for other components. We are able to tune the thrift layer, cache layer, muse layer, and HBase layer configurations based on our clients’ needs. These parameters are typically determined by factors like QPS, latency, throughput, query pattern requirements etc. We have deployed dedicated and shared clusters based on criticality of use case, data volume etc. One of our production clusters is serving ~40k qps at a p99 request latency of ~5ms for a response size of ~12KB with 99.99% availability SLA. The maximum throughput of the entire system peaks at ~250k qps.

Disaster Recovery

For fault tolerance, Ixia is backed by two HBase clusters. The active cluster serves online traffic and continuously replicates to the standby cluster. If there is a problem with the active cluster, we have mechanisms in place to do zero-downtime failover and make standby active without affecting availability.

Additionally, there are systems in place to take periodic HBase snapshots and continuous WAL backups. These two mechanisms working together provide us point-in-time recovery of the source-of-truth data.

We have Map reduce jobs that can deterministically derive an index based on HBase snapshot. This offline Muse index can be used to start a new Muse cluster at any point as long as the source of truth data is intact.

Hence, we have the ability to recover source-of-truth data as well as indexes if ever there is data corruption.

Learnings & Future Work

System Complexity

It has been a great experience to learn, contribute, and onboard clients on such a complex distributed system. Due to native lack of support for secondary indexing in HBase, the async indexing pipeline has multiple components increasing the operation load.

The API surface area of the Ixia thrift layer is large allowing clients a lot of flexibility to write expensive queries. This makes it imperative for us to put system limits at various layers to protect from cascading failures due to query pattern changes. Diagnosing and debugging such complex errors increases the maintenance cost.

Eventual Consistency

The current architecture cannot support strong index consistency in the current model. Since the API layer is generic with pluggable storage and search backends, we are exploring other NewSQL databases to be used with Ixia that can support strong consistency and reduce the complexity of async indexing pipeline.

We currently support eventual index consistency with p99 latency of a few milliseconds and SLA of 99.5%. The small number of failed index updates are logged on disks and attempted at a later time using in-house pipelines which move data from disks to a data lake using pub-sub systems . We run periodic offline jobs to read these failed indexed requests from the data lake and try again using the online path.

Acknowledgement

We would like to thank the entire Storage and Caching team, especially Kevin Lin and Lianghong Xu. We would also like to thank our Search Infra team, especially Haibin Xie. Finally, we would like to thank all of our clients for their support and feedback.

To learn more about engineering at Pinterest, check out the rest of our Engineering Blog, and visit our Pinterest Labs site. To view and apply to open opportunities, visit our Careers page.

--

--