Pacer: Pinterest’s New Generation of Asynchronous Computing Platform

Pinterest Engineering
Pinterest Engineering Blog
8 min readMay 23, 2023

Qi Li | Software Engineer, Core-Services; Zhihuang Chen | Software Engineer, Core-Services; Ping Jin | Engineer manager, Core Services

Client to Enqueue to Pinlater Thrift Service to Backend DataStore to Dequeue Broker Service. Helix to zookeeper to Dequeue Broker Service. Workpool to Dequeue to Dequeue Broker Service.

At Pinterest, a wide range of functionalities and features for various business needs and products are supported by an asynchronous job execution platform called Pinlater, which was open-sourced a few years ago. Use cases on the platform span from saving Pins by Pinners, to notifying Pinners about various updates, to processing images/videos etc. Pinlater handles billions of job executions on a daily basis. The platform supports many desirable features, like at-least-once semantics, job scheduling for future execution, and dequeuing/processing speed control on individual job queues.

With the growth of Pinterest over the past few years and increased traffic to Pinlater, we discovered numerous limitations of Pinlater, including scalability bottleneck, hardware efficiency, lack of isolation, and usability. We have also encountered new challenges with the platform, including ones that have impacted the through-put and reliability of our data storage.

By analyzing these issues, we realized some issues such as lock contention and queue-level isolation could not be addressed in the existing platform. Thus, we decided to revamp the architecture of the platform in its entirety, addressing identified limitations and optimizing existing functionalities. In this post, we will walk through this new architecture and the new opportunities it has yielded (like a FIFO queue).

Recap of Pinlater

Pinlater has three major components:

  1. A stateless Thrift service to manage job submission and scheduling, with three core APIs: enqueue, dequeue, and ACK
  2. A backend datastore to save the job, including payloads and meta data
  3. Job workers in worker pools to pull jobs continuously, execute them, and send a positive or negative ACK for each job depending on whether the execution succeeded or failed
Client to Enqueue to Pinlater Thrift Service, Worker Pool to Dequeue/Ack to Pinlater Thrift Service, and Pinlater Thrift Service to Backend DataStore
Pinlater High Level Architecture

As Pinlater handles more use cases and traffic, the platform does not work as well. The exposed issues include, but are not limited, to:

  1. As all queues have one table in each datastore shard and each dequeue request scans all shards to find available jobs, lock contention happens in the datastore when multiple thrift server threads try to grab data from the same table. It becomes more severe as the traffic increases and thrift services scale up. This degrades the performance of Pinlater, impacts throughput of the platform, and limits the scalability.
  2. Executions of jobs impact each other as jobs from multiple job queues with different characteristics are running on the same worker host. One bad job queue could bring the whole worker cluster down so that other job queues are impacted as well. Additionally, mixing these jobs together makes performance tuning nearly impossible, as job queues may require different instance types.
  3. Various functionalities are sharing the same thrift services and impact each other, but they have very different reliability requirements. For example, enqueue failure could influence site-wide SR as enqueuing jobs is one step of some critical flows while dequeue failure just results in job execution delay, which we can afford for a short period of time.

Pacer — The New Architecture

To achieve better performance and resolve the issues mentioned above, we revamped the architecture in Pacer by introducing new components and new mechanisms for storing, accessing, and isolating job data and queues.

Client to Enqueue to Pinlater Thrift Service to Backend DataStore to Dequeue Broker Service. Helix to zookeeper to Dequeue Broker Service. Workpool to Dequeue to Dequeue Broker Service.
Pacer High Level Architecture

Pacer is composed of the following major components:

  1. A stateless Thrift service to manage job submission and scheduling
  2. A backend datastore to save the jobs and its meta data
  3. A stateful dequeue broker service to pull jobs from datastore
  4. Helix with Zookeeper to dynamically assign partitions of job queues to dequeue broker service
  5. Dedicated worker pools for each queue on K8s to execute the jobs

As you can see, new components, like a dedicated dequeue broker service, Helix, and K8s are introduced. The motivation of these components under the new architecture is to solve issues in Pinlater.

  1. Helix with Zookeeper helps manage assignment of partitions of job queues to dequeue brokers. Every partition of a job queue in the datastore will be assigned to a dedicated dequeue broker service host, and only this broker host can dequeue from this partition so that there is no competition over the same job data.
  2. Dequeue broker service takes care of fetching data of job queues from datastore and caches them in local memory buffers. The prefetching will reduce latency when a worker pool pulls jobs from a job queue because the memory buffer is much faster than datastore. Also, decoupling dequeue and enqueue from thrift service will eliminate any potential impact over enqueue and dequeue.
  3. Dedicated worker pods for a job queue are allocated on K8s, instead of sharing worker hosts with other job queues in Pinlater. This completely eliminates impacts of job executions from different job queues. Also, this makes customization of resource allocation and planning for a job queue possible because of the independent runtime environment so that it improves the hardware efficiency.

By migrating existing job queues in Pinlater to Pacer, a few improvements have been achieved so far:

  1. Lock contention is completely gone in the datastore due to the new mechanism of pulling data
  2. Overall efficiency of hardware utilization has significantly improved, including datastore and worker hosts.
  3. Job is executed independently in its own environment, with customized configuration, which has improved performance (as compared to that of Pinlater).

Key Improvements

As shown above, new components are introduced in Pacer to address various issues in Pinlater. A few points are worth mentioning with more details.

Job Data Sharding

In Pinlater, every job queue has a partition in each shard of the datastore cluster no matter how much data and traffic of a job queue. There are a few problems with this design.

Three separate cylinders representing shard 1, shard 2 and shard n. All shards have 3 job queues.
  1. Resources are wasted. Even for job queues with small volumes of data, a partition is created in each shard of the datastore and may hold very little data or no data at all. As the thrift service needs to scan every partition to get enough jobs, this results in extra calls to the datastore. Based on the metrics, more than 50% of calls get empty results before getting data.
  2. Lock contention becomes worse in some scenarios, like when multiple thrift service threads compete for little data of a small job queue in one shard. The datastore has to use its resources to mitigate lock contention during data querying.
  3. Some functionalities can’t be supported, e.g. job executions of a job queue in chronological order of enqueueing time (FIFO), as workers pull jobs from multiple shards simultaneously, and no global order can be guaranteed but only local order.

In Pacer, the following improvements are made.

Graphic displays improvements made by Pacer
  1. A job queue will be partitioned to partial shards of the datastore depending on data volume and traffic. A mapping of which shards hold data of a job queue is built.
  2. Lock contention in datastore can be addressed with the help of a dedicated layer of dequeue broker service. And the dequeue broker doesn’t need to query every datastore shard for a queue because they know which datastore shard stores partitions of a queue.
  3. Support for some functionalities is possible, e.g. execution in chronological order, as long as only one partition is created for a job queue.

Dequeue broker service with Helix & Zookeeper

The dequeue broker in Pacer addresses multiple critical limitations in Pinlater by eliminating lock contention in the datastore.

Dequeue broker is running as a stateful service, and one partition of a job queue will be assigned to one specific broker in the cluster. This broker is responsible for pulling job data from the corresponding table in a shard of datatore exclusively, and no competition between different brokers. The new way of deterministic job fetching without lock contention in Pacer resources in MySQL hosts more efficiently on actual job fetching (instead of handling lock issues).

Queue Buffer in a Broker

When a dequeue broker pulls job data from target storage, it inserts the data into an appropriate in-memory buffer to let workers get jobs with optimal latency. One dedicated buffer will be created for each queue partition and its maximum capacity will be set to avoid heavy memory usage in the broker host.

A thread-safe queue is used as the buffer because multiple workers will get jobs from the same broker concurrently, and dequeue requests for the same partition of a job queue will be processed sequentially by the dequeue broker. Dispatching jobs from the in-memory buffer is a simple operation with minimum latency. Our stats show that the dequeue request latency is less than 1ms.

Dequeue Broker Resource Management

As mentioned above, one queue will be divided into multiple partitions, and one broker will be assigned with one or multiple partitions of a job queue. Managing a large number of partitions and assigning them to appropriate brokers optimally is one major challenge. As a generic cluster management framework used for the automatic management of partitioned, replicated, and distributed resources hosted on a cluster of nodes, Helix is used for the use case of sharding and management of queue partitions.

Queue configuration manager to ZooKeeper/Helix Controller. Helix agent and Dequeue Broker to ZooKeeper/Helix Controller.

The above figure depicts the overall architecture of how Helix interacts with dequeue brokers.

  1. Zookeeper is used to communicate resource configurations between Helix controller and dequeue brokers, and other relevant information.
  2. Helix controller constantly monitors events that are occuring in the dequeue broker cluster, e.g configuration changes and the joining and leaving of dequeue broker hosts. With the latest state of the dequeue broker cluster, the Helix controller tries to compute an ideal state of resources and sends messages to the dequeue broker cluster through Zookeeper to gradually bring the cluster to the ideal state.
  3. Every single dequeue broker host will keep reporting to Zookeeper about its liveness and will be notified when the tasks assigned to it changed. Based on the notification message, the dequeue broker host will change its local state.

Once the partition information of a queue is created/updated, Helix will be notified so that it can assign these partitions to dequeue brokers.

Acknowledgements

This work is a result of collaboration across multiple teams at Pinterest. Many thanks to the following people that contributed to this project:

  • Core Services: Mauricio Rivera, Yan Li, Harekam Singh, Sidharth Eric, Carlo De Guzman
  • Data Org: Ambud Sharma
  • Storage and Caching: Oleksandr Kuzminskyi, Ernie Souhrada, Lianghong Xu
  • Cloud Runtime: Jiajun Wang, Harry Zhang, David Westbrook
  • Notifications: Eric Tam, Lin Zhu, Xing Wei

To learn more about engineering at Pinterest, check out the rest of our Engineering Blog and visit our Pinterest Labs site. To explore life at Pinterest, visit our Careers page.

--

--