Nebula as a Storage Platform to Build Airbnb’s Search Backends

by Charles He, Soumyadip Banerjee, Tao Tao, & Krishna Puttaswamy

Image Credit: NASA/JPL (Raghvendra Sahai). This image of the Red Rectangle was produced from data in the Hubble Space Telescope (HST) archive, taken with the Wide Field Planetary Camera 2 onboard the HST as part of Guest Observer program 7297.

Last year Airbnb grew to a point that a scalable and distributed storage system was required to store data for some applications. For example, personalization data for search grew larger than what a single machine can hold. While we could rebuild just the personalization service to scale up, we foresaw other services to have similar requirements and decided to build a common platform to simplify such tasks for other service owners.

Besides the normal request/response pattern, many services have different needs such as performing periodic bulk operations to synchronize with the “source-of-truth” (e.g., an MySQL DB), bootstrapping a newly derived data source (e.g., a new search feature), consuming incremental updates from data stream (in our case Kafka), providing analytical interactions on the data, and serving the data with very low latency for site-facing traffic. As the company continues to grow, we will have more applications accumulating more and more data. Such data can provide huge value to our products if we can dig out useful signals and provide feedback to the application.

Requirements

We started with personalized search. It needs to keep rapidly growing history of user behavior. It requires real-time user actions to be recorded and available immediately to help personalize search results (and improve other products). A data snapshot needs to be provided so that other applications can use it (e.g. for analytics or validation). It needs periodic compactions to aggregate and potentially truncate the older history, plus bulk load a new batch of features (computed offline) back to the system.

These requirements are common across many applications in the company. Therefore we decided to build a general storage platform to support such requirements, and to help service owners focus on their specific business logic. We aim to satisfy the following concrete requirements with this system:

  1. Low latency operations (on the order of milliseconds) to serve site-facing traffic
  2. Fresh, incremental updates for real-time data stream ingestion
  3. Bulk operations on data in a convenient and efficient way
  4. Scalable to the company’s growth in data size and throughput
  5. Minimum operational cost

Note that we define “bulk operations” as the operations to snapshot and compact the full repository, to replace the existing snapshot with a new snapshot for serving, to merge a complete new signal into the repository, to replace an existing signals with a new set of data, and any other operations on the full data set. Nebula is the platform we built to fulfill these requirements at the storage level.

What is Nebula?

Nebula is a schema-less, versioned data store service with both real-time random data access and offline batch data management. It is an independent service with a dynamic data store for delta data (updates in a recent period of time), and a static data store for snapshot data which supports bulk operations. We chose to use DynamoDB for the dynamic data store (mainly because it has very low latency reads, and the operational cost is minimal since it is hosted by AWS), and HFileService (a scalable static storage built in-house at Airbnb which can partition and serve pre-computed HFile-formatted data from local disks) to serve static snapshots.

Storage abstraction for random data access

Nebula provides an unified API for the underlying physical storages. The API is provided to the application as a generic key-value store API with the delta and static data being merged internally so each application does not need to develop their own mechanism to handle real-time and batch data separately. Therefore, it has the flexibility to migrate to a different physical storage without changing the API and applications built on top of it.

Nebula uses a versioned tabular schema for storage abstraction, similarly to BigTable and HBase. A versioned tabular storage makes it more convenient for service owners to define their data model in comparison to raw key-value stores. Versions can also be used to do conflict resolution and track the time series data when needed. There is no limitation on how many versions an application can store for each row and column.

Nebula supports atomic operations at <row, column, version> level. Concurrent writes to the same <row, column> have different versions so they can be sorted appropriately. Each column have their own versioning and all writes are appended directly to individual columns (sorted by version). The user gets random access to retrieve one or more versions for a given <row, column>. Requests against multiple columns and/or rows can also be combined into a single multi-request.

Using personalization data as an example, the data model looks like this:

Each row represents data associated with a user, each column represents a type of user interaction (a.k.a. user events) like previous searches, and each version is the value of the user event at the event timestamp. In production we have many user events, and each event column might accumulate a huge amount of events with different timestamps.

To serve a search request, the search backend looks up the required events data for the given user, and uses the personalized data in ranking models to decide ordered listings shown to the user. Since this is on the search request path, we have very strict requirements on latency and availability.

The data can be updated in per-cell basis by incremental data stream (containing individual user events) and in bulk mode by offline pipeline (compacting the history and/or bootstrapping/merging/replacing a new column).

Built-in support for bulk data processing

Nebula runs an offline pipeline for each repository to snapshot the delta store, merge with previous snapshots, and load new snapshots back for serving. Since such jobs are executed separately from the online service, it has minimal impact on site-facing traffic.

The pipeline is highly configurable based on requirements. For example, each application can define their own policy on how to merge old and new data (e.g., new data overwrites old data, aggregates them by versions, throws away old data, etc.), how to compact history (e.g., keeps N versions, remove data older than certain period of time etc.), how to schedule the pipeline execution, and so on.

Nebula also provides well-defined interfaces for users to define their customized data and load into the system automatically. The user can put their data at a shared location, update some Nebula configurations, and the pipeline will pick up and merge the data into the system for serving.

An application owner can also hook their customized logic into the pipeline to fulfil their special requirements on the data snapshot. Their logic will be executed on the latest snapshot data, therefore, it can be done in a very efficient way.

In the personalized search case, we utilize the offline pipeline for the following:

  1. Regular snapshots and delta/static data merges.
  2. Compact and filter stale events based on per column requirements to keep the data size under control.
  3. Offline features extraction to create new features, and bulk load new features into the storage.
  4. Customized logic to validate user events, and sanity check statistics.

All personalization data are versioned, so whenever there is a data problem, Nebula can rollback to previous good snapshot and discard any bad data before a version (timestamp). The logic of rollback belongs to the application, but the bulk interface in Nebula make it very easy to implement.

Architecture

Here is the overall architecture of the system (shown below) and a few highlighted design choices.

A nebula read query looks up two data stores. While the delta data store holds only the latest data, the snapshot store holds full snapshots. Both stores serve read-queries, but only the dynamic store accepts write requests. The snapshot store is updated by switching the underlying snapshots. Co-ordination of data stores is handled by Zookeeper.

DynamoDB as dynamic data store

We choose DynamoDB because of the low-latency requirement, but it is possible to switch to a different physical storage (such as HBase) for different requirements. To be an underlying storage for Nebula, a physical storage only has to support a primary key and a sorted secondary key. Since they are different implementations of the same interfaces, switching the physical storage is transparent to any applications built on top of the system (and service users).

We do not plan to build yet another physical storage, so relying on DynamoDB as the low-level storage layer has allowed us to very quickly build a robust system.

The streaming input data are written to the dynamic store; it allows random updates and supports very high QPS for updating. The read latency from DynamoDB is low so we can satisfy the overall latency requirements with a median of 10 milliseconds. One optimization we did to keep DynamoDB table sizes manageable is to partition data into new tables per day. Therefore, each of our tables occupies only a few DynamoDB shards and provides good QPS guarantees.

HFileService as batch data store

The most recent snapshot is served in a dedicated HFileService cluster, from which Nebula composes a live view of the data together with dynamic updates.

HFileService serves the static HFiles (format of the snapshot) from local disks directly, which can provide very low-latency and high throughput. Furthermore, the data loading process has minimal impact on read traffic, so the offline data merge operation does not influence real-time access of the data.

HFileService partitions the data through a dynamic sharding mechanism, so it is horizontally scalable along with the total data size. Since it is static data, the replication policy is very easy and can be adjusted based on traffic growth.

Offline pipeline for snapshot, compaction, and custom logic

Nebula supports both random online data access and bulk operations. The bulk operations do not affect online access. The figure below illustrates Nebula’s offline architecture.

The delta store periodically dumps data as batch updates to a distributed file system (Amazon S3). Upon completion of the dumping, an offline Spark job starts to merge all historical data together with the batch updates. We often have other offline generated data, such as machine learning features, that we want to bulk upload into the system. This is also done at the merge stage. A new snapshot is created by combining batch updates, historical data, and the customized offline data. We add sanity checks in the process to prevent bad data from being imported into our system.

The newly created snapshot stays on S3 for the next round of merging. It also gets loaded into our historical data store. Throughout the whole dump-merge-load process, real time store continues to serve the delta data dumped into S3, and it drops them only after the new snapshot is successfully loaded into the historical data store. This guarantees the read queries always get a complete data by combining real-time and historical store.

The full snapshot on S3 is used for other offline data analytics.

Streaming Updates Output

Beyond random access and batch processing through snapshots, Nebula also provides an update stream to provide instant access to data changes for the application. This is made possible by DynamoDB supporting update streams through the streams API. A separate component consumes the streams in a Kinesis consumer and publish it into specific Kafka streams, so any service interested can subscribe to it.

Another Case: Search Indexing Infrastructure

After launching Nebula, we also rebuilt Airbnb’s Search Indexing using Nebula. Let’s first talk about why it needed rebuilding.

Since Airbnb largely uses Rails/MySQL for the front-end, Search Indexing listened (and still does) to changes on Database tables, maintained a cache of the current search index documents, and updated the search instances with the new document if anything changed. There was uncertain performance due to features being loaded by polling loaders and periodic syncs with source-of-truth for data consistency. New Search machines could bootstrap their index by slowly streaming from the cache.

The following requirements were decided for improving the system:

  1. Low latency operations end to end (less than 1s median)
  2. Able to consume features built offline by bulk jobs and merge into the search index
  3. Able to also consume real-time features
  4. Generating the index offline (It should also be able to shard the index into shards offline)
  5. Quick rollback of indices in case of bugs
  6. Quickly bring up new search instances for scaling
  7. Auditability of the search index document changes
  8. Scalable to index data growth

The Nebula system was a perfect fit as we could do all of the above with its features. A versioned tabular storage meant that we could have auditable index documents, and support for bulk jobs meant we could generate the index offline (and merge listing features in) and directly deploy to Search. Because the index is built and deployed based on snapshots, we could quickly roll back in case of bad index data. The generated index could be used by new search instances to startup quickly (by just downloading the index).

The figure above shows the search indexing architecture with Nebula. The data snapshot is generated daily as a part of the offline data merge. The index builder job operates on this snapshot to build the sharded index which is then deployed to search periodically like an ordinary binary deploy. The system uses common features provided by Nebula, and only needs to implement customized logic related to search indexing.

Future Plans

We have built a couple of services on top of Nebula including the search indexing pipeline we just talked about, the personalization infrastructure, and Airbnb’s pricing service data repository. It is serving multiple TBs of data for each application, with median latency ~10ms. We want to encourage other teams to build more applications served by Nebula.

We also plan to integrate the system deeply with our data warehouse, i.e., storing historical snapshots into Hive, sharing more data stream consuming logic, etc. The goal is to make the data available and consistent for analytics and making the system easier to manage and interact, such that it becomes much easier for developers to build their applications.

Acknowledgements

There are many people that made this work possible. We’d like to thank Alex Guziel for his significant contributions to this project, also thank Jun He, Liyin Tang, Jingwei Lu for their generous help, and many people across Search, Application Infrastructure, Data Infrastructure, Product Infrastructure and other teams who helped in many ways.

Check out all of our open source projects over at airbnb.io and follow us on Twitter: @AirbnbEng + @AirbnbData