Rust Server Side Event Client with Hyper 0.4

Martyn PS
2 min readJul 23, 2023

--

Server Side Events / Event Sources are a useful tool to avoid the overhead of constantly polling a REST API for state change.

Once the HTTP request has been made, the connection stays open until the connection is dropped by either end. In this example only data fields are processed, but the events can also be tagged with identifiers.

The following module provides a basic interface for recieving data from an SSE source:

use bytes::Bytes;
use hyper::{body::HttpBody, Request, Uri};

use tokio::net::TcpStream;
use tokio::sync::mpsc::Sender;

pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

pub struct SseClient {
pub join_handle: tokio::task::JoinHandle<Result<()>>,
}

#[derive(Debug)]
pub enum Event<T> {
Failed,
Data(T),
Shutdown,
}

impl SseClient {
pub async fn spawn(url: Uri, tx: Sender<Event<Bytes>>, timeout_ms: u64) -> Result<Self> {
let host = url.host().expect("Uri has no host");
let port = url.port_u16().unwrap_or(80);
let addr = format!("{}:{}", host, port);
let stream = TcpStream::connect(addr).await?;

let (mut sender, conn) = hyper::client::conn::handshake(stream).await?;

// Spawn the TCP connection, this will stay alive for the life of the
// connection
let conn_tx = tx.clone();
tokio::task::spawn(async move {
if let Err(_) = conn.await {
conn_tx.send(Event::Failed).await.ok();
}
conn_tx.send(Event::Shutdown).await.ok();
});

// Set the HOST header to match the request URL
let authority = url.authority().unwrap().clone();
let req = Request::builder()
.uri(url)
.header(hyper::header::HOST, authority.as_str())
.body(hyper::Body::empty())?;

// Generate a future so we can monitor for timeout on the initial connection
let work = sender.send_request(req);

// Do the timeout on getting the headers, just in case SSE source is not a HTTP server
let mut res =
match tokio::time::timeout(std::time::Duration::from_millis(timeout_ms), work).await {
Ok(result) => result?,
Err(_) => {
return Err(Box::new(tokio::io::Error::new(
tokio::io::ErrorKind::TimedOut,
"Timeout",
)))
}
};

Ok(Self {
join_handle: tokio::spawn(async move {
// Stream the body to the producer channel
while let Some(next) = res.data().await {
let chunk = next?;
tx.send(Event::Data(chunk)).await?;
}

Ok(())
}),
})
}
}

A “multiple producer single consumer” (MPSC) channel is used to consume data from the event source when needed by the main program.

For example (./src/main.rs):

use hyper::Uri;

pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

#[tokio::main]
async fn main() -> Result<()> {
// Create channel, max of 100 messages
let (tx, mut rx) = tokio::sync::mpsc::channel(100);

// Spawn a client and get some data
let sse_client = SseClient::spawn(
hyper::Uri::from_static("http://localhost:5000/sse_demo"),
tx,
2000,
)
.await
.unwrap();

// Receive 10 events
let mut count = 10;
while let Some(event) = rx.recv().await {
println!("got = {:?}", event);
count = count - 1;
if count == 0 {
break;
}
}

// Stop the client
sse_client.join_handle.abort();

Ok(())
}

(Cargo.toml)

[package]
name = "sse"
version = "0.1.0"
edition = "2021"

[dependencies]
bytes = "1.4.0"
hyper = { version = "0.14", features = ["full"] }
tokio = { version = "1", features = ["macros", "net", "rt", "rt-multi-thread", "sync", "time", "test-util"] }

That the channel will be destroyed if the event source shuts down, so the while loop will not block if the event stream is disconnected.

--

--