Playing with Ruby Threads and Queues

code example

Threads are the Ruby implementation for a concurrent programming. Threads are existing within an Operating System Process and have access to its memory. Actually, any code written in Ruby executes within a thread — main thread.

Threads are useful when code can be executed independently, especially when code spends time waiting for external events. This kind of situations happens when you dealing with Input and Output operations (I/O).

I have an example.


Enter Worker.

class Worker
end

The whole purpose of the Worker is to do work. But what makes it useful is that it can do it in separate thread or threads.

class Worker
def initialize(num_threads:)
@num_threads = num_threads
@threads = []
end
  attr_reader :num_threads, :threads
private :threads
  def spawn_threads
num_threads.times do
threads << Thread.new do
# there will be work that the worker performs
end
end
end
end

Since threads are useful for heavy I/O operations this worker is perfect to do HTTP requests, manipulate with files on disk, make DB requests.

“Hey, worker! Send this data to API and fetch some data from another API, after save some of it data in my database and don’t forget to log all you have done into a log file” — this is a perfect job for the worker.


How we may pass work to the worker?

It is straightforward If you have to perform one single monotonous task every time.

class Worker
# rest of the class omitted
  def spawn_threads
num_threads.times do
threads << Thread.new do
HealthService::API.ping # send a HTTP request
end
end
end
end

But what if you need to perform various kind of work depending on external circumstances.


Queues to the rescue!

class Worker
def initialize(num_threads:)
@num_threads = num_threads
@threads = []
@queue = Queue.new
end
  # rest of the class omitted

def enqueue(action, payload)
queue.push([action, payload])
end
end

With Worker#enqueue method, it is now possible to pass work to the Worker. This can be done in many ways. For example, action can be a Proc and payload can be arguments for the Proc.

require "net/http"
require "json"
action = proc do |data|
Net::HTTP.post(
URI("https://api.some-ping-service.com"),
data.to_json,
"Content-Type" => "applicatoin/json",
)
end
worker_instance.enqueue(action, { ok: true })

What is great about Ruby implementation of Queues it that they are thread-safe by nature.


To perform actions that enqueued into Worker and do not take all CPU resources we need to do arrange dequeued algorithm in a smart way.

queue = Queue.new
loop do
puts "we need dequeue actions and do some job" unless queue.empty?
end

A loop like above will eat all you free CPU time.

Here is the output of top command when the loop is running:

$ top -o cpu
PID    COMMAND      %CPU  TIME
56681 ruby 99.9 01:58.17

The most common approach to solve this problem is to use sleep statement:

queue = Queue.new
loop do
puts "we need dequeue actions and do some job" unless queue.empty?
sleep 5
end

And it will help, but this is not perfect.

Imagine how the Ruby interpreter has to spend the time to switch between the main thread and worker’s threads every sleep interval to just realize that we have nothing to do because the worker queue is empty. This issue will be multiplied by a number of threads and get worse when the sleep interval has to become smaller.

sleep is not an efficient way to catch something in the future.


Again, Queues to the rescue!

Queue#pop(non_block = false) method, when non_block = false, suspends current thread If the queue is empty until data is pushed onto the queue.

This means that worker’s thread that has nothing to do will wait for the next enqueued action. No sleep required.

For convinienc Worker has domain specific methods that describes Worker state.

class Worker
# rest of the class omitted
  private
  attr_reader :queue, :threads
  def actions?
!queue.empty?
end
  def running?
!queue.closed?
end
  def dequeue_action
queue.pop(true)
end
  def wait_for_action
queue.pop(false)
end
end

Most important here is #wait_for_action. It suspends a thread of the Worker, as described above, when we have no actions in the queue.

Finally, it is time for the main part of the Worker class:

class Worker
# rest of the class omitted
  def spawn_threads
num_threads.times do
threads << Thread.new do
while running? || actions?
action_proc, action_payload = wait_for_action
action_proc.call(action_payload) if action_proc
end
end
end
end
  # rest of the class omitted
end

With #wait_for_action that equals to queue.pop(false) the worker starts to drain the queue exactly when the queue starts to fill up.

Perfecto!


The last thing. The Worker needs to have a method to stop it.

class Worker
# rest of the class omitted
  def stop
queue.close
threads.each(&:exit)
threads.clear
true
end
  # rest of the class omitted
end

Full example with some tweaks available in this gist.

If you enjoy this writeup, make sure to clap, follow me on Medium, follow me on Twitter, and share this with your friends or retweeting this 👇!