Mint Digital
Published in

Mint Digital

Stateful WebSockets with Elixir’s GenStage

Can we utilise these technologies to create an incremental search solution?

I am not a visual designer

Breaking down the problem

Perhaps the first solution to consider is one in which we fire off a request after every keystroke into your search field.

  • With a debounce(), we only send a search once the user has stopped typing for a given interval. This perhaps takes away from the instant “autocomplete” feeling;
  • With a throttle(), we send a request every time a given interval has passed. This is fine, but it may be the case that under certain circumstances we can send requests more frequently than others;
  • For all of the above, whenever a request is sent, we have to wait for a response. Ideally, we wouldn’t have to wait at all; we’ve done our bit, so why shouldn’t we take a break, and just handle results later, if the server tells us it has some?

Thinking about a different solution

The truth is, any of the above solutions could work, depending on your needs. In my case, I’m attempting to implement incremental search against a 3rd party API endpoint, which has a rate limit I need to avoid running into.

  • I know I can send no more than 1 request per second on average, however I have not sent any requests in the last 2 seconds, so the next 2 can be sent immediately.

Enter GenStage

The official announcement can likely explain GenStage better than I am able to:

GenStage is a new Elixir behaviour for exchanging events with back-pressure between Elixir processes. Developers who use GenStage only need to worry about how the data is produced, manipulated and consumed. The act of dispatching the data and providing back-pressure is completely abstracted away from the developers.

By using GenStage, we can create stages, either acting as producers which emit events, or consumers which ask producers for events when they are ready. For us, this means we can setup separate stages for each of the following:

  • Asking for the latest query at a rate which will not exceed a given limit;
  • Calling out to the API endpoint with our query.
  • QueryRateLimiter a producer_consumer. Once per second this module sends 1 demand to Query. If Query has an event ready, the QueryRateLimiter will receive it. Otherwise, the demand is buffered, and the next event dispatched when it comes in.
  • QueryRunner a consumer. This sends new demand whenever it receives an event, meaning it receives the events from QueryRateLimiter as they arrive, before sending these off to the external API.

Enter WebSockets

Fortunately , I’ve already made the decision to use Elixir, which the web framework Phoenix is also written in. Phoenix makes it easy as pie to set up WebSockets.


So, now that we’ve got an idea of how this system can be architected, we can dive into implementation. I’ll show the code for each of the key modules, then describe what they’re doing.


  • This triggers init/1, which specifies that this stage is a producer, and that we only want to keep 1 event at a time in our buffer (more on this later);
  • Now, whenever a query comes over the socket, we call update/2 with the pid of the previously started process, and the incoming query;
  • This triggers handle_cast/2, which immediately attempts to dispatch the event to a subscribed consumer;
  • We don’t care about incoming demand here (as we make use of the buffer), so we return no results in handle_demand/2.
  • If the incoming demand outpaces the events (e.g if we don’t send any events for 5 seconds, but still ask for demand 4–5 times) this is also buffered. As we’ve not sent any events in the last 5 seconds, we are safe to send 5 in quick succession, so we will likely send one for 5 incoming keystrokes in quick succession.


  • Now, when this module subscribes to a producer (e.g Query) handle_subscribe/4 is called. This takes the producer pid and reference in from, and stores them in the producers state, along with values for pending and interval. We next call ask_and_schedule/2 with the updated state, and the new producer, before finally returning a tuple, which specifies that this stage will manually ask for demand;
  • In ask_and_schedule/2, we make use of pending and interval. First, we ask the producer for pending events. handle_events/3 will take care of any which arrive by simply emitting them unchanged;
  • Finally we make use of interval, to send a message to self() after interval has elapsed. When this message is received in handle_info/2, we simply call back into ask_and_schedule/2, and the loop begins anew.



Handling the server side of our WebSocket is easy, thanks to Phoenix channels. The only unusual part is the associating of GenStage with the socket.

  • When we receive search events into the channel, we send this to Query, along with the socket pid. This triggers an event to be sent by the producer.

Front end

For the front end of this, I used Elm. This isn’t the focus of this article (and I’m a complete novice with it) but you can find the code here, and the TLDR is:

  • We render an input. onInput here we send our update function a Search message, in which we send the value currently in the input across the socket.
  • When results come back across the channel, our subscription triggers a Results update message. This attempts to decode the received JSON, updating the model with the results. This in turn triggers an update of our view.

Wrap up

Once we’ve plugged all these pieces together, we have a working solution, which I’m pretty pleased with! I feel this made a good experiment, however if I wanted to try to push this farther, in order to make it production ready, I would need to look more closely at some areas, for instance.

  • The GenStage processes are not supervised;
  • Phoenix will drop our WebSocket connection after 60 seconds of inactivity.



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store