From a legacy relational DB to an event queue
Most companies start using a single instance of a relational database. It is convenient and can scale quite far. It can store pretty much all required data on it: Admins account, user accounts, product data, but also analytic data and payment.
Since it is a single database, it’s easy to query it and update it anywhere in an application, leading to many cross related tables. If an organization outgrows its single relational database, it has to untangle those links to give independence to all teams.
Leboncoin started with a single PostgreSQL database, called blocketDB, inherited from ”Blocket” a Swedish website. The schema evolved in a convoluted ways to follow the evolution of features. At first, there weren’t any accounts. They were added later, and existing ads had to be reattached afterwards. All of these features were added in a retro compatible way, leading to specific exceptions. All of this makes our main database hard to consume and even harder to update.
Nowadays, leboncoin is organized in feature teams and uses micro services to give each team as much autonomy as possible. The original database has no clear ownership and it’s a challenge to maintain. Every new feature has specific database needs, and we are trying to focus on event-based communications to prevent tight coupling.
We would like to handle every feature using a similar pattern, but with all the features using blocketDB as their primary datastore, it’s not that easy. Our legacy monolithic codebase is still heavily implicated in updating and writing to blocketDB, and these updates have to be taken into account. That said, we need a communication channel between the old codebase and all the microservices. Currently, it is mostly done using polling and complicated queries to blocketDB, not only from microservices, but also from the data team that feeds the analytics databases and applications.
Creating a kafka dataflow containing all updates happening to any ad in the database would allow event sourcing, even from the legacy database. This should prevent polling and simplify rules such as:
- If an ad is updated then index the ad into the search engine.
- If an edit is refused then notify user by email explaining the broken rule.
However, event sourcing requires that all updates to domain objects triggers at least one event. One way to ensure that all database updates generate events is to have two tables. The first one is for the object itself, and the other one is a log of all updates. As long as all writes to the domain object table generate a write to the log table, using transactions to ensure atomicity, then we can ensure all updates are recorded. From there, it could be possible to consume the log table, and generate events. To know which events were sent, either a specific column would be updated after sending, or the sent event would be dimply deleted from the log.
blocketDB already has a table called “action_states” that resembles the log described above. It doesn’t contain each version of each ad, but an entry for each update on every ad. The entry contains the date, the type of update (edit, create, review…) and a few more details. What is missing to apply the pattern above is a column to update after sending an event. But adding a column is too expensive; it’s the biggest table on disk, has lots of columns, with comprehensive indexing and data inserted continuously. We just don’t want to make it even bigger, add yet another index on it, etc.
Some features already use this “log” to keep up with ads status. The indexer filling our ElasticSearch cluster is based on IDs in the action_states table. They come from a sequence and are continuously increasing. However, since they are inserted in transactions of variable duration, the effective insertion order doesn’t match a strict timely order. To work, the indexer uses a pivot ID, which is the ID under which all action state have been processed. But it isn’t the biggest ID, since some transactions may be running. Then after processing all existing states higher than the pivot, it needs to select a new pivot, but not too big to ensure no state is missed and start again.
This method of fetching ads is complicated and error prone since you need to be careful on your pivot ID. It’s also very hard to parallelize between multiple workers, to restart a single worker it needs to keep the pivot somewhere or start from scratch.
It’s clear that a real flow is the solution and we can’t retrofit the required column on our schema. So we need a way to consume at most once each action state. Thankfully, most relational databases, in particular PostgreSQL, have a feature which is similar to “writing once each event”. It’s called replication and usually serves to create read-only replicas of a database. If you set up replication, you are capable to ensure all updates will be replicated.
Technically, replication is just postgres sending a log of all the updates it registers, in postgres this log is called WAL for “Write Ahead Log”, used as a base for crash recovery as well as replication. From PostgreSQL 9.5 and above, WAL has logical information capability and interesting replication schemas were made possible like granular per table publications and output plugins to send out custom data formats.
The first idea was to create a service registering as a replica reading the WAL stream as JSON using an extension like wal2json. The risk is, if you don’t keep up with the data rate, it will be written to files on disk which will pile up and might fill up the available space of the master. A safer solution was found: replicate the table to another PostgreSQL database. The service will only interact with the replicated database. It also means that the service won’t have to read the WAL stream, but will get it’s data through normal SQL, which is something developers are more familiar with.
At first, the idea was to replicate the entire ads table, but since it was too heavy to stream, only the log table was replicated (i.e action_states). Since the log table doesn’t contain full ads information it still is necessary to query the primary datastore to produce self sufficient events.
With this setup, it is then possible to connect a service to the replicated database and generate the events. The service works like this:
- Read an event from the replicated database.
- Query the missing information from blocketDB.
- Send all data via a kafka event.
- Delete the already sent event line from the replica.
This ensures that at least one event is sent for each update to the ads table. Duplicate events can happen if the service fails to delete the line after sending the event.