Dynamic Cache Replication Using gRPC Streaming

Benoit Daviaud
Teads Engineering
Published in
10 min readDec 14, 2022

--

Context

As part of Teads Global Media Platform, the Buying Engine is the back-end service responsible for selecting displayed Ads. For each request, we have to select the best Ad among thousands, at a rhythm reaching several hundred thousand requests per second.

To handle such a load, the Buying Engine needs to scale horizontally. In this article, we share how we recently removed a scaling limit within the Buying Engine and were able to divide by 4 the size of our Cassandra cluster.

The problem we needed to solve

Each advertising campaign has a budget to spend and, before serving an Ad, we need to check the remaining amount. Each time we display an Ad, we increment its consumed budget. When the total budget is finally exhausted, the Ad should not be displayed anymore.

As depicted in the below diagram, the Buying Engine queries a Cassandra cluster in which the consumed Ad budgets are stored.

Using Excalidraw library

Buying Engine instances recompute the remaining budget frequently and we easily reach a million requests per second at peak time. To handle this load while keeping a response time below a dozen milliseconds, the cluster has around a hundred nodes.

The Ad business is seasonal and depends on user activity, which means that the Buying Engine load varies a lot throughout the day. There is typically a factor of 5 to 10 between the lowest and the highest load levels.

While the number of Buying Engine instances automatically scales up and down to handle this varying load, the Cassandra cluster has a fixed size. Up or down-scale operations are manual and take several hours. These sensitive operations are performed to adapt to the yearly variations rather than the daily ones.

Checking the budget is a vital task, so our cluster must be sized to handle peak load. Since the Buying Engine inbound traffic has grown year after year, it has become very inefficient to use a large Cassandra cluster that is idle most of the time.

Switching To A Push Architecture

A first solution that may come to mind to improve efficiency is to put a distributed (auto-scalable) read-through cache between Cassandra and the Buying Engine instances. It could certainly reduce the load on Cassandra, but it will not make it flat as the cache will also need to scale up to follow a large Buying Engine up the scale. In the end, the load on Cassandra would also increase, so it would only be a mitigation.

Using Excalidraw library

Let’s circle back to the business need. In fact, the Buying Engine only needs to know whether some budget remains to be spent or not. It compares the remaining to the total budget to get the budget status which actually changes very little, only twice an hour in practice!

To take advantage of this very slow change rate, let’s switch from a poll to a push architecture. This way, instead of polling one Double every second we will only send 1 Boolean every 30 minutes. This represents at least 1000 times fewer data going through the network. A huge saving in terms of network & hardware resources!

So we extracted this budget service logic in a microservice. It is solely in charge of:

  • querying Cassandra,
  • computing the budget statuses,
  • and it only pushes them to the Buying Engine instances when needed.
Using Excalidraw library

Implementing The Dynamic Cache Replication

We will now focus on how we implemented the communication between the Budget Service and the Buying Engine instances. The problem boils down to replicating a cache onto a large number of distributed processes, with the particularities that:

  • The budget status cache replicas in each Buying Engine instance do not contain exactly the same keys (also, in practice, there is a large overlap)
  • The budget statuses values change in real-time
  • The replication must occur with the smallest possible delay

Why didn’t we use traditional pub/sub solutions

To implement the push we could have used a traditional message broker but we didn’t for several reasons:

  • Adding a third-party broker means having to maintain it, manage its failures, etc.
  • In our use case, the topics are Ads. It means that we have thousands of topics and that they come and go without notice. We should not need to create and delete topics.
  • To create and maintain its budget status cache, each Buying Engine instance will need to receive a snapshot of the current value of the budget status right after subscribing, even if it was published minutes ago.
  • Last but not least, we want to compute only the needed budget statuses. The list of Ads whose budget status is needed is only known by the Buying Engine instance itself because it depends on the type of requests it receives. So it changes continuously. The only way to get this list is to let Buying Engine instances subscribe to the Ads they need the budget of, in real-time. Thus, we need a latency-efficient, two-way communication medium.

That’s why we decided to implement a lightweight custom Pub/Sub solution. Its purpose will be to replicate in many hosts a host-specific subset of a central cache, whose values change dynamically.

Transport Layer

gRPC was a natural choice for our transport layer because

  • It is widely used and known at Teads
  • It has well-supported OS Java/Scala client implementations
  • It is well supported by our cloud provider load balancer
  • And most importantly, it treats streaming as a first-class citizen.

Service Definition

Our service is pretty simple to define using Protobuf:

service AdBudgetService {
rpc SubscribeToBudgetStatus (stream BudgetStatusSubscriptionRequest)
returns (stream BudgetStatusMessage);
}

message BudgetStatusSubscriptionRequest {
int64 ad_id = 1;
bool subscription_toggle = 2;
}

enum BudgetStatus {
BudgetAvailable = 0;
BudgetExhausted = 1;
BudgetError = 2;
}

message BudgetStatusMessage {
int64 ad_id = 1;
BudgetStatus budget_status = 2;
}

When connecting to the Budget Service, a Buying Engine instance will call the SubscribeToBudgetStatus method. It takes into argument a stream in which the needed Ad subscriptions/un-subscriptions will be pushed. It returns a stream that will deliver the subscribed Ad budget status snapshots and updates.

Client implementation

We chose Akka-gRPC client because

Managing thread races

As Akka-gRPC comes with Akka-stream for modeling input and output streams, we considered using it to implement the service itself. Let’s consider this design:

A set of Custom Budget Status sources would produce the budget statuses by recomputing it periodically and pushing when it changes. The provided BroadcastHub would be used to dispatch the budget statuses to all the connected Buying Engine instances that subscribed to it. Those Consumers would be sinks sending the statuses through gRPC.

Also, the BroadcastHub seems to fit the use case, however, it misses a critical feature: sending the budget status snapshot on subscription. This would need to be handled by an external piece of code, probably, the one that will add the new consumer to the relevant BroadcastHub when a subscription occurs.

The issue with this implementation is that it does not allow synchronizing the addition of a budget status consumer with the push of the snapshot. To illustrate the problem, let’s look at this scenario:

  1. A new consumer is added to the BroadcastHub
  2. The current budget status is read from the budget status source
  3. The budget status source publishes an update and it is sent to the consumers
  4. The previously read budget status is sent to the new consumer

Steps 1, 2, and 4 will take place in the thread handling the subscription request while step 3 will be in another thread, handling the new budget status dispatch.

If we change the implementation to read the budget status before adding the consumer, we still end up in a situation where a consumer may never receive a snapshot, or receive the wrong one.

Encapsulate the Pub/Sub state in an Actor

To solve the above race conditions, we used actors. A single actor, called dispatcher, manages the complete Pub/Sub state:

  • The Buying Engine connections
  • The connections subscripted for each Ad
  • Ads latest known budget status if any

Other actors, one for each subscribed Ad, are in charge of computing the budget statuses. They are spawned and killed by the dispatcher.

The dispatcher actor will handle sequentially any of the following events and perform the related actions:

  • A connection is created: it is added to the list
  • A connection subscribes to an Ad: a computer is created for the Ad if it does not exist yet and the latest budget status, if known, is sent
  • A connection unsubscribes from an Ad: It is removed from this Ad’s subscribers and the computer actor is killed if there are no more subscribers
  • A budget status changes for an Ad: It is sent to all subscribed connections and the latest known budget status cache is updated
  • An existing connection is closed: all its subscriptions are canceled

Each time, the Pub/Sub state is mutated atomically. This guarantees that no event is lost or reordered as in the above scenario and as a consequence that our replicated caches will always remain in sync.

Working with permanent HTTP/2 connections

Manage Disconnections

One of the challenges with the above solution is that, unlike in traditional poll architecture, our application layer needs to react to transport disconnections. gRPC is based on HTTP/2 which relies itself on TCP to guarantee that no data is lost in the transport. This holds as long as the connection is alive. If for any reason, the gRPC connection is closed, the budget status cache built by the Buying Engine will become stale. In that case, the Buying Engine instance immediately clears the cache and triggers a reconnection.

Load balancing

In practice, a single instance of the Budget Service is able to serve our many Buying Engine instances. Most of the resources are consumed computing the budget statuses, not handling client requests and a small quantity of data is sent over the network. However, for reliability reasons we need to have several instances available in production. A load balancer is used between the Budget Service instances and the Buying Engine ones to ensure connections are evenly dispatched.

Because our setup implies long-lived connections through a load balancer we added to our application a periodic reconnection. This ensures that

  • Canary deployments receive connections rapidly and can be tested in a timely manner.
  • The connections stay evenly distributed

Even if the load balancer dispatches connections evenly between available instances, it can happen, for example during a rollout that, for a moment, a single instance is ready. If many Buying Engine instances are disconnected from the previous version at that same time, they will all connect to the same instance, creating an imbalance. Automatic reconnections will smooth this quickly.

Conclusion

Results

This project was rolled out last summer and has been working smoothly since then. The objective to decorrelate the Buying Engine load, in terms of the number of inbound requests, from Cassandra is perfectly achieved.

It was demonstrated during a Buying Engine rollout. During the few minutes when both versions were running, the Budget Service was able to service twice as many client connections without a visible impact on CPU usage.

Besides, the load on the Cassandra cluster was divided by more than 4, which is a nice saving.

Key Takeaways

  • Understanding the final usage of the queried data allowed us to cut a microservice out of our monolith in an optimal way, dividing by several orders of magnitude the quantity of data having to flow to the highly scaled process. This type of opportunity might be rare, but it illustrates that often the best optimizations are found thanks to a good understanding of the business rather than expertise on the tech stack.
  • If you need to replicate a dynamic cache, letting each client subscribe to the keys it actually needs, then gRPC streaming is a solution you should consider. In our experience, the actor model was a great help in solving the race conditions inherent to this type of service. The final implementation is simple and rock solid.

What’s next?

Martin Kleppmann explains in the last chapter of Designing Data-Intensive Applications that the huge popularity of web applications in recent years has sometimes made developers forget that other models exist besides a stateless client polling data from a server and ultimately a database. This project is an illustration that reversing the data flow can be a huge improvement.

For now, we only worked on the last part of the data journey. The next step will be to break the read/write boundary currently embodied by our still costly Cassandra cluster and replace the polling done on it by events pushed from our tracking chain.

Thanks to all who reviewed this article, especially, Benjamin Davy, Louis Fruleux, Remy Saissy.

Bibliography

--

--

Benoit Daviaud
Teads Engineering

Passionate Software Engineer @ Teads