Using Google Cloud to Serve 10,000s of Personalized Recs Per Second
This article is co-authored by my great tech lead Mike Hurwitz.
From the beginning, Bluecore has run on the Google Cloud Platform (GCP). When it came time to build a high-performance service, it was only natural that we looked to leverage the GCP infrastructure, rather than building it ourselves. By using Kubernetes (GKE), Redis (MemoryStore), Bigtable, and other GCP technologies, we were able to build a service with sufficient bandwidth and latency to send 10,000s of personalized recommendations per second.
What is a product recommendation?
You’ve most likely received an email tempting you to buy a product you left in your online shopping cart, and it probably also came with a few other recommended products. How did we come up with these products to recommend the consumer? At Bluecore, we create product recommendations in two ways: via a product or via the consumer’s behavior. For example, a product may be recommended based on one that was recently viewed or previously purchased. Bluecore has developed powerful data models to calculate the customer’s next best purchase or the types of products a consumer may prefer.
Mapping products to users is not as simple a process as it seems. We evaluate rules about which products we can insert into the email at runtime. For example, to exclude products that are out of stock or on discount. Since it is important that we only show valid products, we must have an up-to-date product catalog. Products change constantly for any given retail store. Therefore, keeping our products up-to-date requires a service of its own.
Though email is our primary product, Bluecore also provides real-time product recommendations on retailer’s websites, which introduces much tighter latency bounds than email would otherwise require. Onsite recommendations forced us to rethink our recommendations infrastructure. The recommendations service was built with one of Bluecore’s engineering values in mind, to build things “as simple as possible, as powerful as necessary.” Throughout the evolution of the recommendations service, we have learned valuable lessons about how to leverage GCP tools to reach our latency and bandwidth goals while also saving money.
Where we started
Three components go into making an email: the template (design outline of the email), the audience (the list of email addresses that will receive an email), and the recommendations (the values derived from the data science models per recipient we have information on). The templates live in Cloud Datastore, our audiences are generated in BigQuery, and our recommendations are in Datastore. Emails are generated in Google App Engine Standard (GAE), and we query Cloud Datastore as each email (sometimes up to several millions of rows) is formatted. Since we started out only serving emails, latency was not an issue, but bandwidth was.
So, is this good enough?
Well, no. As we mentioned before, we now support recommendations shown while the consumer is on the website. This means latency does matter now. The shift from email-only to both on-site and email services forced us to re-evaluate our architecture. Onsite recommendations require that we have a much tighter SLO of 300–400 ms. This is about all you get before a website starts to feel slow. Recommendation fetching is only part of the load time given that other services, network, and transit to the browser also take time.
Audience (list of email addresses) generation and personalization (mapping an address with their recommendations) were separate, which means that templates were limited to data appended to the rows of the audience query. If the type of recommendation were personalized within a single template, the audience query would be unmanageable.
Goals
In the new version of the recommendation service we want to do five basic things:
- Support onsite recommendations (p95<100ms)
- Serve >50MM personalized emails/hour (20kHz)
- Remove product filtering load from email pipeline
- Support experiments for pre-calculated recommendations
- Avoid burning a hole in our wallets
Naive implementation
With our new goals in mind, and in the spirit of being “as simple as possible, as powerful as necessary,” let’s build a naive implementation of the service. We know we want a gRPC service, our recommendations data comes from BigQuery/Datastore, and our product data lives in Datastore. Our recommendations service, written in Go, supports runtime personalization because now we feed the products and recommendations data to a service and give those recommended products to the personalization service.
As powerful as necessary?
Datastore is expensive and Bigquery can’t meet our latency requirements. So again, no. Even if we exported all of our recommendations to Datastore, we still wouldn’t hit our latency goals. That’s okay though, we can’t blame a hammer for being a bad screwdriver, we just need the right tool!
Liberating recommendations
Let’s focus on how we can improve the “recs” portion of our diagram. Recommendations were written to BigQuery or Google Cloud Storage (GCS), then some were loaded into Datastore. Instead, we could load all of our recommendations from BigQuery and GCS into Bigtable. The release of the BigQuery Storage API made pulling massive volumes of data from BigQuery fast. Our fixed schema for recommendations kept the scope manageable.
Why Bigtable?
When thinking about improving performance, many times the answer is to implement caching. In our case, this isn’t a good solution because a recipient’s recommendations data is most likely only read once before it is invalidated. Cache-miss means that we would read from the source, costing us latency, money, or both as in our previous system diagram. Bigtable, however, performs well when hit hard. Even with only three instances handling 5,000 requests, per second, we saw latency of 15–20 ms (p95). Our bigquery2bigtable task, run in GKE and orchestrated with Airflow, is able to load (read and write) 150–200K rows per second.
As powerful as necessary?
Although we improved our recommendation part of our pipeline, we are still lacking with products. Datastore is still burning a hole in our wallet, and we are still failing our on-site latency requirements.
Accelerating products
Let’s talk about latency. Our products are ~2KB of data (they contain information about various attributes like green, shirt, short-sleeved, etc.). Datastore latencies are 46.5 ms + 4.2 ms/key. This isn’t great, but not disqualifying. Memorystore latencies are 0.09 ms + 0.033 ms/key. This is outstanding! Memorystore takes home the belt.
Product update listener
Unlike recommendations, the same products are used thousands of times in a single batch of emails, so caching is a good fit — we will read the same product many times before it is invalidated. To handle this, we created a GKE-based service, written in Go, called the product update listener.
Product catalogs are maintained by our GAE application. Two updates for the same product may come in simultaneously, with the last write to Datastore winning. That makes cache invalidation a challenge. We could have had a short time-to-live (TTL) on cached items, but that would have cost a lot of runtime efficiency unless we had some kind of gate to keep all the recommendation service nodes from trying to fetch from Datastore at the same time. Instead, the product update listener receives notifications from AppEngine via Pub/Sub when a product is updated. This happens after the update has been committed to Datastore. The listener then pulls product data from Datastore and updates the cached value. Values not present in the cache are ignored. This is more complicated than short expirations but makes the cache far more effective. Though we could theoretically have no TTL on the cache at all, we set it to two weeks. The expectation is that each active product will likely receive at least one update in that time and stay hot, while inactive products will age out of the cache eventually.
Zstandard compression of product protobufs
Our cache holds objects in Datastore transmitted as protobuf. While products across partners have different schemas, within a single retailer there is a lot of consistency and repetition. That made products, which average 2KB, a prime candidate for compression. However, most compression schemes do not do well on small files. The authors of Zstandard recognized this problem and added features specifically to improve performance on small files. The chart below shows that popular compression schemes were only able to achieve approximately 1.8:1 compression on a set of a few thousand product protobufs. Using tar, we can get a sense of how good it could possibly get, even though a single archive for all products can’t be used in production: 1:5.44 using gzip, 1:8.67 using xz (lzma). By training dictionaries per retailer, compressing individual objects with Zstandard nearly matches our idealized and unrealistic test with tar+gzip. This means we only need a fifth of the cache storage and a fifth of the bandwidth. Zstandard is also cheap to decompress, so the additional CPU required was negligible.
As powerful as necessary?
Utilizing GCP tools has improved our performance tremendously. Recommendations now come from Bigtable, giving us back literally seconds of latency. Our product cache hit rates are usually above 99.9% because of the long TTL enabled by the proactive refresh from the product update listener. We also improved performance by adding a small LRU cache with a short TTL, keeping the Memorystore traffic down to a dull roar. So, finally, YES! It is as powerful as necessary.
Proving it
Let’s prove that we meet our goals. We deployed our service to a GKE cluster, cloned and anonymized data, created attacker clients in Go, and collected metrics for the results. Thanks to the power of GKE, we were able to scale both the service and the attacking clients with a single command. The entire run, including rescaling the components was scripted and hands-off.
32 cores, 256 attacker threads: 94ms (p95)
(2 cores per instance)
The main takeaway from this graph is that we were able to handle well above our bandwidth goal of 20 kHz. Woo hoo!
Summary
The process of creating a recommendation service that meets our goals shows the importance of knowing the strengths of the tools that your service or application is using. GCPs breadth of offerings enabled us to build an incredible service with just a handful of engineers. Here is an overview we use GCP tools in the recommendation service:
Rec source: BigQuery, Google Cloud Storage
Rec storage: Cloud Bigtable
Product storage: Cloud Datastore
Product caching: Memorystore
Async transport: Google Cloud Pub/Sub
Sync transport: protobuf+HTTP/1.1, gRPC+HTTP/2
Runtime environment: Google Kubernetes Engine