GenStage: My Alternative To Certain Parallel Stages

adam mokan
6 min readSep 28, 2016

--

UPDATE (2016–09–29): The following approach should not be used due to the fact that it will cause more demand to be triggered while your work is being done asynchronously. A DynamicSupervisor can be used to limit concurrency and act as a GenStage consumer. See the detailed response from José Valim.

I apologize to anyone who followed this path, as I was just really excited that it ‘functioned’ without resorting to the manual subscription mode, but that perceived functionality comes at a cost. I will update with a link to a future post related to this once I have a new solution in place.

Ever since this post first hit the elixir-lang website, I have been a bit obsessed with the idea of GenStage and finding ways to use it in my projects at work. If you’re unfamiliar with GenStage, check out the intro blog post, the documentation, or start at the 17 minute mark of this video.

What I’ve noticed as I’ve used GenStage more often, is that some patterns have emerged to handle scenarios I encounter on a regular basis. Below is one of them involving processing parallel events. Maybe I’m doing it wrong and I should not be using GenStage the way I am? So take these notes with a grain of salt and also understand that the GenStage API is still under development, so things could change in future releases.

I am also assuming you are somewhat familiar with GenStage as we move ahead. If not, I encourage you to check the recent keynote video from ElixirConf or look at the blog post introducing it. Both links are mentioned above.

Code can be found below if you do not feel like reading.

Multiple Parallel Stages vs Single Worker Pool Stage

I process a lot of data and have to do much of this processing concurrently to meet deadlines each day. This concurrency has traditionally been done at the hardware level prior to using Elixir/Erlang.

Before using GenStage, I found myself using a lot of pooling techniques to limit the concurrency on a node. Not so much because the VM couldn’t handle the work, but because external systems I need to integrate with became overwhelmed with the load. Worker pools are a logical step in the right direction. Having said that, it can be tricky to manage the queue(s) feeding these pools to make sure you have just enough data on-deck to keep the workers running consistently at all times. Basically, I do not want idle worker processes but at the same time I do not want to load my entire workload into RAM on a single node or even across connected nodes. I think this is a fairly common scenario.

How can GenStage help this specific scenario? I can build a :producer stage that integrates with external systems to provide a source of work and process this work with :producer_consumer, and finally a :consumer stage. These stages are tied together with what amount to simple message contracts. GenStage will also give me the inherent back-pressure feature to make sure that I’m pulling the amount of work into my pipeline that my processing can safely deal with at that given time. If something has slowed down during a middle stage or an external resource later in my pipeline is down, the whole system will slow itself down to accommodate. This works very, very well out of the box.

Now let us assume that there is a long-running portion of the pipeline I need to run in parallel. These are just multiple processes of the same stage to get more throughput. That idea in itself is not a difficult to work through here, but how do I handle it at scale? Do I start up multiple GenStage procs to handle this data source? If I only need a couple processes, this is probably the best way to handle it. What if I need 100 processes? 200? 1000? I’m not sure. I’m guessing it would work, but it just didn’t feel right to me.

A very simple example of parallel :producer_consumer stages for processing. (B) could be scaled to N.

In the diagram above, (a) represents a :producer stage pulling data from an external data source, the (b) stages are multiple processes of a :producer_consumer stage, and (c) is a final :consumer stage. The arrows show the direction of data flow, but GenStage works on demand so the actual demand is performed from right to left. This means that (c) will need to be subscribed to all of the (b) processes. You can do this manually or use something like a comprehension to programmatically bind the consumer stage to a bunch of producers dynamically and it works alright.

But looking at the example above, which is clearly a simple case, you could imagine this becomes much more difficult when faced with 50 instances of stage (b) and other scenarios in your processing pipeline with similar needs. Imagine implementing this across 15–20 stages from right to left and multiple portions needing to go highly parallel. It became difficult for me to manage, honestly.

Here is what I’ve come up with.

The same scenario as above, but with a worker pool fed by stage (B).

The end result is the same, but now I have a much simpler demand contract between each stage. I can also use config settings to manage the size and overflow of the worker pool. This approach is possible due to the very cool way the folks that worked on GenStage piggybacked on the GenServer behavior. In particular the callbacks for handle_cast/2 and handle_call/3.

The flow goes like this:

  • (a) sends requested demand to (b)
  • The callback handle_events/3 in (b) fires and the list of events is available for action.
  • Rather than processing the items in this list, I store the data into the state of (b) and return an empty list. This will mean (c) receives no data at this time. Update: By returning the empty list to (c), you will cause it to trigger for demand again. This may cause issues with certain types of producers.
  • Now that I have a list of work to perform stored in the state of (b), I can begin pushing that work to the worker pool.
  • When each process of the worker pool finishes a single item in the list, it performs a call against (b) with the results. Note that these pool processes are asynchronous in nature, so there is no guarantee of the ordering. Basically do not expect FIFO processing on (b).
  • (c) receives each item as it is completed by the worker pool that (b) is managing.

What do I gain here? Well, I get that automatic management of the GenStage pipeline to throttle how much data I am feeding into my worker pools and other stages upstream or downstream. This has been really nice in scenarios where I have the need to use a worker pool strategy because it almost always involves some external data for me. If you don’t have a concern about pooling, limiting concurrency, or you just have very short-running operations you want to conduct in parallel - you could simply use a Task/Task.Supervisor approach to perform the work and return the results similar to what I have done here. I am sure there will be cases where I do that as well.

I’m not sure that what I have come up with here is news to anyone working with GenStage today, but I hope that I may have given you an idea of how to handle this case if you are wondering if GenStage is right for your project or not.

Here is an simple example of the code described in the second version of stage (b).

--

--

adam mokan

Husband and father. Director of Engineering @ HiringSolved. Functional programming fan. Electronics guy. Music nerd. Habitual button-pusher. From Michigan.