[Rust] Event-Driven Programming with Rust and RabbitMQ using Crosstown Bus [Updated]

Paolo Posso
Geek Culture
Published in
5 min readApr 10, 2022
Photo by Jonathan Borba on Unsplash

Event-driven design pattern helps us to create distributed applications which are execute asynchronously and scalable separately.

When you have data being processed in a service, for example, an API, and you want this process to trigger a different and independent process (or more processes) in a asynchronous way, you can trigger them using a message broker such as RabbitMQ.

Crosstown Bus makes it easier to use RabbitMQ with this purpose, but providing a SDK where you can connect to RabbitMQ with the bare minimum configuration, allowing you to have multiple publishers and multiple subscriber, which can, for its turn, subscribe to one or more events using strongly typed objects.

Check Crosstown Bus documentation here.

Let’s get our hands dirty to understand this better.

Pre-requisites:

  • Rust
  • VS Code (or your favorite IDE / Editor)
  • RabbitMQ — I’m using docker to run it locally

In your terminal create a folder and, right after, two Rust projects inside of it:

mkdir crosstown_poccd crosstown_poccargo new --bin publishercargo new --bin subscriber

Subscriber

First let’s create the subscriber. For that, open your editor in the subscriber folder.

Edit the Cargo.toml file to add crosstown_bus dependency. Borsh is also needed for seralizing, desserializing our objects.

The import below is an example. Please adjust it to the current version.

crosstown_bus = "1.0.1"
borsh = "1.4.0"
borsh-derive = "1.4.0"

Every time the publisher, that we are gonna write right away, publishes a message (which in this case is a CustomerCreatedEvent), the handler function will receive the published message. Notice that this event struct must derive BorshSerialize and BorshDeserialize.

Let’s then create the struct that represents the event to be published.

#[derive(Debug, Clone, BorshDeserialize, BorshSerialize)]
pub struct UserCreatedMessage {
pub user_id: String,
pub user_name: String,
pub user_email: String,
}

I’m also adding Debug so we are able to print the object if we want to.

Now let’s call the function to subscribe the event. For that we will:

  1. Create a Subscriber object, using the new_queue_subscriber method.
  2. Call the method to listen to the event.
let subscriber = CrosstownBus::new_subscriber("amqp://guest:guest@localhost:5672".to_owned())?;subscriber.subscribe(

subscriber.subscribe(
"user_created".to_owned(),
NotifyUserHandler::new(received_messages.clone()),
QueueProperties {
auto_delete: false,
durable: false,
use_dead_letter: true,
consume_queue_name: Some("queue2".to_string()),
},
)?;

Notice that the handler implementation must be passed as parameter. In this case, UserCreatedHandler will receive the message and handle it. It must implement the MessageHandler from CrosstownBus.

The handler in our example will be defined like this:

pub struct UserCreatedHandler;

impl MessageHandler<UserCreatedEventMessage> for UserCreatedHandler {
fn handle(&self, message: Box<UserCreatedEventMessage>) -> Result<(), HandleError> {
println!("Message received on handler 1: {:?}", message);
Ok(())
}
}

CrosstownBus now also supports QueueProperties, where you can specify configurations such as auto delete and a dead letter policy.

Full code for the Subscriber’s main file:

use borsh::{BorshDeserialize, BorshSerialize};
use crosstown_bus::{CrosstownBus, MessageHandler, HandleError};


#[derive(Debug, Clone, BorshDeserialize, BorshSerialize)]
pub struct UserCreatedMessage {
pub user_id: String,
pub user_name: String,
pub user_email: String,
}

pub struct UserCreatedHandler;

impl MessageHandler<UserCreatedEventMessage> for UserCreatedHandler {
fn handle(&self, message: Box<UserCreatedEventMessage>
) -> Result<(), HandleError> {
println!("Message received: {:?}", message);
Ok(())
}
}

fn main() {
let listener = CrosstownBus::new_queue_listener("amqp://guest:guest@localhost:5672".to_owned()).unwrap();
_ = listener.listen("user_created".to_owned(), UserCreatedHandler{}, crosstown_bus::QueueProperties { auto_delete: false, durable: false, use_dead_letter: true });

loop {
}
}

Publisher

First, navigate to the Publisher project folder, created previously, and open your editor (preferably a second window of it).

Add the dependencies to the .toml file. The same ones we used in the Subscriber.

Now, in the main.rs file, let’s create the same struct we created in the Subscriber project. This way we will have the same type being sent and received.


#[derive(Debug, Clone, BorshDeserialize, BorshSerialize)]
pub struct UserCreatedEventMessage {
pub user_id: String,
pub user_name: String
}

Side note: In a real-world application you would probably have a package or another mechanism to share this struct not to have to duplicate this code.

Now we have to create the publisher object.

After that, let’s call the publish method to publish multiple messages.

let mut publisher = CrosstownBus::new_publisher("amqp://guest:guest@localhost:5672".to_owned())?;

_ = publisher.publish("notify_user".to_owned(),
UserCreatedMessage {
user_id: "asdf".to_owned(),
user_name: "Billy Gibbons".to_owned(),
email: "bg@test.com".to_owned()
});

The type of the message is the same one the Listener is expecting and the event name is also the same.

For the last message, notice that I mistyped the event name. This message will be sent to a different RabbitMQ exchange and will not be received by our Listener.

Testing our Application

For that we will need:

  1. Start RabbiMQ
  2. Start Subscriber
  3. Start Publisher

Start RabbitMQ

In my case I’m running it on Docker.

Open you RabbitMQ Admin on your browser. Mine is running on localhost:15672.

RabbitMQ Admin

Start Subscriber

Navigate to the subscriber folder and run the following command:

cargo run
Subscriber program being executed

Start Publisher

In a different command / terminal window, run cargo run inside of the publisher program folder.

Publisher executed

Checking the result

Go back to the publisher terminal window.

   Running `target/debug/subscriber`
Message received: UserCreatedEventMessage { user_id: "5", user_name: "Zer0" }
Message received: UserCreatedEventMessage { user_id: "90", user_name: "Salvador" }
Message received: UserCreatedEventMessage { user_id: "8", user_name: "Maya" }
Message received: UserCreatedEventMessage { user_id: "7", user_name: "Rhys" }

Check it out! Both events were sent and were received by both subscribers.

And this is all!

See the full code here at my github: GitHub — paoloposso/crosstown_bus_client_poc.

Thank you and see you on the next one!

--

--