Adding Functionality to Google Pub/Sub: Queue Meta Processing
Zendesk Connect is an event driven, messaging service which uses Google’s Pub/Sub queues with autoscaling workers to do most of the processing. One example thread of processing is to send emails, with a rate limit of the total number of emails sent per minute, but only sending the last email every hour to each user. Each of these steps can be abstracted as a ‘Queue Meta Processor’ so they can be chained together generically.
We’ve had some issues implementing workers on top of Pub/Sub, specifically related to our initial assumptions about how queues work in general.
When working with message driven systems based on Google’s GCP Pub/Sub, some basic axioms need to be kept in mind:
- Processing must be idempotent as delivery is at-least-once
- Messages are generally ordered, but it’s not guaranteed
- The maximum message timeout is 10 minutes
- Message retention is 7 days
- Nack’d messages are generally redelivered immediately
This is specifically for Pub/Sub, but some of these approaches will apply to other queues too.
We’ve come up with a few general approaches for dealing with these specific aspects of Pub/Sub.
While message ordering isn’t guaranteed, messages are generally processed FIFO. To create different priorities for messages, we have separate topics, subscriptions and workers for each priority.
Retry with blocking wait
When a message fails with an error that can be retried, we nack it. This can result in workers just continually failing and retrying the message at the head of the queue in certain failure modes; if a datastore is temporarily unavailable, for instance.
We added a short sleep before the nack request to apply some back-pressure, and a small wait before the message is retried.
At most once processing
Some processing cannot be made idempotent, for example requests to some external systems. Because Pub/Sub delivery is at-least-once, we built a mechanism that guarantees that processing will only be executed once per unique identifier. This mechanism is backed by Bigtable and uses ReadModifyWrite.AppendValue, to guarantee atomicity.
It also allows for the processing to indicate that a retry is possible.
Without transactions between our various data stores, we’ve found that some processing requires coordination between simultaneous messages. We created durable pessimistic locks, again backed by Bigtable using NewCondMutation to guarantee atomicity.
Pub/Sub doesn’t provide a way to schedule delivery of messages, and the maximum message timeout is 10 minutes. Our service needs a way to schedule processing for longer than the 7 day retention time. We also need this feature at a much higher scale than job schedulers typically support. Google Cloud Tasks wasn’t available at the time, so it wasn’t considered as an option.
We built a queue worker service which forwards messages after a delay, backed by Bigtable. Essentially it accepts messages from a Pub/Sub queue, and stores them in Bigtable in the scheduled order. Another process polls for messages which should be delivered, and publishes them to the target queue. The Bigtable table is sharded to utilise Bigtable’s performance better.
Our service has a feature which requires only the last message within a time frame to be delivered, per unique identifier. This is sometimes called debounce.
We built another queue worker service built around our delay service to achieve this. It takes messages from a queue, storing metadata for that message and then sending it back into the debounce service via the delay service. Once the message comes back, we forward it to the target queue if no other message for that identifier was seen within the time frame.
Retry with exponential backoff
Exponential backoff is useful for retrying requests to external systems. So we built a mechanism that reuses our delay service to add delay to retries of message processing. To keep track of the number of attempts, we used message attributes, so that the messages data itself is left alone. This also allows us to better keep track of the number of retries for each message.
Some smaller scale jobs needed to be run on a set schedule, so we built a simple cron style scheduler to publish certain messages at set times to trigger processing.
Another feature we need is to rate limit certain processing, in particular requests to external systems.
We’re planning to build another queue worker service to provide this generic throttling at the level of messages. It will likely need the exponential backoff mechanism to retry message later, as well as the option to drop messages once the limit is reached.
By building the above components, we’ve come to an approach which allows for generalised queue meta processing. It matches the scale of Pub/Sub with Bigtable as a store of messages.
The ‘Meta Message’ wraps the ‘Target Message’ as it only needs the bytes that will be sent and the name of the queue, the ‘Target Queue’. This way it doesn’t need to know anything about the content of that ‘Target Message’. The ‘Queue Meta Processor’ uses Bigtable as its data store, and can optionally forward the ‘Target Message’ to the ‘Target Queue’.
By making these message processors generic, they can be composed together, allowing us to easily build even higher level components. The initial example of Emailer into Debounce into Throttle into Email Sending Queue can be abstracted into a single logic service called ‘Email’.