Using Kafka to throttle QPS on MySQL shards in bulk write APIs

Pinterest Engineering
Pinterest Engineering Blog
8 min readNov 25, 2019

Qi Li | Software Engineer, Real-time Analytics

At Pinterest, backend core services are in charge of various operations on pins, boards, and users from both Pinners and internal services. While Pinners’ operations are identified as online traffic because of real-time response, internal traffic is identified as offline because processing is asynchronous, and real-time response is not required.

The services’ read and write APIs are shared between traffic of these cases. The majority of Pinners’ operations on a single object (such as creating a board, saving a Pin, or editing user settings through web or mobile) are routed to one of the APIs to fetch and update data in datastores. Meanwhile, internal services use these APIs to take actions on a large number of objects on behalf of users (such as deactivating spam accounts, removing spam Pins).

To offload internal offline traffic from APIs so online traffic can be handled exclusively with better reliability and performance, write APIs should support batch objects. A bulk write platform on top of Kafka is proposed and implemented. This also ensures internal services like QPS are supported more efficiently, without being restricted to guarantee high throughput. In this post, we’ll cover the characteristics of internal offline traffic, the challenges we faced and how we attacked them by building a bulk write platform in backend core services.

Datastores and write APIs

At Pinterest, MySQL is one major datastore to store content created by users. To store billions of Pins, boards and other data for hundreds of millions of Pinners, many MySQL database instances form a MySQL cluster, which is split into logical shards to manage and serve the data more efficiently. All data are split across on these shards.

To read and write data efficiently for one user, the data is stored in the same shard so that APIs only need to fetch data from one shard without fan-out queries to various shards. To prevent any single request from occupying MySQL database resource for a long time, every single query is configured with timeout.

All write APIs of core services were built for online traffic from Pinners at the beginning and work well as only a single object is accepted because pinner operates on a single object most of the time) and the operation is lightweight. Even when Pinners would take bulk operation, e.g. move a number of Pins to a section one board, the performance is still good because the number of objects isn’t very big and write APIs can handle them one by one.

Challenges

The situation changes as more and more internal services use existing write APIs for various bulk operations (such as removing many Pins for a spam user within a short period of time or backfilling a new field for a huge number of existing Pins). As write APIs can only handle one object at a time, much higher traffic with spikes is seen in these APIs.

To handle more traffic, autoscaling of the services can be applied but does not necessarily solve the problem completely because the capacity of the system is restricted by the MySQL cluster. With the existing architecture of MySQL cluster, it’s hard to do autoscaling of MySQL cluster.

To protect the services and MySQL cluster, rate limiting is applied to write APIs.

Although throttling can help to some extent, it has several drawbacks that prevent backend core services from being more reliable and scalable.

  1. Both online and offline traffic to an API affect each other. If the spike of internal offline traffic happens, online traffic to the same API is affected with higher latency and downgraded performance, which impacts the user’s experience.
  2. As more and more internal traffic is sent to the API, rate limiting needs to keep bumping carefully so that APIs can serve more traffic without affecting existing traffic.
  3. Rate limiting does not stop hot shards. When internal services write data for a specific user, e.g. ingest a large number of feed pins for a partner, all requests are targeting the same shard. The hot shard is expected because of spike of requests in a short period of time. The situation gets worse when update operations in MySQL are expensive.

As internal services need to handle a big number of objects within a short period of time and do not need a real-time response, requests that target to the same shard can be combined together and handled asynchronously with one shared query to MySQL to improve efficiency and save bandwidth of connection resource of MySQL. All combined batch requests should be processed at a controlled rate to avoid hot shards.

Bulk write architecture

The bulk write platform was architectured to support high QPS for internal services with high throughput and zero hot shards. Also, migrating to the platform should be straightforward by simply calling new APIs.

Bulk write APIs and Proxy

To support write (update, delete and create) operation on a batch of objects, a set of bulk write APIs are provided for internal service, which can accept a list of objects instead of a single one object. This helps reduce QPS dramatically to the APIs compared to regular write APIs.

Proxy is a finagle service that maps incoming requests to different batching modules, which combine requests to the same shard together, according to the type of objects.

Batching Module

Batching module is to split a batch request into small batches based on the operation type and object type so one batch of objects can be processed efficiently in MySQL, which has timeout configured for each query.

This was designed for two major considerations:

  • Firstly, write rate to every shard should be configured to avoid hot shards as shards may contain different numbers of records and perform variously. One batch request from proxy contains objects on different shards. To control QPS accurately at shards, the batch request is splitting into batches based on targeting shards. ‘Shard Batching’ module splits requests by affected MySQL shards
  • Secondly, each write operation has its own batch size. The operations on different object types have different performance because they update a different number of various tables. For instance, creating a new Pin may change four to five different tables, meanwhile updating an existing Pin may change two tables only. Also, an update query to tables may take various lengths of time. Thus, a batch update for one object type may experience various latencies for different batch sizes. To make batch update efficient, the batch size is configured differently for various write operations. ‘Operation Batching’ further splits these requests by types of operation.

Rate Limiter with Kafka

All objects in a batch request from the batching module are on the same shard. Hot shard is expected if too many requests are hitting one specific shard. Hot shard affects all other queries to the same shard and downgrades the performance of the system. To avoid the issue, all requests to one shard should be sent at a controlled rate thus the shard will not be overwhelmed and can handle requests efficiently. To achieve this goal, one ratelimiter needed for every shard and it controls all requests of the shard.

To support high QPS from internal clients at the same time, all requests from them should be stored temporarily in the platform and processed at a controlled speed. This is where Kafka makes a good fit for these purposes.

  1. Kafka can handle very high qps write and read.
  2. Kafka is a reliable distributed message storage system to buffer batch requests so that requests are processed at a controlled rate.
  3. Kafka can leverage the re-balancing of load and manage consumers automatically.
  4. Each partition is assigned to one consumer exclusively (in the same consumer group) and the consumer can process requests with good rate-limiting.
  5. Requests in all partitions are processed by different consumer processors simultaneously so that throughput is very high.
P: partition C: consumer processor

Kafka Configuration

Firstly, each shard in MySQL cluster has a matching partition in Kafka so that all requests to that shard will be published to the corresponding partition and processed by one dedicated consumer processor with accurate QPS. Secondly, a large number of consumer processors are running so that one or two partitions at maximum are assigned to one consumer processor to achieve maximum throughput.

Consumer Processor

The Consumer processor does rate-limiting of QPS on a shard with two steps:

  • Firstly, how many requests that a consumer can pull from its partition at a time is configured.
  • Secondly, consumer consults with the configuration for shards to get the precise number of batch requests that one shard can handle and uses Guava Ratelimiter to do rate control. For instance, for some shards, it may handle low traffic because hot users are stored in that shards.

Consumer processors can handle different failures with appropriate actions. To handle congestion in the threadpool, the consumer processor will retry the task with configured back off time if threadpool is full and busy with existing tasks. To handle failures in MySQL shards, it will check the response from MySQL cluster to catch errors and exceptions and take appropriate action on different failures. For instance, when it sees two consecutive failures of a timeout, it will send alerts to system admin and will stop pulling and processing requests with a configured wait time. With these mechanisms, the success rate of request processing is high.

Results

Several use cases of internal teams have been launched to bulk write platform with good performance. For instance, feed ingestion for partners is using the platform. Many improvements are observed in both the time spent and the success rate of the process. The result of ingesting around 4.3 million Pins is shown as follows.

Also, the hot shard is not seen during feed ingestion any more, which has caused a lot of similar issues before.

What’s next

As more internal traffic is separated from existing write APIs to new bulk write APIs, the performance of APIs for online traffic sees improvement, like less downtime, lower latency. This helps make systems more reliable and efficient.

The next step for the new platform is to support more cases by extending existing operations on more object types.

Acknowledgments

Thanks to Kapil Bajaj, Carlo De Guzman, Zhihuang Chen and the rest of the Core Services team at Pinterest! Also special thanks to Brian Pin, Sam Meder from the Shopping Infra team for providing support.

--

--