Starfighter in C++ : The Task Queue

Foster Brereton
6 min readJan 11, 2016

--

The similarities between single-threaded and multithreaded applications end about the time a comparison begins. It is akin to comparing a tomahawk (the axe) to a Tomahawk (the cruise missile.)

A multithreaded task queue is nothing new. But I had never written one before, and I’d like to think I learned some interesting things along the way.

Task Queue Overview

The high level architecture is a structure containing two basic elements. The first, the queue, holds units of work called tasks. The second, a number of subservient worker threads, take tasks off the queue and execute them. Other member variables are added for the sake of synchronization, which will be introduced when necessary. My task_queue_t starts with the following:

struct task_queue_t {
typedef std::function<void ()> task_t;
typedef std::deque<task_t> task_deque_t;
typedef std::mutex mutex_t;
typedef std::unique_lock<mutex_t> lock_t;

We must assert the deque is only ever being modified by a single thread. The mutex and lock types are used to control resource ownership.

The Task

In general it is a noble pursuit to make tasks as self-contained as possible. By far the most important reason for this is to minimize blocking. More than one thread needing a shared resource, means they’ll have to take turns. If they do not, they will stomp on each working with said resource (to disastrous effect). Taking turns means all but one thread will have to wait, immediately reducing the progression of waiting threads (and their tasks) to zero. There are a number of ways to minimize blocking in a task-based application, but those are conversations for another post.

The Queue

The queue itself is straightforward: I chose a std::deque as the basis for my queue as it affords constant-time pushing and popping from either end. A std::list would achieve the same goal, but it allocates memory for every push and pop (a deque does not.)

The current implementation uses a single queue per task queue. The problem with this approach is that it becomes the shared resource that requires thread blocking. An improved implementation would have a thread-local queue per worker thread and the ability for each thread to steal tasks from one another after their own queue had emptied. (My Stockfighter client was never in need of such performance in the task queue, though.)

The Worker Threads

When a thread is idle it should consume a minimal amount of machine resources. They should not destruct, however, since threads are somewhat expensive to spin up and tear down, and the queue should be ready to work at a moment’s notice. Therefore the number of worker threads remains stable, and we launch them at the time the queue constructs. I kept track of the threads in a std::vector so I could join them at destruction time.

task_queue_t(std::size_t pool_size =
std::thread::hardware_concurrency()) {
for (std::size_t i(0); i < pool_size; ++i) {
pool_m.emplace_back(std::bind(&task_queue_t::worker, this));
}
}

The pool size was originally fixed but I parameterized it to handle task priorities. My problem was I had N > pool_size tasks that were taking too long to complete, and were blocking other tasks and effectively halting the application.

My first solution was to prioritize the task queue: these lengthy tasks could be lower priority, and the normal tasks would still get processor time. Unfortunately that added complexity to the code, and did nothing to solve the problem. The reason was the tasks were not interruptible: once each thread was blocked on a low priority task, no other task would progress. Eventually all the threads would get swamped in low priority tasks, and I’m back to square one.

My second solution unblocked the problem for me: use two task queues, one for each priority. The low priority tasks might block one another, but they would be unable to block tasks on the other queue (where the normal tasks were being processed.)

Terminating the Task Queue

All good things must come to an end; task queues are no different:

~task_queue_t() {
lock_t lock{mutex_m};
signal_done(); lock.unlock(); for (auto& thread : pool_m) {
thread.join();
}
}

Originally I did not acquire the mutex on destruction. After a handful of race-condition-based crashes, I realized it’s implicitly used here as it is being destructed. We unlock the queue after we signal termination, otherwise the worker threads will never re-acquire the mutex and will deadlock.

Once the termination signal has been sent to the worker threads, we join on each to give them time to complete normally. Finally we exit safe in knowing that our threads are gone, too. Any remaining tasks on the queue are destroyed.

(I think there’s a lingering crash on quit here, but haven’t taken the time to iron it out.)

Adding a Task

Adding a task is pretty simple: own the mutex, push the task, and notify a condition variable:

template <typename F>
void push(F&& function) {
lock_t lock{mutex_m};
deque_m.emplace_back(std::forward<F>(function)); condition_m.notify_one();
}

The parameter type to the routine , F&&, is new to C++11. Scott Meyers calls it a universal reference. In Effective Modern C++, he states that when forwarding a universal reference, std::forward<F> should be used in lieu of std::move (because F may already be an rvalue reference.) If you haven’t picked up that book, do so and read it before moving on.

Go on, I’ll wait. Ready? OK…

If you haven’t seen a std::condition_variable yet, they’re amazing. They are a means of putting an unused thread to sleep until it is woken up by another thread. The state a thread falls into while waiting to be woken up is pretty efficient — and far more so than a traditional loop. In this case, notify_one wakes up a single thread watching the condition variable (as opposed to notify_all.) Speaking of notify_all

Signalling Termination

Recall from the destructor that in order to bring the task queue down, we called signal_done. Here is the implementation, a small but important routine:

void signal_done() {
if (done_m.exchange(true))
return;
condition_m.notify_all();
}

done_m is a std::atomic<bool>, owned by the task queue. Atomic variables are set transactionally for every thread, meaning an ill-formed (unsafe, or intermediate) state is never observed. Atomic intrinsics accomplish this without mutexes and are comparably faster (though not as fast as a nonatomic assignment.) Otherwise, it’s a familiar construct — setting a boolean to true to signal something is done.

In this case we want to wake up all sleeping worker threads, not just one. We do this because they need to awaken before they can terminate appropriately. Non-sleeping threads are working on a task, and will detect the done flag is set once the task has completed.

The Worker Threads

The worker thread encapsulates logic related to queue management and termination detection. It is easily the most complex routine in the task queue:

void worker() {
task_t task;
while (true) try {
lock_t lock{mutex_m};
condition_m.wait(lock,
[=](){
return done_m || !empty_unsafe();
});
if (done_m)
return;
if (try_pop_unsafe(task)) {
lock.unlock();
task();
}
} catch (…) {
// Drop it on the floor. Not ideal, but really
// there’s nowhere for them to go right now.
}
}

The worker loops forever, barring a signal on the done boolean. The start of the loop acquires the mutex, guaranteeing this thread is the only one modifying the deque. The condition_variable then waits, blocking (sleeping) until a interrupt happens (waking the thread). This will happen intentionally or spuriously, but the same thing happens: the predicate is checked and, if true, the wait is over.

Once wait returns, we know two things: 1) The predicate passed to wait was true (but we don’t know which condition in the predicate it was), and 2) we own the mutex and can operate on the deque freely. If there is more than one conditoion that could have awoken the thread, each must be checked.

try_pop_unsafe is a non-blocking operation that sees if the task queue is empty and, if not, moves the top task to the worker thread’s local task variable. I suffix the routine with the _unsafe tag to remind myself that routine does no mutex acquisition and yet it still modifies a shared resources. Instead, it relies on the calling routine to acquire the necessary locks (which the worker thread does.)

Thinking Ahead

There are several things this implementation does not do that I would like to incorporate at some point:

  • Improved exception handling: Right now any exception that makes its way to the worker thread is dropped on the floor. It would be ideal if they could be forwarded to an awaiting caller for improved handling. This would require the incorporation of…
  • Futures and promises (packaged_tasks). This is the current state of the art in C++ if you are looking to send data asynchronously between disparate blocks of the application. I am still untrained in their use, and since there was never a top-level recipient of the task’s results, the use of packaged_task never arose.
  • Thread local task queues and task stealing. As mentioned before, having a single queue for the task queue makes it a blocking bottleneck for the threads looking to get work done. A more performant implementation would have local queues per-thread, and a worker thread would first empty its own queue before jumping into another worker’s queue to steal a task.

--

--