Channels in Rust. Part 1

Serhij S.
8 min readMar 16, 2024

--

Rust has got a powerful feature called channels. Channels are a way to send data between threads. You can think of a channel as a pipe: one end of the pipe sends data, and the other end receives it.

This is the first part of the article where I will explain basic concepts: what channels are and typical channel programming patterns.

Few words about the author

I am using Rust in mission-critical production since 2019, including both high-performance and real-time applications. Our company is called Bohemia Automation, it provides solutions for industrial IoT projects, mostly in high-energy and other heavy industry sectors.

What are channels?

Channels exist in many programming languages. As already mentioned, virtually a channel is a pipe. It has two ends: a producer and a consumer. The producer sends data into the channel, and the consumer receives it. The data is usually sent in a FIFO (first-in, first-out) order. Some channel implementations can have multiple producers or consumers.

From the technical point of view, a channel is a data buffer with a lock and methods to put and obtain data behind it. The lock is used to synchronize access to the buffer.

When there are no consumers left, producers are refused to send data. When there are no producers left, consumers are allowed to obtain data until the buffer is empty. After, a channel is considered to be “closed”.

Popular channel implementations:

  • Standard Rust channels. Part of the standard Rust library. Well tested and reliable, versatile, acceptable for most cases.
  • Crossbeam channels. Perfect for high-performance applications.
  • Flume. A high-performance channel implementation, similar to Crossbeam. It has got both methods for synchronous and asynchronous communication.
  • Tokio channels. Part of the Tokio library. Designed for asynchronous applications.
  • async_channel. A high-performance asynchronous channel implementation.
  • Specialized channels for specific use cases (for example real-time communication, embedded systems, etc.). We will talk about them in the part two. An example can be my RoboPLC project which is focused on real-time application development.

One-shot channels

A one-shot channel is a channel that can only be used to send a single value. It has got a different programming implementation and is more likely a one-time signal. As one-shot channels have got different purposes, we will not focus on them in this article, unless I would have a chance to write Part 3.

Typical channel programming patterns

Commons

Channels can be either bounded or unbounded.

  • Bounded channels have a limited buffer capacity and producers are either blocked or refused to send data when the buffer is full. This is the safest way of using channels however it can lead to deadlocks if not used properly.
  • Unbounded channels have an unlimited buffer capacity. Producers are never blocked and can always send data. This can lead to memory exhaustion if a producer is faster than a consumer. Should be always used with caution.

In the majority of implementations channels are split after creation into “Sender” instance with methods “send” (blocking) and “try_send” (non-blocking). And “Receiver” instance with methods “recv” (blocking) and “try_recv” (non-blocking). The receiver usually can be also used as a blocking iterator.

Producer-consumer

A simple pattern where one or multiple producers send data to a single consumer. Good for simple parallel processing. Should be used with caution in large projects because of its flexibility which can lead to unclear project architecture.

use std::sync::mpsc;
use std::thread;

fn main() {
let (tx, rx) = mpsc::channel();
// Producer
let producer = thread::spawn(move || {
let data = vec![1, 2, 3, 4, 5];
for x in data {
tx.send(x).unwrap();
println!("Sent {}", x);
}
});
// Consumer
let consumer = thread::spawn(move || {
for received in rx {
println!("Received {}", received);
}
});
producer.join().unwrap();
consumer.join().unwrap();
}

Pipeline

A pattern where multiple threads are connected in a chain. Useful in certain cases:

  • Data processing is divided into multiple stages.
  • Conversion between different channel implementations.
  • Conversion between unbounded and bounded channels (e.g. to prevent memory exhaustion, unprocessed data is dropped or processed in a different way).
use std::sync::mpsc;
use std::thread;

fn main() {
// Let us use a scope to avoid having to join all the threads
thread::scope(|scope| {
let (tx, rx) = mpsc::channel();
let (tx2, rx2) = mpsc::channel();
// Stage 1
scope.spawn(move || {
let data = vec![1, 2, 3, 4, 5];
for x in data {
tx.send(x * x).unwrap(); // Squares each number
}
});
// Stage 2
scope.spawn(move || {
for x in rx {
tx2.send(x + 1).unwrap(); // Increments each number
}
});
// Final output
scope.spawn(move || {
for y in rx2 {
println!("{}", y); // Prints result
}
});
});
}

Worker pool

A pattern where multiple workers are processing data from a single channel. A typical pattern to parallelize data processing where all workers are doing the same job.

use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::{mem, thread};

struct Worker {
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Option<i32>>>>) -> Self {
let thread = thread::spawn(move || loop {
let Some(job) = receiver.lock().unwrap().recv().unwrap() else {
println!("worker {} exited", id);
break;
};
println!("Worker {} received job {}", id, job);
//thread::sleep(std::time::Duration::from_secs(1));
});
Self { thread }
}
}
struct WorkerPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Option<i32>>,
}
impl WorkerPool {
fn new(size: usize) -> Self {
let (sender, receiver) = mpsc::channel();
// The standard Rust library does not provide multi-consumer channels
// (by the moment this article is published). In other implementations,
// a receiver is usually allowed to be cloned so no Arc+Mutex
// is requied.
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver.clone()));
}
Self { workers, sender }
}
fn execute(&self, job: i32) {
self.sender.send(Some(job)).unwrap();
}
}
impl Drop for WorkerPool {
fn drop(&mut self) {
// send all workers a signal to exit
for _ in 0..self.workers.len() {
self.sender.send(None).unwrap();
}
// wait for all workers to exit
for worker in mem::take(&mut self.workers) {
worker.thread.join().unwrap();
}
}
}
fn main() {
let pool = WorkerPool::new(4);
for job in 0..8 {
pool.execute(job);
}
}

Pub-sub (Data hub)

A pattern where consumers are subscribed to a channel, usually via a “hub” object that manages subscriptions.

The hub is usually a passive instance that does not process data. When a producer call its methods, pub/sub logic is processed in the producer’s thread.

Pros:

  • A clear and flexible project architecture.
  • Subscriptions can be managed in a centralized way and may have additional logic, e.g. conditions.

Cons:

  • Harder to implement.
  • The hub can become a bottleneck if not implemented properly.
  • Messages usually need to implement “Clone” trait.
use std::collections::BTreeMap;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{self, Receiver, SyncSender};
use std::sync::{Arc, Mutex};
use std::thread;

type SubscriberId = usize;
struct PubSub<T> {
subscribers: Arc<Mutex<BTreeMap<SubscriberId, SyncSender<T>>>>,
next_id: AtomicUsize,
}
impl<T: Send + 'static> PubSub<T> {
fn new() -> Self {
Self {
subscribers: Arc::new(Mutex::new(BTreeMap::new())),
next_id: <_>::default(),
}
}
fn subscribe(&self) -> (SubscriberId, Receiver<T>) {
let (tx, rx) = mpsc::sync_channel(512);
let mut subscribers = self.subscribers.lock().unwrap();
let id = self
.next_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
subscribers.insert(id, tx);
(id, rx)
}
fn unsubscribe(&self, id: SubscriberId) {
let mut subscribers = self.subscribers.lock().unwrap();
subscribers.remove(&id);
}
// an example of wrong implementation: deadlocks the whole hub
// if any channel is full
fn publish_deadlocking(&self, message: T)
where
T: Clone,
{
let subscribers = self.subscribers.lock().unwrap();
for tx in subscribers.values() {
tx.send(message.clone()).unwrap();
}
}
// an example of correct implementation: subscribers are cloned before
// sending and the lock is released instantly. may lead to tiny overhead
// with cloning and data-races when a subscriber may receive a message
// after it has been unsubscribed, however the deadlock-free benefits
// fully compensate for all of that
fn publish(&self, message: T)
where
T: Clone,
{
let subscribers = self.subscribers.lock().unwrap().clone();
for tx in subscribers.values() {
// ignore error if a consumer has been already unsubscribed
// and dropped
let _ = tx.send(message.clone());
}
}
}
fn main() {
let hub = PubSub::new();
let (sub_id, rx) = hub.subscribe();
let handle = thread::spawn(move || {
for received in rx {
println!("Received: {:?}", received);
}
});
hub.publish("Hello, world!");
// In production code, subscribers should unsubscribe themselves
// automatically, by implementing Drop trait
hub.unsubscribe(sub_id);
handle.join().unwrap();
}

Fan-in / Fan-out

A pattern where producers are sending data to a single channel (fan-in). After that, a single or multiple consumers collect data from the channel (fan-out).

Good for parallel processing and data aggregation for a single operation. The channel is used as a common result storage. Probably the only pattern where unbounded channels are always safe to use as the number of tasks is strictly defined.

A consumer can either wait for all producers to finish or start receiving data immediately. After the producers are finished, their threads are stopped.

use std::sync::mpsc;
use std::thread;

fn main() {
let (tx, rx) = mpsc::channel();
// Fan-in: Distributing work to multiple workers
for i in 0..5 {
let tx_clone = tx.clone();
thread::spawn(move || {
tx_clone.send(i * 2).unwrap(); // Each worker sends a message
});
}
drop(tx); // Close the original sender
// Fan-out: Collecting results from multiple workers
for received in rx {
println!("Received {}", received);
}
}

Actor model

A pattern which may look similar to workers, however it is much more sophisticated. Each worker has got an own channel to communicate with and performs a different job.

From the technical point of view, an actor is usually a complex object. More information about Actor model can be found in this Wikipedia article.

use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread;

struct Actor {
sender: Sender<Option<String>>,
receiver: Mutex<Option<Receiver<Option<String>>>>,
name: String,
quiet: bool,
handle: Mutex<Option<thread::JoinHandle<()>>>,
}
impl Actor {
fn new(name: String, quiet: bool) -> Arc<Self> {
let (tx, rx) = mpsc::channel();
Actor {
sender: tx,
// put the receiver under a Mutex to get it back when the actor
// is started
receiver: Mutex::new(Some(rx)),
name,
quiet,
handle: <_>::default(),
}
.into()
}
fn start(self: &Arc<Self>) {
let receiver = self.receiver.lock().unwrap().take().unwrap();
let me = self.clone();
let handle = thread::spawn(move || {
for message in receiver {
if let Some(msg) = message {
if !me.quiet {
println!("{} received: {}", me.name, msg);
}
} else {
break;
}
}
println!("{} stopped", me.name);
});
self.handle.lock().unwrap().replace(handle);
}
fn send(&self, message: String) {
self.sender.send(Some(message)).unwrap();
}
fn stop(&self) {
if let Some(handle) = self.handle.lock().unwrap().take() {
self.sender.send(None).unwrap();
handle.join().unwrap();
}
}
}
fn main() {
let actor1 = Actor::new("Actor1".to_string(), true);
let actor2 = Actor::new("Actor2".to_string(), false);
actor1.start();
actor2.start();
actor1.send("Hello from Main to Actor1".to_string());
actor2.send("Hello from Main to Actor2".to_string());
actor1.stop();
actor2.stop();
}

Conclusion

Channels are a powerful tool for parallel programming. They are versatile and can be used in many different ways. However, they can also be a source of many problems if not used properly.

In the next part we will focus on some technical aspects of channel implementations and the problems that can occur if not take these aspects into account.

--

--

Serhij S.

IT Engineer since 1994. In industrial automation since 2012. CTO/Co-founder of Bohemia Automation