gRPC Logo

Streaming gRPC with Rust

Lately I’ve been working on a side project that involves aggregating the indexes of media libraries and allowing one library to “lend” a file to another. For example, I might use this system if I want my home tower to “lend” a file to my laptop before I get on a plane. I’ll be committing my work-in-progress on this project to github shortly (just want to get an MVP working first and do some refactoring), but I thought I would take a step back for a moment and share my thoughts on one aspect of this project: gRPC. In particular: gRPC streaming.

If you’re not familiar with gRPC streaming, the elevator pitch is that the full-duplex nature of HTTP/2 (which supports gRPC) allows you to simultaneously send and receive streams of messages over a connection. This has the potential to enable all kinds of new service communication patterns.

The first thing I needed to do when building out my application code for the gRPC client and server was to figure out which library or libraries to use. For Rust, that meant three main contenders:

My first exposure to gRPC and Rust was through Stepan Koltsov’s implementation. I found it pretty easy to get started, but I struggled quickly once I got into some more complicated scenarios like bi-directional streaming. His library is excellent but I felt like I needed a library with a bit more active maintenance and more robust examples.

I then tried the Tower-gRPC library. This felt far more advanced, it uses prost for its protocol buffers library, and it certainly looked like it had made trade-off decisions that favored performance. The repository itself warns you that the APIs are not yet stable and, despite having examples of streaming available, I found them nearly inscrutable. I’m not a Rust expert, but I’m not a newbie, either. I was not able to get a bi-directional streaming sample to work the way I wanted. I would say use this library at your own risk. More experienced Rust experts than myself may find this library easier to use than I did. Tower-gRPC is used as the underpinnings of the Conduit Kubernetes service mesh’s data plane.

Interestingly enough, both Stepan Koltsov’s gRPC library and PingCAP’s gRPC library use Stepan Koltsov’s protocol buffers library. Ultimately I found that the PingCAP library was mature enough, had enough maintainers, and had clear enough examples that I could muddle my way through with it. It wasn’t without its share of head-scratching moments, but a working application was worth more to me than the incomplete one I had with tower-rs.

Okay, so now that I had a bunch of smoldering piles of ruined code where my early experiments and prototypes lay, it was time to write some real-world streaming gRPC code.

The first thing I needed server-side was a struct to hold my data store and implement the trait from the gRPC generated code:

#[derive(Clone)]
pub struct AlexandriaApiServer<T>
where
T: 'static + DataStore + Send + Sync,
{
data_store: Arc<T>,
}

impl<T> AlexandriaApiServer<T>
where
T: 'static + DataStore + Send + Sync,
{
pub fn new(store: T) -> AlexandriaApiServer<T> {
AlexandriaApiServer {
data_store: Arc::new(store),
}
}
}

This is pretty basic Rust and hasn’t gotten into the gRPC stuff yet. This struct is just set up to hold anything that is thread-safe and implements my DataStore trait. It’s worth pointing out that I’m using an Arc and not a Arc<Mutex<T>> . This is because my data store is stateless, and just makes connections to Redis, so I don’t need to block it for a mutex guard. Also, I really don’t like mutex blocking this high up in an RPC call stack as that can have serious consequences on performance.

My first function that has streaming is the server function get_bindings. In my app, a binding is essentially a lending agreement between autonomous agent processes. So, if there’s a binding between my tower and my laptop for a file, the cooperating agents will ensure that a copy of the file exists on my laptop.

fn get_bindings(&self, ctx: RpcContext, req: BindingQuery,
resp: ServerStreamingSink<Binding>) {
info!("Handling get bindings request: {}", req.get_agent_id());

match self.data_store.get_bindings() {
// TODO: filter by agent
Ok
(raw_bindings) => {
let bindings: Vec<(Binding, WriteFlags)> = raw_bindings
.into_iter()
.map(|binding| (binding.into(),
WriteFlags::default()))
.collect();
let f = resp.send_all(
stream::iter_ok::<_, Error>(bindings))
.map(|_| ())
.map_err(|e| log_fail("get bindings", e));
ctx.spawn(f);
}
Err(_) => {
let f = resp.fail(
RpcStatus::new(RpcStatusCode::Internal, None))
.map_err(|e| log_fail("get bindings", e));
ctx.spawn(f);
}
}
}

The first thing to notice is that the response is of type ServerStreamingSink<Binding> and Binding is one of my data types from the protobuf IDL. This stream requires that each item be a tuple, with the first element being the item (Binding) and the second element being WriteFlags, in my case I’m using WriteFlags::default(). This lets me control the stream’s buffering behavior.

The binding.into() in this code is converting the internal domain model binding struct into the protobuf generated struct.

When the data store operation succeeds, I ctx.spawn the call to send_all on the sink. If it fails, we ctx.spawn the call to resp.fail. I am still haunted by the duplicate calls to ctx.spawn here. Despite both of these match arms producing something that can be a future, neither of them are compatible so I can’t assign them to a sized value and thus haven’t been able to reduce this to a single call to ctx.spawn.

Let’s take a look at the client side that invokes get_bindings:

pub fn get_bindings(&self) -> Result<Vec<Binding>> {
let mut results = Vec::<Binding>::new();
let mut req = BindingQuery::new();
req.set_agent_id("agent007".to_string());
let mut bindings = self.client.get_bindings(&req)?;
loop {
let f = bindings.into_future();
match f.wait() {
Ok((Some(binding), s)) => {
bindings = s;
let client_binding = self::Binding::from(binding);
results.push(client_binding);
}
Ok((None, _)) => break,
Err((e, _)) => return Err(ClientError::RPC(e)),
}
}
info!("query bindings succeeded.");
Ok(results)
}

The BindingQuery struct also came from my protobuf IDL. The way this implementation of protobufs works (from Stepan Koltsov’s library), you have to create a new mutable object and invoke a bunch of setters. This feels very “un-Rustful”, whereas the prost protobuf library surfaces much more of a “plain old struct” approach. If I could have prost’s simpler protobuf API underneath PingCAP’s more approachable and production-ready gRPC library, I think I’d finally be happy.

The client pattern for receiving a stream of responses from the server is to convert the remote call into a future, and then loop to drain the stream. If we get an Ok(Some(value), stream) then we extract the value, and re-assign the future for the next loop iteration (this allows the future to move through the loop and avoid running afoul of the borrow checker). If we get Ok((None, _)) then we know that the stream is empty with no errors. Finally, getting Err((e, _)) indicates there was an error in receiving the stream. You’ll see this pattern every time you are on the receiving end of a stream.

I’m relying heavily on the use of the Into trait to create a little anti-corruption layer so that the business logic on both my client and my server are not operating directly on the protobuf-generated structs, rather on types that belong in their relative domains. It’s a little extra work, but I like the isolation it gives me and it keeps my tiers from “bleeding” concerns. If I were using prost, where the structs carry less protobuf-specific baggage, I might not be so aggressive with the ACL. You may have also noticed I’m still hard-coding some stuff (like the “agent007” agent ID), which is another reason I haven’t yet pushed this project to github.

Now let’s take a look at bi-directional streaming. In this function, the server is receiving a stream of media library updates (of LibraryEntry instances) that contain file hashes, paths, and file sizes uploaded from an individual agent monitoring a media library and replying with a stream of acknowledgments indicating whether the library entry was successfully persisted:

fn update_library_entries(
&self,
ctx: RpcContext,
entries: RequestStream<LibraryEntry>,
resp: DuplexSink<LibraryEntryAck>,
) {
info!("Handling library update");

let store = self.data_store.clone();

let to_send = entries.map(move |entry| {
let mut ack = LibraryEntryAck::from(&entry);
if let Ok(_) = store.put_library_entry(entry) {
ack.set_saved(true);
}
(ack, WriteFlags::default())
});

let f = resp.send_all(to_send)
.map(|_| ())
.map_err(|e| log_fail("library update", e));
ctx.spawn(f);
}

Here I have to clone my data store (since it’s an Arc I’m really just creating a reference-counted pointer, and not cloning the real store) because the to_send variable is a future that may not finish before my function goes out of scope. If you don’t do this clone, Rust’s borrow checker will hit you in the face with a club.

Now I can just call send_all on the sink, which takes a stream of tuples. As before, this tuple is the stream’s item type and a WriteFlags value. The client side gets back a stream of acknowledgements. Because we’re now doing bi-directional streaming, the client will start getting acknowledgments while it is still sending more library entry updates. This is where gRPC streaming starts to feel amazing. If this was just a single stream, the request would still be a RequestStream<T> and the response would be a UnarySink , the same type used in simple request-reply semantics.

The first map call on the send_all return value initially confused me. The syntax .map(|_|()) essentially means that we’re mapping any input value into a unit or empty response. We have to do this because ctx.spawn takes a future with a unit item type and send_all returns a tuple.

Here’s the client side code that shows sending a stream at the same time as receiving streamed responses:

pub fn update_library(&self, entries: Vec<LibraryEntry>) -> 
Result<Vec<EntryAck>> {
let (mut sink, mut receiver) =
self.client.update_library_entries()?;
    let h = thread::spawn(move || {
for entry in entries {
let entry: super::bemisapi::LibraryEntry = entry.into();
sink = sink.send( (entry, WriteFlags::default()))
.wait()
.unwrap();
}
future::poll_fn(|| sink.close()).wait().unwrap();
});

let mut acks: Vec<EntryAck> = Vec::new();
loop {
match receiver.into_future().wait() {
Ok((Some(ack), r)) => {
acks.push(ack.into());
receiver = r;
},
Ok((None, _)) => break,
Err((e, _)) => return Err(ClientError::RPC(e)),
}
}

h.join().unwrap();
println!("acks: {:?}", acks);
Ok(acks)
}

The first thing I do here is create a new background thread. In this thread, I call send on the sink to transmit a library entry (again in stream-tuple form). Then I’ll poll the sink.close() future, which will then allow the thread to die.

At this point I think I might be able to refactor that for loop into a call to send_all on the result of entries.into_iter().map(|e| (e.into(), WriteFlags::default()) but I haven’t tried that yet.

Next, I set up a receiver loop (this should look very much like the server’s own receiver loop) to add received acknowledgments into the acks vector. We wait for everything to finish by calling join on the sending thread.

Wrapping Up

Once I got beyond building hello world samples, I needed something that would let me build rock solid, stable, reliable gRPC servers and clients. Whatever library I used needed to be fast and functional, but it also needed to be ergonomic enough so that the code I wrote in consumption of that library would be easy for me to read 6 months from now, and easy for other contributors to the project to understand with limited exposure to the github repository.

I haven’t yet written this part, but I’ll be writing functions that allow me to stream an entire file from a client to a server (e.g. from my tower to my laptop). Now that I’ve figured out some of the patterns around building streaming servers and clients, I’m confident that I can forge ahead and get that part of my project done.

In an ideal world, I’d be able to get the simpler protobuf structs from the prost library under the power and ease of use of PingCAP’s grpcio library, but for now I’m happy with the trade-off and will be moving forward using the PingCAP gRPC library. Your mileage may vary, and I definitely encourage you to try each of the alternatives yourselves, rather than just looking at the examples in their repositories. Unless you’re actually building the RouteGuide example, you should explore and see how well your domain fits with the code those libraries generate. The knowledge I gained from struggling with each library is value that will carry forward with the rest of my project.