Adventures in data structures: inventing an external, sequential key-value store

Jacob Evelyn
Building Panorama Education
8 min readSep 29, 2020

This post was co-written with Mike DiScala.

A scaling challenge

Event sourcing is at the core of our data pipeline: each night we extract data from our client systems and construct an event log describing everything that happened within the data (e.g., a student was added, attendance was taken, grades were recorded). We replay these events in our code to construct Postgres tables that our application queries to answer questions like “What is student XYZ’s name?” and “How many days of school has student XYZ missed this year?”

In event-sourcing parlance, these tables are called projections. Even though we only add one day’s worth of events each night, we re-project the entire history of events daily to ensure we can always change a projection’s interpretation of a past event — or even build new projections over our complete event stream.

When we first implemented projections there weren’t many events, and so we took the easy approach of mapping events to simple INSERT/UPDATE/DELETE statements against our Postgres tables. For example, when we see a StudentAdded event we INSERT a row with that student’s data into our students_projection table. If we later see a StudentAttributesChanged event we UPDATE that original students_projection table row to reflect the new attributes.

While simple, this approach was inefficient for a few reasons:

  1. We end up inserting data that’s later going to get overwritten (updated) or deleted.
  2. Separate SQL statements have a lot of overhead in them (network, parsing, commit to disk) that we’re paying many times.

As the number of events we were processing nightly grew (more days of data in our system, and more schools with data), these inefficiencies became bottlenecks in our projection system, and we knew we needed a better strategy.

The idea

In order to avoid both of the above inefficiencies, we decided to try computing the projection’s entire state locally in a single Heroku dyno before sending it over to the database. Once we had processed all events, we would stream the local dataset directly into our SQL database using Postgres’ COPY FROM STDIN functionality.

The trick would be doing it efficiently. We needed a local datastore with:

  1. Fast operations — quickly execute the INSERTs, UPDATEs, and DELETEs required by each event.
  2. Constant memory usage — handle datasets with unbounded sizes (some are millions of records) with Heroku’s very limited available memory.

Attempt #1: Using an embedded key-value library

Our first thought was to use an embedded key-value store. These libraries provide dictionary-like interfaces, allowing us to store key-value pairs and quickly read from random keys (fast operations). Rather than storing their data in-memory like a regular Ruby hash, however, these libraries are implemented with disk-backed data structures and do not consume significant amounts of memory (constant memory usage).

There are many high quality key-value stores available that meet these needs (e.g. RocksDB, LevelDB, LMDB)¹ — most of which are implemented in C/C++ and have existing gems to bind their lower-level code into the Ruby language. Unfortunately these gems are not widely-used within the Ruby community, as they each have gone months without updates and have not historically had a large number of contributors. This lack of activity made us nervous about relying on these bindings because projections are a core part of our application and our team does not have significant expertise in maintaining these types of non-Ruby extensions.

To get a better sense of whether this reluctance was justified, we decided to prototype an implementation of our projection code using the most actively maintained library we could find: the LMDB gem. We were happy to find that we could quickly build a working implementation of one of our projections that built up state within a local LMDB database and then streamed the final data to Postgres with a COPY FROM STDIN query.

Despite those positive signs, we found that our unit tests covering the new projections would segfault and crash the interpreter non-deterministically 😱. This was enough to discourage us from pursuing this approach any further.

Attempt #2: Designing our own data structure

After running into these obstacles with embedded key-value stores, we decided to try to implement our own pure-Ruby data structure for this use case.

As a first step in our design process, we wrote out the operations our application actually requires and realized that we did not need the ability to read random records back after they were written. Our problem is more constrained: writes need to be able to fully overwrite the data stored at a given key, but our reads will always just be a full stream of the entire data structure into the Postgres server.

We can therefore constrain our data structure’s API to the following operations:

  • put(key, row) → void
  • delete(key) → void
  • each_row → lazy enumerator returning all rows in any order (so we can stream it into Postgres with Sequel#copy_into)

For example, a series of StudentAdded, StudentAttributesChanged, and StudentRemoved events would result in the following calls:

data_structure = DataStructure.new# StudentAdded events
data_structure.put("student123", { first_name: "Jake", last_name: "Evelyn" })
data_structure.put("student789", { first_name: "Lisa", last_name: "Alagna" })
data_structure.put("student456", { first_name: "Mike", last_name: "DiScala" })
# StudentRemoved event
data_structure.delete("student789")
# StudentAttributesChanged event
data_structure.put("student123", { first_name: "Jacob", last_name: "Evelyn" })
data_structure.each_row.to_a # use `.to_a` to convert lazy enumerator to array=> [{ first_name: "Mike", last_name: "DiScala" }, { first_name: "Jacob", last_name: "Evelyn" }]

We implemented this API with a data structure based on two files:

  1. Index file: a log of all put/delete operations that have been applied to the data structure. Each log line contains:
    • a sequence number (a global counter indicating the order of all the operations)
    • the key that we modified
    • the type of operation (put vs. delete)
    • the starting byte position and number of bytes of data in the data file (for put operations only)
  2. Data file: completely unstructured — just the raw bytes stored for each call to put

Our put and delete methods are implemented with simple appends to the ends of these files. For example, the method calls above would result in:

# index file
student123,1,put,0,10
student789,2,put,10,10
student456,3,put,20,11
student789,4,delete
student123,5,put,31,11
# data file
JakeEvelynLisaAlagnaMikeDiScalaJacobEvelyn

Once all of the events have been processed and the data structure has been filled up with put and delete calls, we’re ready to stream the final data from the each_rows method into Postgres.

In this method we use the UNIX sort utility to efficiently sort the index file by key and counter (descending) and write the results to a tempfile. This sort groups all the index entries for the same key together, with the latest operation for the key coming first in the file:

student123,5,put,31,11
student123,1,put,0,10
student456,3,put,20,11
student789,4,delete
student789,2,put,10,10

We can then read the sorted tempfile one line at a time, filtering to only the rows that describe the latest operation for each key:

student123,5,put,31,11
student456,3,put,20,11
student789,4,delete

We can further ignore rows where the latest operation is a delete, since these keys have been removed from the dataset, giving us:

student123,5,put,31,11
student456,3,put,20,11

These remaining rows represent only the put operations that should appear in our final dataset. For each row we read the necessary bytes from the data file and add these onto our output enumerator.

Putting this all together, our each_row method looks something like this:

previous_key = nil
sorted_index_file_entries.lazy.map do |entry|
if entry.key == previous_key
:ignore
elsif entry.key == :delete
previous_key = entry.key
:ignore
else
previous_key = entry.key
read_bytes_from_data_file(entry.start_position, entry.n_bytes)
end
end.reject { |value| value == :ignore }

The each_row enumerator can be streamed directly into our Postgres database.

This approach has a number of nice advantages:

  1. Deleted/overwritten values are never read back into the Ruby process.
  2. Writes are implemented as simple appends to file streams and therefore are very fast. (We don’t need durability here — if the process crashes we’ll just restart the job from the beginning.)
  3. Our memory use is constant: the sort utility uses a bounded-memory external sort and the each_row method only needs to keep the previous_key and the current row in memory at once.
  4. The index file is human-readable and easy to debug (no binary encoding of numbers/operations).

The code needs to be slightly more complex to handle the real world — making sure we support UTF-8 properly, and enforcing that keys did not include characters used as separators in the index file. For some of our more complex projections we added an additional read method to this data structure that gives the full history of values for each key rather than just the latest, but the underlying architecture is the same.

Results (and what we learned)

We had five big takeaways from this project:

1. Streaming data into Postgres in bulk has big benefits

This project reduced our nightly projection runtimes by a staggering 72%. This is despite the fact that our new data structure requires a lot of local I/O, and that these projections have other costs (like downloading events) that we didn’t optimize.

Pay attention to where your application performs lots of database writes, and see if you can change them to be done in bulk.

2. Determinism is extremely valuable

Our projections are designed to be deterministic transformations of our input event streams: the same event stream will always result in the exact same data in the final Postgres table. This property allowed us to be extremely thorough when rolling out this technique across all of our projections — we compared snapshots of each projection using both our old and new code in order to confirm that our refactored code produced exactly the same results. By extensively testing the before and after behavior of our code, we were able to rewrite 20+ of our projections without introducing a single regression.

Where possible, design your system as a deterministic transformation of your input, so that you can do regression testing in this way.

3. When adding new dependencies, think about maintainability

We could have gotten a solution working using an existing embedded key-value library and worked around our issues with it. While this might have saved us some upfront development time, we likely would have had to spend more time later on in order to maintain those libraries across Ruby version upgrades and future bugs.

Remember to account for this type of maintenance burden when adding new gems to your projects.

4. UNIX utilities are awesome

We were able to greatly simplify our code by relying on the UNIX sort utility for an efficient external-sort implementation.

Don’t forget that these battle-tested programs can be leveraged from within your own code.

5. Be careful about building your own data stores

Databases and embedded data stores typically are very complex pieces of software, as they often must consider durability, file encodings, escape characters, etc.

We therefore do not recommend rolling your own if you can avoid it. This type of project likely only makes sense after you have found issues with your off-the-shelf options and if your use case does not require more complex functionality such as concurrent access and durability. Even then, be sure to allot extra time for carefully testing your implementation.

¹ We later learned about SDBM, which is a pure-Ruby key value store that ships as part of the Ruby standard library. SDBM isn’t a good fit for us here as it does not support key/value pairs larger than 1008 bytes. More info on this in an upcoming blog post!

--

--