To support the Walmart Search, a Full Index is generated periodically, and incremental updates are applied via real-time stream processing. Together they keep the Walmart search index current. The Full Index is implemented as a Spark based batch job, that does a full table scan on the underlying Item Store (Apache Cassandra). The requirement for Full Index generation was to capture the current state of the entire Walmart Item Catalog and that it needs to conform to strict SLAs. However, it was observed that the full table scan using the Apache Spark Cassandra connector had very unpredictable SLAs, primarily due to the following factors:
- Imbalanced partitions and size of the partition scanned
- Number of SSTables
The following shows the unpredictable times for Full Index Generation when we started this effort:
We set out with the following goals:
- Make the index generation conform to predictable SLAs
- Bring down the SLA of Full Index job, preferably to under two hours
The Full Index generation involves a full table scan on the Cassandra based Item Store and for each item in the result set, issue a set of point queries on different datastore[s] to build the item for indexing.
Before we look at the solution, let us understand why this initial design exhibited this behavior. Due to sudden spurts in data, the number of SSTables increased accordingly. Read latencies were impacted as both memtables and SSTables had to be read. Compaction took more time as well as the data increased. All these factors contributed to the degraded performance of the full table scan.
The bottleneck for full index generation was due to the full table scan stage and the unpredictability in latencies was isolated to a few partitions in Cassandra that are enormously imbalanced. This resulted in a few tasks (reading from those imbalanced partitions) that took forever and resulted in unpredictable SLAs. Point queries (based on specific item) was working fine and didn’t have much impact on the variability in execution times. The item store was managed by a different team at Walmart and they were supporting various applications including search and tuning Item Store specifically for the search use case was not an option.
The solution that we came up with leverages a secondary cache that is optimized for Full Index use case and will store only the product IDs. The requirements for this cache are:
- Provide alternate solution for full table scan with predictable SLAs
- Support persistence in addition to caching
- Support high throughput
- Provisioning and management of secondary cache is completely under our control
- Can be optimized for search use case
- Need to keep in sync with source of truth
Redis for secondary cache met all our requirements. Redis is a proven industry solution that provides high throughput and reliability and adds minimal operational overhead.
Advantages of Redis:
- High throughput
- Support rich data types
- Operations are atomic
- In memory datastore with persistence
It is a very common pattern for applications to leverage multiple data sources optimized for specific need[s]. The solution was to leverage a secondary cache (Redis) that will be kept up to date via a combination of bootstrap job and a streaming job. Full index will leverage this cache for full table scan and will use Item Store for point queries only. The challenge now is to keep multiple datastores in sync. This is achieved via a change data capture mechanism, which captures committed changes from Item Store in near real time and publishes them to a Kafka stream. The new flow is as shown in the diagram below:
A bootstrap job could do a one-time migration of Product IDs from Item Store to this cache. For incremental updates of newer items, a change data capture mechanism that captures committed changes from Item Store in near real time and publishes an event to a Kafka topic. Streaming job could ingest the events and populate the cache with newer Product IDs and handle deletion of existing items.
We could leverage the full scan capability of a secondary cache like Redis. Redis full scan will be inexpensive especially since the data we plan to store would fit in memory.
The following results capture how long it took for the Item Store Cassandra based full table scan versus the new Redis based full table scan. To maintain apples to apples comparison, both these runs were started around the same time. This would ensure external factors like load on Cassandra is same for both these runs. This also allows us to compare the output product set is the same between these runs.
The results above confirmed we met both our goals — the runs had predictable SLAs and were under 2 hours. It has been a few months since the launch of this optimization. Here is how it is performing as of this post:
In the next part of this series, we will explore ongoing improvements to the Walmart Search Indexing pipelines. If you are interested in solving similar problems and contributing to Walmart Search, please reach out to us.