Implementing GenStages Under Umbrella

GenStage Under Umbrella — Part 3

Sep 29, 2017 · 3 min read

With the tests in place, we can start the actual implementation of our application. We will implement two parallel GenStages information flows. They closely follow the tests we wrote in the last article.

Receive GenStages



We use a basic GenStage producer implementation, heavily inspired by the official docs. As said before, the main topic of the article is not the GenStage itself. But using it as a way of communication between Umbrella Apps. This is why we do not insist on optimizations, demand management, etc. We let the default GenStage implementation to handle the things for us.

We start the server with :no_state, as we will not need one for our demo app. The receive_info/1 function will be the entry point to our Receive GenStage. When we receive a new stock market message, the handle_call callback will either:

  • pass it to the Converter.ReceiveProducerConsumer, if there is any demand
  • or queue it in the GenStage buffer, waiting for demand

Do not forget to start the server in the supervision tree. We will repeat this step for all GenStages we create in our example, so I will not post this step again in the article.


I’m using Elixir 1.5 for our example. Please check Streamlined Child Specs if you are not familiar with the syntax above.

The implementation of UsaMarket.ReceiveProducer is similar to GerMarket, so we skip it. In the next (and last) article of the series, once we will put everything in place, you will find the link to the Github repository, with the full code of the demo.



The code is quite simple. We receive events, and we map them to GBP, using pattern matching to find USD and EUR.

If you used GenStage before, you are right to ask why we do not subscribe the producer_consumer to the producers in the init/1, using subscribe_to. In a “monolith” application you can control the start order of the GenStage processes. Not in a “flat umbrella”. You cannot guarantee that the GerMarket.ReceiveProducer starts before Converter.ReceiveProducerConsumer tries to subscribe to it. That’s why we manually subscribe the consumers to the producers in the tests. We will come back to this issue in the next article. We will see how we can handle the subscriptions automatically.



The ReceiveConsumer asks the Converter.ReceiveProducerConsumer for events. When it receives any, it calls the Shared.Interface.process_info/2. As discussed in the last article, we implemented this Interface, just to send the received info to the Test process and to be able to assert them.

At this point, the ReceiveInfoTest will pass. We have a fully functional GenStage communication between different apps in the umbrella.

Send GenStage



Nothing new here. The implementation is similar to the other producer above.


Note the dispatcher: GenStage.BroadcastDispatcher option. This ensures that the SendProducerConsumer will send the events to all the subscribed consumers. Not only to the first that issued the demand.

For each message received from the producer, the Converter will create two of them, one for each converted currency.


The SendInfoTest will pass and we have a fully working 2-way communication between “flat umbrella” apps.

One thing remains to be done: automatically handle the subscriptions between consumers and producers. We will take care of this in the next article.


Written by


elixir dev | | @iac0bs0n