Problems with Deferred

A rare appengine logo from back at the beginning…

I’ve been using the deferred library since I started building python apps on Google App Engine. That is, I’ve been using it for seven years. And, when I started using it, seven years ago, it was already old.

The library was originally written by Nick Johnson, who hasn’t worked on App Engine (or for Google at all I think) for many years now. The current docs still contain this article written by Nick in 2009:

I met Nick Johnson many years ago at a Google Developer Day in Sydney. He’s a brilliant guy, and a great Python developer. Whatever you’re up to now Nick, thanks for all the great work.

And in the source I see this at the top:

# Copyright 2007 Google Inc.

But there is one more thing: did you hear about that new deferred library for App Engine?

So it’s not the latest thing. In fact they started writing this code before there were iPhones.

Anyone who’s been following my writing on Python function serialisation and distributed computing on App Engine will have seen that everything I do with tasks ultimately falls back to calls to deferred. Most notably, my previous post showed how you can use yccloudpickle to run any function at all in python in another task; it uses deferred to do the lowest level work.

In that article I presented this code:

def AddFreeCredit(creditamount):
@task
def ProcessOnePage(cursor):
accounts, cursor, kontinue = Account.query().fetch_page(
100, start_cursor = cursor
)
for account in accounts:
account.balance += creditamount
ndb.put_multi(accounts)
if kontinue:
ProcessOnePage(cursor)
ProcessOnePage(None)
AddFreeCredit(10)

and showed how to implement the decorator function task using yccloudpickle and deferred. At the end of the article I said this:

Next I’ll throw task and related functions into a python package that you can use, then start looking at the kinds of things we can do with it.

So, I’ve been building that package. In the course of doing that, I’ve dug into deferred and discovered that it’s not black magic, and it’s got some serious shortcomings. That second thing is bad, but the first thing is good news; we can fix it.

Note: Here’s the next article, presenting my replacement for deferred:

Recapping deferred

The deferred library provides one method, defer(), which you can use to run a function in another task.

from google.appengine.ext.deferred import defer
def myfunction(arg1, arg2):
... do something ...
defer(myfunction, value1, value2, _queue="someotherqueue")

Here’s the doco:

and here’s the source for deferred.py:

On my linux box, I find this file here:

/usr/lib/google-cloud-sdk/platform/google_appengine/google/appengine/ext/deferred/deferred.py

defer() takes the function it is passed, and the arguments, pickles them, then enqueues a call to /_ah/queue/deferred, passing these pickles. The handler for that route (which runs in another task) then unpickles these pickles and calls the resulting unpickled function, passing the unpickled arguments to it.

As an added wrinkle, the data passed to the task queue system can only be 100K maximum. So if our pickles are larger than this, the task queue system will throw a fit (well, an exception). In this case, deferred stores the pickles in a datastore object, then enqueues a call to a special function for dealing with this called run_from_datastore, passing it the key of the datastore object. This function in turn takes the key, loads the datastore object, unpickles the pickles, and runs the function as above. It also cleans up the datastore object. Datastore objects can be up to 1 mb in size, which is roughly the same amount of text as appears in a paperback novel, so this gets us a really long way.

You’ll notice another argument in the example above, _queue. There are a set of reserved keyword arguments to defer() which are passed to the task queue system instead of through to the caller’s function, identified by their leading underscores.

The deferred module provides a default handler (actually a whole mini WSGI app) to deal with /_ah/queue/deferred, which is configured using a builtin; ie: we add these lines to app.yaml:

builtins:
- deferred: on

So to use defer() we don’t even have to be aware of how it works behind the scenes. Put the magic incantation into app.yaml (or equivalent <servicename>.yaml), and defer just works.

Problems with deferred

Weak pickling

The major problem with defer(), which I’ve spent a lot of effort trying to fix, is that it only works on global functions (sorry, callables). This is because it uses the standard Python library pickle, which is very limited in what it can pickle. From deferred.py:

Tasks consist of a callable and arguments to pass to it. The callable and its
arguments are serialized and put on the task queue, which deserializes and
executes them. The following callables can be used as tasks:
1) Functions defined in the top level of a module
2) Classes defined in the top level of a module
3) Instances of classes in (2) that implement __call__
4) Instance methods of objects of classes in (2)
5) Class methods of classes in (2)
6) Built-in functions
7) Built-in methods
The following callables can NOT be used as tasks:
1) Nested functions or closures
2) Nested classes or objects of them
3) Lambda functions
4) Static methods
The arguments to the callable, and the object (in the case of method or object
calls) must all be pickleable.

I’ve bolded the two cases which are real limitations, lambda functions and nested functions (elsewhere I call these inner functions). When I find myself wanting to use defer() in a natural way (ie: as I’d code if I wasn’t throwing some code to other tasks), I hit these limitations constantly, particularly nested functions that use lexical closures.

This could be fixed if deferred used a better pickling library. My article introduces yccloudpickle, a library which can pickle anything, including nested functions. This is what we should be using in deferred.

No decorator style

Surely it’d be more pythonic to use defer() like this:

from google.appengine.ext.deferred import defer
@defer
def myfunction(arg1, arg2):
... do something ...
myfunction(value1, value2)

or

@defer(queue="someotherqueue")
def myfunction(arg1, arg2):
... do something ...

ie: you could statically decorate a function to be run as a task, along with all task related parameters, then just call it naturally.

Sometimes you want to make this choice at run time; then you can use the decorator as a factory, and fall back to a style like this:

from google.appengine.ext.deferred import defer
def myfunction(arg1, arg2):
... do something ...
myfunctiondefered = defer(myfunction, queue="someotherqueue")
myfunctiondefered(value1, value2)

or simply

defer(myfunction, queue="someotherqueue")(value1, value2)

This would remove the clunky use of underscores in task-related arguments, and separate setting up the function for deferral from actually calling the function.

Also, a couple of nasty transactional bugs in defer()

In digging through deferred, I discovered this nasty little bug in the function defer():

def defer(obj, *args, **kwargs):
taskargs = dict((x, kwargs.pop(("_%s" % x), None))
for x in ("countdown", "eta", "name", "target",
"retry_options"))
taskargs["url"] = kwargs.pop("_url", _DEFAULT_URL)
transactional = kwargs.pop("_transactional", False)
taskargs["headers"] = dict(_TASKQUEUE_HEADERS)
taskargs["headers"].update(kwargs.pop("_headers", {}))
queue = kwargs.pop("_queue", _DEFAULT_QUEUE)
pickled = serialize(obj, *args, **kwargs)
try:
task = taskqueue.Task(payload=pickled, **taskargs)
return task.add(queue, transactional=transactional) # 1
except taskqueue.TaskTooLargeError:
key = _DeferredTaskEntity(data=pickled).put()
pickled = serialize(run_from_datastore, str(key))
task = taskqueue.Task(payload=pickled, **taskargs)
return task.add(queue) # 2

Push queue tasks support an argument transactional, which allows them to participate in datastore transactions. If they are enqueued inside a datastore transaction, with transactional=True, tasks will only be launched at commit time, and only if the transaction succeeds. This is a really useful feature; when you’re writing code making heavy use of transactions, it often comes up that you want to mutate the datastore, then launch a task that should run after that mutation has taken place. If tasks aren’t transactional but launched inside a transaction, they can easily be launched many times, once every time the transaction fails & retries as well as the time it finally succeeds, which is rarely what you want, and can sometimes be disastrous.

In the code above, you can see that defer() pickles its arguments, and tries adding them as a task, correctly setting the transactional flag if it was set by the caller (#1). But if the pickles were too large and we have to fall back to using an intermediate datastore object, the enqueue step doesn’t set the transactional flag (#2).

This means that callers relying on the transactional flag will see the correct behaviour unless their pickled function & arguments are larger than 100K, in which case their task will be non-transactional, with possibly disastrous results.

This nasty bug is hard to notice if you’re not specifically testing for it; most of the time the 100K limit isn’t a problem, and most of the time transactions complete without a problem. So it’ll show up rarely, probably when your system is under load and you’re getting transactional contention, and even then in seemingly random fashion; the size of arguments to a function can vary, so the size of your pickles can vary, so this might only occur sometimes.

There’s another problem with this code in a transactional setting, which is that a datastore object is created. If this function is called inside a standard (non cross-group) datastore transaction, it’s going to cause a failure. That’s because it creates a datastore object which isn’t part of the same entity group as the rest of the transaction — you can tell that because there is no attempt to create the key, therefore the generated key will be root level and in its own entity group. So, your non-cross-group transactions will fail sometimes, based on the size of your pickled function & arguments. And when it does fail, it’ll likely fail repeatedly until your task fails permanently.

Leaking _DeferredTaskEntity entities

For pickles larger than 100K, a _DeferredTaskEntity object is created. This is then cleaned up in run_from_datastore(). However, this relies on the enqueued task running, and running to completion (this cleanup happens last).

Tasks in App Engine push queues aren’t guaranteed to run. If you enqueue them successfully, you can be pretty certain they’ll run, but there’s no absolute guarantee; a task queue could get flushed for a variety of reasons. An admin can flush the queue. Google might lose the queue contents for some reason. Or something else? In any case, it is improbable, but possible, that your task might never run. When you’re enqueuing vast numbers of tasks, improbable but possible means this will almost certainly happen eventually.

If it does run, then here’s the code that it runs:

def run_from_datastore(key):
entity = _DeferredTaskEntity.get(key) # 1
if not entity:
raise PermanentTaskFailure()
try:
ret = run(entity.data) # 2
entity.delete() # 3
except PermanentTaskFailure:
entity.delete() # 3
raise

The important bit to stop leaking is that either of the entity.delete() calls are executed.

#1: Firstly, we have to load the _DeferredTaskEntity. This might fail, say because the datastore service is temporarily unavailable. It might stay unavailable long enough for your task to get to maximum retries and fail permanently.

#2: Then, your function has to run, to completion or until it throws an exception. But your function might not return at all, or at least not within the finite window required of a push task (10 minutes). In this case, the task will be killed and retried. The same thing could happen repeatedly, until your task is permanently failed, leaking the_DeferredTaskEntity object.

#3: And of course, the entity.delete() might also fail, due to transient infrastructural problems.

In normal code, we could use a try/finally construct to ensure the entity is cleaned up. In this code though, we mustn’t clean up until we are absolutely sure of success, because the task could fail and retry from any point in the code, and the datastore object needs to still exist for each retry.

So, we end up with the possibility of leaking _DeferredTaskEntity. If you’ve used deferred in your own real projects, you’ll be familiar with seeing these in the datastore viewer; this leakage absolutely happens in practice.

Can we fix this?

Absolutely. We can use yccloudpickle as the pickler. We can implement a decorator. We can fix the transactional bugs.

In my next article, I’ll present an alternative library which you can use instead of deferred which does all these things.

As to leaking datastore entities, this is an unavoidable consequence of Tasks running zero, one, or more than one times; there’s just no way to absolutely guarantee that leaks don’t occur. This is a problem with any algorithm which uses transient datastore objects in App Engine, and is best addressed separately through a garbage collection paradigm. Stay tuned for a separate article just on that topic, but for now, know that if you name your transient datastore classes with a leading underscore, that’s enough to clean them up later.

Update 1

  • In the original version of this article, I wrote that the size limit for a task is 1K, but that’s not right, it’s roughly 100K. Thanks to Shay Erlichmen for pointing this out. I went looking for any reference to this in the App Engine documentation and couldn’t find anything, but I’ve validated it experimentally (update: this is documented in “quotas”). This means of course that all the machinery for dealing with tasks via a datastore object is far less relevant, as the limit is high enough that we’ll hit it very rarely.
  • Shay Erlichmen also pointed out that there is another issue with deferred, ie: that log entries all have the url /_ah/queue/deferred. That’s a hugely irritating feature of deferred when you try to debug code that uses tasks, because you can’t tell which log entries refer to which functions.
  • There’s another problem with deferred that I forgot to document, which is the way it throws away information about headers. Here’s the code:
class TaskHandler(webapp.RequestHandler):
"""A webapp handler class that processes deferred invocations."""
def run_from_request(self):
"""Default behavior for POST requests to deferred handler."""

<...elided...>

headers = ["%s:%s" % (k, v) for k, v in self.request.headers.items()
if k.lower().startswith("x-appengine-")]
logging.log(_DEFAULT_LOG_LEVEL, ", ".join(headers))
    run(self.request.body)

It reads the headers from the request, prints them to the log, and then throws them away.

That’s a real shame, because there’s great information in the headers, including how many times the task has run / retried.

This information is thrown away because, well, how would you pass it to an arbitrary function? There’s no clean way, and most of the time you wont care. But sometimes you will, so it’s worth addressing.

Update 2

Mark Cummins on Google+ pointed me to this library, another deferred replacement:

It’s not sufficient for my needs (it doesn’t use yccloudpickle), but it’s got some interesting tricks that I’ll steal. Great stuff.