At uSTADIUM, we use a task scheduling system for sending thousands of push-notifications. The need for a task queue and scheduler wasn’t obvious at first. Our server would process notifications as they were needed during a request. Over time, this load began to overwhelm the system. I wasn’t sure how to fix it so it was an interesting journey solving this problem. In this article, I’ll discuss this method, how we built it using Redis, and our experience as the system scaled.
Building an API isn’t that complicated once you understand the basics. We send HTTP requests to the server, it does some work, and then returns the requested data. Simple. But what happens when that request requires work that’s outside of its scope? For example, when I mention a user the system needs to send a push notification to the affected users. Processing the notification during that request’s lifecycle would delay the final response. As our notification system became more complex, it was clear we needed to put more thought into it.
Processing a notification and then sending a push notification requires calls to the database and external APIs. The process breaks down as follows:
- An action occurs that requires a notification to be generated.
- A notification is constructed and inserted into the database.
- That notification is mapped to the set of users who will receive it.
- We retrieve a list of all devices for the users we need to notify.
- We send a push notification to every device they have registered with us.
- We update the send status of that notification and remove invalid devices tokens.
Each of those 6 steps has at least one database query associated with them. This process could finish very quickly when a single notification needs to be sent to a single user’s device, but if it takes longer the requests are at risk of timing out. We have to separate this logic out so it can be handled outside of the request/response lifecycle.
The Task Queue
Task Queues manages a list of work that needs to be completed in a separate process. One system adds work to the end of the queue while another pops work items off the top. We need to create a Task object that represents the work described above and then add it to the Task Queue. Before we could begin I needed to ask a few fundamental questions.
1. Where will the Task Queue live?
We were already using Redis as a caching system, so when I began looking for ways to build a queue Redis was the obvious choice. Not only was it well built to handle this pattern, but there are a lot of resources online discussing how it’s built. There are many other options for this, and if you’re using Google App Engine you should look into Google Cloud Task Queue, which offers more built-in features.
2. How do we know when an item has been added to the queue?
I spent a bit of time trying to figure out. I did not want to poll Redis every n milliseconds for new jobs. Two methods showed up on my radar. The first is the Redis Pub/Sub system. For this method, I would have a function that subscribes to a channel and receives messages on it. These messages will alert me to a new Task that is ready to run. The second is to use a simple Redis list as the queue and the blocking list pop primitive, BLPOP, to wait until an item is ready to be removed from a queue.
During our first iteration of this design, we used the Pub/Sub pattern, but it added a layer of complexity that wasn’t needed. Also, when our system scaled we had to do extra work to verify a message wasn’t processed on multiple machines. Therefore, we switched over to the List and BLPOP method.
3. What do we submit to the task queue?
“Well we submit the task, duh…” was what you might be thinking, but queues only support adding strings, so we can’t really push an object. We have to push a key onto the end. I’ll admin this question confused me more than it should, mainly because I wasn’t sure what the “best” method was. Was the key a primary ID into our database or should it be a reference to some object in Redis? Where do we draw the line on the work that has to occur? I decided to send the events primary key ID to the queue and allow the Task to decide how to handle it. For example, if a user upvotes a post, I’ll push the ID of the vote action to the vote_queue, once it’s popped off the queue the service will know how to handle it.
The Set Up
Ok, I’ve set up the problem and answers to some of the questions I had (hopefully those answer some of yours), now let’s take a look at a big picture diagram of how this will work.
You can see from the diagram we have two systems running on the server. The TaskScheduler will create a new task, add it to the database, and then push the task’s ID to end of the task queue. The TaskManager waits for a Task to be added to the queue and handles it appropriately.
The Code Sample
The TaskScheduler.js file is a basic example of how we can add a task to the database and then push it onto the end of a task queue. Once it is pushed onto the queue it will begin processing when the TaskManager starts listening.
The TaskQueue.js gist is a basic example of how to implement it in NodeJS with async/await.
There are many areas for improvement since the code I’ve given is just a basic example. One question you might need to ask is where will you place your TaskManager. If you add it directly to your server it could overload the system during levels of high usage, but this depends on what type of work your tasks are performing. In our system, we extracted all of this out to a new microservice with a simple API for checking its state.
Also, in the sample code, we run one task at a time. This isn’t ideal because a long-running task has the potential to back up the whole queue. Instead, we should have a pool of running tasks that are added and removed as needed. Once the pool is full the while loop will wait for a new space.
The method described isn’t too complex, but it decouples the business logic from the application logic. With this minor change in place, we can begin iterating on the performance of the system and build more robust queues and services. We can also reproduce this method to handle various long-running processes like recommendation systems, text processing, etc.