Implementing AMQP in an Elixir Application
Here at Casper, we use a service-oriented architecture to separate concerns around placing orders, fulfilling orders, and sending transactional notifications. Much of this system relies on AMQP (specifically RabbitMQ) as the protocol for communication between these services.
In our Ruby on Rails applications, we use a gem called Hutch to structure this implementation. Hutch is an opinionated framework built around Bunny, the RabbitMQ Ruby client. In a lot of ways, Hutch made the AMQP setup in our Rails applications painless by taking out some of the guesswork. Hutch uses topic exchanges and “makes some assumptions about how consumers and publishers should work.” One of our services, however, is an Elixir application that is used to send transactional emails and SMS messages. Here, we use the AMQP package as our RabbitMQ client. Since there was no opinionated wrapper equivalent to Hutch available in Elixir, we had some design decisions to make when first diving into the process of setting this up.
Today, I’d like to talk about how we structured our AMQP implementation in Elixir by leveraging a supervision tree to ensure a fault-tolerant system. We first needed to find a way of establishing and maintaining the connection to RabbitMQ and monitoring the consumers so that they could be restarted if a process crashed. The problem we faced was how to do this without creating a new connection for each consumer, and without creating a new channel (if the existing channel was not terminated when the process died).
We decided to use a supervision tree to:
- Monitor our consumers and restart them when necessary
- Establish and retrieve the connection as needed
- Ensure that a consumer is able to recover the existing channel, or retrieve a new channel.
- Maintain a mapping of opened channels to consumers, so that if a consumer process dies and restarts, it is given the same channel used previously by that consumer.
Let’s take a look at each of these points in more detail.
Monitoring the Consumers
A supervision tree is basically made up of “supervisors that supervise other supervisors.” Our supervision tree is represented in the diagram below:
Upon bringing up our application, the main application supervisor
MyApp.Supervisor starts, starting
MyApp.AMQPConnectionManager— which is also a supervisor. A supervisor is a process that monitors child processes, and is used to build the hierarchical structure known as a supervision tree when developing fault-tolerant systems in Elixir. For a more detailed look at supervisors, great documentation can be found here.
Our connection manager creates a linked supervisor process
MyApp.ConsumerSupervisor, which then starts and monitors our consumers. Links establish a relationship between two processes such that, in the event that the supervised process fails, a new one is started in its place.
We next define the consumers as workers in the init function:
We then pass them as children to
Supervisor.start_link(), which calls
start_link for each of our consumers (line 23 in the example above). The connection manager is now supervising these consumer processes, and will restart them if they should crash.
Establishing the AMQP Connection
Great! Our consumer processes are now running and supervised. But what about our connection to RabbitMQ? Fortunately, the connection manager also handles establishing a new connection. As the last part of the init function,
establish_new_connection() is called. This is defined below:
If the connection is opened successfully, we create a link between the processes. In the event that the connection manager dies, the connection itself will be terminated. And, if the connection dies, the connection manager will also be killed. Since the connection manager is a supervised process, we can rely on it to be restarted and subsequently reestablish the connection.
Retrieving the Connection and Managing Channels
We need to ensure that our consumers are always created with a channel containing the existing RabbitMQ connection, instead of creating a new channel each time that we need to restart a consumer process. In the
init() function of
MyApp.Consumer, we request a new channel from the connection manager:
request_channel() is defined below. You will notice that it casts an asynchronous
:chan_request message which is handled by the
handle_cast() function on line 9. This is so that the consumer will not expect a response in the event that the connection manager does not yet have a connection ready in order to create the channel (i.e. if the connection itself has died and is being reestablished). Instead, the consumer will wait for the connection manager to return the channel when ready.
Maintaining the Consumer/ Channel Mapping
Let’s break down the function on line 9 above. A new mapping is created by the
store_channel_mapping() function on line 17.
Map.put_new_lazy will only create a new channel if the
consumer does not already exist in the mapping. This is to avoid creating multiple channels for a given consumer. We then get the channel for the requesting consumer on line 11, and return it to the consumer by calling
This is then handled back in the requesting consumer:
The remainder of our consumer implementation is fairly similar to the setup outlined in the AMQP package documentation here: https://github.com/pma/amqp. With the use of our supervision tree, Casper customers can rely on receiving email and SMS messages for their orders even in the event of a connection, channel, or consumer failure.
If you are currently using AMQP in your Elixir application, or if you’re interested in learning more about the way we use AMQP at Casper, we’d love to hear it! And you can learn more about joining the Casper Tech Team here.