Rethinking the Foundry job orchestration back end: From CRUD to event-sourcing
In his talk, Microservices: A retrospective at Reversim 2019, Tomer Gabel provided an overview of the benefits of event-based software design over the canonical CRUD choice. Tomer advised to study the CAP theorem in this context: of course it also applies to event-sourced designs in which the hard distributed systems problems won’t magically disappear.
Inspired by his talk, I put together this blog post to reflect on my experience of rewriting the CRUD data store of Palantir Foundry’s job orchestration system in terms of event sourcing and memory images. I’ll first describe the role of the orchestration system in Foundry’s architecture, explain why we hit the limits of the CRUD storage backend, and then illustrate the event-sourced re-implementation in terms of a concrete minimal end-to-end example in Java. The focus of the example is on the consistency problems encountered when maintaining a graph data structure. Note that this post won’t give much background on event-sourcing or CQRS, but you will easily find a ton of material on the internet.
Warning: the sample implementation is not optimized for anything. It’s meant purely as educational material to highlight the conceptual differences and similarities between the CRUD and the event-sourced implementation.
Job orchestration in Foundry
Our customers employ Palantir Foundry as the backbone for data-driven decision-making; however, for the narrow purpose of this blog post, let us focus on Foundry as a platform for authoring and executing data transformations. For example, the code listing below shows a PySpark data transformation that computes a join between a dataset of temperature sensor readings and corresponding weather station locations in order to produce a new dataset called temperature_by_region.
More formally, each data transformation is a function between a set of input datasets and output datasets. Executing the transformation produces the output datasets as a result of the transformation function applied to the input datasets. We call the specification of such a transformation a job specification (or job, for short); it comprises names of input and output datasets (e.g., Parquet files in S3) as well as the source code (e.g., a PySpark program) of the transformation. The compute_temperature_by_region has two inputs dataset (stations and temperatures) and one output dataset (temperature_by_region).
If a job JOB1 declares inputs that are the output of another job JOB0, then we say that JOB1 depends on JOB0: JOB0 needs to execute before JOB1 in order to produce JOB1’s input. The set of all job specifications spans a bipartite graph in which nodes are job specifications (turquoise) and datasets (purple), and edges are job inputs and outputs. The diagram on the left shows an example of such a graph; it’s clear that we need to compute the temperature by region dataset with JOB0 before running JOB1.
The job orchestration problem is as follows: given a target output dataset, compute a partially ordered set of jobs required to produce it. We call this set the orchestration plan. Note that a plan exists only if the relevant subgraph is acyclic.
There is a straight-forwarded algorithm for finding orchestration plans: starting from the target output dataset, traverse the graph to collect all transitive job dependencies; the edges encountered on those paths define the partial order. We found that the semantics of job orchestration are much simpler for users to understand if we impose two additional constraints on the job graph:
- The full job spec graph should be acyclic. This implies that all subgraphs are acyclic and thus that there exists an orchestration plan for every dataset.
- Every output dataset must be produced by at most one job. This constraint implies that plans are deterministic.
Together, acyclicity and determinism imply that there exists exactly one orchestration plan for every dataset. In plain English, we can use the job dependencies to derive an execution order of the data transformations that produce a given target output dataset. (… Not rocket science… )
CRUD orchestration service
Foundry users typically interact with Foundry’s orchestration system by authoring data transformations in a Git-backed Web IDE; see diagram on the left. A pre-commit hook creates a job specification for each transformation and stores it in an orchestration service through a simple CRUD API.
The pre-commit hook extracts the dataset names in the Input/Output annotations (see code listing above) to determine the dependencies between different jobs. The orchestration service stores all job specifications and provides APIs for executing orchestration plans for output datasets by schedule or user request.
Our first implementation of the orchestration service used a simple passthrough from its CRUD API to a CRUD database: we stored the graph of job specifications as adjacency lists in Cassandra. The diagram on the left illustrates this model. While this model was a simple and effective solution for the basic CRUD portion of the orchestration service, it had two major shortcomings:
- In order to find a plan for a target dataset, we need to traverse the job graph to find all transitive dependencies. Since the graph is stored by adjacency lists, this procedure requires O(N) database lookups where N is the length of the target’s dependency chain. The orchestration service was no longer able to provide interactive performance when some of our customers reached hundred thousands of datasets and transformations.
- Similarly, the acyclicity check requires traversing a large portion of the graph. As the number of job updates grew, we eventually turned off the proactive acyclicity check (thus allowing cycles in the graph) and only checked that subgraphs were acyclic when computing and executing plans. (There are much fewer job executions than job modifications.) This model is undesirable since we prefer to fail fast and inform users that a given transformation is illegal when they write the transformation rather than when they execute it.
Event-sourced orchestration service
To solve these two shortcomings, we rewrote the orchestration service storage layer in terms of event-sourcing and memory images of the job specification graph; see diagram on the left. Instead of maintaining the always-up-to-date graph adjacency lists in a database, we store a linearized sequence of graph mutation events.
To serve reads against the graph, we maintain eventually-consistent in-memory copies of the graph; of course, graph traversal is much faster against in-memory graphs than against database-backed graphs. (You’ll notice an interesting limitation of the event-sourced variant here: what if the graph no longer fits in memory?)
The orchestration service checks the graph for acyclicity and determinism before appending mutation events to the event log. The crux of this problem is consistency, both in the CRUD and in the event-sourced implementation. In the CRUD model we guarantee atomic updates (or, in other words, we avoid phantom reads) by using AtlasDb as a MVCC database: we can check constraints and perform graph updates transactionally. In the event-sourced model, we guarantee atomic updates via a dense linear order of mutation events: we check the constraints against a graph at version N and insert the mutation event at version N+1. The following sections describe this model in much more detail.
Let’s now look at a concrete sample implementation of the CRUD and the event-sourced job orchestration service. At the heart of the orchestration service is a simple graph database API:
Users can interact with the GraphDb API to create graphs by adding nodes and edges. The API mentions a couple of constraints that the implementations will have to maintain: a user can’t add existing nodes or redundant edges, and the managed graph must be acyclic.
Although this dummy implementation runs in a single JVM, the imagined scenario is a distributed system (e.g., a Web service) with multiple concurrent users for the read and the write code paths. In the case of Foundry, both the CRUD and the event-sourced implementation support running the job orchestration service in a highly-available configuration on multiple nodes. In particular, we need to provide consistency guarantees when multiple users access the graph concurrently.
CRUDding it up
The CRUD implementation uses a GraphStore to persist graph nodes and edges and a DistributedLock to coordinate mutations between different nodes of the service. The store API is designed so it can be readily implemented on top of a Cassandra-like data model (or, by extension, any relational database). Note: In order to simplify the presentation in this example, the store does not provide any transaction or MVCC guarantees and the concurrency concern is instead implemented with a global distributed lock.
The CrudGraphDb implementation is simple: before storing a mutation (i.e., adding a node or an edge), we check whether the relevant API constraints are satisfied by the mutation. Since the GraphStore database does not support transactions, we simply wrap every read/write code path in a distributed read-write lock (think Zookeeper). This guarantees correctness in the case of concurrent requests.
A standard defect of the CRUD approach is that graph re-hydration (i.e., reading the graph from the database) is expensive: since we cannot maintain the full graph incrementally, our only hope for checking the acyclic constraint is to read a full copy of the graph from the database and then check whether it’s acyclic; we then throw away the graph and read it again when the next edge gets added. Of course we could try to be a bit smarter here: for example, if the backing database supports versioning, then we could check whether the graph was updated since we last read it, and use a cached graph in case it was not. (All of this has to happen under the lock.)
One may wonder if we could avoid the distributed lock by pushing the constraint checking into a transactional backing store. The answer is (usually) “No, we can’t”: The expressive power of typical database backends (including key-value stores, Cassandra-like models, and even SQL databases) does not suffice to check non-local constraints such as graph connectivity or acyclicity. SQL databases can indeed maintain local-ish constraints such as non-null, primary key, or join integrity, but I suspect that most real-life use cases will have constraints that are more complex than this.
Event sourcing: a broken attempt
The first attempt at an event-sourced implementation uses an EventStore and a MemoryImage (aka projection or aggregate, see also Fowler:MemoryImage), but no distributed lock. The idea is that we append mutation events like NodeAdded and EdgeAdded to the event store, and that the memory image subscribes to to the stream of events and maintains the current graph as an in-memory, for instance as linked Java objects. Since mutation events are immutable, we never have to look back in time. The big benefit over the CRUD approach is that we only pay the incremental maintenance cost rather than having to re-hydrate the full graph from the store for every mutation.
The implementation looks simple: we check the constraints (the same ones as in the CrudGraphDb implementation) against the current in-memory graph, and if the constraints are satisfied we emit the corresponding event to the event store. The MemoryImage itself simply subscribes to new events and then updates the graph when it gets notified of a new event:
Sadly, this implementation is broken since the memory image graph can change under us while we check the constraints. In database parlance, BrokenEventGraphDb suffers from phantom reads. This is extra disappointing as we’ve already tried to be super smart: events in the event store are ordered by sequence number, and the store verifies that we insert events in the right order: 0, 1, 2, 3, etc.
Fixing the race condition
One way to fix the race condition is to use optimistic concurrency control: in EventGraphDb we check the constraints against a graph at a known, fixed version, and then emit the mutation event at exactly the next sequence number. If the event store rejects the event, then we know that someone else has changed the graph under us by inserting an event with a conflicting sequence number. We can then either give up or try again with a hopefully more up-to-date memory image:
This simple example demonstrates the structural similarities to concurrency between the CRUD and the event-sourced approaches: in both cases, we need to perform constraint checking and state mutation atomically, for instance by linearizing state mutations. In the CRUD implementation, we chose to linearize mutations with a distributed lock; in the event-sourced implementation, we chose to linearize mutations with a dense, ordered sequence of events. Note that this difference is by choice and is not a principled difference between the CRUD and the event-sourced styles: for instance, we could linearize the mutations with pessimistic locks in the event-sourced implementation.
The optimistic concurrency approach chosen here exhibits the typical performance penalty when N users perform concurrent mutations: N-1 out of N concurrent mutation attempts fail because of sequence number mismatches; notably, these N-1 attempts have already checked the graph constraints, thus wasting considerable compute resources. A better strategy would likely be to linearize the mutations: locally via a synchronized queue, or in a distributed system via leader election… just like in the CRUD case. The lesson here is that changing the paradigm from CRUD to event-sourced does not magically resolve the fundamental coordination and consistency problems in distributed systems.
Like before, could we avoid the concurrency control in the application layer altogether by pushing the constraints into the backing store? Unfortunately, the “No” answer is at least as emphatic in the event-sourced world because the backing databases (MySQL, Kafka, Kinesis, etc.) cannot express complex constraints such as graph connectivity or acyclicity.
The MemoryImage class supports snapshotting: it periodically stores snapshots of the current graph to a simple key-value backing store. To re-hydrate a memory image (for instance when starting a new node or after pruning caches to free memory), we can then load the latest snapshot and apply only the subset of events that are newer (by sequence number) than the snapshot. The snapshotting approach trades memory against time: the more snapshots we store, the more more memory we consume, but the faster (aspirationally…) we can re-hydrate a memory image that is up-to-date with the events. In our experience, keeping start-up time low is critical in continuously-deployed systems; for instance, a blue-green deployment strategy becomes untenable when nodes require hours of start-up time before becoming available.
So far, we have assumed that the event store has unlimited retention, i.e., that it never deletes events, and that the snapshot store is merely an optimization. In pathological cases, this implies a linear memory overhead compared the CRUD world; for instance, the sequence
addNode(1), removeNode(1), addNode(1), removeNode(1), ... yields a graph with constant size, but the event store grows linearly.
I am aware of two classes of solutions to this problem: first, we can prune events older than N from the event store if all consumers (i.e., MemoryImage and friends) have a handle on a durable snapshot >N. Second, we could inspect events and delete redundant or idempotent ones via domain-specific logic: for instance, if subsequent events add and remove the same node, then we can prune redundant “earlier” events and just keep the latest one.
Of course, both of these approach invalidate some of the invariants and benefits of the “pure” event-sourcing approach in favor of a smaller memory footprint: we no longer have a complete audit trail, cannot go back in time, and—maybe most importantly—we materialize our business logic bugs into snapshots and delete the event information we would likely need to fix the state.
This concludes my blog post on job orchestration in Foundry and the relevance of CRUD and event-sourcing in Foundry’s architecture. The main takeaway is that many of the hard distributed systems problems, in particular consistency, don’t magically disappear in the event-sourced model. Like most things in computer science and software engineering, the choice between CRUD and event-sourced model is a question of tradeoffs.
I hope you find the code samples and explanations instructive. I would love to learn from your experience with systems architecture, CRUD, or event-sourcing and am happy to discuss on GitHub or Twitter, or at the next conference. Also, please do point out the bugs in my sample code.
Robert has shaped the engineering culture and architecture behind Palantir Foundry since day one of the data analysis platform. He enjoys technical writing as a means for education and knowledge dissemination.