Where Does The Work Belong In The Data Divide?

What makes remote execution hard to get right, and a possible optimization for microservice architectures
Luis De Pombo, Alejandro Guillen, David Ellis
In backend applications, separation of data storage and compute is the gold standard. Whether you’re using Go on AWS Lambda, or Ruby-on-Rails on Heroku, or Perl and C++ CGI scripts on your own server in a rack you rented at Peak Hosting, your application layer doesn’t maintain any responsibility for the storage of the data it is working with. Instead, you have a database cluster, whether it’s DynamoDB, or Cassandra, or PostgreSQL. One of these applications is deployed to be in charge your data.
Your application is “stateless” and your databases “only” store and retrieve data. You don’t write distributed compute code because it’s hard to get right and will make things more difficult to follow.
Except regardless of any of the possible combinations of application layer and database described, you already are writing distributed compute for any but the most trivial of cases. We’ll use Javascript and SQL as the compute and query languages for the following examples as they are commonly understood (any web development requires familiarity with Javascript, and most databases use a dialect or derivative of SQL).
Suppose you were writing some sort of trip-managing application and you wanted to show a user a list of their own trips ordered from most recent to least. It is highly unlikely that you wrote it like this:
It’s more likely you would write something like:
There’s actually more lines of code in the second form than the first, but it looks “right” in a way the first doesn’t. They both accomplish the exact same goal, but why is the first one wrong and the second one right?
The primary issue is data transfer. The first one gets every trip record in the database and loads it into your application, then it filters out the data it doesn’t need, then it transforms the singular row into a nested object and finally sorts them so the latest time is on top. The second one tells the database to filter the trips and provide them in latest-first ordering, and then you transform them into nested objects for consumption in your application.
You have now pushed the filter and sorting computation to the database instead of doing it in your application. You have written distributed compute code and it wasn’t scary! You have merged computation and data storage into the same layer!
Without any other information, this is also causing the database to scan the entire table and apply the filter to each row. It would only be able to immmediately rule out data if you also instructed the database to construct an index on the userId
column (B-Tree or Hash would work here). Conceptually, that index would be an object of arrays, where the keys are the various values of userId
and the values are the row numbers of each record, and the database is mutating these indexes on each change to the table in question, and is more computational work pushed to the database with the intent of minimizing the total amount of work assuming the table is queried more often than it is mutated divided by the size of the table, so it very quickly becomes worthwhile even for infrequently-queried tables if the table size is massive.
Now, suppose you want to show the user some notable locations near their destination while they’re on their trip. You have a destination lat and lng, and you also have your notable places in a table. You only want to show the 4 closest locations as you don’t want to overwhelm the user, but unfortunately that isn’t a maximum, the UX designer wants exactly four to fit into the square set aside for it in the client-side application. Since the destination may end up being far away from the locations in your table, you have to compute the haversine distance between the user and the locations, sort by that, and then slice the first four from the list.
Most SQL dialects let you do math to create virtual columns that you can select on and then sort on the output of these virtual columns. Some require you to use inner select statements if you want to do sorting, some do not. But if you simply translated the degrees to radians and haversine function into SQL, besides being more verbose as most do not allow defining functions in the select statement (so everywhere degsToRads
is called in the example above, you will have to inline the statements) it actually doesn't reduce the amount of computation that needs to be done; the database will have to perform the haversine calculation on all rows in the database to then sort and let you select the top(4)
from.
Since the total amount of data that is being transmitted is reduced, this may seem like a win, but if you are using a classic vertically-scaled database, this computation needing to be applied to the entire table on each query may put a lot of load on the database that you can’t afford, while your horizontally-scalable application layer could be easily increased to handle the increased traffic. Here is a case where reducing the amount of data transferred to the application layer can decrease reliability of the application. (If you have a collection of read-followers attached to the read/write-main database, you can mitigate that, at the cost of eventual consistency on the data in question, but it’s still quite inefficient.)
Cleverness is necessary to reduce the amount of data transferred and also not overwhelm the server. With some probing queries on the application side, we can constrain the query while still making sure we return 4 results for the UI.
This approach probes mercator-projected squares on the Earth surrounding the destination coordinates for notable locations, where, if there are B-Tree indexes on the lat
and lng
columns, will quickly reduce the set of places to consider to only those that are likely candidates for the list, and returns how many were found. If the list is too short, it quadruples the search area (by doubling the bounding box sides) and searches again until it has at least 4 returned records.
Once the probing is complete, it performs the actual query to get the records, performs the mutation, the sorting of this smaller set, and then slicing out the top 4 entries.
Ideally, this logic could be pushed to the database, as well, to eliminate the back-and-forth of request and response during probing, but it is often very difficult, if possible at all, to express this sort of logic in SQL syntaxes. Difficult enough that even when a database does support it, it is often through an extension written in C and loaded into the database, such as PostGIS for PostgreSQL. That’s a very high barrier to entry for adding efficient computation local to the data in question.
Considerations on data and compute affect both the application layer and the database layer, and the trade-offs can be hard to quantify, as the programming language and model on both sides are very different, so how long rewriting the computational logic from application side to database side or vice-versa will take or how efficient it will be on either side is difficult to predict ahead of time. It’s also hard to see when the volume of data transfer to the application layer is the bottleneck and the compute should be pushed into the database layer in the first place, and even harder to tell when that compute will produce a bottleneck in the database itself.
The answer to these questions is often down to developer intuition built up through experience running head-first into each of these bottlenecks in each of these scenarios, but that intuition will always be fuzzy and will bias towards code clarity over ideal performance with complicated logic, and rightly so. Engineering trade-offs, but does it have to be that way?
Let’s take our application layer / database layer setup described above, and make two tweaks:
- The database layer can be scaled horizontally for any given query like our application layer.
- The database layer can execute the exact same code that our application layer can execute with the same performance characteristics, if desired.
This lets us side-step the “too much compute on the database layer negatively impacts other queries” problem and sidestep the “how do I rewrite this for the database and how well will it perform” when we don’t need to rewrite at all. Then the main question becomes “where do I put this computation to minimize the total latency to respond to the user?”
Minimizing latency when dealing with data stored in a cluster is the simplest problem to be abstracted and doable with current languages, but often very awkward and difficult to do. In a situation where a request comes in to node N1 but the relevant data is located on node N2, there are two possible choices to make:
- Request the relevant data from N2 to N1, then execute the code using the relevant data from the request and return the result.
- Grab the relevant data from the request to N1 and push it and the code to execute to N2, then execute the code and push the result from N2 back to N1.
Option 1 is taken the vast majority of the time because your distributed data is often in a distinct cluster (a database) from your request/response application layer, and doing the compute within the database is made difficult by needing to be written in another language from your main application language, requiring a context switch, and often made further difficult by this language primarily being a query language, likely SQL, and being awkward or impossible to represent the computation desired, as described above.
But if that cost to you was zero, because your programming language could determine when one path or the other was the optimal choice, what would the actual switching point be? Let’s write these two options out as equations on the total response latency:
Toption1 = Tdata + Texecute
Toption2 = Tclosure + Texecute + Tresult
Where Tdata
is the time to transmit the raw data from N2 to N1, Tclosure
is the time to transmit the relevant metadata for the execution from N1 to N2, Tresult
is the time to transmit the output result from N2 to N1, and Texecute
is the time required to perform the relevant computation, which we assumed to be identical earlier.
Therefore, choose Option 2 when:
Tclosure + Tresult < Tdata
When the total data to compute on is more than the metadata from the request and the size out the output payload, then it makes sense to push the compute to the remote machine, otherwise it makes sense to pull the data from that node and perform the computation locally.
A handshaking protocol between the two nodes while negotating whether or not the data or the closure will be transmitted can be performed to make the sizes of those two payloads known and the decision automatic (assuming symmetric bandwidth between the two nodes). But wait, what about Tresult
? You have to perform the computation to know which choice is correct, right?
While true in a general case, there are a few ways that the uncertainty can be reduced. First, in a statically-typed language, types that are constant in size (unitary types like int
, float
, and bool
, or compount struct types composed only of those types or other struct types similarly composed) have a compile-time known size, and the exact sizing of Tresult
can be known ahead of time. If a variable-length type is included, such as Array
or string
, then prediction becomes more difficult without also tracing the origin of the data in question. If Tdata
was of type Array<T>
and Tresult
is of type Array<U>
, and the type was due to something like data.filter(filterFn).map(mapFn)
, then it is known that the length of the output Array<U>
is less than or equal to the Array<T>
. Then if the len(Array<T>) * sizeof(U) < len(Array<T>) * sizeof(T) + sizeof(closureData)
, you are similarly guaranteed that the computation should be pushed to the remote end.
There will still be cases where explicit determination of which node to perform the computation is not possible. But with some reasonable assumptions, we can create an approximate solution that can produce a solution that will choose the correct execution location most of the time. The input data is from the closure and remote data pieces, and so the entropy, E
, of the output result should not exceed that. Of course it is possible to create highly compressible data that still fits in that definition, but assuming the compressibility is approximately the same and the bandwidth is symmetric between the nodes, we don't have to worry about entropy and just consider time. Therefore, Tresult <= Tdata + Tclosure
, with the total range for Tresult
being between [0..Tdata + Tclosure]
. The exact distribution of times for Tresult
depends on the use-case, but is likely a normal-ish distribution with a low average and a long tail when considered in bulk across many different queries for many different remote executions, due to the Central Limit Theorem. Therefore if we instead go with a conservative linear distribution, we overestimate the expected entropy of the output result somewhat while also simplifying the math and define Tresult = (Tdata + Tclosure) / 2
, we can substitute this back into our original inequality and get:
Tclosure + (Tdata + Tclosure) / 2 < Tdata
(3/2) * Tclosure < (1/2) * Tdata
3 * Tclosure < Tdata
This approximation will likely bias towards local execution when remote execution would have been the better bet, but generally when the stakes are low and the impact of one choice or the other has a low variance in the output latencies. And for static languages we can produce exact results in many circumstances (first by the type system, and later with a more intelligent compiler proving relations between Tresult
and Tdata
). All of this analysis depends on being able to ship identically-behaving code between the nodes, and becomes considerably more difficult if the code in question must be rewritten in another language for the data storage side of things, hence why it has tended to be done on an adhoc, "squeaky wheel gets the grease" manner, and only when a developer has a "hunch" that it will pay off.
Now, there are no databases that run the same language as your application logic and have the same runtime characteristics (nope, not even MongoDB, it uses a different Javascript engine than Node.js and even if it didn’t, the exposed standard library and implementation of the implicit event loop would be different and so subject to compatibility and timing issues between the two). So, what can this analysis be useful for?
- A cluster of nodes in a consistent or rendezvous hash ring sharing (probably) ephemeral data with each other.
- Two or more microservices written in the same language with one querying the other for work to handle.
The first situation is rarer (though we have seen it in production systems before), while the second is very common in microservice architectures. When the outermost “gateway” microservice receives a request from a client, the request can often trigger a chain of microservice requests throughout the backend, with varying levels of depth and breadth in the query. Since microservice topologies tend to match organizational topologies, rather than a performance-oriented architecture, there can be a lot of “fat” in the request handling, with much of that precisely being the wasted IO as too much data is passed up from the bottom and then filtered and mapped into higher precision forms as it trickles back up to the original gateway service.
There are two models for how microservice-oriented remote execution can work, either sending the function to execute over the wire, or sending a reference to the function over the wire and both services already having a copy of that function locally.
The first model is simpler to understand and requires less centralization to start gaining benefits. Microservice A makes a request to Microservice B for some data. Depending on the protocol used, metadata may also be transmitted that Microservice A would like to potentially upgrade the query to a remote execution and instead receive back those results instead of the raw data. For an HTTPS-based REST service, this can simply be a special x-
extension header that provides the size in bytes of the function to be transmitted and the extra arguments to be provided to it, if any.
If Service B does not recognize that header, it will ignore it and the request can continue normally. If it does recognize it, and it determines that the response payload is less than 3 times that payload, it can also behave as normal and just return the data. But if the data is large enough, it could instead respond with a 202 Accepted
response with the body being a URL to push the payload to and get the result of that execution back.
The implementation of this negotiation process can vary. If Service B is able to cheaply calculate the size of the data for the comparison without actually performing the query, then whichever node handles the remote execution URL request can do the actual underlying query and then execute the provided code and return its output. If that is not possible, then the first Service B node will have to query the data and then make the decision, and the URL needs to have the property of having that data available on the other side, which can be done by sticky routing back to the exact same node and accessing the data from an internal cache.
It is tempting to simply re-request the data on the second node, or put the data into a cache layer to pull back out on the second node, but those are basically transmitting the data around that we are trying to avoid transmitting to improve the response times in the first place, and are counterproductive.
The “sticky routing” isn’t nearly as difficult to do as you might imagine, though. The Connection and Keep-Alive headers in HTTP/1.1 are perfect for this. You can simply mark the initial request as a keep-alive connection and then the follow-up request to the same service should continue to use the same socket on the same process, so an in-process LRU cache with a randomly-generated key for the data should be enough to handle this.
The other mechanism of both services having the remote execution function shared in a common library between the two further improves the possible efficiency gains at the cost of greater coupling: Service A would only need to provide Service B with the name of the function that it wants to execute along with any extra arguments (beyond the data in question), so the size of the closure passed to it is very tiny and there are many more situations where total response times go down by performing the execution remotely. But now any time you want to change that function, you actually need to create a new function name in the common library, then get the services to have the newer version of that function and then you can delete the older function from the list.
If the upgrade/fallback negotiation mechanism just silently ignores unknown function names this isn’t too painful of trade-off as you won’t ever fail requests, but will have a performance regression until Service B upgrades the common library to have access to the new possibly remote function call, and this approach is easier for languages that don’t eval
new source code that well (or at all) since they can be included at compile time.
In an ideal situation, such a mechanism could cut through multiple layers of microservices, pushing all of the compute to the lowest-level microservice and simply streaming the results back up to the higher level with near-zero additional latency. Service A gets a request for data, makes a request to Service B for the data it needs and the function to transform it to the output to return to the user, which it starts streaming back as soon as it receives the first byte from Service B, while Service B does the exact same thing to the “Database API” service, which could be colocated with the database read nodes (or at least on the same rack as one) making the actual database query near zero latency, as well. Then this Database API service is actually running a remote execution function that took as an argument another remote execution function, and it runs them both with zero data transmission latency at the same time. Intra-cloud network bandwidth is reduced, the response time converges to almost match the cpu time, and the tug-of-war over where application-specific logic lives no longer needs to be fought, at least on the traditional lines of team expertise versus architectural performance needs.
Finally, “Service A” doesn’t even have to live in the datacenter. It could be your web or mobile app pushing its own compute needs to the gateway service which then handles it for them, or not. It is definitely trickier, though, as web and mobile apps often use a different programming language from the backend, or have wildly different compute performances, and often have an asymmetric bandwidth configuration, so a much more complicated model would be required. But considering the “straw” through which data is flowing into most consumer devices, it could possibly be even more impactful than the optimizations within the server, and such a solution could open up this up for heterogenous backend services (Node-based API gateway to Python-based ML service to Go-based Database API service), too.
If you found this interesting and would like to discuss more, you can email us at hello@alantechnologies.com.