Implementing GenStages Under Umbrella
GenStage Under Umbrella — Part 3
With the tests in place, we can start the actual implementation of our application. We will implement two parallel GenStages information flows. They closely follow the tests we wrote in the last article.
First, we need to add GenStage as a dependency. We will put it in the
:gen_stage, “~> 0.12”. Then we start with the producer.
We use a basic GenStage producer implementation, heavily inspired by the official docs. As said before, the main topic of the article is not the GenStage itself. But using it as a way of communication between Umbrella Apps. This is why we do not insist on optimizations, demand management, etc. We let the default GenStage implementation to handle the things for us.
We start the server with
:no_state, as we will not need one for our demo app. The
receive_info/1 function will be the entry point to our Receive GenStage. When we receive a new stock market message, the
handle_call callback will either:
- pass it to the
Converter.ReceiveProducerConsumer, if there is any demand
- or queue it in the GenStage buffer, waiting for demand
Do not forget to start the server in the supervision tree. We will repeat this step for all GenStages we create in our example, so I will not post this step again in the article.
I’m using Elixir 1.5 for our example. Please check Streamlined Child Specs if you are not familiar with the syntax above.
The implementation of
UsaMarket.ReceiveProducer is similar to
GerMarket, so we skip it. In the next (and last) article of the series, once we will put everything in place, you will find the link to the Github repository, with the full code of the demo.
Converter will ask the producers for stock market information (events).
The code is quite simple. We receive events, and we map them to GBP, using pattern matching to find USD and EUR.
If you used GenStage before, you are right to ask why we do not subscribe the producer_consumer to the producers in the
subscribe_to. In a “monolith” application you can control the start order of the GenStage processes. Not in a “flat umbrella”. You cannot guarantee that the
GerMarket.ReceiveProducer starts before
Converter.ReceiveProducerConsumer tries to subscribe to it. That’s why we manually subscribe the consumers to the producers in the tests. We will come back to this issue in the next article. We will see how we can handle the subscriptions automatically.
The last stage of the receive flow is the
ReceiveConsumer asks the
Converter.ReceiveProducerConsumer for events. When it receives any, it calls the
Shared.Interface.process_info/2. As discussed in the last article, we implemented this Interface, just to send the received info to the Test process and to be able to assert them.
At this point, the
ReceiveInfoTest will pass. We have a fully functional GenStage communication between different apps in the umbrella.
The send information flow is very similar to the receive, from the implementation point of view. The only major difference is the fact that we have two information consumers this time.
MyUkApp will have the producer role.
Nothing new here. The implementation is similar to the other producer above.
This time the Converter will exchange GBP info to USD and EUR.
dispatcher: GenStage.BroadcastDispatcher option. This ensures that the
SendProducerConsumer will send the events to all the subscribed consumers. Not only to the first that issued the demand.
For each message received from the producer, the
Converter will create two of them, one for each converted currency.
The consumer again is identical to the one above, so no point to insist on this code. The
UsaMarket.SendConsumer is similar as well.
SendInfoTest will pass and we have a fully working 2-way communication between “flat umbrella” apps.
One thing remains to be done: automatically handle the subscriptions between consumers and producers. We will take care of this in the next article.