Google Cloud Pub/Sub Ordered Delivery

Kamal Aboul-Hosn
Oct 19, 2020 · 17 min read

The Google Cloud Pub/Sub team is happy to announce that ordered delivery is now generally available. This new feature allows subscribers to receive messages in the order they were published without sacrificing scale. This article discusses the details of how the feature works and talks about some common gotchas when trying to process messages in order in distributed systems.

Ordering Basics

Ordering in Cloud Pub/Sub consists of two properties. The first is the ordering key set on a message when publishing. This string — which can be up to 1KB — represents the entity for which messages should be ordered. For example, it could be a user ID or the primary key of a row in a database. The second property is the enable_message_ordering property on a subscription. When this property is true, subscribers receive messages for an ordering key in the order in which they were received by the service.

These two properties allow publishers and subscribers to decide independently if messages are ordered. If the publisher does not specify ordering keys with messages or the subscriber does not enable ordered delivery, then message delivery is not in order and behaves just like Cloud Pub/Sub without the ordered delivery feature. Not all subscriptions on a topic need to have the same setting for enable_message_ordering. Therefore, different use cases that receive the same messages can determine if they need ordered delivery without impacting each other.

The number of ordering keys is limited only by what can be represented by the 1KB string. The publish throughput on each ordering key is limited to 1MB/s. The throughput across all ordering keys on a topic is limited to the quota available in a publish region. This limit can be increased to many GBs/s.

All the Cloud Pub/Sub client libraries have rich support for ordered delivery. They are the best way to take advantage of this feature, as they take care of a lot of the details necessary to ensure that messages are processed in order. Ordered delivery works with all three types of subscribers: streaming pull, pull, and push.

Ordering Properties

Ordered delivery has three main properties:

  1. : When a subscription has message ordering enabled, subscribers receive messages published in the same region with the same ordering key in the order in which they were received by the service.
  2. : If a message is redelivered, then all messages received after that message for the same ordering key will also be redelivered, whether or not they were already acknowledged.
  3. : If there are messages with an ordering key outstanding to a streaming pull subscriber, then additional messages that are delivered are sent to that same subscriber. If no messages are currently outstanding for an ordering key, the service delivers messages to the last subscriber to receive messages for that key on a best-effort basis.

Let’s examine what these properties mean with an example. Imagine we have two ordering keys, A and B. For key A, we publish the messages 1, 2, and 3, in that order. For key B, we publish the messages 4, 5, and 6, in that order. With the ordering property, we guarantee that 1 is delivered before 2 and 2 is delivered before 3. We also guarantee that 4 is delivered before 5, which is delivered before 6. Note that there are no guarantees about the order of messages across different ordering keys. For example, message 1 could arrive before or after message 4.

The second property explains what happens when messages are redelivered. In general, Cloud Pub/Sub offers at-least-once delivery. That means messages may be sent to subscribers multiple times, even if those messages have been acknowledged. With the consistent redelivery guarantee, when a message is redelivered, the entire sequence of subsequent messages for the same ordering key that were received after the redelivered message will also be redelivered. In the above example, imagine a subscriber receives messages 1, 2, and 3. If message 2 is redelivered (because the ack deadline expired or because the best-effort ack was not persisted in Cloud Pub/Sub), then message 3 is guaranteed to be redelivered as well.

The last property defines where messages for the same ordering key are delivered. It applies only to streaming pull subscribers, since they are the only ones that have a long-standing connection that can be used for affinity. This property has two parts. First, when messages are outstanding to a streaming pull subscriber — meaning the ack deadline has not yet passed and the messages have not been acknowledged — then if there are more messages to deliver for the ordering key, they go to that same subscriber.

The second part pertains to what happens when no messages are outstanding. Ideally, one wants the same subscribers to handle all of the messages for an ordering key. Cloud Pub/Sub tries to do this, but there are cases where it cannot guarantee that it will continue to deliver messages to the same subscriber. In other words, the affinity of a key could change over time. Usually this is done for load-balancing purposes. For example, if there is only one subscriber, all messages must be delivered to it. If another subscriber starts, one would generally want it to start to receive half of the load. Therefore, the affinity of some of the ordering keys must move from the first subscriber to this new subscriber. Cloud Pub/Sub waits until there are no more messages outstanding on an ordering key before changing the affinity of the key.

Ordered Delivery at Scale

One of the most difficult problems with ordered delivery is doing it at scale. It usually requires an understanding of the scaling characteristics of the topic in advance. When a topic extends beyond that scale, maintaining order becomes extremely difficult. Cloud Pub/Sub’s ordered delivery is designed to scale with usage without the user having to think about it.

The most common way to do ordering at scale is with partitions. A topic can be made up of many partitions, where each stores a subset of the messages published to the topic. When a message gets published, a partition is chosen for that message, either explicitly or by hashing the message’s key or value to a partition. The “key” in this case is what Cloud Pub/Sub calls the ordering key.

Subscribers connect to one or more partitions and receive messages from those partitions. Much like the publish side, subscribers can choose partitions explicitly or rely on the messaging service to assign subscribers to partitions. Partition-based messaging services guarantee that messages within the same partition are delivered in order.

A typical partition setup would look like this:

The green boxes represent the partitions that store messages. They would be owned by the messaging servers (often called “brokers”), but we have omitted those servers for simplicity. The circles represent messages, with the color indicating the message key and the number indicating the relative order for the messages of that color.

One usually has a lot fewer partitions than there are keys. In the example above, there are four message colors but only three partitions, so the second partition contains both blue and red messages. There are two subscribers, one that consumes from the first partition and one that consumes from the second and third partitions.

There are three major issues a user may have to deal with when using partitions: subscriber scaling limitations, hot shards, and head-of-line blocking. Let’s look at each in detail.

Subscriber Scaling Limitations

Within a set of subscribers across which delivery of messages is load balanced (often called a “consumer group”), only one subscriber can be assigned to a partition at any time. Therefore, the maximum amount of parallel processing that can occur is min(# of partitions, # of subscribers). In the example above, we could load balance across no more than three subscribers:

If processing messages suddenly became more expensive or — more likely — a new consumer group was added to receive messages in a new pipeline that requires longer processing of the messages, it may not be possible to gain enough parallelism to process all the published messages. One solution would be to have a subscriber whose job is to republish the messages on a topic with more shards, which the original subscribers could consume instead:

The downside is that now both of these topics must be maintained or a careful migration must be done to change the original publisher to publish to the new topic. If both topics are maintained, then messages are stored twice. It might be possible to delete the messages from the first topic once they are published to the second topic, but this would require the migration of any subscribers receiving messages from the original topic to the new topic.

Hot Shards

The next issue is a hot shard — the overloading of a single partition. Ideally, traffic patterns across partitions are relatively similar. However, it is possible that there are a lot more messages or much larger messages hashing to one partition in comparison to messages hashing to other partitions. As a result, a single partition can become overloaded:

What can be done to deal with this hot shard? Typically, the solution is to add partitions. However, maintaining order during a repartitioning can be very difficult. For example, if we add a new partition in the case above, it could result in related messages going to completely different partitions:

With this new set of partitions, purple messages now publish to the first partition, blue messages to the third partition, and yellow and red messages to the fourth partition. This repartitioning causes several problems. First of all, the fourth partition now contains messages for keys that were previously split among the two subscribers. That means the affinity of keys to subscribers must change.

Even more difficult is the fact that if subscribers want all messages in order, they must carefully coordinate from which partitions they receive messages when. The subscribers would have to be aware of the last offset in each partition that was for a message before adding more partitions. Then, they need to consume messages up to those offsets. After they have processed messages up to the offsets in all the partitions, then the subscribers can start to consume messages beyond that last offset.

Head-of-Line Blocking

The last difficult issue is head-of-line blocking, or the inability to process messages due to the slow processing of messages that must be consumed first. Let’s go back to the original scenario:

Imagine that the red messages require a lot more time to process than the blue ones. When reading messages from the second partition, the processing of the blue message 2 could be needlessly delayed due to the slow processing of red message 1. Since the unit of ordering is a partition, there is no way to process the blue messages without processing the red messages. One could try to solve this by repartitioning in the hopes that the red and blue messages end up in different partitions. However, the processing of the red messages will block the processing of others in whichever partition they end up. The repartitioning also results in the same issues discussed in the Hot Shards section.

Alternatively, the publisher could explicitly assign the red messages to their own partition, but it breaks the decoupling of publishers and subscribers if the publisher has to make decisions based on the way subscribers process messages. It may also be that the extra processing time for the red messages is temporary and doesn’t warrant large-scale changes to the system. The user has to decide if the delayed processing of some messages or the arduous process of changing the partitions is better.

Automatic Scaling With Ordering

Cloud Pub/Sub’s ordered delivery implementation is designed so users do not need to be subject to such limitations. It can scale to billions of keys without subscriber scaling limitations, hot shards or head-of-line blocking. As one may expect with a high-throughput pub/sub system, messages are split up into underlying partitions in Cloud Pub/Sub. However, there are two main properties of the service that allow it to overcome the issues commonly associated with ordered delivery:

  1. Partitions are not exposed to users.
  2. Subscribers acknowledge messages individually instead of advancing a partition cursor.

By taking advantage of these properties, Cloud Pub/Sub brokers have three useful behaviors:

  1. They assign subscribers to groups of ordering keys that are more fine-grained than a partition.
  2. They track publishing rates per key and scale to the appropriate number of partitions as needed, maintaining proper ordered delivery across repartitioning.
  3. They store the order of messages on a per-ordering-key basis so delivery is not blocked by messages for other keys in the same partition that have not yet been processed.

These behaviors allow Cloud Pub/Sub to avoid all three major issues with ordered delivery at scale!

Ordered delivery doesn’t come for free, of course. Compared with unordered delivery, the ordered delivery of messages may slightly decrease publish availability and increase end-to-end message delivery latency. Unlike the unordered case, where delivery can fail over to any broker without any delay, failover in the ordered case requires coordination across brokers to ensure the messages are written to and read from the correct partitions.

Using Ordered Delivery Effectively

Even with Cloud Pub/Sub’s ability to deliver messages in order at scale, there are still subtleties that exist when relying on ordered delivery. This section details the things to keep in mind when building an ordered pipeline. Some of these things apply when using other messaging systems with ordered delivery, too. In order to provide a good example of how to use ordering keys effectively, the Cloud Pub/Sub team has released an open-source version of its ordering keys prober. This prober is almost identical to the one run by the team continuously to verify the correct behavior of this new feature.

Publishing in Order

On the surface, publishing in order seems like it should be very easy: Just call publish for each message. If we could guarantee that publishes never fail, then it would be that simple. However, transient or permanent failures can happen with publish at any time, and a publisher must understand the implications of those failures.

Let’s take the simple example of trying to publish three messages for the same ordering keys A: 1, 2, and 3. The Java code to publish these messages could be the following:

String[] messages = {"1", "2", "3"};
for (String msg : messages) {
PubsubMessage message = PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8(msg))
.setOrderingKey("A")
.build();
ApiFuture<String> publishFuture = publisher.publish(message);
publishFuture.addListener(() -> {
try {
String messageId = publishFuture.get();
System.out.println("Successfully published " + messageId);
} catch (Exception e) {
System.err.println("Could not publish message "+ msg);
}
}, executor);
}

If there were no failures, then each publish call would succeed and the message ID would be returned in the future. We’d expect the subscriber to receive messages 1, 2, and 3 in that order. However, there are a lot of things that could happen. If a publish fails, it likely needs to be attempted again. The Cloud Pub/Sub client library internally retries requests on retriable errors. Errors such as deadline exceeded do not indicate whether or not the publish actually succeeded. It is possible that the publish did succeed, but the publish response wasn’t received by the client in time for the deadline, in which case the client may have attempted the publish again. In such cases, the sequence of messages could have repeats, e.g., 1, 1, 2, 3. Each published message would have its own message ID, so from the subscriber’s perspective, it would look like four messages were published, with the first two having identical content.

Retrying publish requests is complicated even more by batching. The client library may batch messages together when it sends them to the server for more efficient publishing. This is particularly important for high-throughput topics. In the case above, it could be that messages 1 and 2 are batched together and sent to the server as a single request. If the server fails to return a response in time, the client will retry this batch of two messages. Therefore, it is possible the subscriber could see the sequence of messages 1, 2, 1, 2, 3. If one wants to avoid these batched republishes, it is best to set the batch settings to allow only a single message in each batch.

There is one additional case with publishing that could cause issues. Imagine that in running the above code, the following sequence of events happens:

  1. Publish is called with message 1.
  2. Publish is called with message 2.
  3. Publish for message 1 transiently fails.
  4. Publish is called with message 3.

The result could be that messages 2 and/or 3 are successfully published and sent to subscribers without 1 having been sent, which would result in out-of-order delivery. A simple solution may be to make the calls to publish synchronous:

String[] messages = {"1", "2", "3"};
for (String msg : messages) {
PubsubMessage message = PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8("1"))
.setOrderingKey(msg)
.build();
boolean successfulPublish = false;
while (!successfulPublish) {
ApiFuture<String> publishFuture = publisher.publish(message);
try {
String messageId = publishFuture.get();
System.out.println("Successfully published "+ messageId);
successfulPublish = true;
} catch (Exception e) {
System.err.println("Could not publish message "+ msg);
}
}
}

While this change would guarantee that messages are published in order, it would make it much more difficult to publish at scale, as every publish operation would block a thread. The Cloud Pub/Sub client libraries overcome this problem in two ways. First, if a publish fails and there are other messages for the same ordering key queued up in the library’s message buffer, it fails the publishes for all those messages as well. Secondly, the library immediately fails any subsequent publish calls made for messages with the same ordering key.

How does one get back to a state of being able to publish on an ordering key when this happens? The client library exposes a method, resumePublish(String orderingKey). A publisher should call resumePublish when it has handled the failed publishes, determined what it wants to do, and is ready to publish messages for the ordering key again. The publisher may decide to republish all the failed messages in order, publish a subset of the messages, or publish an entirely new set of messages. No matter how the publisher wants to handle this edge case, the client library provides resumePublish as a means to do so without losing the scaling advantages of asynchronous publishing. Take a look at the ordering key prober’s publish error logic for an example of how to use resumePublish.

All of the above issues deal with publishing from a single publisher. However, there is also the question of how to publish messages for the same ordering key from different publishers. Cloud Pub/Sub allows this and guarantees that for publishes in the same region, the order of messages that subscribers see is consistent with the order in which the publishes were received by the broker. As an example, let’s say that both publishers X and Y publish a message for ordering key A. If X’s message is received by Cloud Pub/Sub before Y’s, then all subscribers will see the messages in that order. However, publishers do not have a way to know in which order the messages were received by the service. If the order of messages across different publishers must be maintained, then the publishers need to use some other mechanism to coordinate their publishes, e.g., some kind of locking service to maintain ownership of an ordering key while publishing.

It is important to remember that ordering guarantees are only for messages published in the same region. Therefore, it is highly recommended that all publishers use regional service endpoints to ensure they publish messages to the same region for the same ordering key. This is particularly important for publishers hosted outside of GCP; if requests are routed to GCP from another place, it is always possible that the routing could change if using the global endpoint, which could disrupt the order of messages.

Receiving Messages in Order

Subscribers receive messages in the order they were published. What it means to “receive messages in order” varies based on the type of subscriber. Cloud Pub/Sub supports three ways of receiving messages: streaming pull, pull, and push. The client libraries use streaming pull (with the exception of PHP), and we talk about receiving messages via streaming pull in terms of using the client library. No matter what method is used for receiving messages, it is important to remember that Cloud Pub/Sub offers at-least-once delivery. That means subscribers must be resilient to receiving sequences of messages again, as discussed in the Ordering Properties section. Let’s look at what receiving messages in order means for each type of subscriber.

Streaming Pull (Via the Client Libraries)

When using the client libraries, one specifies a user callback that should be run whenever a message is received. The client libraries guarantee that for any given ordering key, the callback is run to completion on messages in the correct order. If the messages are acked within that callback, then it means all computation on a message occurs in order. However, if the user callback schedules other asynchronous work on messages, the subscriber must ensure that the asynchronous work is done in order. One option is to add messages to a local work queue that is processed in order.

It is worth noting that because of asynchronous processing in a subscriber like this, ordered delivery in Cloud Pub/Sub does not work with Cloud Dataflow at this time. The nature of Dataflow’s parallelized execution means it does not maintain the order of messages after they are received. Therefore, a user’s pipeline would not be able to rely on messages being delivered in order. To ensure that one does not use Pub/Sub in Dataflow and expect ordered delivery, Dataflow pipelines that use a subscription with ordering keys enabled fail on startup.

Pull

For subscribers that use the pull method directly, Cloud Pub/Sub makes two guarantees:

  1. All messages for an ordering key in the PullResponse’s received_messages list are in the proper order in that list.
  2. There is one outstanding list of messages per ordering key at a time.

The requirement that only one batch of messages can be outstanding at a time is necessary to maintain ordered delivery. The Cloud Pub/Sub service can’t guarantee the success or latency of the response it sends for a subscriber’s pull request. If a response fails and a subsequent pull request is fulfilled with a response containing subsequent messages for the same ordering key, it is possible those subsequent messages could arrive to the subscriber before the messages in the failed response. It also can’t guarantee that the subsequent pull request comes from the same subscriber.

Push

The restrictions on push are even tighter than those on pull. For a push subscription, Cloud Pub/Sub allows only one message to be outstanding per ordering key at a time. Since each message is sent to a push subscriber via its own request, sending such requests out in parallel would have the same issue as delivering multiple batches of messages for the same ordering key to pull subscribers simultaneously. Therefore, push subscribers may not be a good choice for topics where messages are frequently published with the same ordering key or latency is extremely important, as the restrictions could prevent the subscriber from keeping up with the published messages.

In summary, ordered delivery at scale usually requires one to be very careful with the capacity and setup of their messaging system. When that capacity is exceeded or message processing characteristics change, adding capacity while maintaining order is a time-consuming and difficult process. With the introduction of ordered delivery into Cloud Pub/Sub, users can rely on order in ways they are accustomed to in a system that still automatically scales with their usage.

Google Cloud - Community

Google Cloud community articles and blogs

Google Cloud - Community

A collection of technical articles and blogs published or curated by Google Cloud Developer Advocates. The views expressed are those of the authors and don't necessarily reflect those of Google.

Kamal Aboul-Hosn

Written by

Technical Lead on Google Cloud Pub/Sub

Google Cloud - Community

A collection of technical articles and blogs published or curated by Google Cloud Developer Advocates. The views expressed are those of the authors and don't necessarily reflect those of Google.