Box Tech Blog
Published in

Box Tech Blog

Scaling Box Search Using Lumos

Illustrated by Jeremy Nguyen / Art directed by Sarah Kislak

Search is one of the fundamental ways to discover content in Box. In order to power this core functionality all user content and related metadata needs to be indexed. This results in an inverted index which is hundreds of terabytes in size distributed across multiple clusters. Every action performed by a user on any document triggers a search index update. As a result, the search infrastructure at Box needs to support thousands of requests per second while ensuring high availability and fast response times for queries. The search index grows every day by millions of documents and with it the scaling challenges of our search infrastructure. In this article we will discuss some of these challenges and how we, at Box, are addressing them.

Box Search Architecture

Search infrastructure at Box consists of two main pipelines:

  • Indexing Pipeline: Responsible for processing user update actions on documents in order to ensure that the search index is up-to-date for all content and metadata.
  • Query Pipeline: Responsible for serving user requests by looking up the search index to find relevant content for the query.
Figure 1: Search Architecture

When you upload a new file to Box or perform any action on existing ones, it generates an event which is queued into our centralized event queuing system. Our indexers (Search Workers) subscribe to these event streams to receive file update events. The workers extract the indexable text for the uploaded documents along with any associated metadata. The document is then indexed to a specific shard in a cluster based on the hash of the file_id. We run multiple Solr clusters across different availability zones for disaster recovery and high availability. Each cluster consists of multiple nodes, each hosting multiple shards. The clusters themselves are replicas of each other as shown below.

Figure 2: Solr Cluster Deployments

The Search Workers are also responsible for writing the file content and metadata for the file to HBase which is our offline store. This store is used to build new clusters or rebuild existing ones without requiring us to go back to the live application database.

When you search for something on Box, the request is served by our query engine, Cerebro. It comprises of a federation layer which is responsible for query routing and load balancing across the Solr clusters. It is also responsible for enforcing query timeouts and retrying failed or timed-out requests. Finally, it collects the responses from all the shards and re-ranks them based on various external insights such as recency, Box Graph as well as machine learning based ranking models. These reranked responses are then returned back to the user after hydration and permissions filtering.

Finally, we run a cluster manager service called Sops (Search Ops) which is responsible for ensuring all serving clusters are healthy. When hosts go down or shards become unhealthy, Sops notifies the Cerebro to reroute traffic away from these shard(s). This ensures users do not experience any degradation in search experience due to intermittent shard/host failures. We also use Sops to re-index clusters and perform other operational and maintenance tasks.

Scaling challenges

As Box’s customer base continues to grow, so does our index size and request throughput. In the last year alone we have doubled our cluster capacity. But even with increased capacity, users started complaining about elevated latencies for their search queries and the team was constantly firefighting outages. There was simply too much load on the clusters. What worked well before was starting to fail and at the core of all this was our sharding strategy.

Since the clusters were sharded by file_id, we needed to fanout every query to all shards and wait for them to respond back with hits before returning the top relevant results to the user. This is because, at query time, we do not know apriori which files will match the user’s request.

Figure 3: Query Fanout Issue

This lead to some major drawbacks:

  • The user queries were bottlenecked by the slowest machine/shard in the cluster.
  • Each shard has to process all queries even though most shards may not have relevant information in their indexes. This led to unnecessary load and wasteful consumption of resources on the shards.
  • Every time we had to scale the cluster by adding new machines, the query fanout increased. This impacted p95 query latency and really crippled the horizontal scalability of our infrastructure.
  • Expensive queries from a particular user/enterprise impacted all shards in the cluster and hence every other Box user.
  • The number of shard replicas for any given enterprise could only be expanded by adding an entire new cluster, which is prohibitively expensive.
  • Relevance for any given enterprise was impacted due to Solr ranking needing to work on many small data sets spread out over a large number of shards, which were then combined up the stack.

In order to operate at scale and meet the availability and latency requirements of search at Box, we needed to rethink our sharding strategy i.e. how we distribute the user documents across shards.

Enterprise based Sharding

As mentioned in the previous section, the main issue with sharding by file_id was that all queries had to be fanned out to all shards in a cluster. Using filter queries in Solr, we could reduce some of the index lookup overhead, especially on shards which barely have any content for that query. But still, a lot of compute is wasted on each shard to simply parse, evaluate and expand the query.

The only way to reduce this wasteful consumption of resources was to eliminate shards which do not have any relevant content. Since search results can only include those documents that the user has access to, the permissions model can be helpful in reducing the query fanout. As users mostly have access to documents within their enterprise, grouping all such documents to a single shard would potentially reduce the query fanout to just that one shard. Unfortunately that is not always true due to the following reasons:

  • Users can have external collaborations across enterprises.
  • Some enterprises may be so large that they wont fit on a single shard.

The first concern can be handled easily by fetching all collaborated enterprises for the user and then fanning out the query to the shards where they exist. The second one is a bit tricky as we also need to account for enterprise growth over time. As the content for a given enterprise exceeds the capacity of a shard it should be spilled to another shard and so forth. Hence the allocation of enterprises to shard(s) needs to be dynamic rather than predictive.

Key Range Partitioning

Figure 4: Key Range Partitioning

Range Partitioning is a commonly used sharding approach for various databases and key-values stores. The idea is to use some pre-determined unique key per object to create a range which can then be partitioned and assigned to physical instances or nodes. The most lucrative benefit of this sharding scheme is its ability to scale by splitting large regions as they grow into smaller ones and reassigning them to different nodes. This is exactly how HBase does region splitting. We took this idea and applied it to our search index to solve our large enterprise problem. Using a sharding key based on hash of enterprise_id, folder_id and file_id as shown below we are able to create a key range by sorting the sharding keys in byte order.

Figure 5. Key Design

Since the higher order bits of the sharding key are formed from the hash of enterprise_id, all files belonging to the same enterprise would be grouped together in the sorted order. Then we can split this key space into equal sized chunks or logical shards based on available disk capacity and desired disk utilization. Finally we can assign these logical shards to physical nodes in a cluster.

Shard Routing

The routing information can be easily encapsulated by just storing the max sharding keys for each shard in their sorted byte order. Along with that, we also store the physical shard_id that this logical shard maps to. This sharding scheme can now easily accommodate newly indexed files. All we need to do is find the position of the new file’s sharding key in the byte ordered max sharding keys using binary search. For example if the new sharding key lies between the 3rd and the 4th max sharding keys then the file should be indexed to the 4th shard. Then using the physical shard id for the 4th shard we can resolve the destination shards physical address and index the document.

Figure 6: Shard Routing

At query time we know the enterprise id of the user and all the enterprises that user is collaborated with. Using this information we construct the minimum sharding key starting with the hash of the given enterprise id. Then using binary search and linear probing we can narrow down the shards where the relevant index for the user resides. The query is then routed to only these shards rather than to all shards in the cluster.

Shard Assignment

A crucial part of this sharding scheme is the assignment of shards to nodes in the cluster. If a node contains multiple shards of a given enterprise, then it is susceptible to hot-spotting from that enterprise. Moreover if that node goes down, that enterprise would be significantly impacted as compared to others. Hence we need to assign shards in such a way that:

  • No two logical shards of the same enterprise end up on the same node in the cluster
  • Overall query throughput across all nodes in the cluster is balanced

We solved this by applying a modified bin-packing algorithm which allocates shard to nodes based on their estimated historical QPS(queries-per-second) such that the overall QPS across all nodes in the cluster is uniformly distributed. For example if an enterprise has high QPS historically then all shards on which the enterprise resides will also have higher cumulative QPS compared to other shards. Then if we apply bin-packing based on cumulative shard QPS, no two shards of that enterprise will end up on the same host. In this way we are able to avoid hot-spotting by optimally distribute shards across hosts. Note that this does not completely prevent hot-spotting, just reduces the risk of it happening. For time when it does happen we have rate-limiting.

Introducing Lumos !

Lumos is our core routing engine that implements the sharding strategy we discussed above. It consists of two kinds of components:

  • Lumos Controller: Every service that needs to interact with the search clusters creates an instance of the controller. The controller in-turn routes the requests (index / query) to the correct shard(s).
  • Lumos Coordinator: The coordinator runs in our cluster manager and is responsible for communicating updates to cluster configurations and shard boundaries to all live controllers via zookeeper. It is also responsible for collecting relevant metrics and ensuring all controllers up and using the correct configurations.
Figure 7: Lumos Architecture

The sharding-keys for files are persisted to a metastore at index time. Scanning the sharding keys in sorted byte order gives us the key-range which is used to generate the shard boundaries for a cluster. This information is then persisted to disk. Since the shard boundaries are only a few kilobytes in size, it can be read at service start-up by the Lumos Controllers and cached in-memory. When shard boundaries are updated for some cluster, the coordinator notifies the controller instances, via zookeeper, to refresh the cached information. Thus we can avoid reading expensive disk reads during shard resolution.

The Lumos Coordinator is also responsible for performing following actions:

Shard Boundary Generation

As discussed earlier, the key-range needs to be sliced into logical shards and then assigned to physical nodes using bin-packing. When a new cluster needs to be built or an existing one needs to be scaled, we use the lumos co-ordinator to kick off a MapReduce job to scan the key range and split it into shards of approximately equal size.

Initially we started with splits based on document counts. But quickly we realized that this can lead to very skewed shad sizes due to large variation in document size across enterprises. We needed something that could more closely estimate the on-disk inverted-index footprint of documents.

For this purpose we created a linear regression model, trained on the content and metadata size of files on a given shard and its actual shard size. The model learnt the weights associated with the content and metadata components so as to predict the actual shard size.

Figure 8: Regression Model for shard size impact

Now we can generate shard boundary splits so that the estimated size of all shards is approximately equal. This approach resulted in only 10% variation in actual shard size across the cluster.

Shard Spilling

One of the major issues with enterprise based partitioning is non-uniform shard growth. Once a cluster is taking live index traffic, each shard grows at a different rate depending on the enterprise(s) on that shard. As a result some shards will quickly exceed disk capacity whilst others will barely grow leading to suboptimal utilization of resources across the cluster. In order to address this issue we implemented Shard Spilling. When any node on a cluster reaches a specified disk utilization threshold, the Lumos Coordinator spills the shards on that host to other hosts on the same cluster which have the most available disk capacity. This information is persisted to HBase and also communicated to the Lumos controllers. The controllers in-turn spill any index request bound for the original shard to the new shard(s). At query time the controller fans out the requests to both the original and the spilled shards. In this way we are able to get much better disk utilization across the cluster without requiring a full cluster rebuild which is expensive and requires manual intervention.

Key Outcomes

Deploying Lumos addressed some of the core issues with our existing infrastructure as well as opened up opportunities for further improving our overall operational posture.

Figure 9: p95 Query latency improvement

Below are some of the key wins from Lumos:

  • Significantly improved availability of search by reducing load on shards and impact of a shard or node going down.
  • p95 latencies for search queries reduced by more than 50% due to significantly smaller query fanout
  • Over 20% reduction in disk usage due to index compression from d-gaps
  • Significant improvement in query recall due to better idfs from indexing documents from an enterprise on a single shard.
  • Enabled us to serve results from partial clusters (if required) by rerouting queries away from bad shards to other clusters while continuing to serve queries from the good shards. This was not possible before since every query was fanned out to all shards in a cluster.
  • Drastically reduced load on all shards in the cluster as well as impact of high throughput queries. This enables us to increase out rate-limits as the clusters can take more load.
  • Greater ease of scaling and rebalancing as we can now reindex specific shards without reindexing the whole cluster. This is also helpful in failure scenarios where a particular node goes down.


One of the major challenges that we faced when releasing this to production was index hot-spotting. As mentioned earlier we are indexing thousands of documents per second. This load was earlier being distributed uniformly across all shards due to the file_id based sharding scheme. But now the shards are more susceptible to large uploads from a single enterprise. This is often the case when a new enterprise is on-boarded. We addressed this problem with a two pronged approach:

Rate Limiting

Whenever we detected very high index rate in terms of bytes/sec for a given shard or high failure rates while indexing to that shard, we marked the incoming objects for retry with some exponential backoff. This allowed the shards to recover from bursts in indexing throughput. But unfortunately this also caused a build-up of indexing backlog in our queuing system, especially when the bursts were sustained.

Super Shards

In the key range partitioning based approach, most enterprises end up fitting on a single shard except the largest enterprises. With super shards we try to ensure that every enterprise is spread across at least k physical shards. Super shards are disjoint subsets of physical shards across different nodes in the cluster. The super shard size ‘k’ is chosen to be much smaller than the number of shards in the cluster but more than the number of shards on a single node. Now, during shard resolution, the Lumos controller expands the target shard identified from the key-range lookup to its entire super-shard. So instead of requests going to a single shard they are fanned out to all shards in that super-shard. This obviously increases query fanout a little but helps eliminate most of our index hot-spotting issues.

In practice, we observe some index rate limiting from time to time but there is negligible backlog in most cases. The impact to query latency or overall CPU on the cluster was also negligible due to the increase in fanout.

Future Work

Lumos unlocks a lot of potential for further improving availability of our search clusters as well as reducing operational overhead. Below are some of the enhancements that our Search team is planning to work on in the future:

  • Move Lumos to a virtual partitioning approach wherein multiple small logical shards are generated and then assigned to physical nodes in a cluster. The assignment to physical nodes can be done using consistent hashing approach along with multi-parameter bin-packing. This will ensure that we can easily scale our clusters without having to regenerate shard boundaries. The multi-parameter bin-pack will look at some additional aspects such as index growth rate and enterprise spread to ensure we can handle index hot-spotting more gracefully and dynamically.
  • Implement automated cluster recovery mechanisms by cloning index files from replica nodes in other clusters which have the same Lumos configurations. This should enable us to drastically reduce mean time to recover from node failures.
  • One drawback of shard spilling is that it eventually requires a full cluster rebuild when there is no more capacity left on the existing nodes. Since full cluster rebuilds can be very expensive and time consuming we intend to implement cluster scaling by splitting largest shards in the cluster and moving them to newly assigned nodes. This is similar to HBase region splitting and will potentially help reduce overhead of cluster scaling significantly as well as remove any manual intervention.

Lumos is a prime example of the kind of large scale infrastructure projects the Search team at Box is working on. If you are interested in similar problems we would love to have you on our team. To learn more please checkout our careers page.



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store