MemQ: An efficient, scalable cloud native PubSub system
Ambud Sharma |Tech Lead and Engineering Manager, Logging Platform
The Logging Platform powers all data ingestion and transportation at Pinterest. At the heart of the Pinterest Logging Platform are Distributed PubSub systems that help our customers transport / buffer data and consume asynchronously.
In this blog we introduce MemQ (pronounced mem — queue), an efficient, scalable PubSub system developed for the cloud at Pinterest that has been powering Near Real-Time data transportation use cases for us since mid-2020 and complements Kafka while being up to 90% more cost efficient.
For nearly a decade, Pinterest has relied on Apache Kafka as the sole PubSub system. As Pinterest grew, so did the amount of data and the challenges around operating a very large scale distributed PubSub platform. Operating Apache Kafka at Scale gave us a great deal of insight on how to build a scalable PubSub system. Upon deep investigation of the operational and scalability challenges of our PubSub environment, we arrived at the following key takeaways:
- Not every dataset needs sub-second latency service, latency and cost should be inversely proportional (lower latency should cost more)
- Storage and Serving components of a PubSub system need to be separated to enable independent scalability based on resources.
- Ordering on read instead of write provides required flexibility for specific consumer use cases (different applications can have different for same dataset)
- Strict partition ordering is not necessary at Pinterest in most cases and often leads to scalability challenges.
- Rebalancing in Kafka is expensive, often results in performance degradation, and has a negative impact for customers on a saturated cluster.
- Running custom replication in a cloud environment is expensive.
In 2018, we experimented with a new type of PubSub system that would natively leverage the cloud. In 2019, we started formally exploring options on how to solve our PubSub scalability challenges and evaluated multiple PubSub technologies based on cost of operations as well as reengineering cost for existing technologies to meet the demands of Pinterest. We finally landed at the conclusion that we needed a PubSub technology that built on the learnings of Apache Kafka, Apache Pulsar, and Facebook LogDevice, and was built for the cloud.
MemQ is a new PubSub system that augments Kafka at Pinterest. It uses a decoupled storage and serving architecture similar to Apache Pulsar and Facebook Logdevice; however, it relies on a pluggable replicated storage layer i.e. Object Store / DFS / NFS for storing data. The net result is a PubSub system that:
- Handles GB/s traffic
- Independently scales, writes, and reads
- Doesn’t require expensive rebalancing to handle traffic growth
- Is 90% more cost effective than our Kafka footprint
The secret of MemQ is that it leverages micro-batching and immutable writes to create an architecture where the number of Input/output Operations Per Second (IOPS) necessary on the storage layer are dramatically reduced, allowing the cost effective use of a cloud native Object Store like Amazon S3. This approach is analogous to packet switching for networks (vs circuit switching, i.e. single large continuous storage of data such as kafka partition).
MemQ breaks the continuous stream of logs into blocks (objects), similar to ledgers in Pulsar but different in that they are written as objects and are immutable. The size of these “packets” / “objects,” known internally in MemQ as a Batch, play a role in determining the End-to-End (E2E) latency. The smaller the packets, the faster they can be written at the cost of more IOPS. MemQ therefore allows tunable E2E latency at the cost of higher IOPs. A key performance benefit of this architecture is enabling separation of read and write hardware dependending on the underlying storage layer, allowing writes and reads to scale independently as packets that can be spread across the storage layer.
This also eliminated the constraints experienced in Kafka where in order to recover a replica, a partition must be re-replicated from the beginning. In the case of MemQ, the underlying replicated storage only needs to recover the specific Batch whose replica counts were reduced due to faults in case of storage failures. However, since MemQ at Pinterest runs on Amazon S3, the recovery, sharding, and scaling of storage is handled by AWS without any manual intervention from Pinterest.
Components of MemQ
MemQ client discovers the cluster using a seed node and then connects to the seed node to discover metadata and the Brokers hosting the TopicProcessors for a given Topic or, in case of the consumer, the address of the notification queue.
Similar to other PubSub systems, MemQ has the concept of a Broker. A MemQ Broker is a part of the cluster and is primarily responsible for handling metadata and write requests.
Note: read requests in MemQ can be handled directly by the Storage layer unless the read Brokers are used
The Governor is a leader in the MemQ cluster and is responsible for automated rebalancing and TopicProcessor assignments. Any Broker in the cluster can be elected a Governor, and it communicates with Brokers using Zookeeper, which is also used for Governor election.
The Governor makes assignment decisions using a pluggable assignment algorithm. The default one evaluates available capacity on a Broker to make allocation decisions. Governor also uses this capability to handle Broker failures and restore capacity for topics.
Topic & TopicProcessor
MemQ, similar to other PubSub systems, uses the logical concept of Topic. MemQ topics on a Broker are handled by a module called TopicProcessor. A Broker can host one or more TopicProcessors, where each TopicProcessor instance handles one topic. Topics have write and read partitions. The write partitions are used to create TopicProcessors (1:1 relation), and the read partitions are used to determine the level of parallelism needed by the consumer to process the data. The read partition count is equal to the number of partitions of the notification queue.
MemQ storage is made of two parts:
- Replicated Storage (Object Store / DFS)
- Notification Queue (Kafka, Pulsar, etc.)
1. Replicated Storage
MemQ allows for pluggable storage handlers. At present, we have implemented a storage handler for Amazon S3. Amazon S3 offers a cost effective solution for fault-tolerant, on-demand storage. The following prefix format on S3 is used by MemQ to create the high throughput and scalable storage layer:
s3://<bucketname>/<(a) 2 byte hash of first client request id in batch>/<(b) cluster>/topics/<topicname>
(a) = used for partitioning inside S3 to handle higher request rates if needed
(b) = name of MemQ cluster
Availability & Fault Tolerance
Since S3 is a highly available web scale object store, MemQ relies on its availability as the first line of defense. To accommodate for future S3 re-partitioning, MemQ adds a two-digit hex hash at the first level of prefix, creating 256 base prefixes that can, in theory, be handled by independent S3 partitions just to make it future proof.
The consistency of the underlying storage layer determines the consistency characteristics of MemQ. In case of S3, every write (PUT) to S3 Standard is guaranteed to be replicated to at least three Availability Zones (AZs) before being acknowledged.
2. Notification Queue
The notification system is used by MemQ for delivering pointers to the consumer for the location of data. Currently, we use an external notification queue in the form of Kafka. Once data is written to the storage layer, the Storage handler generates a notification message recording the attributes of the write including its location, size, topic, etc. This information is used by the consumer to retrieve data (Batch) from the Storage layer. It’s also possible to enable MemQ Brokers to proxy Batches for consumers at the expense of efficiency. The notification queue also provides clustering / load balancing for the consumers.
MemQ Data Format
MemQ uses a custom storage / network transmission format for Messages and Batches.
The lowest unit of transmission in MemQ is called a LogMessage. This is similar to a Pulsar Message or Kafka ProducerRecord.
The wrappers on the LogMessage allow for the different levels of batching the MemQ does. Hierarchy of units:
- Batch (unit of persistence)
- Message (unit of producer upload)
- LogMessage (unit application interactions with)
A MemQ producer is responsible for sending data to Brokers. It uses an async dispatch model allowing for non-blocking sends to happen without the need to wait on acknowledgements.
This model was critical in order to hide the upload latencies for the underlying storage layers while maintaining storage level acknowledgements. This leads to the implementation of a custom MemQ protocol and client, as we couldn’t use existing PubSub protocols, which relied on synchronous acknowledgements. MemQ supports three types of acks: ack=0 (producer fire & forget), ack=1 (Broker received), and ack=all (storage received). With ack=all, the replication factor (RF) is determined by the underlying storage layer (e.g. in S3 Standard RF=3 [across three AZs]). In case acknowledgement fails, the MemQ producers can explicitly or implicitly trigger retries.
The MemQ Topic Processor is conceptually a RingBuffer. This virtual ring is subdivided into Batches, which allows simplified writes. Messages are enqueued into the currently available Batch as they arrive over the network until either the Batch is filled or a time-based trigger happens. Once a Batch is finalized, it is handed to the StorageHandler for upload to the Storage layer (like S3). If the upload is successful, a notification is sent via the Notification Queue along with the acknowledgements (ack) for the individual Messages in the Batch to their respective Producers using the AckHandler if the producers requested acks.
MemQ consumer allows applications to read data from MemQ. The consumer uses the Broker metadata APIs to discover pointers to the Notification Queue. We expose a poll-based interface to the application where each poll request returns an Iterator of LogMessages to allow reading all LogMessages in a Batch. These Batches are discovered using the Notification Queue and retrieved directly from the Storage layer.
Data Loss Detection: Migrating workloads from Kafka to MemQ required strict validation on data loss. As a result, MemQ has a built in auditing system that enables efficiently tracking E2E delivery of each Message and publishing metrics in near real-time.
Batch & Streaming Unification: Since MemQ uses an externalized storage system, it enables the opportunity to provide support for running direct batch processing on raw MemQ data without needing to transform it to other formats. This allows users to perform ad hoc inspection on MemQ without major concerns around seek performance as long as the storage layer can separately scale reads and writes. Depending on the storage engine, MemQ consumers can perform concurrent fetches to enable much faster backfills for certain streaming cases.
MemQ supports both size and time-based flushes to the storage layer, enabling a hard limit on max tail latencies in addition to several optimizations to curb the jitter. So far we are able to achieve a p99 E2E latency of 30s with AWS S3 Storage and are actively working on improving MemQ latencies, which increases the number of use cases that can be migrated from Kafka to MemQ.
MemQ on S3 Standard has proven to be up to 90% cheaper (avg ~80%) than an equivalent Kafka deployment with three replicas across three AZs using i3 instances. These savings come from several factors like:
- reduction in IOPS
- removal of ordering constraints
- decoupling of compute and storage
- reduced replication cost due to elimination of compute hardware
- relaxation of latency constraints
MemQ with S3 scales on-demand depending on write and read throughput requirements. The MemQ Governor performs real-time rebalancing to ensure sufficient write capacity is available as long as compute can be provisioned. The Brokers scale linearly by adding additional Brokers and updating traffic capacity requirements. The read partitions are manually updated if the consumer requires additional parallelism to process the data.
At Pinterest, we run MemQ directly on EC2 and scale clusters depending on traffic and new use case requirements.
We are actively working on the following areas:
- Reducing E2E latencies (<5s) for MemQ to power more use cases
- Enabling native integrations with Streaming & Batch systems
- Key ordering on read
MemQ provides a flexible, low cost, cloud native approach to PubSub. MemQ today powers collection and transport of all ML training data at Pinterest. We are actively researching expanding it to other datasets and further optimizing latencies. In addition to solving PubSub, MemQ storage can expose the ability to use PubSub data for batch processing without major performance impacts enabling low latency batch processing.
Stay tuned for additional blogs about how we optimized MemQ internals to handle scalability challenges and the open source release of MemQ.
Building MemQ would not have been possible without the unwavering support of Dave Burgess and Chunyan Wang. Also a huge thanks to Ping-Min Lin who has been a key driver of bug fixes and performance optimizations in MemQ that enabled large scale production rollout.
Lastly thanks to Saurabh Joshi, Se Won Jang, Chen Chen, Divye Kapoor, Yiran Zhao, Shu Zhang and the Logging team for enabling MemQ rollouts.