Wormhole : A pub-sub system at Facebook

Ameya
Coinmonks
8 min readSep 27, 2018

--

A wormhole shortcut in space-time. Credit: Shutterstock

Introduction

Publish-Subscribe systems solve the problem of communicating changes in system efficiently. Whenever a certain data is changed, a publisher informs subscribers about this change. In a classical application that uses a database for storing state, producers of data may write to a database. In order to gather these updates, subscribers then face the dilemma of polling infrequently at the cost of operating on stale data for some time or polling often at the cost of performance overhead on the database itself. Pub-sub systems solve this problem by generally introducing an intermediate broker. Publishers publish to the broker about changes and subscribes register with the broker for getting change notifications.

In Facebook, many applications depend on such change notifications. Newsfeed may want to receive the latest updates, friends may want to access the notification content. In addition, even internal systems such as index modification or cache invalidation can benefit by knowing about changes in the underlying data or to invalidate data that is not valid anymore. This obviously generates a ton of data internally. This data is internally sharded and stored across many clusters using systems such as RocksDB, HDFS, MySQL etc. For Facebook, to use such a broker based system would mean that the broker would need to support Facebook’s internal systems and scale to very high volume and also there would be replication at a large scale. To address these limitations Facebook proposed a new system called wormhole.

In wormhole, subscribers register with the data systems directly. All data systems maintain transaction logs. Wormhole uses these logs to identify updates and then propagate those updates to the subscribers. This is quite unique and completely obviates the need for an intermediate broker, but it means publishers may need to be specialized for different data systems. In this post, we will cover the high level architecture of wormhole.

Challenges at Facebook for a pub-sub system

As you can imagine, a lot of data gets generated at Facebook. For reliability, this data is sharded and geo-replicated across data centers. In addition, there are many data systems at facebook — Tao which optimizes read heavy graph data and memcache for read-heavy workload. Such systems use aggressive caching for offloading database access. On a write, these cached copies need to be invalidated or if there is an index on the database then that needs to be rebuilt. In addition, a pub-sub system may need to support the following:

  1. In order delivery of updates from the same shard
  2. At least once delivery — No updates are ever lost, but some duplicates might be delivered
  3. Different consumers may have different storage and speed of consumption needs.
  4. Like with any big distributed systems, it needs to be fault tolerant and software and hardware exceptions are the norm.

Architecture

Wormhole architecture with many-to-many mapping between publishers and subscriber shards.

Wormhole consists of a datastore that is scaled to address needs of facebook. Each such datastore, MySQL or TAO runs a publisher. This is the Wormhole publisher. This publisher knows how to consume the content written to the transaction logs by the producers of data. Publisher then converts this native data-store format to a key-value format that is standardized and then delivers it to subscribers. A publisher finds the subscribers by reading the zookeeper(distributed coordination service like chubby) configuration file for that data store. A subscriber or application gets a client side wormhole library which registers with zookeeper. One interesting aspect of subscribers is that, for a given application, multiple subscribers exist and they are assigned to deal with some shard of the overall data by the publisher. Publishers assign ownership of specific shards to specific subscribers and then send updates of shards to the corresponding subscribers. For the given shard, all updates always land on the same subscriber as long as it is available. Publishers ensure that all updates from the given shard are delivered in-order to the subscribers.

It is good to introduce some terminology at this point which will be useful in subsequent sections.

Datastore stores a collection of shards for the dataset.

Flow: A stream of updates for every shard that is sent by the publisher to the subscriber. It contains the shard number in the every update.

Datamarker: It indicates the last successfully acknowledged flow by the subscriber. This is a pointer into the transaction log of the given datastore. Publisher stores the datamarkers to ensure at least once delivery of data to subscribers.

Reader: Component that funnels the reads from the transaction log to the publisher. Explained more in next section.

Caravan and leader: A reader and associated flow is called a caravan. Since multiple readers can be reading from the transaction log at the same time, the reader that has read the farthest is called the lead caravan.

Delivering updates to the subscribers

A publisher reads the zookeeper file for discovering which applications are interested in updates. For each shard and application pair, it creates a stream of updates i.e. flow. Once in a steady state, a reader is reading from the most current location and sending out updates. The reader helps with managing the transaction log access in an efficient manner. When recovery is in progress, reading from multiple data markers i.e. positions in the log might be necessary. To avoid thrashing the disk, reader can manage the data access more efficiently. Hopefully this explains the concept of caravan and the lead caravan.

In addition, publisher tries to not overwhelm the subscriber. So publisher can publish the given flow to a lightly loaded subscriber using a weighted random selection. In addition, the subscribers can update the zookeeper file to divert away load to some other subscribers. Once publisher establishes the flow between caravan and subscriber, all the updates are sent over TCP for reliable in-order delivery. Wormhole also multiplexes many updates over the same connection to reduce connection overhead.

Multiple Caravan and Slow consumers

Generally the lead caravan is satisfying most updates and reading from the most current time. But either during recovery or when there are slow consumers, multiple caravans are needed to send the old updates. Allocation of caravans to flows is dependent on I/O load vs latency requirements. If there were a lot of caravans then they can access disk pretty arbitrarily and cause heavy I/O load. But they unblock later updates in the same flow much more quickly. On the other hand, if there are less number of caravans, then I/O load can be optimized better by batching the disk access. But this may cause increase in latency due to longer queues — specially for the later or most recent updates. Wormhole allows for caravans to split and merge depending on these two parameters. Typically, non-lead caravans are asked to read at faster speed so that they can catch up with lead caravan. Also there are configuration limits to avoid excessive disk usage by limiting speed of reads, number of caravans etc.

Filtering

A nice feature is for application/subscriber to specify what it would like the publisher to filter. This can be done via configuration using simple AND/OR based filtering rules. Publisher will not send these updates to the subscriber applications. Popular types of filters are: Is key in a specific range? Does key have a specific value? Negation of such membership queries.

Reliable Delivery with single publisher

Reliable delivery is important for wormhole because critical processes such as index generation depend on it. Missing an update on an index, would obviously cause an inconsistent state of index. Hence wormhole publisher uses datamarkers with updates it is sending to subscribers. When clients acknowledge the markers, publishers write the datamarkers(offset into the transaction log) to a persistent log. If a publisher process dies and comes back up, it can start reading from the last acknowledged datamarker and send updates from there on. This can obviously cause duplicates, but subscribers own the responsibility of dealing with duplicates. If subscriber fails, then publisher can use the same mechanism to send updates from the last acknowledged marker. Usage of TCP ensures in-order delivery.

Reliable Delivery with publisher failover

Most of the data in facebook is geo-replicated for reliability. Subscribers can register with multiple copies of the dataset. When publisher faces a hard failure like machine not coming up, another publisher needs to help with the recovery. In this mode, wormhole guarantees at least once delivery of all updates from some dataset. This poses several challenges.

When a publisher experiences a permanent failure, generally all the datamarkers that are stored on the same machine also become unavailable. Also publishers don’t communicate with each other to avoid overhead. To overcome these limitations, wormhole uses zookeeper to store datamarkers.

Consider the following diagram of a geo replicated database. There are two publishers P1 and P2. P1 initially owns the flows for A1. As P1 updates A1 subscribers, it is storing datamarkers in zookeeper. In addition, each publisher gets a corresponding ephemeral node which other publishers watch. Ephemeral node in distributed coordination is something that disappears once its owner of the node goes away. In this case once P1 dies, P1 ephemeral node may disappear and P2 will get a notification of it via Zookeepr API. Now P2 can take over the ownership of the flows and start publishing to A1 via stored datamarkers. One more implementation detail here is that datamarkers are pointers into the transaction logs which cannot be used directly by P2. Hence wormhole adds logical offsets to help with that.

Publisher failover monitored by backup publishers that are watching ephemeral zookeeper nodes

Some of the operational improvements include using a distributed deployment model using a monitor on the same machine as the publisher instead of a centralized deployment model. In addition, all the configuration is made dynamic such that it doesn’t always need a restart of the process for the new configuration to take effect.

Comparable pub-sub systems

Most comparable pub-sub systems like Kafka, RabbitMQ use an intermediate brokers. These brokers can store and forward messages depending on the speed of the consumers. This is not needed in Facebook as their datastores already keep a reliable log. In addition the data scale makes development of an intermediate broker equally challenging and may add additional latency.

Google’s Thialfi only works with version numbers and is used for invalidation of data. While wormhole can be used for data refills since it sends data updates. Kafka can lose data if broker fails and runs an order of magnitude slower than wormhole. Apache Hedwig doesn’t scale with too many publishers and subscribers. Various message queue implementations do not scale as well for facebook with limitations on geo replication or they are not just generic enough.

Conclusions

Although more complex, Wormhole provides a pretty unique and efficient mechanism to implement pub-sub systems in data intensive and high throughput environments using transaction logs which i found to be pretty different than how traditional pub-sub systems are implemented.

Get Best Software Deals Directly In Your Inbox

--

--