Efficient parallel data processing from Cloud Datastore on Google App Engine
We use automatic scaling and rely heavily on the managed services App Engine offers. Namely we use Cloud Datastore for storage, Task Queues for background processing, Stackdriver Logging for logs and BigQuery for analysis.
Over the years we have gathered a lot of data and increasingly our users need to export their data for offline analysis( think Excel or other business intelligence tools ). Initially exporting data from our platform was fast since most clients did not have that much data anyway. However with time our users have accumulated large amounts of data that they so often export.
We rely heavily on Cloud Datastore (NoSQL database) for storage. The canonical way to fetch data from the Datastore is to
compose a query and perform a series of fetches on the query.
A query like Contact.all() when executed will return all contacts. Typically we fetch and process data in batches serially, so code like below is very common
The code above works fine, but time taken to process the entities scales roughly linearly with the number of entities. This is because fetching entities is expensive. In our case we use the F1 instance class (128MB RAM and 600MHz CPU).
So on average fetching 100 entities from the Datastore takes 900ms. This means that processing 100,000 entities will take more than 1000 seconds (~ 17 minutes).
17 minutes is a rather long time to wait for an export. However this is not the longest time our clients have had to wait. Some clients export upwards of 300,000 entities, this roughly translates to a wait time of an hour. As more clients started exporting data the long wait times became unacceptable.
We started looking for ways to speed up data exports efficiently. We found out that the key bottleneck in processing data from the App Engine cloud Datastore, was that fetching data was designed to be serial in nature, as shown in the code snippet above. There was no way we could speed up exports without processing the query in parallel.
The first approach
With this in mind we decided to bypass this inherently serial design by splitting the query based on a property (ideally a datetime) over a number of ranges. For example with a query like q = Contact.all(), we can split this up on a property e.g create_date property into let’s say 4 different queries assuming that max create_date is 2018 and min create_date is 2012 e.g
with that we can run queries 1 to 4 in parallel and gain a speed boost of roughly 4 times.
However sometimes the data is not evenly distributed in the given time ranges, for example there might be more data between 2014 and 2015. This results in a skewed query. In case we have a skewed query we need to redistribute the ranges if we want maximum efficiency, such that the 4 queries process roughly the same amount of data. Processing serially 260,000 entities took 23 minutes, however with this approach using 4 parallel tasks we shaved the time to 15 minutes.
This approach worked fairly well but suffered from poorly distributed data across the parallel tasks. The way we handled redistributing the ranges was by first counting the entire query result set (not very expensive). After counting we assumed we wanted every parallel task to run an equal fraction of the entire result set.
For example if total count was 100,000 and we had 4 parallel tasks, every task should process roughly 25,000 entities. So we would count what q1 returned and if it was more or less than 25,000 we tried to change the date ranges to roughly return a count of 25,000.
This approach was not very scalable as we needed to find the split property at run time which sometimes resulted in properties that we did not have indexes for causing the query to fail. It also suffered from poor resource utilization since some parallel tasks completed much earlier than others.
The second approach
Previously we were able to parallelize processing a query by splitting it over a date range property. However this approach was not very efficient for 2 main reasons.
- Since data is unevenly spread across time, we were not able to evenly spread it over parallel tasks. This led to some parallel tasks completing way before others.
- Due to the uneven spread of data, we tried to perform a redistribution of the date ranges so that all the tasks get roughly equal amounts of data to process. Redistribution adds a lot of overhead even for simple queries hence increasing the time to takes to process a query.
The new approach relies on the fact that fetching keys from the datastore is very cheap. So let’s say we want to process a query q = Contact.all() using four parallel tasks in batches of 100 entities. We can achieve this by using the same query for all the four tasks(no splitting on a property). However instead of fetching entities we fetch keys first.
So every task will fetch 4 * batch_size keys. Task 0 will then perform a Contact.get() on keys 0–100, task 1 will perform a get on keys 101–200, 201–300 for task 3 and 301–400 for task 4. In this way we can process in parallel.
This approach is much more efficient in terms of data distribution across the parallel tasks(almost even distribution). This approach performs up to five times faster than the first approach. We were able to process 260,000 entities using 4 tasks in 8 minutes
How we run the solution in production
In production once we split a query, we run the code to process the query on different sub versions (more like different instances to guarantee parallelization. Note that when you use automatic scaling on App Engine your instances are anonymous hence not addressable).
Initially we ran all the background tasks to process a query in parallel on the same sub version. With this setup we could not guarantee true parallelization since some of the tasks were processed by the same instance. To work around this problem we deployed eight resident sub versions dedicated to processing queries in parallel.
This has enabled us to serve reports to our users up to five times faster than we used to.