Stateful WebSockets with Elixir’s GenStage
Can we utilise these technologies to create an incremental search solution?
A colleague recently asked me how I might go about implementing incremental search against a third party API. While this is a well documented problem, there are a variety of ways to handle it, and I thought it made a good candidate to try something a little different.
I’ll walk through a few ways in which this might typically be solved, before going into why I chose to go for a solution using GenStage, and WebSockets.
Breaking down the problem
Perhaps the first solution to consider is one in which we fire off a request after every keystroke into your search field.
To the end user this should appear snappy, giving a nice “autocomplete” feeling, which is precisely what we’re after. This is what a solution like Algolia will give you out of the box, for their instant search solution.*
* I tried searching for debounce in Algolia’s docs, and I see a result returned, but upon clicking this, I don’t see the string anywhere on the page... Search is hard.
There might however be reasons to limit when, or how often, you send requests to the server. Perhaps you’re connecting to an API with a rate limit, or perhaps you feel it’s not helpful to send this request on every single keystroke. Perhaps you own the server and just don’t want to send so many requests to it! If so, you might look at a solution closer to the following:
Under this setup, rather than submitting every single value the input holds to the server, we use a debounce or a throttle to send these requests less frequently. The API provider is happy about this.
For many use cases one of these solutions will be perfectly acceptable, but each has potential downsides:
- When sending a request for every keypress, we have no control over how often we hit the server. If the server implements a rate limit, this may become a problem;
- With a
debounce(), we only send a search once the user has stopped typing for a given interval. This perhaps takes away from the instant “autocomplete” feeling;
- With a
throttle(), we send a request every time a given interval has passed. This is fine, but it may be the case that under certain circumstances we can send requests more frequently than others;
- For all of the above, whenever a request is sent, we have to wait for a response. Ideally, we wouldn’t have to wait at all; we’ve done our bit, so why shouldn’t we take a break, and just handle results later, if the server tells us it has some?
Thinking about a different solution
The truth is, any of the above solutions could work, depending on your needs. In my case, I’m attempting to implement incremental search against a 3rd party API endpoint, which has a rate limit I need to avoid running into.
At the same time, I’m keen to display results as the user types, without worrying about mistakenly running over this limit. This effectively rules out the debounce.
The API I’m connecting to (GitHub’s GraphQL API) gives me 5000 points to play with per hour, and the requests I need to make cost me 1 point each. This means that as long as I don’t exceed an average of 1.38 reqs/s over any given hour, I will not run up against my limit.
I could look at using a throttle here, but this has a downside in that it doesn’t know about what’s happened previously. If I’ve not sent any requests in the last 5 seconds, I could send 5 right now, and still be in no danger of running into the limit.
Imagine a world in which our implementation can tell the difference between the following states, and act appropriately:
- I know I can send no more than 1 request per second on average, and I have sent a request in the last second, so I need to wait;
- I know I can send no more than 1 request per second on average, however I have not sent any requests in the last 2 seconds, so the next 2 can be sent immediately.
The benefits of this approach vs a throttle are two-fold. Firstly, we don’t waste any calls — we “bank” calls which are unused, for use later. Secondly, if calls are “banked”, we can make use of them immediately when a new query comes through.
The official announcement can likely explain GenStage better than I am able to:
GenStage is a new Elixir behaviour for exchanging events with back-pressure between Elixir processes. Developers who use GenStage only need to worry about how the data is produced, manipulated and consumed. The act of dispatching the data and providing back-pressure is completely abstracted away from the developers.
By using GenStage, we can create stages, either acting as producers which emit events, or consumers which ask producers for events when they are ready. For us, this means we can setup separate stages for each of the following:
- Holding the latest query input by the user;
- Asking for the latest query at a rate which will not exceed a given limit;
- Calling out to the API endpoint with our query.
I’ll explain the setup in more detail below, but this diagram shows roughly how data will flow through the system, with
QueryRunner being the stages in our GenStage setup:
So, under this new setup, the browser sends all events to our Elixir server. Here, we have a GenStage pipeline setup to process events, with 3 stages:
Querya producer. This module receives user input, buffering or emitting each value as an event.
QueryRateLimitera producer_consumer. Once per second this module sends 1 demand to
Queryhas an event ready, the
QueryRateLimiterwill receive it. Otherwise, the demand is buffered, and the next event dispatched when it comes in.
QueryRunnera consumer. This sends new demand whenever it receives an event, meaning it receives the events from
QueryRateLimiteras they arrive, before sending these off to the external API.
This should handle limiting the frequency of queries sent to the API more intelligently than under the previous strategies. However, this does mean that for any query coming from the browser, there is no guarantee that we will hit the API.
In addition, when we do want to hit the API, we now need to make 2 jumps, once from browser to the Elixir server, and then once from here to the API. Generally I’d try to avoid having this sort of work done in request. In this case however, the user does care about the response, so we need to send it back to them somehow.
Fortunately , I’ve already made the decision to use Elixir, which the web framework Phoenix is also written in. Phoenix makes it easy as pie to set up WebSockets.
By setting up a GenStage pipeline when a client connects through our WebSocket, we can process events in our own time, as described above, and then send results back across the same connection whenever they arrive:
So, now that we’ve got an idea of how this system can be architected, we can dive into implementation. I’ll show the code for each of the key modules, then describe what they’re doing.
Query module is fairly small. We use it by:
start_link(socket)with socket being a struct representing a WebSocket connection, which this query is associated with;
- This triggers
init/1, which specifies that this stage is a producer, and that we only want to keep 1 event at a time in our buffer (more on this later);
- Now, whenever a query comes over the socket, we call
update/2with the pid of the previously started process, and the incoming query;
- This triggers
handle_cast/2, which immediately attempts to dispatch the event to a subscribed consumer;
- We don’t care about incoming demand here (as we make use of the buffer), so we return no results in
This setup is good for us, for a number of reasons:
- If the events dispatched outpace the demand (e.g. more than once per second), they will be stored in GenStage’s internal buffer. We’ve limited this buffer to 1, so we only ever buffer the latest event (we don’t want to show stale results to a user);
- If the incoming demand outpaces the events (e.g if we don’t send any events for 5 seconds, but still ask for demand 4–5 times) this is also buffered. As we’ve not sent any events in the last 5 seconds, we are safe to send 5 in quick succession, so we will likely send one for 5 incoming keystrokes in quick succession.
Ultimately it might be good to look into handling these buffers manually, so that we have more control. I’ve not looking into that here, as I think there is enough to take in without it, but as far as I can see, demand buffering has no ceiling, and I would like to limit this if I can.
This module is a bit more involved:
- As with the producer, we start the stage by calling
start_link/1, which calls through to
init/1. This initializes the stage as a
producer_consumer, with it’s state represented by an empty map;
- Now, when this module subscribes to a producer (e.g
handle_subscribe/4is called. This takes the producer pid and reference in
from, and stores them in the
producersstate, along with values for
interval. We next call
ask_and_schedule/2with the updated state, and the new producer, before finally returning a tuple, which specifies that this stage will manually ask for demand;
ask_and_schedule/2, we make use of
interval. First, we ask the producer for
handle_events/3will take care of any which arrive by simply emitting them unchanged;
- Finally we make use of
interval, to send a message to
intervalhas elapsed. When this message is received in
handle_info/2, we simply call back into
ask_and_schedule/2,and the loop begins anew.
Finally we have the consumer. This module simply sends demand to
QueryRateLimiter on subscribe, and after receiving events, handling them as they arrive. When the event coming in has a query value, we send this off to the external API (code omitted as that is something of a distraction right now). Once we get these results back, we can push them over the WebSocket also referenced in the event.
Handling the server side of our WebSocket is easy, thanks to Phoenix channels. The only unusual part is the associating of GenStage with the socket.
- Upon join of our channel, we setup the GenStage pipeline, storing the socket in the producer’s state, and the producer’s pid in the socket assigns;
- When we receive search events into the channel, we send this to
Query, along with the socket pid. This triggers an event to be sent by the producer.
For the front end of this, I used Elm. This isn’t the focus of this article (and I’m a complete novice with it) but you can find the code here, and the TLDR is:
- When the Elm program loads, it initializes a model, which stores the current query and any results. We also join the
"typeahead:public"channel, and create a subscription which listens here.
- We render an input.
onInputhere we send our update function a
Searchmessage, in which we send the value currently in the input across the socket.
- When results come back across the channel, our subscription triggers a
Resultsupdate message. This attempts to decode the received JSON, updating the model with the results. This in turn triggers an update of our view.
Once we’ve plugged all these pieces together, we have a working solution, which I’m pretty pleased with! I feel this made a good experiment, however if I wanted to try to push this farther, in order to make it production ready, I would need to look more closely at some areas, for instance.
- We would not currently handle a rate limit that needed to be shared across multiple clients;
- The GenStage processes are not supervised;
- Phoenix will drop our WebSocket connection after 60 seconds of inactivity.
Having said all that, I think this was a great way to get to know GenStage better, and ultimately I think that if these issues were ironed out this could be a good solution to the problem. More generally, I think this is a nice way of adding state to a WebSocket connection, using it to do some async work, and sending the result of this work back across the socket later on.
You can find the full source for this work here. I’d love to hear any thoughts.