Dude, I sharded (part 2)

Greyboi
The Infinite Machine
9 min readMay 9, 2017

In the last article I talked in depth about ndb Keys, entity groups, ancestor queries, and all that; so much that maybe we all forgot why?

Why we care about ndb Keys is that they are always present as the handle to an ndb object, and they are ordinal (albeit in an incredibly obtuse fashion). We can exploit this ordinality to allow us to adaptively shard (ie: recursively shard) over ndb entities, and so access them potentially in O(log n) time, rather than O(n).

Here’s the first part of this article:

Ok, this is a good goal in theory. But how do we manage it in practice?

Let’s grab our heavy gloves and go rummaging in Google’s dumpster, see if we can find some perfectly good code for this.

KeyRange

I am the Key Ranger!

Ok, so Google has needed exactly the same ability to access ndb Keys ordinally, for its mapreduce libraries, which also shard the ndb Key keyspace. To do this some clever person who didn’t mind getting their hands really dirty built the KeyRange class.

In the Key class you’ll see
__author__ = ‘guido@google.com (Guido van Rossum)’
… yes, that Guido van Rossum.

I suspect the clever person who wrote KeyRange is also Guido. Thanks!

KeyRange is in your AppEngine libraries at

google.appengine.ext.key_range

on my machine that’s at the path

/usr/lib/google-cloud-sdk/platform/google_appengine/google/appengine/ext/key_range/__init__.py

If you don’t want to go digging, you can see it on the web here:

Here’s the declaration:

class KeyRange(object):
"""
Represents a range of keys in the datastore.
A KeyRange object represents a key range
(key_start, include_start, key_end, include_end)
and a scan direction (KeyRange.DESC or KeyRange.ASC).
"""

KeyRange is a class that’s meant to represent a range of keys. It has start and end keys, and a direction (ascending or descending).

KeyRange is not an iterator. Rather, it can apply filter criteria to an ndb query, to restrict it to return only objects with keys inside the KeyRange. ie: it can shard the query.

It also has some methods for splitting a KeyRange into smaller ranges.

Those two things, sharding a query and splitting a range, are exactly what we need to do to recursively shard a key space. Let’s look at the detail.

Sharding a query: filter_ndb_query()

So imagine I’ve got an ndb query, and I’ve got a KeyRange for a particular shard. I can shard the query using KeyRange.filter_ndb_query().

def filter_ndb_query(self, query, filters=None):
"""Add query filter to restrict to this key range.
Args:
query: An ndb.Query instance.
filters: optional list of filters to apply to the query.
Each filter is a tuple: (<property_name_as_str>,
<query_operation_as_str>, <value>).
User filters are applied first.
Returns:
The input query restricted to this key range.
"""

That means I can shard the query like this:

# given myquery and mykeyrange
shardedquery = mykeyrange(myquery)

Simple!

Splitting a KeyRange: compute_split_points and split_range

We’ve got two methods available; something very specific to kick things off, and something more generic that can be used after that.

@classmethod
def
compute_split_points(cls, kind, count):
"""Computes a set of KeyRanges that are split points
for a kind.
Args:
kind: String with the entity kind to split.
count: Number of non-overlapping KeyRanges to generate.
Returns:
A list of KeyRange objects that are non-overlapping.
At most "count" + 1
KeyRange objects will be returned.
At least one will be returned.

This method takes a kind and a count, and returns a list of KeyRange objects.

Think of the kind as the type of object here, like “Account”. It is actually the kindn of the keys of the objects (more about ndb Keys here).

You can get the kind for an ndb query very simply, like this:

ndbquery.kind

So, given an ndb query ndbquery and a number of initial splits (say 5), we can get an initial list of KeyRange objects as follows:

initialkeyranges = KeyRange.compute_split_points(ndbquery.kind, 5)

Great! Now, we’ll need to further shard these KeyRange objects, and compute_split_points wont help us there because it works on an entire kind.

Instead, we can use split_range.

def split_range(self, batch_size=0):
"""Split this key range into a list of at most two ranges.
This method attempts to split the key range
approximately in half.
Numeric ranges are split in the middle into two equal
ranges and string ranges are split lexicographically
in the middle. If the key range is smaller than
batch_size it is left unsplit.
Note that splitting is done without knowledge of the
distribution of actual entities in the key range,
so there is no guarantee (nor
any particular reason to
believe) that the entities of the range
are evenly split.
Args:
batch_size: The maximum size of a key range that
should not be split.
Returns:
A list of one or two key ranges covering the same
space as this range.
"""

This is a method on a KeyRange instance; it returns either two KeyRange objects splitting the range, or one KeyRange object (itself).

So we can further split any of our ranges like so:

splitranges = myrange.split_range()

Simple.

Processing a Page, then skipping that Page

If you recall from earlier recursive sharding, the general approach to processing a shard is to

  • Get a page full of records.
  • Process them.
  • Are there more?
  • No: Stop
  • Yes: Further shard the query (skipping the processed records)

Processing a page is easy. We get the sharded query as above, then just grab a page and pass it off to be processed somewhere.

keys, _, more = shardedquery.fetch_page(pagesize, keys_only=True)
processthekeys(keys)

But then if there are more records, we need to shard the remaining range and process those new shards. To do this, we’ll skip the keyrange forward to after the last key retrieved, then split it in two, then process the two halves.

if more and keys:
newkeyrange = KeyRange(keys[-1], keyrange.key_end,
keyrange.direction, False, keyrange.include_end)
krlist = newkeyrange.split_range()
for kr in krlist:
MapOverRange(kr)

You can see that we make a new keyrange (newkeyrange), by constructing a new KeyRange, with fields from the old KeyRange but using the last key retrieved as the start of the range.

I’ve just noticed that I might be able to do this:
newkeyrange = keyrange.advance(keys[-1])
I’ll check this out and come back and change this article if this works.

Then, the newkeyrange is split in two, and each resultant range is processed (expect this to be recursive). Note that if split_range only returns one result (ie: doesn’t split) our algorithm will still work, but it will become linear rather than splitting tree-wise. So we are robust to split_range returning only one result, but I’d much rather it returned two!

Setting the End of a Range

If you go trawl through the implementation of KeyRange, you’ll notice that it’s not at all obvious what’s going on. The author was doing some hard stuff, and it reads like that. Thanks again probably Guido! I wont bore you with the gruesome detail here, although if you find yourself trying to shard basic datatypes, eg: strings, do go digging, there’s great utility code to be found in there.

But one non-obvious thing about KeyRange is that you don’t have to specify the start or end of the range. Not specifying either of these just means the query keeps going until you run out of results in that direction; no criteria.

This is all well and good, except for these things:

  • compute_split_points returns some KeyRange objects without start or end (specifically the first and last ones)
  • split_range doesn’t work (well, just returns one KeyRange, unsplit), if either start or end is missing.

I’d really like split_range to return two KeyRange objects. So I really need to always pass it ranges with both start and end specified.

It turns out that if we are working on a KeyRange with no start, this doesn’t matter, because after we process a page of results, we create newkeyrange and this always has a specified start (keys[-1]).

But the end of the range can be missing. This needs fixing.

There’s a very curious static method on KeyRange called guess_end_key:

@staticmethoddef guess_end_key(kind, key_start, 
probe_count=30, split_rate=5):
"""Guess the end of a key range with a binary search of
probe queries.

This curious method does some random probing to make/get a key (?) which is suitable for the end of a range, given the start of the range and a kind. I haven’t tried to understand it; it seems to do what it says on the tin, and for now that’ll do.

Using this, I’ve been able to write this utility method:

def _fixkeyend(keyrange, kind):
if keyrange.key_start and not keyrange.key_end:
endkey = KeyRange.guess_end_key(kind, keyrange.key_start)
if endkey and endkey > keyrange.key_start:
keyrange.key_end = endkey

which means at the start of a method for processing a shard, I can “fix” the range, putting an end key in place before splitting if necessary:

def MapOverRange(keyrange, **kwargs):
_fixkeyend(keyrange, kind)
# keyrange should now have a valid end

Putting it all together with ndbshardedpagemap

So, with all these pieces, and the@task decorator for running functions in separate AppEngine tasks, we can finally put together a function for recursively sharded mapping over ndb entities.

def ndbshardedpagemap(
pagemapf=None, ndbquery=None, initialshards = 10,
pagesize = 100, **taskkwargs
):
@task(**taskkwargs)
def MapOverRange(keyrange, **kwargs): #3
_fixkeyend(keyrange, kind) #4 filteredquery = keyrange.filter_ndb_query(ndbquery) #5 keys, _, more = filteredquery.fetch_page(
pagesize, keys_only=True
) #6
if pagemapf:
pagemapf(keys) #7

if more and keys:
newkeyrange = KeyRange(
keys[-1], keyrange.key_end,
keyrange.direction, False,
keyrange.include_end
) #8
krlist = newkeyrange.split_range() #9 for kr in krlist: #10
MapOverRange(kr)
kind = ndbquery.kind krlist = KeyRange.compute_split_points(kind, initialshards) #1 for kr in krlist: MapOverRange(kr) #2

So how’s this work? Let’s talk it through.

The method takes

  • pagemapf: a function which expects a list of ndb keys as an argument. This should process a page at a time of results.
  • ndbquery: a valid ndb query, which defines the entities you wish to map over
  • initialshards: the number of shards to use in the call to compute_split_points
  • pagesize: the size of a page of results (for fetching from the query and passing to pagemapf)

It works as follows:

#1: The query space (all entities for the kind of the query) is split using compute_split_points into a list of KeyRange objects. Each defines a top level shard.

#2: MapOverRange is called for each KeyRange, to process each shard.

#3: MapOverRange is a recursive function, implementing the algorithm discussed previously (basically, get a page of results, process them, split any remaining range).

It uses the @task decorator to run itself in a separate AppEngine task. It works as follows:

#4: The KeyRange is fixed to add a concrete end key if it doesn’t have one

#5: The sharded query (called filteredquery here) is generated using filter_ndb_query

#6: A page of results is retrieved from the sharded query…

#7: … and passed to pagemapf to process

#8: We have more results in the query. Here, we generate a new KeyRange which skips over the retrieved page

#9: We then split the new range into two ranges

#10: And finally we recursively call MapOverRange to process these two sub-ranges.

That’s pretty nifty really.

Check out the code here:

Using ndbshardedpagemap

Remember the problem of adding credit to all the accounts? You can do it like this:

def AddCreditToAccounts(creditamount = 10):    def AddCreditToAccounts(keys):
accounts = ndb.get_multi(keys)
for account in accounts:
account.balance += creditamount
ndb.put_multi(accounts)
ndbshardedpagemap(AddCreditToAccounts, Account.query())

That’s it. The most complex bit is AddCreditToAccounts, which loads all the Accounts entities for the keys, increments their balances, and then saves them again.

Note: Because we’re using sharding and multiple background tasks, incrementing accounts this way isn’t really safe. Why? Because AddCreditToAccounts could be called multiple times for the same set of keys due to task retries. In practice you’ll need to make sure your pagemapf function is idempotent.

Where to next?

This is a damned fine result, and ndbshardedpagemap is pretty useful. But there’s a long way to go from here.

--

--

Greyboi
The Infinite Machine

I make things out of bits. Great and terrible things, tiny bits.