Easy Manual/Async Asking in GenStage

Adrian Dunston
5 min readNov 29, 2016

--

Most of the GenStage examples I see (and the one I wrote) are A → B → C, subscribe, subscribe, go! Data flows from producer to producer_consumer to consumer as quick as it can. And that’s great. But what if we want something slower, more measured?

Maybe your consumer is subscribed to two producers and must combine their content. Maybe your consumer is waiting for some other process to call in and trigger a specific level of demand asynchronously. In the example below, the user manually demands words from a poem via interactive Elixir, iex.

Long Story Short

Q: How do we set up a situation where we can manually/asynchronously trigger a consumer to ask its producer for a specific number of events?

A: We implement the handle_subscribe callback, capture its from argument, return {:manual, state}, and then call GenStage.ask with the from. Simple as pie. Let’s go to the code!

Example

This is the simplest example I could think of. If you want something with more meat on it, check out the RateLimiter example that’s now in the official documentation.

Requirements

Dear Elixir,

Please give us the words from a famous song, but only dole them out as we ask for them. Manual.ask(pid, 4) to ask for four words, etc.

Many thanks,
Th
e User

P.S. Also please give us a pony.

Code

I generally like to do a code dump first, and then go through it in detail further on. If you’re here to learn, please scroll down to Example in Detail. If you’re here for reference, here’s your code:

Usage

Interactive Elixir (1.3.2) - press Ctrl+C to exit (type h() ENTER for help)iex(1)> pid = Manual.go
#PID<0.136.0>
iex(2)> Manual.ask(pid, 3)
I am the:ok
iex(3)> Manual.ask(pid, 2)
very model:ok
iex(4)> Manual.ask(pid, 1)
of:ok
iex(5)> Manual.ask(pid, 3)
a modern Major-General,:ok
iex(6)>

Example in Detail

string = "I am the very model of a modern Major-General, I've information vegetable, animal, and mineral, I know the kings of England, and I quote the fights historical From Marathon to Waterloo, in order categorical"
defmodule Manual.Producer do
use GenStage
def init(string), do: {:producer, String.split(string, " ")} def handle_demand(demand, state) do
{first, last} = Enum.split(state, demand)
{:noreply, first, last}
end
end

Here the producer takes a string, chops it into a list of words, and stores that in state. When demand comes in, it grabs that many words out of the list, and sends them as its events.

defmodule Manual.Consumer do
use GenStage
def init(:ok), do: {:consumer, :ok}

When the consumer starts up, it doesn’t have meaningful state.

  def handle_events(words, from, state) do
words |> Enum.join(" ") |> IO.write
IO.puts ""
{:noreply, [], state}
end

Its handle_events callback is just like any other. handle_events is the same whether you’re doing manual subscriptions or automatic. Remember that handle_events returns :noreply, an empty list, and the state. For producer_consumers, that list is the events being passed along, but for consumers it’s always empty.

  def handle_subscribe(:producer, _opts, from, _state), do: {:manual, from}

This is the simplest form of handle_subscribe ever. I return a tuple starting with the atom :manual . This sets the context for this GenStage process. The tuple also contains the new state. I’m storing the from argument as state. It’s (currently) the usual {pid, ref}, process identifier and communication reference that you’d expect from GenStage (and GenServer).

  def handle_cast({:ask, number}, state) do
GenStage.ask(state, number)
{:noreply, [], state}
end

And here we are, asking the producer for number events. Remember our handle_subscribe set state to the from value it got in its arguments. That tuple now represents that particular subscription.

We could subscribe this consumer to multiple producers, or to the same producer multiple times (though I’ve not tried this). We need this from tuple to identify not just the producer process, but the specific subscription we’re asking through. That said, look at that call. GenStage.ask({pid, ref}, demand) That is dead-simple. Thanks again, GenStage!

{:noreply, [], state}

In GenStage, a handle_cast returns :noreply, a list of events to add to the producer queue, and the state. That event list can be empty or contain events if you’re working with a producer or producer_consumer. It must be empty, if you’re dealing with a consumer.

handle_call has a similar return. It’s {:reply, info_to_send_back_to_caller, events_list, state}. Again, events_list has to be empty in a consumer context.

Summary

Again, this was the most-stripped-down implementation of manual asking I could think of. I hope it’s helped you get started. For more information please check out the marvelous documentation on the subject here.

Postscript

Elixir didn’t give us a pony. I’ll include one here, along with a bit of bonus information.

I don’t actually recommend you set up a situation where you’re manually asking for a variable number of events. You can, and it works. But DemandDispatcher sets its expectations for max demand on a subscription based on the first ask that comes in. If you ask for a few events first and then a greater number of events second, it will give you a warning.

Interactive Elixir (1.3.2) - press Ctrl+C to exit (type h() ENTER for help)iex(1)> pid = Manual.go
#PID<0.116.0>
iex(2)> Manual.ask(pid, 1)
I:ok
iex(3)> Manual.ask(pid, 2)
am the:ok
iex(4)>
16:52:40.893 [warn] GenStage producer DemandDispatcher expects a maximum demand of 1. Using different maximum demands will overload greedy consumers. Got demand for 2 events from #PID<0.116.0>

Big thanks to José Valim (and other contributers) for writing GenStage. And thanks to folks with examples up already.

--

--