Practical Functional Distributed Programming, using Python on App Engine

Modern cloud based systems are a true wonder of the world.

Only a very short time ago, if you wanted to build computer systems at scale, you had to own servers and networks and maybe buildings, and employ armies of clever people to plan and coordinate and operate all that infrastructure.

Now, if you’ve got your credit card handy, you can sign up for a cloud service and in a couple of minutes be looking at a console like this:

That’s a graph with time along the X-Axis, and number of “instances” (roughly, virtual computers) along the Y-Axis.

What you’re looking at is an unshackling from computational limits, the dream of potentially unbounded scaling. That’s what this publication, The Infinite Machine, is all about, focusing on what I consider to be the world’s best and most underrated distributed computing platform, Google App Engine.

An Infinite Machine is amazing. However, it leaves us with a big, still poorly answered question, which is how on earth are we to program this thing?

Most of modern pop culture computing focuses around front-end development, the world’s bike shed. People agonise endlessly about how they might start some asynchronous task in a single threaded environment and then later respond to the result of that, retaining context and tractability as the size of their software grows. And that’s not an entirely trivial problem. Mostly, but not entirely.

What I notice is that almost all the good answers to this problem are some variant of functional programming.

What would be really brilliant would be if we could abstract away the difficult details of a horizontally scaling distributed computing platform far enough that it would be amenable to similar techniques.

That is, can we do Functional Distributed Programming on the Infinite Machine?

What is Functional Programming?

Functional programming in a nutshell is the ability to program with First Class Functions. First-class functions boil down to the following:

I’ll address each of these factors shortly. But first let’s reduce the possible solution space.

Why Python?

I’ve already decided on App Engine as the platform. I’ve further restricted this to the App Engine Standard Environment, simply because one of the practical goals I have in building distributed systems is to get other people to do almost all of the work. In the case of the App Engine Standard Environment, we get language environments managed by Google, App Engine libraries managed by Google, security managed by Google, etc. Everything we could gain from going with the Flexible environment (like arbitrary choice of language) brings larger drawbacks than I really want to incur (eg: worse instance startup times, possible security issues, worse integration with the App Engine environment).

So sticking with the standard environment, we need to choose from the supported languages, which are:

  • Java
  • Python
  • PHP
  • Go

Of these languages, we need something which supports real functional programming. That rules out Go as far as I can tell (I don’t know much about it, maybe I’m wrong?).

Java can support functional programming, but you’re really going against the grain. Java really, really wants to be an Object Oriented language, and that’s not a paradigm consistent with a functional approach.

So the best candidates are Python and PHP.

Having worked commercially with PHP, I’m inclined to just rule it out immediately; as with anyone who’s ever used it, I still get the night sweats. I’ll say no more about it in this article, except to note that everything I will talk about may in fact be possible in PHP (eg: see this library), albeit more revoltingly.

So this leaves us with Python. And Python is such a lovely language.

Distributed First Class Functions

Python supports first class functions excellently. That gets us started.

But we want to get first class functions working on the Infinite Machine. How is this different to standard Python based functional programming?

In standard functional programming, we’re working in a single process environment. Python represents functions and lexical closures using application memory, and we either stick to one thread of execution, or else wrangle multiple threads to squeeze a bit more concurrency out of I/O bound code or a bit more grunt out of multiple cores (GIL notwithstanding).

But in App Engine’s horizontally scaling environment, we’ll need to make functions work across separate processes, usually running on separate machines.

App Engine calls its units of computation (push) Tasks. An Instance (a regular, probably virtual machine) can potentially run multiple Tasks concurrently, but this is a constrained, fixed small number. We scale by adding Tasks to Task Queues, and letting App Engine allocate these Tasks to Instances, allocating more Instances, or shutting down Instances, as the rate of task enqueuing rises and falls.

What we’re looking to do is create Distributed First Class Functions. These are functions which run in their own Tasks, yet somehow retain all their first class qualities.

@task, the fundamental building block

The basic tool I’ve created to help us do this in Python on App Engine is the @task decorator function. I’ve posted about this before:

@task lets us run a function in a separate App Engine task using a strongly Pythonic approach, ie: a decorator. So in this simple example,

from taskutils import task
def OuterFunction(a, b):
  @task
def InnerFunction():
// do something in another Task
x = a + b // we can reference a and b
  InnerFunction()

you can see OuterFunction() calling InnerFunction() to do some work. InnerFunction is decorated with @task, which causes App Engine to run it in a separate Task, ie: in a separate process, probably on a separate machine.

How does @task do this? I recommend reading the linked article, but the short story is that Python exposes enough metadata at runtime to allow us to serialise a function, with its arguments and lexical closure intact, and deserialise it elsewhere. This trick is used to get our function from one App Engine Task to another.

We have a strong restriction here though. You’ll notice that no result is returned from InnerFunction(). That’s because it’s running in another task, and there’s no way for the result to come back to the outer scope. If the inner function were to return a result, it’d just be lost. eg:

@task
def DoSum (a, b):
return a+b // this result is thrown away
x = DoSum(4, 5) 
// here x is None, because @task decorated
// functions always return None.

That’s a bit sad, but we can cope.

There’s a technique in functional programming called Continuation Passing Style that we can take advantage of. Continuation Passing Style is where you pass an extra argument to a function, and that extra argument is another function. Instead of returning the result when your first function is complete, you call this “continuation” function, passing it the result as an argument.

This might sound familiar. It’s more familiar to most programmers as a result handler, and is a common technique used in modern practical functional programming languages, like Python and Javascript, where they are doing something asynchronous.

So we could rewrite our Sum function as follows:

@task
def DoSum (a, b, OnSuccess):
OnSuccess(a+b)
def SuccessHandler(result):
x = result
// x == 9 here
DoSum(4, 5, SuccessHandler) 

That’s a perfectly acceptable style for functional programming in an asynchronous environment (and a distributed computing platform is as asynchronous as it gets).

So using @task in a continuation passing style, you can get a tantalising glimpse of Distributed Functional Programming in Python on App Engine.

Let’s look at the basic features of First Class Functions listed earlier in the article, and see how we can satisfy them using this approach.

Higher-order functions: passing functions as arguments

We’ve already seen functions passed as arguments, in the Sum() example above. When we do this, @task makes sure to serialise the passed function, with arguments and closure intact, and deserialise it transparently inside the new Task. This is how OnSuccess() can be called in Sum() above, and you should note that OnSuccess() runs in the same task as Sum().

Anonymous and nested functions

The @task approach can support anonymous and nested functions as easily as first level (global) functions in Python, because it serialises any Python function object. eg:

def DoSomething():
def HandleResult(result):
// result == 9
logging.debug(result) // App Engine's equivalent of "print"
  @task
def ApplyFunctionToArgs(f, a, b, HandleResult):
HandleResult(f(a, b))
  ApplyFunctionToArgs(lambda x, y: x + y, 4, 5)

You’ll notice here that an anonymous function (the lambda) is used, and that a nested function is also included (HandleResult).

One slight problem, which is a Python problem, is that Python doesn’t have true anonymous functions. A lambda, while being a first class function, can only have an expression as a function body. To do more than that, you need to declare a named function.

But this isn’t a big deal. There’s nothing that you can do with anonymous functions that you can’t do with Python’s named nested functions. Sometimes it feels a little clunkier compared to what you might do in Javascript, but it’s just a syntactic quibble at the end of the day.

Non-local variables and closures

We’ve seen some use of closures. In the following example, you’ll see something a bit more obvious:

@task
def DoSum (a, b, OnSuccess):
OnSuccess(a+b)
def GenerateSuccessHandler(c):
def SuccessHandler(result):
x = result
// x == 9 here
y = x + c
// y == 16 here
return SuccessHandler
DoSum(4, 5, GenerateSuccessHandler(7))

In this example, GenerateSuccessHander() provides a value for c, and SuccessHandler references it. This means the value is bound in SuccessHandler’s closure, and so is moved into the new Task by @task, ready to be added to y when SuccessHandler is finally called.

Higher-order functions: returning functions as results

This is an odd one, because we can’t return any results, never mind a function. But sticking to continuation passing style, can we pass a function as a result to a continuation? Of course:

@task
def GenerateApplyFunc(f, a, b, handler):
def ApplyF():
return f(a,b)
handler(ApplyF)
def HandleApply(applyf):
logging.debug(applyf()) # logs 9
GenerateApplyFunc(lambda x, y: x + y, 4, 5, HandleApply)

The call to GenerateApplyFunc() passes in

  • a lambda which sums two numbers,
  • some values (4 and 5) and
  • a function which expects to be passed another function that takes no arguments and returns a result.

Inside GenerateApplyFunc(), you can see that ApplyF() is a function which takes no arguments and returns the result of applying f (the lambda) to the values. GenerateApplyFunc() passes ApplyF() to the passed in handler (ie: HandleApply().

The important bit for this section is handler(), our continuation, being passed the function ApplyF().

Assigning functions to variables

We can treat a @task function just as we’d treat any other.

@task
def DoesSomething():
// stuff
x = DoesSomething
x()  

Functions in python can be assigned to variables, and @task decorated functions are just like any other Python functions.

You don’t have to use @task in a decorator style, either.

def DoesSomething():
// stuff
DoesSomething = task(DoesSomething)
x = DoesSomething
x()
y = task(lambda(): 9)
y()

The last bit of that example, running a lambda in a task, is ridiculous, but it’s worth making the point that it’s possible.

Equality of functions

Python supports equality testing for functions, but this is simple referential equality. It doesn’t work in the way we’d like to intuitively use it in a distributed setting. In particular, if you serialise and deserialise a function, the reconstituted version wont test equal to the original.

But because we have full function serialisation, we can do something odd looking which is a little bit magical. And that magic is computing a hash value.

Firstly, let’s look at serialisation:

import yccloudpickle
c = 3
def myfunc(a, b):
d = a + b + c
// d is 6
pickled = yccloudpickle.dumps((f, 1, 2))
// pickled is a string including function, arguments and closure

Here, pickled is a string, the serialised form of the function with arguments and closure.

If we’ve got a string, we can make a hash value:

import hashlib
def GenerateStableId(instring):
return hashlib.md5(instring).hexdigest()
hash = GenerateStableId(pickled)

Now what’s fascinating about that hash value, is that we can use it to implement intentional equality of functions. Here, two functions are intentionally equal if they use the same code, take the same arguments, and refer to the same values in their closures. This isn’t something that we’d use generally, but can be hugely useful in specific situations.

For example, in implementing the @debouncedtask here,

I had to be able to distinguish between separate calls to the same function, where they were called with different arguments.

For example, if I want to debounce a function which operates on person records, I want to debounce calls to it separately based on the person they are operating on. So,

@debouncedtask(initsec = 60, repeatsec = 60):
def UpdatePerson(personid):
// do something to the person with this personid
UpdatePerson("1234") // will run in 60 seconds
UpdatePerson("1234") // skipped, we're already scheduled to run
UpdatePerson("5432") // unaffected by previous calls, will
// also run in 60 seconds

This works because @debouncedtask uses hash values to separate calls (ultimately to compute cache keys). And it means that I can debounce the function and it just works in the way you’d expect it to. Magic.

In a future article I’ll present an honest-to-goodness critical section that works the same way.

So what can we do with Distributed Functional Programming?

I’ll continue to explore this in subsequent articles. I’ve already published some, such as the series on sharding:

In these articles I used @task to implement an adaptively sharded map over ndb datastore objects. This gives you a potential O(log n) map over n objects, where n is an arbitrarily large number.

The kids might have looked at continuation passing style and thought yuck, I don’t want to deal with horrible handlers, can’t we have something proper like promises, or async/await?

The answer to this is yes, but don’t expect these things to map to a distributed environment very usefully. When we do distributed work in a system like App Engine, we’re usually doing something very different to what, say, Promises are used for in Javascript, and that is, we’re working on arbitrarily large data. Constructs like Promise.all() just wont cut the mustard; we have to deal seriously with scale.

In subsequent articles I’ll introduce Distributed Hierarchical Futures. These are Futures, superficially like those presented in the ndb library, but built on top of @task. They’re useful for constructing arbitrarily complex distributed jobs where we really care about

  • operating on arbitrarily large data
  • knowing the realtime status of the job
  • computing progress of the job
  • producing summaries of results

It’ll get a little hairy. But stick with me, it’ll be fun! Like programming is supposed to be :-)

Update: Here it is: