NuConta is Nubank’s digital bank account. It’s built using the same amazing stack that powers our other products: Clojure, Datomic, and Kafka; all deployed to AWS cloud. At the team building NuConta, we’re also big fans of event sourcing.
Event sourcing is a fancy expression for an idea that’s simple at its core: instead of storing the current state of the system in a database, store each step of the story so far. Afterward, if necessary, we can always look back at that store to answer whatever questions we have or even reduce over the entire list to get some up-to-date “stateful” view.
This solves a whole host of problems that arise with distributed systems. We use a lot of microservices at Nubank, and keeping them consistent with each other is a complex problem. We found that event sourcing lets us break down things in a way that keeps this complexity at a comfortable level.
This is a story about when our event-sourced approach started going OOM, and how we solved that without compromising on consistency.
One core feature of a bank account is to keep a running score of how much money you have there. Of course, we also want to allow you to put more money in and take your money out.
Getting money in and out of Nubank involves a few hairy integrations which are interesting in their own right, but I’d like to talk about the “keep a running score” part. In this case, the score is the balance of the account.
One naive implementation of a balance service would look like this:
- The database model consists of an account entity
- This entity has a balance attribute
- Adding money means updating the balance attribute to increase it
- Remove money means updating the balance attribute to decrease it
Sounds simple, right?
Yet how would you build a bank statement on top of this? How would you debug if something goes wrong? How would you ensure you never accidentally add some amount twice, or remove it twice?
I won’t go into detail here, but an event sourcing approach solves all these problems (and some others, as well). So here’s one way to build an event-sourced balance service:
- The database model has an account entity
- It also has a deposit entity and a withdrawal entity, both belonging to an account. These entities each have two attributes: amount and date.
- Adding money means creating a new deposit
- Removing money means creating a new withdrawal
- The balance of the account on a certain date is the sum of all deposits up to that date, minus the sum of all withdrawals up to that date.
This is oversimplified but close enough. This is how I usually explain our system at talks. When we open for questions, there’s a predictable one:
“You run over the entire list every time someone asks for their balance? Isn’t that too slow?”
My answer was “it’s holding up fine so far, thanks for your concern!” up to a few months ago.
It turns out that speed wasn’t a problem, but the memory requirements ended up being one. To understand why, we need to talk a little about Datomic.
Datomic is not a traditional SQL database, but it also probably does not fit with what you think about when you read the “NoSQL” buzzword. It offers fully consistent ACID transactions, and has a bunch of superpowers right out of the box. We get easy audit trails, and we never lose any data. It’s really impressive, even after working with it for a few years.
As cool as they are, the intricacies of Datomic data model are not too important to this problem. What is relevant is a specific architectural decision: Datomic unbundles the jobs of writing, storing and querying data. In a traditional SQL server, a single machine does all three jobs. With Datomic, we can (and typically do) have three different machines or clusters, one for each job.
Storing data looks like an essential part of a database. In a sense it is — but Datomic does not solve this problem directly. Instead, it allows us to offload this to a separate service, such as DynamoDB.
The job of writing data is left to a specific process, called the transactor. All writes go through this one funnel, are serialized, processed, validated. If all goes well, our transaction is accepted and the result is persisted to our storage layer.
Because all data is in the storage layer, it is accessible from anywhere. This means we can have as many querying processes as we want, all independently searching, filtering and returning the data of interest. These read-only processes are called peers.
At Nubank, our peers are our microservice instances themselves. The same container, the same JVM that is running our application code is also running queries, loading things from DynamoDB into a huge cache in memory, preemptively loading data before we even ask it to. This means queries can be ridiculously fast, because most data you’ll need is in memory, no network trips involved.
This can also be very memory intensive. We’ve seen an instance of our balances service allocating several gigabytes in a few seconds.
So what happened is that we started seeing crashes. Machines going out of memory, stopping for minutes-long garbage collection pauses. At first, our response was to increase the available memory for these instances. At first, this worked.
When we got to 100GB instances that were hanging by a thread, we knew we could not keep this up any longer.
A task force was assembled. Profilers were the weapons of choice, and many load tests were run. Megabytes were cut, queries were optimized. There was peace for a while. It did not last, however.
The real problem rested on two basic facts about our system:
- Our event-sourced approach required us to load a lot of data from storage into the peers’ in-memory cache, where we could run the required queries.
- Any instance of our service could be called to answer any request. This means that if a customer made 4 requests for their balance in the course of navigating their app (which is typical), they could force 4 different instances to load the same data.
No instance could be so large as to hold all of the database in the memory cache — but yet all instances had to hold data for all customers active at each moment. All instances were spending lots of time and memory redoing work that some other machine had already done. All the time. For everyone.
If we could just get these machines to share some of the work, or even better, to avoid redoing unnecessary work altogether, then we’d have some hope. This meant adding a cache layer. But what do they say about those, again?
One of the hard things
I must admit I’m a bit scared of caches. I mean, I know they’re out there, I know it’s possible to make them work. I also know it’s possible to write a safe multithreaded program in assembly — but I’d rather not do that if I can avoid it.
I’ve seen cache implementations where several people would pour over the code, review it, conclude it was correct, and still we’d find invalid states in production.
One famous quote/joke about caching is attributed to Phil Karlton:
“There are only two hard things in Computer Science: cache invalidation and naming things.”
It turns out that this quote is quite precisely correct: the hard part is not caching, the hard part is cache invalidation. What if we could make a cache that never had to be invalidated?
We Clojure programmers boast about our out-of-the-box immutable data structures. We never lose anything until we don’t need it anymore, and at that moment we can just… let go of the reference, forget it, trust the GC and never look back.
Could we make a cache that worked kinda like that? Could we work around the hard thing, instead of solving it?
Think about this: a cache where any reference you held points either to nothing (a cache miss) or to something immutable, therefore valid for some point in time. When the world changes, we change the reference, not that value. The new reference should also either point to nothing or to a new, immutable, valid value.
It’s hard to trace where ideas come from, but looking back I think two sources were most influential.
Concepts, Techniques, and Models of Computer Programming is a mind-blowing book by Peter Van Roy and Seif Haridi. One of the first constructs one learns in that book is the logic variable. Logic variables can only be assigned once — they are either unbound or bound to a specific value for the rest of time (until the program ends). Logic variables remind me of futures and promises and of memoized functions.
That book also shows to use logic variables to build beautiful concurrent systems with fully deterministic behavior. It turns out that if your variables are immutable once bound and the process that binds them is deterministic, concurrency is pretty safe!
The other big inspiration was Datomic itself. Datomic stores data (on whatever storage layer we wish to use) in the form of a big tree of immutable nodes. Each immutable node is identified by a UUID. Whenever it must make a change to that tree, it can simply create new versions of the nodes that must be modified, all the way up to the root of the tree — and, finally, do a single, atomic swap to the part of the storage that holds the reference to the current root of the tree.
Now imagine a Datomic peer must run a query. It starts at the root of the tree, does some indexing logic, and decides that the data it needs will be found in the node with UUID EAE4D22F-CE79–47C4-AE5A-759A46B4D166. Now suppose it finds that node already loaded in its in-memory cache. How can it know if the node is still valid?
Of course it is valid! It is an immutable value! As long as we start at the correct reference, we can’t help but arrive at valid values. Datomic, under the hood, has exactly the kind of cache we were looking for!
Another suggestive bit of documentation from Datomic is this one:
“Datomic will always see a correct log pointer, which was placed via conditional put. If some of the tree nodes are not yet visible underneath that pointer, Datomic is consistent but partially unavailable, and will become fully available when eventually happens.”
This looks a lot like having an unbound logic variable which will be asynchronously bound by some other thread. As long as that other thread does its work correctly and deterministically, we should be safe!
So here’s what we need:
- An immutable store that maps references to some value (or to nothing)
- A way to derive the current value from the state of the database and push it to the store
- A way to get a valid reference to the latest value
Any key-value store will work fine as an immutable store. We decided to use DynamoDB, which is something we’re used to at Nubank.
We already know how to compute the results we want from the database — we’re doing that every time someone makes an HTTP request. So one simple strategy that we can use is a read-through cache: look for data in the cache; if it is missing, compute it from scratch, then write it to the cache and return.
All we’re missing now is that magic reference, which we should be able to use as a key to our immutable store. How do we get that?
Datomic has been a good model so far, let’s see if we can steal some more ideas from it. What is the valid reference for Datomic storage?
The answer is in that one special storage location that is only updated using strongly-consistent conditional-put: the key to the root node of the tree. This UUID points to the root of the latest version of the data. If you follow that reference, everything you’ll find is current. Anything else you may find in the storage layer is old garbage.
This idea of version identifiers shows up in many places as a way to uniquely identify stuff: git repositories, package managers, blockchains. They may be cryptographic hashes, or they may be names assigned by some central authority. It does not matter, as long as they name immutable values.
Datomic does have a kind of version number that can be used by application code. It is called a basis-t, a reference to a point-in-time in the database. One can share a basis-t between peers to ensure they are seeing the same version of the data. The basis-t can be used to look into the past (i.e. query the database as it was at that moment) or to wait for the future (i.e. block a thread until the local peer has received all data up to that point in time).
What if we use this basis-t number as our reference?
Well, every transaction makes the Datomic clock tick, every transaction bumps this version number. So the following would happen:
- We query Datomic and find that the current basis-t is 13. We’ll use this as part of our cache key until the current basis-t changes.
- We receive a request for the balance of account number 2042.
- Our cache key will have the form “account-number/basis-t”, so in this case, it will be “2042/13”.
- We look it up in the store and find nothing. Cache miss.
- We query the database and compute the result.
- We save the result to the store at key “2042/13” and return.
- Next, we receive a deposit for a different, unrelated account. We save it to the database. This transaction increases the basis-t to 14.
- If we now receive a new request to get the balance of account 2042, we’ll use a different cache key: “2042/14”. Cache miss again!
So if we use the basis-t as part of our cache key, we will lose our caches way too often, for no good reason. All these queries we’re running look at data for one account at a time. Losing the cache for account A because of a deposit in account B is just silly.
Can we do better?
All we have to do is to add an attribute to our account entity in Datomic. This attribute starts at zero. Every time we do anything related to that account, we increase the attribute by 1 in the same transaction. This gives us a version number for the account data, which we can use as a cache key.
Because this attribute ends up counting every event that happens for an account, we gave it a very creative name: event-counter.
The overall structure for our read-through cache looks like this:
- We receive a request for the balance of account 2042
- We query Datomic for the current event-counter value for account 2042.
- Datomic tells us the event-counter is currently 17. Our cache key will be “2042/17”
- We look up “2042/17” in the immutable store and find nothing. Cache miss.
- We query Datomic for the data we need and compute the balance.
- We save the balance to the immutable store, at key “2042/17” and return.
- Hours later, we receive a new request for the balance of account 2042.
- The current event-counter is still 17, so the cache key is still “2042/17”.
- Cache hit!
If we get a new deposit or withdrawal for account 2042, very well, we will increase the event-counter, and this will lead to a cache miss on the next request. This is inevitable, as the current balance has, indeed, changed! The important part is that this will not happen for silly reasons, as it did when we tried using the basis-t.
The cache and the database
We usually think of a cache as something that goes in front of the actual service, shielding it from requests — that’s what CDNs do, for example. This kind of cache also allows us to keep serving content even if the real service is completely down.
However, this only works well for static content. If we need dynamism and interaction and actions that go beyond whatever state we may hold on the client-side, we need to run some actual server-side application code anyway.
If that’s the kind of application we’re running, and if consistency is a strong requirement, it makes sense to get some cooperation from the server and its database, even when we’re trying to serve from cache.
Notice the difference: in a CDN, we query the cache first, then fall back to the actual server if it misses. In the scheme we described above, the first thing we do is to query the database to get the event-counter. Only then we can look for a cache, and after that fallback to the database and fetch the rest of the data.
We may cache almost every little bit of data, but the database still provides that little bit of solid ground that keeps everything consistent.
This makes sense for our use case because “availability at all costs” does our customers no good. What is the use of seeing a stale balance for their bank account, and not be able to do anything with it because the servers are down?
Instead of replacing the service, our main goal with the cache was to allow it to run using fewer resources. We achieved that: each instance of our service required 90GB of RAM, but now they need only 55GB. And even better: we are now able to scale the service horizontally if the load increases because work is effectively distributed between instances. Latency also went down for most requests, which was nice, even though it wasn’t the main goal.
With this approach, we managed to drastically reduce the amount of repeated work we did on our servers. It also allowed us to handle a larger load with smaller machines, all without compromising on consistency.
Now we just have to find a way to avoid naming things!