Python Function Serialisation with YCCloudPickle
Say I’ve got an App Engine app managing user accounts. They have a store credit “balance”. As a promotion, I’d like to…
In the article I’ve linked above, I presented the following problem:
Say I’ve got an App Engine app managing user accounts. They have a store credit “balance”. As a promotion, I’d like to add $10 of free credit to everyone’s account.
I had a model class
Account, with an attribute
balance , and I wanted to increment every
balance by 10.
That article was talking about the idea of serialising code, and presented the decorator
@task to let me do this in background tasks. Here’s the code:
accounts, cursor, kontinue = Account.query().fetch_page(
100, start_cursor = cursor
for account in accounts:
account.balance += creditamount
This was great, it made sure we could map over (ie: visit) every entity no matter how many there were, by using App Engine tasks.
Note: If you haven’t read about
@task, have a read at the link above. You can grab a proper library providing @task here.
But this code has a really big flaw, in that it maps over the entities linearly. That’s pretty disappointing. If I had a really large amount of accounts, this would result in a really long runtime, ie: O(n). It only uses one App Engine task at a time, finishing one task and kicking off the next one until done.
I’m writing about App Engine as the Infinite Machine, ie: that I have endless computation at my disposal. And the Datastore can handle as many requests as I can throw at it in parallel. So isn’t there some way to do all this at once?
Brilliant! That’s the thing.
But there’s a catch…
The catch is that when you access the Datastore, it only presents you with the ability to access data in a linear fashion.
There are a few variants, but basically all the usable* APIs work like this:
accounts, cursor, kontinue = Account.query().fetch_page(
100, start_cursor = cursor
That code says:
- Using the
- Make a
querywith no criteria (you could add criteria here like
Account.name=="fred", but here we don’t need any), then
- fetch a page (
fetch_page) of entities matching the query,
- up to
100if you’ve got ‘em,
- starting from the
None, just start from the beginning).
And it returns 3 things:
1 — The list of up to 100
2 — A
cursor marking where we got up to,
3 — A boolean which tells us whether we’ve got more
Account entities after this batch (often called
more, but I’ve used
This is a problem if we want to go at the entities in parallel, because this API requires us to visit every page in order; we need to get a cursor for the end of one page, so that we can use it as the start of the next one.
You said usable*…
Ok, you can also access the Datastore like this:
allaccounts = 
kontinue = True
offset = 0
accounts = Account.query().fetch(amount=100, offset=offset)
offset += len(accounts)
kontinue = len(accounts) < 100
Using amount and offset sounds like just the trick for concurrent access; just let tasks use different offsets, each doing one page full.
But never do this! Have a look at the definition of the offset & amount arguments (found here):
Number of query results to skip is telling you that you are using Schlemiel The Painter’s algorithm.
Shlemiel gets a job as a street painter, painting the dotted lines down the middle of the road. On the first day he takes a can of paint out to the road and finishes 300 yards of the road. “That’s pretty good!” says his boss, “you’re a fast worker!” and pays him a kopeck.
The next day Shlemiel only gets 150 yards done. “Well, that’s not nearly as good as yesterday, but you’re still a fast worker. 150 yards is respectable,” and pays him a kopeck.
The next day Shlemiel paints 30 yards of the road. “Only 30!” shouts his boss. “That’s unacceptable! On the first day you did ten times that much work! What’s going on?”
“I can’t help it,” says Shlemiel. “Every day I get farther and farther away from the paint can!”
When you say this
you’re ok; fetch has to deal with 100 entities. But when you say this
you’re telling ndb to read through from the zeroth entity to the 999,999th entity, skipping each one, then read and return the next 100 entities.
This algorithm performs at O(N²), in cpu time maybe and Datastore reads definitely. That second thing will upset you greatly, because Datastore are $.
This caused a huge issue some years back when Google started charging per Datastore read. I wrote a series of articles about that, you can read more here:
AppEngine Tuning - Schlemiel, you're fired!
This is the concluding post in the series that begins with The Amazing Story of AppEngine and the Two Orders Of…
But the tl;dr is, limit+offset isn’t usable as a mapping paradigm in the Datastore.
So what do we do instead, intrepid reader?
Instead, we shard.
Sharding works like this:
Say we’ve got a whole chunk of records in a table in a database (or Accounts in Datastore). And say they’ve all got a random integer ID assigned to them.
Now say we’d like to break these records into ten partitions. In each partition we want roughly the same amount of records, although we don’t really care about being precise.
But what we don’t want to have to do is to actually know anything about the records themselves in order to do this; that would entail reading them. We just want to use schema information that we can know beforehand.
We can use the randomly assigned IDs to do this.
Say that the random ID is of a known size, eg: between zero and 2³²-1 inclusive.
We can think of this size as defining a key space. That key space is the space of all numbers between zero and 2³²-1 inclusive.
We can break this key space into ten smaller key spaces, using 11 more or less equally spaced numbers in the range from zero to 2³². eg:
[0, 429496729, 858993459, 1288490188, 1717986918, 2147483648, 2576980377, 3006477107, 3435973836, 3865470566, 4294967296]
[int(i*math.pow(2, 32)/10) for i in range(11)] to generate this)
Using these numbers, I can use each adjacent pair to define a smaller key space.
eg: the 5th key space (position 4 if we count from zero) includes all integers from 1717986918 to 2147483647 (including the lower bound and excluding the upper bound).
Because these random IDs are random, I can say with a straight face that if I retrieve all records from my table where 171798691 ≤ ID < 2147483648, I will get roughly one tenth of the records.
Most importantly, if I do this for every one of these key spaces, then each record will be included in exactly one of the key spaces, and the combination of every record in every key space is exactly the original set of records, with no duplicates and no missing entries.
These partitions are called shards; sharding is usually thought of in the context of sharding data storage.
Note: A key space definitely doesn’t have to be integers. Another very common space is a space of strings, which can also be successfully sharded. In general, any data type which has a strict order can be sharded.
Sharding Data Storage
In the example above, say I’ve got 10 database servers, and I’d like to spread my records across them. This is called Horizontal Scaling, or Scaling Out.
What I can do is to use each database server to store a separate partition. The partition is called a shard.
The 5th shard (4th counting from zero) stores only records with IDs from 171798691 to 2147483647. And so on with the other shards.
An ID is mapped to it’s shard using this formula:
shardnum = floor(10 x ID / 2³²)
eg: if ID is 1,900,000,000, then shardnum is 4 (counting from zero), ie: the 5th shard.
Whenever I create a new record, I can randomly assign an ID to it, then use that ID to determine the correct shard, and write it to the database server storing that shard.
Whenever I want to look up a record by its ID, I can similarly use the ID to determine the correct shard, then go straight to the correct server and query just that shard.
That’s pretty cool, and it’s an essential underpinning of technologies that horizontally scale storage.
But that’s not how we’re going to use it. Instead, we need sharding to scale data access.
Sharding Data Access
Ok, say I’m not handling my own data storage, instead I’m using a massively scalable data storage service which handles all of that for me. However, it only allows me to access data in a linear fashion, and I would like to access it concurrently. eg: say I’m using the Datastore.
I can use sharding here for concurrent access, by adding shard information to my queries.
eg: Say I have a list of User entities in the Datastore. Each User has several attributes, including a name, address, etc. It also has a
userid, a randomly allocated integer, and a boolean
active, which tells us if the User is active.
I want to visit every User that is active, and run some code to check that they’re really active (and maybe turn off the active flag if they’re not).
The query I’d use to get all the active users looks like this:
User.query(User.active == True)
I’d like to visit the Users concurrently, so I need to use sharded queries.
To shard a query, I simply add criteria to it to split it into shards.
Say I’d like to use ten shards. I can create ten separate queries to do this job:
User.query(User.active == True).filter(User.userid < 429496729)
User.query(User.active == True).filter(User.userid >= 429496729)
.filter(User.userid < 858993459)
User.query(User.active == True).filter(User.userid >= 3865470566)
Then I could use code like this:
def VisitActiveUsers(): boundaries = [0, 429496729, 858993459, 1288490188,
1717986918, 2147483648, 2576980377, 3006477107,
3435973836, 3865470566, 4294967296] for index in range(10):
query = User.query(User.active == True)
if index > 0:
query = query.filter(User.userid >= boundaries[index]
if index < 10:
query = query.filter(User.userid < boundaries[index+1] @task
... use the query to work on this shard,
... this runs in its own task VisitUsersForShard(query)
VisitUsersForShard() is called ten times, each in its own task, each with its own shard of the data. So that’s great, we’ve got concurrency!
In general, this approach lets us take advantage of an even spread across a key space, to split our entity access across that key space, and achieve concurrency where we would otherwise only have linear access.
VisitUsersForShard() going to look like though?
def VisitUsersForShard(query, cursor = None):
users, cursor, kontinue = query.fetch_page(
100, start_cursor = cursor
) for user in users:
if not user.CheckIsActive():
user.active = False
user.put() if kontinue:
Here we’re back to linear access.
In many cases that’s good enough. If we know that we want to limit our shards to 10 (or some other fixed number), and are happy with linear access within that, then this is fine.
But what if we really don’t know how much work there is to do? And we want maximum concurrency?
So I’ve split my query into N queries. Each one can be used just like the original query was being used. But it also has the same problems; linear access, we don’t know how much work there is to do.
If it’s the same problem, does it have the same solution?
Sure it does! We can break the query into shards, then break those into further shards, etc.
There’s just one problem:
We need a stop case, which is strongly related to the fact that we need to stop sharding and instead do the actual work at some point.
So when do we stop? When do we do real work?
The decision could look like this:
Will I get less than a page full of records from this query?
- Yes: just process them.
- No: shard the query.
But how do we know if there are more or less than a page of records? The only way is to go retrieve them.
If we’re going to retrieve them, we should definitely process them.
So, we can use an algorithm like this:
- Get a page full of records.
- Process them.
- Are there more?
- No: Stop
- Yes: Further shard the query *
Why the asterisk? To remember this tweak: we want to further shard the query, but we don’t want to include the records that we just processed.
So what we end up with is a tree style fanning out across the key space. That means that we’re getting an O(log N) execution time, to traverse N entities. That’s pretty cool!
Let’s look at actually coding this. The first problem we hit is that we’ve been using a hard coded set of partition boundaries, but now we’re going to need to generate them algorithmically. Like this:
def calculate_boundaries(start, end, numshards):
span = end - start
if span <= numshards:
return [start, end]
int(index * span / numshards) + start
for index in range(numshards+1)
The meat of
calculate_boundaries() is the last line. But notice the first clause; if we’re trying to shard too small a key space, we don’t bother and instead return one shard equal to the whole space. Code that calls this method will have to deal with this.
So, given this algorithmic calculation of boundaries, what’s the code look like where we really do this? Let’s rewrite the User example above:
return User.query(User.active == True) @task
def VisitUsersForShard(start, end):
# first, do one page
query = OriginalQuery().filter(
User.userid >= start, User.userid < end
) users, _, kontinue = query.fetch_page(100) lastid = 0
for user in users:
lastid = user.userid
if not user.CheckIsActive():
user.active = False
user.put() #next, if there are more records, shard them #note we're using lastid instead of start, so we
#skip the processed records if kontinue:
boundaries = calculate_boundaries(lastid+1, end, 10)
for index in len(boundaries) - 1:
) VisitUsersForShard(0, math.pow(2, 32), 10)
You can see
VisitUsersForShard() uses the algorithm we described above. Instead of passing a query around, I pass the boundaries of the range for the given shard. Then, I construct the original query, add criteria to restrict to the boundaries given, and grab a page of records. These are processed, then, if there might be more records in the shard, I shard the remaining space, and call
VisitUserForShard() recursively for each shard.
VisitUserForShard() is marked with the
@task decorator, which means that each call to it runs in a separate App Engine task.
Note also the magic that
@task provides, supplied by
yccloudpickle (see previous articles). For instance, the function
OriginalQuery() is available inside
VisitUserForShard() even though it’s an inner function and we’re running in many different tasks; this is possible because it is captured by
VisitUserForShard()’s lexical closure, and so shipped over the wire (serialised + deserialised). But you don’t have to worry about this; it just works.
So what’s been achieved?
So far I’ve shown how to use sharding to access entities in the Datastore concurrently. And using sharding inception (ok, recursion), you can break up a task of unknown size horizontally into finite chunks of work, and do them all in parallel. Which is a pretty awesome result!
What I haven’t shown you is how to solve the original problem, stated at the start of the article. That’s because our Account entity doesn’t have a nice, randomly generated numeric ID, to use for sharding.
I could add one, but wouldn’t it be nicer to have a more general solution?
What if we could use the Datastore entity’s key to accomplish this sharding?
Stay tuned for the next amazing episode, where I’ll do just that. It’ll require some serious dumpster diving for code, so bring a pair of heavy gloves and a garbage bag poncho, and we’ll see what we can find.
Note: In this article I talk about Sharding Inception. I now call this Adaptive Sharding.
All about ndb keys:
Are you the Keymaster?
In the last article I showed how you can use recursive sharding (Sharding Inception!) and the taskutil @task to visit n…
Dude I sharded (part 2):