My experience porting old Rust Futures to async/await

Jeff Hiner
Jan 17, 2020 · 9 min read

A discussion of how my migration went, and some tips to help you do the same

Image for post
Image for post
Spoilers: the road to async has many more twists than this one. Photo by salvatore ventura on Unsplash

There’s been a lot of excitement in the Rust community about the new async and await keywords that dropped on the stable channel last year, but until recently there wasn’t much in the way of documentation or library support. The futures and tokio developers have been working like mad to migrate their own crates over, finally pulling off their own releases in November of 2019. Many library crates using futures have followed suit, and the ecosystem is finally starting to settle into the new way of doing things. This really is a completely different way to express asynchronous code, which means in many cases code must be rewritten or tossed out. So there’s an obvious question for developers: is migrating all your existing code worth the trouble?

The answer is a resounding yes.

Why migrate?

There are also some other ergonomic benefits. Unlike the old and_then way of chaining futures together, async blocks allow the compiler to reason about safe borrowing, including self. Typically this means you no longer have to move self into and out of futures. You’ll also be able to use a lot fewer 'static lifetimes.

How to get started

Next, go into your Cargo.toml file and make sure any futures or tokio dependencies are versioned to their appropriate async support versions. If you’re using crates that currently depend on the 0.1 version of futures you’ll want to update those too. (In my case, I needed to run a master branch of the reqwest crate until the official 0.10 release arrived.) Thankfully, most crate authors are starting to provide versions that support async/await.

If your crates aren’t there yet, you’ll need to use the compat feature within your code to convert between the new and old future formats. Add feature = ["compat"] to futures inside your Cargo.toml if you need to enable these shims. They’re ugly to use, but they work.

Assuming you’ve got a main function somewhere, you’ll likely need to mark it async and provide an executor. The Tokio crate provides a set of executors and an easy way to run your program with them. Follow the rustdoc to add the “full” feature set of Tokio to your Cargo.toml. Then simply add the #[tokio::main] annotation above async fn main.

Finally, add use future::prelude::*; to the top of any file that uses sinks or streams. Just trust me on this one.

From here, you can tackle the conversion in a few different ways.

  • Replace Future<Item = T, Error = E> function returns with async fn returning Result<T, E>. If your function never returns an error, you can also use this opportunity to just return T, or nothing.
  • Update Stream types in generics, boxed returns, and impl Stream returns. For now your goal should be replacing old Stream<Item = T, Error = E> with Stream<Item = Result<T, E>>
  • Update Sink generic arguments in the same way; SinkItem is now just the first unnamed argument, and SinkError is simply Error.
  • Loops over streams can be refactored to allow more flexibility:
// old way, block on a future and possibly panic:
foo_stream.for_each(move |item| {
item.bar().unwrap();
}).wait();
// new futures allow us to early return with an error:
while let Some(item) = foo_stream.next().await {
item.bar()?;
}
  • Sometimes you can rework pieces that previously required laboriously moving self in and out to sneak past the borrow checker. Here’s some gnarly test code that was part of my refactor.
let action = MqttProtocol.new()
.and_then(|mqtt| {
// Send a publish with QoS 1, and terminate
mqtt.publish(
"path/to/faketopic".to_string(),
QualityOfService::Level1,
"test message".to_string().into_bytes(),
)
.map(|_| ())
.into_future()
.and_then(move |_| {
Delay::new(Instant::now() + Duration::from_millis(100))
.map_err(Into::into)
.map(move |_| mqtt)
})
.and_then(|mqtt| mqtt.disconnect())
})
.map_err(|e| panic!("{}", e));
let mut runtime = Runtime::new().unwrap(); runtime.block_on(action).unwrap();

Pretty disgusting, isn’t it? Here’s the same code with async/await, with the publish and disconnect methods refactored to accept &mut self. Now it’s a lot more clear what the test is trying to do:

let mqtt = MqttProtocol.new().await.unwrap();// Send a publish with QoS 1, and terminate
mqtt.publish(
"path/to/faketopic".to_string(),
QualityOfService::Level1,
"test message".to_string().into_bytes(),
).await.unwrap();
tokio::time::delay_for(Duration::from_millis(100)).await;
mqtt.disconnect().unwrap();

Hiccups

Everything has moved

Some examples:

  • Sink generics still require two parameters (one with the name stripped out, and the other a required named parameter), but Stream only takes a single parameter. This does simplify turning inerrant types like vector iterators into streams, because there’s no need for an empty Error type. But it also means that if you were forwarding streams directly to sinks with old futures, the types won’t line up anymore. You’ll have to rework the alignment.
  • Most of the things you can actually do with Stream and Sink are now located in the “extension” SinkExt and StreamExt traits for some reason. The extension traits are more or less automatically implemented for anything that implements their base traits, but only if you remember to use them. (Remember how I told you above to just import everything from prelude?) Not only does this make compiler messages incredibly confusing, but also means the documentation is split in half. So if you’re wondering what happened to things like filter_map or inspect, that’s where they went.
  • futures::sync::mpsc is now futures::channel::mpsc
  • join and select are now macros. This is actually a nice change, because it means you can await on multiple futures even if they have different return types.
  • tokio::codec is now in the tokio-util crate, but won’t show up unless compiled with features = ["codec"]. A lot of other tokio_X things got moved to tokio::X and gated behind feature flags. Start by specifying features = ["full"] and then figure out what’s actually required after everything builds.

Traits and async fn (use the async-trait crate, mate)

trait Foo {
async fn bar() -> ();
}

But the compiler complains.

error[E0706]: trait fns cannot be declared `async`

Huh. Okay, so why is that?

$ rustc --explain E0706
error: no extended information for E0706

Oh COME ON.

If you dig into the async handbook, the async keyword is actually just syntactic sugar for “this block returns impl Future<T> and has some associated lifetimes”. But since each implementation of the trait’s future will be slightly different and you can’t return heterogeneous types from traits (for the same reasons you can’t return a dyn &Trait in a trait signature), you’ll need to return a boxed future. There’s obviously a heap allocation cost for doing this, which is why it doesn’t just happen automatically.

You could try to figure out how to do this manually for every async function. Or you can just import the async_trait crate and annotate both your trait and its impl with #[async_trait]. This is such an obvious omission I imagine it’s going to get moved into the standard library at some point.

Unpin, Pin, and Box

Image for post
Image for post
Photo by Lisa Woakes on Unsplash

The async/await feature also introduced something called “pinning” which I found incredibly difficult to wrap my head around. I tried to read the official documentation which started out making some sense before rapidly devolving into headache territory. A Pin<Box<T>> is also somehow Unpin? Well is it pinned or not? And it has something to do with self-referential structs? I’m not trying to write linked list implementations here, I’m trying to write async code!

Well, behind the scenes an async block creates a sort of invisible struct scoped with a bunch of lifetimes and references for every local variable in scope. Some of these struct elements may contain references to each other, so the compiler needs some way to reason about its own borrowing guarantees. The Unpin trait is like Send or Sync in that it is automatically generated in most cases. It’s used to make sure you’re not trying to pass around references to things that might be moved.

The short of it is that boxed futures and streams and sinks generally aren’t usable unless they are Unpin. So if you find you can’t actually do anything with the futures you’re passing around, make sure your generics and your boxed returns specify that trait. Pretty much any chaining combinator like map only works if the object is Unpin. If you need to box and are reaching for Box::new, you’ll instead want to use the boxed method to ensure Unpin is applied properly. It used to be the case that all futures had to be 'static as well, but that’s apparently no longer the case.

Tests marked async need to use #[tokio::test]

error: async functions cannot be used for tests

For reasons beyond my fathoming, there is no default async engine to run tests! But if you import Tokio as a dev-dependency you can work around this issue by replacing #[test] with #[tokio::test] for each async test. This will wrap your function in code that sets up a simple engine for you.

Avoid async blocks when future::ready will do

// If there's a stream decode error, close the stream
let stream = stream.take_while(|x| { x.is_ok() });

But with new futures you can’t just use this as-is. The take_while method in 0.3 requires a future to a bool, not just a bool closure. So your first instinct might be to just slap the async keyword on that block and be done with it. But if you do that, you’ll run into issues with the borrow checker, because it can’t guarantee that x lives across the async boundary.

Instead, create a future that’s immediately ready. The following doesn’t need to borrow anything across an async boundary:

let stream = stream.take_while(|x| future::ready(x.is_ok()));

Avoid std::sync

There is a futures-aware Mutex available in futures::lock (but no corresponding RwLock yet). Since async code typically doesn’t hold locks very long anyway, you can often get away with just replacing RwLock with Mutex — the latter is still quite performant. If you share your Mutex between synchronous and asynchronous code, getting access in sync-land requires some finesse. You won’t be able to await within the synchronous code; you might have to play some tricks with futures::executor::block_on in order to get things working.

Problems with format! and .await

tokio::spawn(async move {
^^^^^^^^^^^^ `*mut (dyn std::ops::Fn() + 'static)` cannot be shared between threads safely

So if you see the above, do a ctrl-F for any format macros and bind them to temporaries on a separate line above the await.

Despite the hurdles, I’m glad I tackled this project. The code is a lot easier to read, the tests are simpler, and everything just seems cleaner. Hopefully I’ve left you a bit more informed about why now is a good time to pull the trigger on a Rust futures refactor, and given you an idea of some of the issues you might encounter along the way. Good luck!

Dwelo Research and Development

All the Dwelo R&D news that is fit to render.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store