Ron Pantofaro
Google Cloud - Community
8 min readJun 27, 2018

--

Note: This was authored when Pub/Sub did not support dead letter queues. Please use the product native dead letter topics : https://cloud.google.com/pubsub/docs/dead-letter-topics

Do not read further, instead go to https://cloud.google.com/pubsub/docs/dead-letter-topics

Error handling strategy for Cloud Pub/Sub and Dead letter queues

Sometimes customers ask me about dead letter queues for Cloud Pub/Sub (And they mention the fact they have that feature on some other cloud providers).

What are dead letter queues? In essence, they are where you send the messages that you fail to process normally. Let’s say you have a message queue. You’ll might want to define a policy that upon failure to process a certain message for N times, the queue will stop redelivering the message to the subscribers and instead publish it to a different queue. This other queue holds the faulty messages (hence dead letter). What is that queue good for? Well, you’ll might want to track the faulty messages, log and monitor them , or perhaps fix some code and consume them again and deal with them. That is the classical usage of a dead letter queue.

Implementing the dead letter mailman

In this post we will discuss a strategy that can be implemented in your consumers for dead letter queues.

What you will need

  1. Your Cloud Pub/Sub topic and subscription.
  2. A key value store. It can be redis (have you checked our new Cloud Memory Store that’s in beta now?), it can be memcache, bigtable, or sql, depends on your potential throughput and what you are comfortable with.
  3. A Cloud Pub/Sub topic and subscription for dead letters. Why is it necessary to have a subscription? Let me remind you that messages that are sent to a topic with no subscriptions are essentially discarded, so it’s important to have a subscription in place.
  4. Ability and will to make some changes to your application.

The following diagram describes the data flow:

So what is happening?

Cloud Pub/Sub subscribers can come in many shapes and forms. Perhaps you have containers polling using python. Perhaps you have Compute Engine instances polling in Java. Perhaps you have a Cloud Dataflow pipeline implemented in Java or Python. Perhaps you are using Cloud Pub/Sub’s push feature and receive the messages with your web servers. You catch my drift.

My intention is not to provide implementation, but more of a pattern that you can adapt to your needs.

Assuming you have your Cloud Pub/Sub topic + subscriptions in place, we have to create a new topic and (at least) one subscription for the dead letter queue.

In cloud shell (or any other gcloud enabled environment):

gcloud pubsub topics create dead-letters
gcloud pubsub subscriptions create --topic dead-letters error-monitoring-sub

This will create a Cloud Pub/Sub topic named dead-letters, and then a subscription called error-monitoring-sub attached to that topic.

Key-Value store schema

Let’s start with the key. I think that in the common case you would need to express the uniqueness of the topic + subscription + message id. Message id is unique in a topic, but it could be that one type of subscriber will deal with a message with no problems, and a different subscriber will fail. Which is why we have the subscription in the key. So a simple concatenation of the three can be the key here. The value can be whatever you might think is good for storing, if you have app specific data for example, the message payload etc.

Like I said, there are quite a few options for key-value store, I’ll be using redis with the new Memorystore on Google Cloud Platform. Please note that at the time these lines are being typed, Memorystore is still in beta.

The next line creates a redis instance named “error-tracker” on Memorystore in the us-central1 region (attached to the default network):

gcloud beta redis instances create error-tracker --region=us-central1gcloud beta redis instances list  --region=us-central1 
INSTANCE_NAME REGION TIER SIZE_GB HOST PORT NETWORK RESERVED_IP STATUS CREATE_TIME
error-tracker us-central1 BASIC 1 10.0.0.3 6379 default 10.0.0.0/29 READY 2018-06-19T17:40:12

Now that we have some redis goodness going on, we can note the ip address (in this case 10.0.0.3) and continue to our code.

Make sure your vm/instance has at the very least the permissions of “Pub/Sub Subscriber” and “Pub/Sub Publisher”.

Let’s write some sample behavior code in python (Officially the python client is still in beta), but you can more or less adjust to any language.

Setting up:

from functools import wraps
import redis
import os
import time
os.environ['GOOGLE_CLOUD_PROJECT'] = 'myproject'FAIL_LIMIT = 5# Our subscription to the work topic
subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format(
project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
sub='work-subscription', # Set this to something appropriate.
)

# Our publisher for the dead letter queue
dead_letter_queue_topic = 'projects/{project_id}/topics/{topic}'.format(
project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
topic='dead-letters', # Set this to something appropriate.
)

Here we just perform the basic imports, and getting the fully qualified name of the subscription we are going to pull from and the fully qualified name of the dead-letter topic we are going to push to.

notice FAIL_LIMIT = 5 , which is our fault tolerance per message (i.e, if a message fails more than FAIL_LIMIT times it

Message handler

def pubusb_message_handler(f):
"""
A decorator for handling a pub/sub message. It handles the error handling and acking.
"""
@wraps(f)
def wrapper(message):
try:
result = f(message)
message.ack()
return result
except Exception, ex:
handle_error(ex, message)
return wrapper

This is a decorator for handling pubsub messages. The flow here is relatively simple. Run the decorated function, if no exception was raised then acknowledge the message. If an exception was raised, then let the handle_error function take care of that.

Handling errors

def handle_error(ex, message):
"""
Handling errors when there is a failure of handling a message
"""
key = create_key(message.message_id)
counter = get_fail_count(key)
if counter >= FAIL_LIMIT:
print "message %s failed" % (message.message_id)
error_notifier = pubsub.PublisherClient()
error_notifier.publish(dead_letter_queue_topic, message.data, **message.attributes)
message.ack()
else:
wait_time = calc_wait_time(counter)
time.sleep(wait_time)
raise ex

Let’s talk about the error handler. The flow here gets a (bit) more complicated.

We call a function called “create_key” that takes the message id in this case and returns a unique identifier that is passed to get_fail_count that accesses the data store.

If we have reached our threshold, we publish the message to the dead letter queue and then acknowledge it so we won’t process it again. If we haven’t reached the threshold, we try to see if we want to sleep for some period of time before returning (could be useful in some cases) and then re-raising the exception.

Helpers

def calc_wait_time(fail_count):
# In case you want to wait some arbitrary time before your message "fails"
return fail_count
def create_key(message_id):
"""
This helper function creates a unique key for a message
"""
return "%s_%s" % (subscription_name, message_id)

The create_key function here just concatenates the subscription name to the message id , that is because message id is unique within the topics and we want to create a key that is unique for the subscription.

The calc_wait_time can basically be coded for any logic you would like (some backoff perhaps?), but it really depends on the acknowledgment time that you have configured for a pull topic. Here I just return the number of fails and that’s the number of seconds we wait.

Accessing our key-value store

def get_fail_count(key):
"""
This function wraps the data store logic. In this case we access redis, but this can be implemented with bigtable, spanner, sql, cassandra, etc.
Here, I create the client in the function but it can also be created outside.
"""
redis_client = redis.StrictRedis(host='10.0.0.3', port=6379, db=0)
redis_client.incr(key)
counter = int(redis_client.get(key))
return counter

The get_fail_count method atomically (as much as possible) accesses the key value store , increases the fail count and retrieves the current count. In this case I use redis, but you can choose to use whatever you like.

Starting the subscriber

@pubusb_message_handler
def callback(message):
print("Received Message %s " % message.data)
# Uncomment to cause some havoc
#raise "Unknown Error"
if __name__ == '__main__':
subscriber = pubsub.SubscriberClient()
future = subscriber.subscribe(subscription_name, callback)
while True:
time.sleep(1)

Notice that the callback method here is decorated by the “pubsub_message_handler” decorator that I described above.

When the app starts, it creates a subscriber and starts listening to messages, which are handled by the callback. The infinite loop is required to keep the process alive (the subscription handler is asynchronous and is not blocking on the main thread.

Putting it all together

In action

I am going to run the process

python main.py

And then I’ll publish a new message to the work topic from the gcp console:

And since I have not set the code to raise any errors, i will just see the message was processed:

python main.py 
Received Message This is a test message

Now let’s uncomment the “raise” line to raise an error within the message handler callback and publish the same message.

In this case we’ll see on the shell:

python main.py 
Received Message This is a test message
Received Message This is a test message
Received Message This is a test message
Received Message This is a test message
Received Message This is a test message
message 130021349625804 failed

We can see we tried to process the message 5 times before we stopped. Now let’s check the dead-letter queue:

gcloud pubsub subscriptions pull error-monitoring-sub --limit=100 --auto-ack
┌────────────────────────┬─────────────────┬───────────────┐
│ DATA │ MESSAGE_ID │ ATTRIBUTES ├────────────────────────┼─────────────────┼───────────────┤
This is a test message │ 126573022336505 │ publisher=ron │└────────────────────────┴─────────────────┴───────────────┘

And voila, the message is now in the dead-letter queue and we can process it any way we want.

Things to note:

  • I have not touched on that, but you might want to TTL/delete the message keys from your key-value store.
  • The code here is still pretty naive. In some cases you’ll might handle messages in bulk and will want to make different types of access to the data store.
  • Error handling here can be improved (data store errors for example)

Conclusion

Pub/Sub does not support dead-letter queues (as of this date), but we can make some changes to our code and architecture and add that functionality to our application.

Read more:

--

--

Ron Pantofaro
Google Cloud - Community

Solutions Architect, Google Cloud (my opinions are my own). Food, distributed systems, coffee, containers, music, devops, travel, data pipelines, fatherhood.