fixing request queuing with server-sent events using Ruby on Rails

thilonel
5 min readNov 14, 2022

--

In this article I’ll demonstrate a request queuing problem on a Ruby on Rails application and walk you through solving it using server-sent events.

The problem

There’s a big slowdown — unusable app — in high traffic hours.

requests are waiting for seconds to be served

As soon as I opened NewRelic, I saw that there’s something wrong.
The picture you see above is straight from the dashboard.

Today — finally — enough users reported slowness in the system for me to be able to dedicate time to solving this issue.

These web transaction times (picture from the dashboard too) indicate that we have crazy slow endpoints. That means that we’d need ridiculously high concurrency —more than the total amount of tabs our users open at the same time — to be able to stay responsive and serve all the requests fast.

These endpoints contain complicated business logic and while they can all be optimized, it would take a lot of time and testing, while our users suffer.

The idea

By decoupling the HTTP response from the request, while a background worker is creating the response, we can serve other requests.

How can we do this? How can we notify our user that the respone is ready? WebSockets, polling, and server-sent events.

With SSE we can make HTTP requests async. A lighter and easier solution than WebSockets. You can just do this from the browser console:

source = new EventSource('/my_endpoint');
source.onmessage = (event) => {
console.log("EventSource message received:", event);
};

And from Rails:

class SseController < ActionController::Base
include ActionController::Live

def index
response.headers['Content-Type'] = 'text/event-stream'
sse = SSE.new(response.stream, retry: 300, event: "open")
sse.write("Hello!", event: "message")
ensure
sse.close
end
end

And ta-dam! Well, almost. Here’s my article on how to start using SSE with Rails.

Verifying the idea

There’s a bit more to this. Is this really going to free up our precious threads? Let’s test!

With the following endpoint

class SseController < ActionController::Base
include ActionController::Live

def index
response.headers['Content-Type'] = 'text/event-stream'
response.headers['Last-Modified'] = Time.now.httpdate

sse = SSE.new(response.stream, retry: 300, event: "open")

loop do
sleep 2
sse.write("The current time is: #{Time.current.to_s}", event: "message")
end
rescue ActionController::Live::ClientDisconnected
sse.close
ensure
sse.close
end
end

I will start Rails with 1 thread, as the idea is that like this, we’ll be able to use a single thread to serve multiple requests without blocking:

RAILS_MAX_THREADS=1 PORT=3001 bundle exec rails s

and open 3 browser tabs with my rails app and run the following from console:

source = new EventSource('/sse');
source.onopen = (event) => {
console.log("The connection has been established.", event);
};
source.onmessage = (event) => {
console.log("EventSource message received:", event);
};
source.onerror = (err) => {
console.error("EventSource failed:", err);
};
  • First tab connects nicely and starts printing out the message as expected.
  • Second tab does the same.
  • Third tab is stuck pending.
  • I try to open a 4th tab, but of course, the message doesn’t reach my web server.

Well of course. :)

sleep(*args) public
Suspends the current thread for duration seconds

No surprise here, we basically put our server to permanent sleep. This is exactly the state that the requests are experiencing when they’re reported as “queuing” by NewRelic.

True asynchronicity

To free up the Rails thread while it’s computing and allow it to serve new requests, we can use Rack Partial Hijack. We’ll be using partial hijack, as the full is only supported with HTTP/1 which has limitations on SSE (see the warning part).

class SseController < ActionController::Base
include ActionController::Live

def index
response.headers['Content-Type'] = 'text/event-stream'
response.headers['Last-Modified'] = Time.now.httpdate

response.headers["rack.hijack"] = proc do |stream|
Thread.new { perform_task(stream) }
end

head :ok
end

def perform_task(stream)
sse = SSE.new(stream, retry: 300, event: "open")

loop do
sleep 3
sse.write("Time is: #{Time.current.to_s}", event: "message")
end
rescue ActionController::Live::ClientDisconnected, Errno::EPIPE
sse.close
ensure
sse.close
end
end

Browser limit

Using the code above, I could open 6 tabs and they were all promptly receiving their time updates. However I couldn’t open a 7th tab.
I warned you about the limitation on HTTP/1… well I’m on Rails and we can’t use HTTP/2 so we are limited to 6 connections to this domain by the browser. Opening a new (normal / incognito / private browsing) window is an easy way to bypass this limitation.

That said, our users will still complain — rightfully —about how the 7th tab fails to open.

In my case, when I just want to unblock the server, the solution is to retry connecting and close down this connection from the server side after the first message is delivered.

Server limit

On the server side, think about where your code runs. For example the Heroku thread limits could surprise you.

Also, now that you can accept a huge amount of slow and resource heavy requests, you can easily run out of memory (on web, worker or DB too). It might be a good idea to offload the processing to the workers (e.g. Sidekiq) and in the thread just checking in for the rendered response, that you can just put under a Redis key.

Final code

We’d like to close the connection once the first message has arrived, or if the request times out. (like when you forget to start sidekiq :))

source = new EventSource('/sse?delay=0&wait_limit=5');
source.onopen = (event) => {
console.log("The connection has been established.", event);
};
source.onmessage = (event) => {
console.log("EventSource message received:", event);
source.close();
};
source.onerror = (err) => {
if (err.data === "timeout") {
source.close();
console.error("EventSource timeout", err);
} else {
console.error("EventSource error", err);
}
};

and we’ll offload processing to a Sidekiq worker and wait for the response:

class SseController < ActionController::Base
include ActionController
::Live

def index
response.headers["Content-Type"] = "text/event-stream"
response.headers["Last-Modified"] = Time.now.httpdate

response.headers["rack.hijack"] = proc do |stream|
Thread.new { perform_task(stream, params) }
end

head :ok
end

def perform_task(stream, params)
sse = SSE.new(stream, retry: 300, event: "open")

delay = params[:delay].present? ? params[:delay].to_i : 5
response_key = "sse_#{SecureRandom.uuid}"
SseWorker.perform_in(delay.seconds, response_key)

wait_limit = params[:wait_limit].present? ? params[:wait_limit].to_i : 30
wait_limit.times do
sleep 1

response = Redis::HashKey.new(response_key).all
if response.present?
sse.write({ response: response }, event: "message")
break
end
end

sse.write("timeout", event: "error")
sse.close
rescue ActionController::Live::ClientDisconnected, Errno::EPIPE
sse
.close
ensure
sse.close
end
end

the worker will set the response:

class SseWorker
include Sidekiq
::Worker

def perform(response_key)
Redis::HashKey.new(response_key).bulk_set({ hello: "world!" })
end
end

To test, try to open as many connections as you can and see how they’ll all nicely arrive.

Production results

Results

You can easily compare this image with the first one as the usage was almost identical and the time frame too. We have practically no response queuing anymore and our users stopped complaining as well.

We did have to add some memory to our DB instance though, as now we have also workers running these expensive DB queries not just the web servers.

That’s it!

If you’re interested about implementation details see my other article.

--

--

thilonel

Documenting the more interesting and hopefully useful bits of my journey as a software developer.