Building an Asynchronous Actor Model in Rust using Tokio

v166ne
4 min readApr 20, 2024

Rust is a powerful language known for its performance, safety, and concurrency features. Tokio, one of the most popular asynchronous runtimes in Rust, provides a powerful foundation for building highly concurrent applications. In this article, we’ll explore how to leverage Tokio to implement an actor model — a popular concurrency pattern — where actors communicate asynchronously via message passing.

The actor model is a concurrency model that treats actors as fundamental units of computation. Each actor has its own state and communicates exclusively through asynchronous message passing. This model provides a structured approach to concurrency, avoiding shared mutable state and the associated complexities.

In Rust, Tokio provides the necessary abstractions for building asynchronous applications. Let’s dive into a practical example of implementing actors using Tokio:

// Rust code snippet
use tokio::signal;
use tokio::sync::{mpsc, oneshot};
use tokio::time::{self, Duration};

// Define the types of commands that can be sent to the actor
enum Command {
Add {
x: i32,
y: i32,
result_sender: oneshot::Sender<i32>,
},
Concatenate {
x: String,
y: String,
result_sender: oneshot::Sender<Result<String, ()>>,
},
}

// Define the actor struct
struct Actor {
receiver: mpsc::Receiver<Command>,
name: String,
}

impl Actor {
// Process incoming commands asynchronously
async fn process(mut self) {
while let Some(command) = self.receiver.recv().await {
match command {
Command::Add { x, y, result_sender } => {
let _ = result_sender.send(x + y); // Send the result of addition
}
Command::Concatenate { x, y, result_sender } => {
let result = if x.is_ascii() && y.is_ascii() {
Ok(format!("{}{}", x, y)) // Concatenate strings if both are ASCII
} else {
Err(()) // Return error if either string is non-ASCII
};
let _ = result_sender.send(result); // Send the result of concatenation
}
}
}
}

// Access the actor's name
fn name(&self) -> &str {
&self.name
}
}

// Define the proxy struct for interacting with the actor
#[derive(Clone)]
struct Proxy {
sender: mpsc::Sender<Command>,
}

impl Proxy {
// Send an addition command to the actor and await the result
async fn add(&self, x: i32, y: i32) -> Result<i32, tokio::sync::mpsc::error::SendError<Command>> {
let (result_sender, result_receiver) = oneshot::channel();
self.sender.send(Command::Add { x, y, result_sender }).await?;
Ok(result_receiver.await.unwrap_or_else(|_| panic!("Failed to receive result from actor")))
}

// Send a concatenation command to the actor and await the result
async fn concatenate(&self, x: String, y: String) -> Result<String, tokio::sync::mpsc::error::SendError<Command>> {
let (result_sender, result_receiver) = oneshot::channel();
self.sender.send(Command::Concatenate { x, y, result_sender }).await?;
let result = result_receiver.await.unwrap_or_else(|_| panic!("Failed to receive result from actor"));
match result {
Ok(value) => Ok(value),
Err(_) => panic!("Failed to concatenate"),
}
}
}

// Initialize the actor and its corresponding proxy
fn init_actor_proxy(name: String, size: usize) -> (Actor, Proxy) {
let (sender, receiver) = mpsc::channel(size);
let actor = Actor { receiver, name: name.clone() };
let proxy = Proxy { sender };
(actor, proxy)
}

#[tokio::main]
async fn main() {
// Initialize the actor and its proxy
let (actor, proxy) = init_actor_proxy("MyActor".into(), 128);

// Spawn the actor's processing task
tokio::spawn(actor.process());

// Send some requests and evaluate responses
assert_eq!(proxy.add(1, 2).await.unwrap(), 3);
assert_eq!(
proxy.concatenate(String::from("foo"), String::from("bar")).await.unwrap(),
String::from("foobar")
);

// Spawn a separate task to demonstrate concurrent message passing
let proxy2 = proxy.clone();
tokio::spawn(async move {
for i in 0..5 {
println!("cool {}", i);
// Sending messages from the main task to task_one
tokio::time::sleep(Duration::from_secs(3)).await;
}
println!("proxy.add(10, 5).await.unwrap() {}", proxy2.add(10, 5).await.unwrap());
});

// Wait for Ctrl+C signal to gracefully exit
let ctrl_c = signal::ctrl_c();
println!("Press Ctrl+C to exit...");
ctrl_c.await.expect("Ctrl+C signal failed");
println!("Ctrl+C received. Exiting...");
}

Understanding the Code

1. Command Enumeration
The Command enum defines the types of commands that can be sent to the actor. It has two variants:

Add: Represents an addition operation with two integers (x and y) and a oneshot channel (result_sender) to send the result back to the sender.
Concatenate: Represents a string concatenation operation with two strings (x and y) and a oneshot channel (result_sender) to send the result back to the sender.

2. Actor Struct
The Actor struct represents an actor. It contains:

receiver: A multi-producer, single-consumer (mpsc) channel receiver through which the actor receives commands.
name: A string representing the name of the actor.
The impl Actor block contains the implementation of the actor. The process method processes incoming commands asynchronously. It continuously receives commands from the receiver channel and executes the corresponding actions based on the received command.

3. Proxy Struct
The Proxy struct serves as a proxy for interacting with the actor. It contains:

sender: An mpsc channel sender through which commands are sent to the actor.
The impl Proxy block contains methods to interact with the actor:

add: Sends an addition command to the actor and awaits the result.
concatenate: Sends a concatenation command to the actor and awaits the result.

4. Initialization Function
The init_actor_proxy function initializes the actor and its corresponding proxy. It takes a name for the actor and the size of the channel buffer as input parameters. It returns a tuple containing the initialized actor and proxy.

5. Main Function
The main function is the entry point of the program. It performs the following steps:

Initializes the actor and its proxy using the init_actor_proxy function.
Spawns the actor’s processing task using tokio::spawn.
Sends some requests to the actor via the proxy and evaluates the responses.
Spawns a separate task to demonstrate concurrent message passing.
Waits for the Ctrl+C signal to gracefully exit the program.

Overall, the code demonstrates how to implement an actor model in Rust using Tokio, allowing for asynchronous message passing between actors. The actor model provides a structured approach to concurrency, enabling clear separation of concerns and minimizing shared mutable state, which contributes to safer and more manageable concurrent applications.

--

--