My experience porting old Rust Futures to async/await

Jeff Hiner
Dwelo Research and Development
9 min readJan 17, 2020

A discussion of how my migration went, and some tips to help you do the same

Spoilers: the road to async has many more twists than this one. Photo by salvatore ventura on Unsplash

There’s been a lot of excitement in the Rust community about the new async and await keywords that dropped on the stable channel last year, but until recently there wasn’t much in the way of documentation or library support. The futures and tokio developers have been working like mad to migrate their own crates over, finally pulling off their own releases in November of 2019. Many library crates using futures have followed suit, and the ecosystem is finally starting to settle into the new way of doing things. This really is a completely different way to express asynchronous code, which means in many cases code must be rewritten or tossed out. So there’s an obvious question for developers: is migrating all your existing code worth the trouble?

The answer is a resounding yes.

Why migrate?

The biggest benefit to developers is that async/await code is easier to write, easier to read, and easier to maintain. I can’t tell you how many times I had to dig into documentation to review the differences between old futures map and and_then, or do a bunch of map_err or flatten_stream to make sure the error types along the chain all lined up. When you await a future that returns Result, you get something you can treat like a Result in normal Rust: you can unwrap it, or you can do an early return via ? operator. The code is much more straightforward, and generally looks exactly like corresponding synchronous code.

There are also some other ergonomic benefits. Unlike the old and_then way of chaining futures together, async blocks allow the compiler to reason about safe borrowing, including self. Typically this means you no longer have to move self into and out of futures. You’ll also be able to use a lot fewer 'static lifetimes.

How to get started

First, make sure you have the most recent Rust compiler. Support for async/await keywords arrived in 1.39, but it’s still a relatively new feature and the Rust dev team is still fixing issues. You aren’t doing yourself any favors by not running the latest stable compiler, so make it a habit to rustup update frequently.

Next, go into your Cargo.toml file and make sure any futures or tokio dependencies are versioned to their appropriate async support versions. If you’re using crates that currently depend on the 0.1 version of futures you’ll want to update those too. (In my case, I needed to run a master branch of the reqwest crate until the official 0.10 release arrived.) Thankfully, most crate authors are starting to provide versions that support async/await.

If your crates aren’t there yet, you’ll need to use the compat feature within your code to convert between the new and old future formats. Add feature = ["compat"] to futures inside your Cargo.toml if you need to enable these shims. They’re ugly to use, but they work.

Assuming you’ve got a main function somewhere, you’ll likely need to mark it async and provide an executor. The Tokio crate provides a set of executors and an easy way to run your program with them. Follow the rustdoc to add the “full” feature set of Tokio to your Cargo.toml. Then simply add the #[tokio::main] annotation above async fn main.

Finally, add use future::prelude::*; to the top of any file that uses sinks or streams. Just trust me on this one.

From here, you can tackle the conversion in a few different ways.

  • Replace Future<Item = T, Error = E> function returns with async fn returning Result<T, E>. If your function never returns an error, you can also use this opportunity to just return T, or nothing.
  • Update Stream types in generics, boxed returns, and impl Stream returns. For now your goal should be replacing old Stream<Item = T, Error = E> with Stream<Item = Result<T, E>>
  • Update Sink generic arguments in the same way; SinkItem is now just the first unnamed argument, and SinkError is simply Error.
  • Loops over streams can be refactored to allow more flexibility:
// old way, block on a future and possibly panic:
foo_stream.for_each(move |item| {
item.bar().unwrap();
}).wait();
// new futures allow us to early return with an error:
while let Some(item) = foo_stream.next().await {
item.bar()?;
}
  • Sometimes you can rework pieces that previously required laboriously moving self in and out to sneak past the borrow checker. Here’s some gnarly test code that was part of my refactor.
let action = MqttProtocol.new()
.and_then(|mqtt| {
// Send a publish with QoS 1, and terminate
mqtt.publish(
"path/to/faketopic".to_string(),
QualityOfService::Level1,
"test message".to_string().into_bytes(),
)
.map(|_| ())
.into_future()
.and_then(move |_| {
Delay::new(Instant::now() + Duration::from_millis(100))
.map_err(Into::into)
.map(move |_| mqtt)
})
.and_then(|mqtt| mqtt.disconnect())
})
.map_err(|e| panic!("{}", e));
let mut runtime = Runtime::new().unwrap(); runtime.block_on(action).unwrap();

Pretty disgusting, isn’t it? Here’s the same code with async/await, with the publish and disconnect methods refactored to accept &mut self. Now it’s a lot more clear what the test is trying to do:

let mqtt = MqttProtocol.new().await.unwrap();// Send a publish with QoS 1, and terminate
mqtt.publish(
"path/to/faketopic".to_string(),
QualityOfService::Level1,
"test message".to_string().into_bytes(),
).await.unwrap();
tokio::time::delay_for(Duration::from_millis(100)).await;
mqtt.disconnect().unwrap();

Hiccups

Not everything is rosy. Porting your old project over is going to be some serious work, and you’re going to run into problems. Here are a few I got snagged on.

Everything has moved

Your most immediate headache is going to be figuring out all of the little things that have moved around or changed. The reasons are generally good, but there’s no reference guide telling you where things have moved to. You’re going to be spending a good bit of time figuring out where everything is, and a lot of stuff is gated behind Cargo feature flags.

Some examples:

  • Sink generics still require two parameters (one with the name stripped out, and the other a required named parameter), but Stream only takes a single parameter. This does simplify turning inerrant types like vector iterators into streams, because there’s no need for an empty Error type. But it also means that if you were forwarding streams directly to sinks with old futures, the types won’t line up anymore. You’ll have to rework the alignment.
  • Most of the things you can actually do with Stream and Sink are now located in the “extension” SinkExt and StreamExt traits for some reason. The extension traits are more or less automatically implemented for anything that implements their base traits, but only if you remember to use them. (Remember how I told you above to just import everything from prelude?) Not only does this make compiler messages incredibly confusing, but also means the documentation is split in half. So if you’re wondering what happened to things like filter_map or inspect, that’s where they went.
  • futures::sync::mpsc is now futures::channel::mpsc
  • join and select are now macros. This is actually a nice change, because it means you can await on multiple futures even if they have different return types.
  • tokio::codec is now in the tokio-util crate, but won’t show up unless compiled with features = ["codec"]. A lot of other tokio_X things got moved to tokio::X and gated behind feature flags. Start by specifying features = ["full"] and then figure out what’s actually required after everything builds.

Traits and async fn (use the async-trait crate, mate)

You’d think async trait functions should be as simple as:

trait Foo {
async fn bar() -> ();
}

But the compiler complains.

error[E0706]: trait fns cannot be declared `async`

Huh. Okay, so why is that?

$ rustc --explain E0706
error: no extended information for E0706

Oh COME ON.

If you dig into the async handbook, the async keyword is actually just syntactic sugar for “this block returns impl Future<T> and has some associated lifetimes”. But since each implementation of the trait’s future will be slightly different and you can’t return heterogeneous types from traits (for the same reasons you can’t return a dyn &Trait in a trait signature), you’ll need to return a boxed future. There’s obviously a heap allocation cost for doing this, which is why it doesn’t just happen automatically.

You could try to figure out how to do this manually for every async function. Or you can just import the async_trait crate and annotate both your trait and its impl with #[async_trait]. This is such an obvious omission I imagine it’s going to get moved into the standard library at some point.

Unpin, Pin, and Box

Photo by Lisa Woakes on Unsplash

The async/await feature also introduced something called “pinning” which I found incredibly difficult to wrap my head around. I tried to read the official documentation which started out making some sense before rapidly devolving into headache territory. A Pin<Box<T>> is also somehow Unpin? Well is it pinned or not? And it has something to do with self-referential structs? I’m not trying to write linked list implementations here, I’m trying to write async code!

Well, behind the scenes an async block creates a sort of invisible struct scoped with a bunch of lifetimes and references for every local variable in scope. Some of these struct elements may contain references to each other, so the compiler needs some way to reason about its own borrowing guarantees. The Unpin trait is like Send or Sync in that it is automatically generated in most cases. It’s used to make sure you’re not trying to pass around references to things that might be moved.

The short of it is that boxed futures and streams and sinks generally aren’t usable unless they are Unpin. So if you find you can’t actually do anything with the futures you’re passing around, make sure your generics and your boxed returns specify that trait. Pretty much any chaining combinator like map only works if the object is Unpin. If you need to box and are reaching for Box::new, you’ll instead want to use the boxed method to ensure Unpin is applied properly. It used to be the case that all futures had to be 'static as well, but that’s apparently no longer the case.

Tests marked async need to use #[tokio::test]

You should obviously write tests to validate your shiny new async functions. But if you need to await before unwrapping results, your test function of course needs to be marked async. You’ll find that as soon as you do this, the Rust compiler complains at you:

error: async functions cannot be used for tests

For reasons beyond my fathoming, there is no default async engine to run tests! But if you import Tokio as a dev-dependency you can work around this issue by replacing #[test] with #[tokio::test] for each async test. This will wrap your function in code that sets up a simple engine for you.

Avoid async blocks when future::ready will do

You might have some existing code that uses references to items to filter or adjust a stream somehow. For example, your old code might look like this:

// If there's a stream decode error, close the stream
let stream = stream.take_while(|x| { x.is_ok() });

But with new futures you can’t just use this as-is. The take_while method in 0.3 requires a future to a bool, not just a bool closure. So your first instinct might be to just slap the async keyword on that block and be done with it. But if you do that, you’ll run into issues with the borrow checker, because it can’t guarantee that x lives across the async boundary.

Instead, create a future that’s immediately ready. The following doesn’t need to borrow anything across an async boundary:

let stream = stream.take_while(|x| future::ready(x.is_ok()));

Avoid std::sync

The standard library sync primitives std::sync::{Mutex, RwLock} block the current thread until the lock is acquired. If you’re using async code that means it will block one of the async executors, and async code isn’t supposed to block at all. This is especially bad because if all runner threads are waiting for a lock then no futures can make progress. This can deadlock your entire program.

There is a futures-aware Mutex available in futures::lock (but no corresponding RwLock yet). Since async code typically doesn’t hold locks very long anyway, you can often get away with just replacing RwLock with Mutex — the latter is still quite performant. If you share your Mutex between synchronous and asynchronous code, getting access in sync-land requires some finesse. You won’t be able to await within the synchronous code; you might have to play some tricks with futures::executor::block_on in order to get things working.

Problems with format! and .await

If you use the format! macro in the same statement line as await, you get pages of opaque error messages. Yes, really. I ran into this myself and spent the better part of a day trying to track it down:

tokio::spawn(async move {
^^^^^^^^^^^^ `*mut (dyn std::ops::Fn() + 'static)` cannot be shared between threads safely

So if you see the above, do a ctrl-F for any format macros and bind them to temporaries on a separate line above the await.

Despite the hurdles, I’m glad I tackled this project. The code is a lot easier to read, the tests are simpler, and everything just seems cleaner. Hopefully I’ve left you a bit more informed about why now is a good time to pull the trigger on a Rust futures refactor, and given you an idea of some of the issues you might encounter along the way. Good luck!

--

--

Jeff Hiner
Dwelo Research and Development

I’m an IoT software engineer at Dwelo, a company that is working to make smart apartments a reality.