Welcome to the Future!

Distributed Hierarchical Futures, part 1

Greyboi
The Infinite Machine
11 min readJan 21, 2018

--

This article explains the basics of Distributed Hierarchical Futures for App Engine, an abstraction for creating complex distributed systems.

A while back now (4 months ago!) I wrote this article about distributed functional programming on App Engine:

Near the end of that article I promised to reveal Distributed Hierarchical Futures, something I’ve been working on for coordinating sophisticated distributed programming work on app engine.

And then I forgot about it. Look, I got busy. I have a busy day job, which is why I make this crazy stuff in the first place.

But I was reminded by a lovely fellow that I’d said I’d write about this, and he’d actually squirrelled around in the appenginetaskutils library and read the code, and it kind of blew my mind.

So, here comes the Future!

Terminological Note: These are properly called Distributed Hierarchical Futures, or DHFs, but I’ll just call them Futures in this article because, you know, dude. Also it’s easier for lazy memes finding. Exhibit A:

So what’s the point of a Future?

Let’s go back to the @task library, and look at some basic usage.

The idea of @task is to take a function, closure and arguments and code and all, and execute it in a separate App Engine task; ie: in a separate process on another machine. eg:

from taskutils import taskdef OuterFunction(a, b):
@task
def InnerFunction():
// do something in another Task
x = a + b // we can reference a and b
InnerFunction()

Here OuterFunction throws processing off to another task using InnerFunction, which is wrapped in the @task decorator.

That’s nice, but can we get the value of x back from InnerFunction()?

no dice

So this obviously limits the utility of @task. You can do something in another process, but you can’t find out what happened. Can we even find out if InnerFunction() ran successfully?

What you can do is pass a handler function to your @task decorated function, using a continuation passing style as discussed previously. But, sometimes we’d like something stronger.

In the ndb library there’s a future class which solves this problem for asynchronous code, like this:

class MyRequestHandler(webapp2.RequestHandler):
def get(self):
acct = Account.get_by_id(users.get_current_user().user_id())
acct.view_counter += 1
future = acct.put_async()

# ...read something else from Datastore...

self.response.out.write('Content of the page')
future.get_result()

You’re separating out an operation (in this case put()), from getting the result of that operation, allowing you to do other stuff while you wait. Nice, useful.

Futures and tasklets in ndb are great (although all the yielding makes my mind bleed), but they suffer in the end from being stuck in the calling task; there are just hard limits on how much work you can do.

But what if that operation could happen in another task? That’s what the DHF Futures do.

from im_future import future

def foo(a):
@future
def bar(futurekey, b):
return a + b + 12

fut = bar(27)

Note: this uses the im-future package. Add it to your requirements.txt file like this:

im-future

So what’s going on here?

We’ve got a function bar() which is using @future as a decorator. It runs in another task, but notice it returns a result.

Also notice it has a weird first argument, futurekey. Just know that this is required for future functions and ignore it for now. I’ll come back to it later.

When we call bar(), we can pass it arguments as normal. The outer result, fut, is not the result returned in bar(), however, it’s a future object, a placeholder returned by the @future decorator, which we can interrogate later (ie: in the future) for a result, like this:

result = fut.get_result()

Now, you’ll find yourself thinking of the classic future use case, like with the ndb future, which looks like this:

def foo(a):
@future
def bar(futurekey, b):
return a + b + 12

fut = bar(27)
// do stuff result = fut.get_result()

That probably even works in trivial cases. But never do it!

If you do this, I will find you

Or, you know, use ndb tasklets in this case, but not @future.

Let’s think about why.

We’re doing something presumably worthy of throwing into a separate task; something which takes some time, while we want to get our immediate task done and quit. So why did we use a future? Yes, to get the result back later, but no, not in this task. Instead, we want to be able to get the result back later, from another task.

The trick to this is that the future object is a persistent ndb model object, called _Future. It’s already saved to the datastore when you get it back from calling bar(). If you want to get bar()’s future object back later, you need to save its key somewhere. eg:

from google.appengine.ext import ndbclass MyJob(ndb.model.Model):
jobid = ndb.StringProperty()
futurekey = ndb.KeyProperty()
def startjob(jobid, a):
@future
def dojob(futurekey, b):
return a + b + 12

jobfuture = dojob (27)
job = MyJob(jobid = jobid, futurekey = jobfuture.key)
job.put()

In this example we’re storing the key of the future object in the MyJob object, along with jobid which is presumably some meaningful identifier that we care about.

Then later, in some other task, we decide we want to know how the job is going:

def checkjob(jobid):
job = MyJob.query(MyJob.jobid == jobid).get()
jobfuture = job.futurekey.get()
if job and job.futurekey else None
if jobfuture and jobfuture.has_result():
return jobfuture.get_result()
else:
return None

So what have we achieved here? We’ve been able to break a job into an invocation, a separate task running the actual job, and then a third task coming back later looking for the result of the job.

We could make this a little more efficient by adding a result field to the job:

class MyJob(ndb.model.Model):
jobid = ndb.StringProperty()
futurekey = ndb.KeyProperty()
result = ndb.IntegerProperty()
success = ndb.BooleanProperty()
def updatejobresult(jobid):
job = MyJob.query(MyJob.jobid == jobid).get()
if job and job.result is None:
jobfuture = job.futurekey.get() if job.futurekey else None
if jobfuture and jobfuture.has_result():
try:
job.result = jobfuture.get_result()
job.success = True
except Exception, ex:
job.result = ex
job.success = False
job.put()

That’s a bit better. Notice the handling of exceptions: if dojob() raises an exception, then get_result() will in turn re-raise that exception.

But wouldn’t it be nice if we didn’t have to go calling updatejobresult() over and over?

Result Handlers

Having to poll the future object for a result is bit of a pain. Instead, you can use success and failure handlers:

from im_future import future, GetFutureAndCheckReady, \
FutureReadyForResult
def startjob(jobid, a):
def loadjob():
job = MyJob.query(MyJob.jobid == jobid).get()
if not job:
raise Exception("Job not available")
return job
def onresult(futurekey):
job = loadjob()
jobfuture = GetFutureAndCheckReady(futurekey)
try:
job.result = jobfuture.get_result()
job.success = True
except FutureReadyForResult:
raise
except Exception, ex:
job.success = False
job.result = ex
job.put()
@future(onsuccessf = onresult, onfailuref = onresult)
def dojob(futurekey, b):
return a + b + 12

jobfuture = dojob (27)
job = MyJob(jobid = jobid, futurekey = jobfuture.key)
job.put()

This handler will update the Job object with the result when it’s available.

Note that result handlers run in separate tasks from the future function, so if they raise exceptions they just retry. Thus, the logic of the handler includes raising exceptions, and forcing retries, when unexpected things happen.

eg: loadjob() raises if it can’t find the job. It indeed might fail to do this because this handler might run before the job is saved (look closely at the code to see this), or it might be saved but we’re not seeing it yet. Also, GetFutureAndCheckReady() (a helper function from the future library) will raise if the future isn’t loaded or isn’t in the right state yet. So by the time we check for the result in get_result(), we know job and jobfuture are available and in good order (hence the lack of defensive checks).

You’ll also note that in the exception handler for the get_result() call, there’s a special case for FutureReadyForResult. This is raised by get_result() if the result isn’t yet ready, and you can check for this instead of calling has_result(). Notice also it’s being re-raised, which again will force a retry.

Progress

So we can kick off a future, and actually get a result from it. We can know if it’s failed or suceeded or still underway.

But, if it’s still underway, where are we up to? It might be slow. Are we 99% done, or just 1% done?

Futures support a progress calculation, using two numbers, weight and progress. Progress as a percentage would be calculated like so:

progress% = progress * 100 / weight 

Note that the futures library itself never makes this calculation; it is left for you to do. You can set weight when you create the future, or afterwards, and it is never used by the code (it’s just there for you to read back later if you’d like to). You set the progress as shown below, just as an integer, which can be between 0 and weight, or it can be whatever you’d like.

You can report progress from a future function and retrieve it later:

def startjob(jobid, a):
@future(weight = a)
def doslowjob(futurekey, b):
jobfuture = futurekey.get()
accum = 0
for i in range(a):
# do slow thing
accum += result_of_slow_thing
# now let's report progress
jobfuture.set_localprogress(i)
return accum

jobfuture = dojob (27)
job = MyJob(jobid = jobid, futurekey = jobfuture.key)
job.put()
# this method returns a value between 0 and 1 inclusive
def getprogress(jobid):
job = MyJob.query(MyJob.jobid == jobid).get()
jobfuture = job.futurekey.get()
if job and job.futurekey else None
if jobfuture:
if jobfuture.has_result():
return 1.0
else:
return jobfuture.get_calculatedprogress() /
jobfuture.get_weight()
else:
return 0.0

Or, you can use a progress handler:

def startjob(jobid, a):
def loadjob():
job = MyJob.query(MyJob.jobid == jobid).get()
if not job:
raise Exception("Job not available")
return job
def onprogress(futurekey):
job = loadjob()
jobfuture = futurekey.get()
if jobfuture:
if jobfuture.has_result():
job.progress = 1.0
else:
job.progress = jobfuture.get_calculatedprogress() /
jobfuture.get_weight()
job.put()
else:
raise Exception("couldn't load future, forcing retry")
@future(onprogressf = onprogress, weight = a)
def doslowjob(futurekey, b):
jobfuture = futurekey.get()
accum = 0
for i in range(a):
# do slow thing
accum += result_of_slow_thing
# now let's report progress
jobfuture.set_localprogress(i)
return accum

jobfuture = dojob (27)
job = MyJob(jobid = jobid, futurekey = jobfuture.key)
job.put()

Note that while success and failure are guaranteed to be reported in a consistent fashion based on transactional logic, progress is reported and calculated without transactions and can be inaccurate. This is to minimize the impact of progress reporting on the actual work being performed.

Cancelling a future

Cancelled.

Your future could run for a while; what if you’d like to cancel it? Do it like this:

fut = futurekey.get()
if fut:
fut.cancel()

But you’ll also need to check this in a long running future function, eg:

from im_task import PermanentTaskFailure
from im_future import future
@future(onprogressf = onprogress, weight = a)
def doslowjob(futurekey, b):
jobfuture = futurekey.get()
accum = 0
for i in range(a):
try:
jobfuture.get_result()
except FutureNotReadyForResult:
pass # expected
except FutureCancelled, cancelledex:
# this includes FutureCancelled
raise PermanentTaskFailure("Future Cancelled")
except Exception, ex:
# unexpected!
raise PermanentTaskFailure(repr(ex))
# do slow thing
accum += result_of_slow_thing
# now let's report progress
jobfuture.set_localprogress(i)
return accum

Oh, note the use of the PermanentTaskFailure exception to permanently cancel a future function; this works with tasks and futures, to error out without any retries. If you raise any other exception, it’ll cause a retry.

Setting a result externally

I got your result… in… this box?

Sometimes you’d like a future to track an asynchronous process that, say, finished via a callback from somewhere else, rather than synchronously, ie: internally in the function. You can do this by raising the FutureReadyForResult exception instead of returning a result:

@future
def KickSomethingOffAndWait(futurekey):
# kick off a thing
# now just signal that we're not done
raise FutureReadyForResult()
KickSomethingOffAndWait()

Later, you can set a result:

def TheSomethingFinished(futurekey, theresult):
fut = futurekey.get()
if fut:
fut.set_success(theresult)

Other stuff

If you have a future object, you can dump out a json dictionary of stats useful for reporting, using to_dict().

You can set a name for a future using the futurename argument, and then you’ll see that name in the to_dict() info as name.

Futures are transactional. The future function will only be launched in a new task if the future object is successfully put(), so you’ll never get the situation that the future function runs but the future object doesn’t exist unless you somehow delete the object after creating it (but don’t do that). Same goes for the reporting handlers (the future object will exist).

If you need to call a future decorated function inside a transaction, you’ll likely need to make it a cross-group transaction, because the implementation of the future uses transactions. Like this:

def myfuturefunc(futurekey):
# stuff
@ndb.transactional(xg=True)
def dosomecrazythings(somekey):
obj = somekey.get()
obj.count += 1
obj.put()
f = future(myfuturefunc, queue="somequeueofmine")
f()

Note also a couple of other things in that snippet; instead of using decorator syntax I’ve directly called future() on my future function (just as you can do with task()). Also notice how I set the task queue; you can pass through any task queue settings here just as with @task, and they’ll apply to the task the future function is invoked in, and any other tasks (eg: result handlers and progress handlers). nb: this might do weird things if you set eta or countdown, must check that :-)

There’s a maxretries argument. If you just count on your task queue to do retries, then once you run out of retries, your code will never run again, so your future will never record a result (ie: fail). If you set a maxretries value lower than the number of retries on your task queue, then it’ll terminate itself when it gets past maxretries, and fail correctly.

If you look in the code you’ll see a timeoutsec argument for future(). It does nothing and may be removed, it’s a relic of earlier work. If you’d like to create a timeout for a future, try just doing this:

def myfuturething():
f = future(myfuturefunc)
futureobj = f()
futurekey = futureobj.key
@task(countdown = 600) # ten minute delay
def timeout():
futureobj2 = futurekey.get()
if futureobj2:
futureobj2.cancel() # does nothing if future is complete

But wait, what’s the hierarchical bit?

So you can see how these are distributed futures, but I haven’t discussed the hierarchical bit. That’s best left to subsequent articles; it gets kinda complex.

Here’s a teaser though:

def andthen(futurekey, level):
# do some stuff
if not noandthen():
# oh, more work? We need another future:
future(andthen, parentkey=futurekey)(level+1)
raise FutureReadyForResult()
andthen(0)

That makes a chain of connected futures. Such fun! But more about that in the next article.

--

--

Greyboi
The Infinite Machine

I make things out of bits. Great and terrible things, tiny bits.