Greyboi
The Infinite Machine
8 min readFeb 12, 2017

--

Note: Since this article was written, CloudPickle has been updated to do everything that YCCloudPickle was created to do. I now use CloudPickle in all circumstances, and recommend you use it to. This article remains for historical interest only.

Python Function Serialisation with YCCloudPickle

Say I’ve got an App Engine app managing user accounts. They have a store credit “balance”. As a promotion, I’d like to add $10 of free credit to everyone’s account.

A naive way I could try to do this is as follows:

def AddFreeCredit(creditamount):
def ProcessOnePage(cursor):
accounts, cursor, kontinue = Account.query().fetch_page(
100, start_cursor = cursor
)
for account in accounts:
account.balance += creditamount
ndb.put_multi(accounts)
if kontinue:
ProcessOnePage(cursor)
ProcessOnePage(None)
AddFreeCredit(10)

But that’s a bad way to do it. It’s bad because we don’t know how many Account objects there are. There could be billions! How long will this take?

A foreground task (on a push queue) in App Engine can run for 10 minutes max. So using a tight loop like this means we have an upper limit on the number of objects we can visit before the task will be timed out (killed) by App Engine.

This is also a strongly linear algorithm (ie: O(n)), but let’s ignore that for the moment. Say we don’t care how long it takes to run, but we don’t want to use a lot of processing resources at any moment, so just looping through the accounts from the beginning to the end is acceptable.

It’d be really cool to be able to run this whole thing in the background, in a task. You’d work in that task for a finite amount of time, then kick off a new task to continue the work, and so on until it’s done.

It’d be really super cool if we could just mark ProcessOnePage as a background task, like this:

def AddFreeCredit(creditamount):
@task
def ProcessOnePage(cursor):
accounts, cursor, kontinue = Account.query().fetch_page(
100, start_cursor = cursor
)
for account in accounts:
account.balance += creditamount
ndb.put_multi(accounts)
if kontinue:
ProcessOnePage(cursor)
ProcessOnePage(None)
AddFreeCredit(10)

This seems like something reasonable. Can we accomplish this? What would it take?

Let’s first discuss why it’s impossible, and then discuss why it’s possible.

Why it’s impossible

If we want to run ProcessOnePage in a task, we run into some immediate problems.

Firstly, the most basic way of kicking off a background task is to enqueue a call to a web handler. But here, we’re looking at an inner function that we want to run.

We’d be building a decorator something like this:

def task(f):
def runtaskinbackground():
... somehow set up a web handler, serialise the function and
... its arguments, enqueue a task to do that, then have the
... web handler reverse that process. Messy!
return runtaskinbackground

But a lot of this is handled by the deferred library. You pass defer() a function and its arguments, and it does all that serialization+enqueue+deserialize_in_handler stuff for you. So can’t we write something like this?

from google.appengine.ext import deferreddef task(f):
def runtaskinbackground(*args, **kwargs):
deferred.defer(f, *args, **kwargs)
return runtaskinbackground

Wow, easy. Let’s run it on App Engine:

ERROR    2017-02-12 04:37:39,795 main.py:16] An error occurred during a request.
Traceback (most recent call last):
... elided ...
File "/home/xapiapps/dev/evcws/emlyn-experiments/experiments/incrementaccountswithtask.py", line 24, in Go
AddFreeCredit(10)
File "/home/xapiapps/dev/evcws/emlyn-experiments/experiments/incrementaccountswithtask.py", line 23, in AddFreeCredit
ProcessOnePage(None)
File "/home/xapiapps/dev/evcws/emlyn-experiments/experiments/incrementaccountswithtask.py", line 7, in runtaskinbackground
deferred.defer(f, *args, **kwargs)
File "/usr/lib/google-cloud-sdk/platform/google_appengine/google/appengine/ext/deferred/deferred.py", line 274, in defer
pickled = serialize(obj, *args, **kwargs)
File "/usr/lib/google-cloud-sdk/platform/google_appengine/google/appengine/ext/deferred/deferred.py", line 245, in serialize
return pickle.dumps(curried, protocol=pickle.HIGHEST_PROTOCOL)
File "/usr/lib/python2.7/pickle.py", line 1374, in dumps
Pickler(file, protocol).dump(obj)
File "/usr/lib/python2.7/pickle.py", line 224, in dump
self.save(obj)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
save(element)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 748, in save_global
(obj, module, name))
PicklingError: Can't pickle <function ProcessOnePage at 0x7fdc9bef3500>: it's not found as experiments.incrementaccountswithtask.ProcessOnePage

Oh dear.

This exception is telling us something profound, and potentially show-stopping. It’s complaining that the function passed to the task decorator, ie: ProcessOnePage, can’t be found as the global name experiments.incrementaccountswithtask.ProcessOnePage. When I ran this code, I ran it in a package called experiments, in a file called incrementaccountswithtask. defer() is looking for a global function in that module.

What’s actually happening is that defer uses Python’s pickle library. That library in turn can’t really pickle a function object. Instead, it grabs the function’s name and module information, pickles that, and tries to look the function up using that information in globals when depickling.

So, you can pickle a reference to a global function in the codebase. But that’s not going to work for a lambda function, and it’s not going to work for an inner function (nested inside something else).

You can read a lot more about this here: https://medium.com/@emlynoregan/serialising-all-the-functions-in-python-cd880a63b591#.vtrt3sf2f

There *are* libraries that can do better. Both Dill and CloudPickle have made an effort to work around these problems. Both of them actually serialise the function object (not just a reference to it).

Defer uses pickle and I don’t think you can change that. But what you can do is the following:

1 — Serialize the function properly with a better library than pickle

2 — Enqueue a task to run a wrapper function, with this serialised function as a string argument to that function

3 — In the wrapper function, deserialise the original function and call it

Let’s give that a shot, using Cloud Pickle. Here’s the code:

import cloudpickle
import pickle
def defer_wrapper(fser, *args, **kwargs):
f = pickle.loads(fser)
f(*args, **kwargs)
def task(f):
def runtaskinbackground(*args, **kwargs):
fser = cloudpickle.dumps(f)
deferred.defer(defer_wrapper, fser, *args, **kwargs)
return runtaskinbackground

That looks good. And let’s run it on App Engine… oh no:

ERROR    2017-02-12 05:23:28,480 main.py:16] An error occurred during a request.
Traceback (most recent call last):
File "/home/xapiapps/dev/evcws/emlyn-experiments/lib/flask/app.py", line 1982, in wsgi_app
response = self.full_dispatch_request()
File "/home/xapiapps/dev/evcws/emlyn-experiments/lib/flask/app.py", line 1614, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/home/xapiapps/dev/evcws/emlyn-experiments/lib/flask/app.py", line 1517, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/home/xapiapps/dev/evcws/emlyn-experiments/lib/flask/app.py", line 1612, in full_dispatch_request
rv = self.dispatch_request()
File "/home/xapiapps/dev/evcws/emlyn-experiments/lib/flask/app.py", line 1598, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/home/xapiapps/dev/evcws/emlyn-experiments/handlers/switchboard.py", line 24, in switchboard
resultobj = experiment[1]()
File "/home/xapiapps/dev/evcws/emlyn-experiments/experiments/incrementaccountswithtask.py", line 33, in Go
AddFreeCredit(10)
File "/home/xapiapps/dev/evcws/emlyn-experiments/experiments/incrementaccountswithtask.py", line 32, in AddFreeCredit
ProcessOnePage(None)
File "/home/xapiapps/dev/evcws/emlyn-experiments/experiments/incrementaccountswithtask.py", line 14, in runtaskinbackground
fser = cloudpickle.dumps(f)
File "/home/xapiapps/dev/evcws/emlyn-experiments/lib/cloudpickle/cloudpickle.py", line 706, in dumps
cp.dump(obj)
File "/home/xapiapps/dev/evcws/emlyn-experiments/lib/cloudpickle/cloudpickle.py", line 150, in dump
raise pickle.PicklingError(msg)
PicklingError: Could not pickle object as excessively deep recursion required.
INFO 2017-02-12 05:23:28,486 module.py:806] default: "POST / HTTP/1.1" 500 27

Note: I tried to do this with Dill as well, but couldn’t get it to work on App Engine at all. Anyone had any luck with that? In any case, Dill has exactly the same issue. https://medium.com/@emlynoregan/digging-deeper-into-recursive-inner-functions-in-python-8ec6c5b1cbb#.6rh93m5mp

If you dig into cloud pickle, you’ll find it can’t serialise an inner function that involves recursion. The short explanation is that the function is in its own closure, causing a loop in what would otherwise be a tree. This causes Cloud Pickle and Dill to both break during serialisation, because they use naive tree-walking algorithms. However, there’s a more fundamental issue that you can’t reconstruct a function with cycles in its closure graph in pure python, because the objects you are working with are all immutable, so there’s no way to create object A, create object B referencing A, make A reference B.

You can read more about this here:
https://medium.com/@emlynoregan/digging-deeper-into-recursive-inner-functions-in-python-8ec6c5b1cbb#.6rh93m5mp

So, our original code is unserialisable, because it uses recursive inner functions. Impossible.

But it’s not impossible! It just requires some gymnastics.

Making it possible

A while back I wrote a series of posts culminating in this one:

I presented a method for modifying Python function objects with closure cycles, to remove the cycles. It’s quite unintuitive, and uses the Y Combinator as the heart of the operation. I wont explain it in any more detail here; you can read the articles if you’d like to understand it.

I’ve taken this technique and added it to Cloud Pickle in a fork called YC Cloud Pickle.

I’d like to have the technique included in Cloud Pickle itself, so let’s hope YC Cloud Pickle is a temporary measure.

Grab YC Cloud Pickle with

pip install yccloudpickle

or if you are vendoring in an App Engine app,

pip install yccloudpickle --target lib

YC Cloud Pickle works exactly the same way as Cloud Pickle. So the code for using it looks like this:

import yccloudpickle
import pickle
def defer_wrapper(fser, *args, **kwargs):
f = pickle.loads(fser)
f(*args, **kwargs)
def task(f):
def runtaskinbackground(*args, **kwargs):
fser = yccloudpickle.dumps(f)
deferred.defer(defer_wrapper, fser, *args, **kwargs)
return runtaskinbackground

And believe it or not, our code for updating the accounts works perfectly using this definition of task. Here it is again for reference.

def AddFreeCredit(creditamount):
@task
def ProcessOnePage(cursor):
accounts, cursor, kontinue = Account.query().fetch_page(
100, start_cursor = cursor
)
for account in accounts:
account.balance += creditamount
ndb.put_multi(accounts)
if kontinue:
ProcessOnePage(cursor)
ProcessOnePage(None)
AddFreeCredit(10)

So what’s been achieved here?

You can see that ProcessOnePage is a recursive inner function. It also refers to another variable in lexical scope outside itself, creditamount. This is serialised in the closure of the function and moves along with it to the task that it will ultimately be run in.

This means that you can write very natural pythonic code, accessing lexical scope freely (although do remember these values are being serialised across machine boundaries).

It makes writing sophisticated algorithms for concurrent programming much simpler and more elegant, which is great because these kinds of algorithms tend to be so complex that you need all the help you can get!

In terms of the Squinty Local Machine Model, this method of serialising any function, complete with lexical scope, and running that in a task, will become the analogue of a thread.

What’s next?

Next I’ll throw task and related functions into a python package that you can use, then start looking at the kinds of things we can do with it.

--

--

Greyboi
The Infinite Machine

I make things out of bits. Great and terrible things, tiny bits.