How we designed and built a production-scale close-to-real-time system to improve search relevance at Glassdoor
Note: This post is the second in a two-part series. Part 1 introduces the problem at hand and provides a high-level overview of the design, while Part 2 fleshes out some of the technical considerations and tradeoffs.
In the previous blog post, I discussed a potential solution for improving search relevance. We wanted a system which could keep track of all the jobs a user has seen, and avoid showing the same jobs in the future for specific use cases. In order to achieve this, I discussed the need for a combined offline and nearline system, which would work together to process both the job listings we have sent the user and the ones the user has actually seen. In this post, I will specifically address some of the technical considerations for the nearline system that needed to be tackled, and how I used it in conjunction with the existing offline system. I will only discuss the email use case in this blog post for simplicity, but it applies equally to all the use cases in which we show users jobs.
The Real MVP
Consideration #1: What is the simplest thing we can do to implement the nearline system?
When building out any system, it’s easy to get lost in the weeds and get overwhelmed by small details when designing a system. It’s important to take an iterative approach when it comes to system design — even for experienced engineers, the first approach one comes up with is rarely the one that makes it to the final product. Thus, it makes more sense to start with a basic solution, and progressively think of how to improve upon it. So in our case, what’s the simplest approach that does what we want, and no more?
One straightforward approach is to have a table to store a list of all the job listings a user has been sent. Then in any application in which we send user jobs, we can write to that table. The table would have the following schema:
Whenever we want to retrieve the list of jobs IDs a user has been set, then we simply query all the rows for that user ID and aggregate them. This seems to do the trick!
While this is very simple to implement and requires the fewest systems to maintain (which are important factors to aim for), it suffers from 2 issues:
- We’d have to add the logic to read from this table and aggregate the data every time we want to exclude sent jobs in any use case. This means different applications will have to reimplement this core logic and the implemented logic may diverge.
- When it comes time to query the table, we need to retrieve and aggregate all the jobs a user has been sent, which is a time-consuming process. Because we want to service requests in realtime, this may take prohibitively long because of the potentially large number of rows we need to retrieve.
To address the first issue, we can extract this code into its own library or API service. We did in fact end up taking the latter approach, building the end logic for the combined pipeline into its own microservice. However, that will be outside the scope of this blog post, which focuses on the design aspects of the online pipeline specifically.
As for the second issue…
The Latency is Too Darn High
Consideration #2: What should be the schema of the table that the service queries from?
It makes sense to keep the fields as they are, with one user ID column and one sent job IDs column, since the user ID will be used for querying while the sent job IDs is what we ultimately want. However, instead of having multiple rows per user which can take a lot of time to query, we can just store all the sent job IDs for a given user. We can summarize the two options and their tradeoff as follows:
It turns out that the latency issue is a hard constraint. Adding additional latency to realtime requests will negatively impact user experience and delay email sends; in our case, benchmarking revealed that Option 1 would be several times slower than Option 2. Consequently, we decided to go with Option 2, which would allow us the fastest queries.
However, taking the approach of Option 2 presents us with the need to have some way of aggregating all the data of a user in close-to-realtime when there is a change.
Aggregate All the Data!
Consideration #3: How do we aggregate all the sent jobs for one user in a nearline fashion?
In our initial approach, we only needed to read from and write to one table, which is easy to do. However, with the introduction of an aggregation process, we must leverage a data streaming framework which can do real-time computation. Every time we send the user jobs, we need to record all the jobs from that email, combine them with the jobs historically sent to the user, and write this final list of jobs for this user to an aggregated table.
For the first part of this process, we need to log the data in a way where we can easily process it in realtime and decouple the logging and aggregation steps. To this end, we can use a pub/sub model: we will have an event queue which different applications can write to (publish) and which other applications can read from (subscribe). This flexible framework allows us to simply publish an event with the user ID and job IDs information each time we show the user jobs, without having to worry about how that event is used. Then we can have an aggregation process which subscribes to the queue and performs the aggregation. As part of this process, we will also need to store the job IDs a user has historically been sent, so that we can join them with the new incoming job. In essence, this table will store all the raw event data from the queue, so that we can use them during aggregation.
Consideration #4: What should be the schema of the raw events table?
In fact, we can reuse the same table schema we came up with in our initial design, which contains multiple rows per user. This is because this aggregation process is not subject to the same latency requirements as we need during query-time. During query-time, an increase in 200ms is quite noticeable by the user; during the aggregation process, we are perfectly okay with taking 200ms to process each entry, since that latency does not reach the end-user.
To sum up, we will have the following process:
- Whenever an application shows a user jobs, it also publishes the event to the event queue.
- Some nearline aggregation process subscribes to the queue. Whenever a new event comes in:
- The aggregation process writes the user ID and job IDs information into the raw events table.
- Then it reads from the raw events table and aggregates all the sent jobs for the user into one list.
- It writes all of the users’ job IDs into a final aggregated table, which will be used during query-time.
Now given that we are combining the results from the nearline pipeline and our existing offline pipeline, we don’t need to store events forever — we only need to store nearline events to cover up the gap resulting from the offline pipeline’s data latency. Moreover, this helps limit the amount of data we have to store in our tables. For our purposes, we set the TTL (time to live, e.g. the expiry date) to 2 days for both the raw and aggregated tables to be conservative. An event will therefore disappear from the online tables 2 days after it is written; the same jobs from that event should already be written to our offline tables by then, so we are still good.
Just a quick note: To alleviate the computational load of sending and processing individual events to the event queue, these events are sent to the queue in batches, to be processed asynchronously. Therefore, some data latency is incurred — 20s on average — but with the upside of requiring less computational power (due to having fewer network calls) to support emitting and processing individual events. A bit of data latency is acceptable on this process; there’s little reason why we need fully realtime functionality for this feature. This explains why the system is a nearline system, as opposed to a fully real-time system.
Consideration #5: What technologies should be used for data storage, the event queue, and the nearline aggregation process?
For data storage, we leveraged our existing Cassandra database for storing the nearline information. Cassandra is a NoSQL database optimized for fast reads and writes, and therefore is ideal for our realtime search use-case. We also have many heavily optimized Cassandra clusters with built-in monitoring at Glassdoor.
For the event queue, we used AWS Kinesis. A number of different frameworks would have worked here for our basic use case, such as Kafka, Redis, etc., some of which we also use at Glassdoor. However, we have the most amount of infrastructure support for Kinesis, which means that issues such as throughput and scaling aren’t things we have to worry about.
Similarly, for the nearline aggregation process, we used Storm, in which our teams already had previous learnings from working with.
As we can see, the common theme amongst all the technologies/frameworks we used is that they fit our needs of the use case, and Glassdoor already had existing infrastructure leveraging those tools, which makes it much easier to implement. After all, there is no need to waste time reinventing the wheel if the wheel is well-suited for the task.
Now let’s take a look at what our Storm pipeline will look like.
Weathering the Storm
Consideration #6: How should our Storm aggregation process function?
To get started, here is some basic Storm terminology that is useful for understanding the following example:
Tuple — The basic unit of data in Storm. It is just an ordered collection of values.
Spout — A connection to an external input data source. This is useful for subscribing to event queues like Kinesis. Data is brought in as tuples.
Bolt — A data processing step in Storm which takes in data from spouts and other bolts, and outputs zero or more tuples.
Topology — A complete data pipeline or workflow in Storm, built from a combination of spouts and bolts.
Below is a diagram of the final Storm topology we built:
We can understand the topology better through the lens of a single event/tuple. Suppose the following event is being processed:
User ID: 3000,
Sent Job Listing IDs: [12, 223, 4490]
Each this event will be processed in real-time as follows:
- Storm Kinesis Spout — The event gets converted into a Storm tuple in real-time with the Kinesis Storm Spout: (3000, [12, 223, 4490]). This is then passed onto the next bolt.
- Raw Data Cassandra Writer Bolt — Writes the data from the event into the raw events Cassandra table:
The tuple containing just user ID is sent over to the next bolt: (3000)
- Raw Data Cassandra Reader Bolt — Queries all rows for the given User ID, or 3000. Note that this will return all events that were previously generated for that user (up to the TTL). Suppose at that point the table had 2 entries for user 3000:
This bolt would then pass on the User ID along with the non-deduped job listings to the next bolt: (3000, [[12, 223, 4490], [175, 223, 5000]]).
- Aggregate Raw Data Bolt — Dedupes/aggregates all the job listings for a single user and passes it onto the next bolt: (3000, [12, 175, 223, 4490, 5000]).
- Aggregated Data Cassandra Writer Bolt — Updates the aggregated data table with the new job listing:
Note that each user will have exactly one corresponding row, which means queries on this table will be fairly efficient.
The Storm topology processes all the events in such a manner, with the end result being a populated aggregated data table, which can support efficient queries.
That’s A Wrap
What is the finished design?
Revisiting the final design as presented in the previous blog post, we have the following:
For each email that we send out, we post the event data to our Kinesis event queue (or Eventbus) system in batches (which are dictated by the email event queue). Our Storm topology subscribes to this queue and then performs real-time aggregation on these individual events, and finally outputs all a users’ seen jobs within the last 2 days, to an aggregated table. This table is then queried from when we show the user jobs in the future for various use cases.
How did we do?
The maximum data latency for this pipeline is around 20 seconds, with almost all of it coming from the delay from batching on the event queue side. This means that within 20 seconds of sending the user a job, we are already able to avoid sending the user the same job in the future.
Furthermore, querying from the aggregated online data table takes just a few milliseconds, and therefore has minimal impact on the final response time, which is on the order of several hundreds of milliseconds.
What did we learn?
As far as production systems go, this particular project was fairly straightforward from a design perspective, but helps illustrate many important points about design:
- Despite the temptation to charge ahead quickly start building a system, it’s important to periodically evaluate and reevaluate the design decisions. Although this post is presented as a straightforward design process, the process to get to the final design was by no means straightforward.
- Always keep any constraints in mind when designing a system. Any decisions made should respect those constraints. In our use case, it was important to keep the latency as low as possible, which resulted in our decision to create a separate aggregation process.
- DRY (Don’t Repeat Yourself) is a mantra in engineering which ends up being very useful for building this system. Wherever possible and appropriate, we tried to leverage existing work that had already been done in the company.
- Always think about the data schema of different data processes in order to achieve optimal performance and reduce redundant computation. Doing this for our Cassandra tables and Storm topology helped reduce querying and processing time.