My experience porting old Rust Futures to async/await
A discussion of how my migration went, and some tips to help you do the same
There’s been a lot of excitement in the Rust community about the new async
and await
keywords that dropped on the stable channel last year, but until recently there wasn’t much in the way of documentation or library support. The futures
and tokio
developers have been working like mad to migrate their own crates over, finally pulling off their own releases in November of 2019. Many library crates using futures have followed suit, and the ecosystem is finally starting to settle into the new way of doing things. This really is a completely different way to express asynchronous code, which means in many cases code must be rewritten or tossed out. So there’s an obvious question for developers: is migrating all your existing code worth the trouble?
The answer is a resounding yes.
Why migrate?
The biggest benefit to developers is that async/await code is easier to write, easier to read, and easier to maintain. I can’t tell you how many times I had to dig into documentation to review the differences between old futures map
and and_then
, or do a bunch of map_err
or flatten_stream
to make sure the error types along the chain all lined up. When you await
a future that returns Result
, you get something you can treat like a Result
in normal Rust: you can unwrap it, or you can do an early return via ?
operator. The code is much more straightforward, and generally looks exactly like corresponding synchronous code.
There are also some other ergonomic benefits. Unlike the old and_then
way of chaining futures together, async
blocks allow the compiler to reason about safe borrowing, including self
. Typically this means you no longer have to move self
into and out of futures. You’ll also be able to use a lot fewer 'static
lifetimes.
How to get started
First, make sure you have the most recent Rust compiler. Support for async/await keywords arrived in 1.39, but it’s still a relatively new feature and the Rust dev team is still fixing issues. You aren’t doing yourself any favors by not running the latest stable compiler, so make it a habit to rustup update
frequently.
Next, go into your Cargo.toml
file and make sure any futures
or tokio
dependencies are versioned to their appropriate async support versions. If you’re using crates that currently depend on the 0.1 version of futures
you’ll want to update those too. (In my case, I needed to run a master branch of the reqwest
crate until the official 0.10 release arrived.) Thankfully, most crate authors are starting to provide versions that support async/await.
If your crates aren’t there yet, you’ll need to use the compat
feature within your code to convert between the new and old future formats. Add feature = ["compat"]
to futures
inside your Cargo.toml
if you need to enable these shims. They’re ugly to use, but they work.
Assuming you’ve got a main
function somewhere, you’ll likely need to mark it async
and provide an executor. The Tokio crate provides a set of executors and an easy way to run your program with them. Follow the rustdoc to add the “full” feature set of Tokio to your Cargo.toml
. Then simply add the #[tokio::main]
annotation above async fn main
.
Finally, add use future::prelude::*;
to the top of any file that uses sinks or streams. Just trust me on this one.
From here, you can tackle the conversion in a few different ways.
- Replace
Future<Item = T, Error = E>
function returns withasync fn
returningResult<T, E>
. If your function never returns an error, you can also use this opportunity to just returnT
, or nothing. - Update
Stream
types in generics, boxed returns, andimpl Stream
returns. For now your goal should be replacing oldStream<Item = T, Error = E>
withStream<Item = Result<T, E>>
- Update
Sink
generic arguments in the same way;SinkItem
is now just the first unnamed argument, andSinkError
is simplyError
. - Loops over streams can be refactored to allow more flexibility:
// old way, block on a future and possibly panic:
foo_stream.for_each(move |item| {
item.bar().unwrap();
}).wait();// new futures allow us to early return with an error:
while let Some(item) = foo_stream.next().await {
item.bar()?;
}
- Sometimes you can rework pieces that previously required laboriously moving
self
in and out to sneak past the borrow checker. Here’s some gnarly test code that was part of my refactor.
let action = MqttProtocol.new()
.and_then(|mqtt| {
// Send a publish with QoS 1, and terminate
mqtt.publish(
"path/to/faketopic".to_string(),
QualityOfService::Level1,
"test message".to_string().into_bytes(),
)
.map(|_| ())
.into_future()
.and_then(move |_| {
Delay::new(Instant::now() + Duration::from_millis(100))
.map_err(Into::into)
.map(move |_| mqtt)
})
.and_then(|mqtt| mqtt.disconnect())
})
.map_err(|e| panic!("{}", e));let mut runtime = Runtime::new().unwrap(); runtime.block_on(action).unwrap();
Pretty disgusting, isn’t it? Here’s the same code with async/await, with the publish
and disconnect
methods refactored to accept &mut self
. Now it’s a lot more clear what the test is trying to do:
let mqtt = MqttProtocol.new().await.unwrap();// Send a publish with QoS 1, and terminate
mqtt.publish(
"path/to/faketopic".to_string(),
QualityOfService::Level1,
"test message".to_string().into_bytes(),
).await.unwrap();tokio::time::delay_for(Duration::from_millis(100)).await;
mqtt.disconnect().unwrap();
Hiccups
Not everything is rosy. Porting your old project over is going to be some serious work, and you’re going to run into problems. Here are a few I got snagged on.
Everything has moved
Your most immediate headache is going to be figuring out all of the little things that have moved around or changed. The reasons are generally good, but there’s no reference guide telling you where things have moved to. You’re going to be spending a good bit of time figuring out where everything is, and a lot of stuff is gated behind Cargo feature flags.
Some examples:
Sink
generics still require two parameters (one with the name stripped out, and the other a required named parameter), butStream
only takes a single parameter. This does simplify turning inerrant types like vector iterators into streams, because there’s no need for an emptyError
type. But it also means that if you were forwarding streams directly to sinks with old futures, the types won’t line up anymore. You’ll have to rework the alignment.- Most of the things you can actually do with
Stream
andSink
are now located in the “extension”SinkExt
andStreamExt
traits for some reason. The extension traits are more or less automatically implemented for anything that implements their base traits, but only if you remember touse
them. (Remember how I told you above to just import everything fromprelude
?) Not only does this make compiler messages incredibly confusing, but also means the documentation is split in half. So if you’re wondering what happened to things likefilter_map
orinspect
, that’s where they went. futures::sync::mpsc
is nowfutures::channel::mpsc
join
andselect
are now macros. This is actually a nice change, because it means you can await on multiple futures even if they have different return types.tokio::codec
is now in thetokio-util
crate, but won’t show up unless compiled withfeatures = ["codec"]
. A lot of othertokio_X
things got moved totokio::X
and gated behind feature flags. Start by specifyingfeatures = ["full"]
and then figure out what’s actually required after everything builds.
Traits and async fn (use the async-trait crate, mate)
You’d think async trait functions should be as simple as:
trait Foo {
async fn bar() -> ();
}
But the compiler complains.
error[E0706]: trait fns cannot be declared `async`
Huh. Okay, so why is that?
$ rustc --explain E0706
error: no extended information for E0706
Oh COME ON.
If you dig into the async handbook, the async
keyword is actually just syntactic sugar for “this block returns impl Future<T>
and has some associated lifetimes”. But since each implementation of the trait’s future will be slightly different and you can’t return heterogeneous types from traits (for the same reasons you can’t return a dyn &Trait
in a trait signature), you’ll need to return a boxed future. There’s obviously a heap allocation cost for doing this, which is why it doesn’t just happen automatically.
You could try to figure out how to do this manually for every async function. Or you can just import the async_trait
crate and annotate both your trait and its impl
with #[async_trait]
. This is such an obvious omission I imagine it’s going to get moved into the standard library at some point.
Unpin, Pin, and Box
The async/await feature also introduced something called “pinning” which I found incredibly difficult to wrap my head around. I tried to read the official documentation which started out making some sense before rapidly devolving into headache territory. A Pin<Box<T>>
is also somehow Unpin
? Well is it pinned or not? And it has something to do with self-referential structs? I’m not trying to write linked list implementations here, I’m trying to write async code!
Well, behind the scenes an async block creates a sort of invisible struct scoped with a bunch of lifetimes and references for every local variable in scope. Some of these struct elements may contain references to each other, so the compiler needs some way to reason about its own borrowing guarantees. The Unpin
trait is like Send
or Sync
in that it is automatically generated in most cases. It’s used to make sure you’re not trying to pass around references to things that might be moved.
The short of it is that boxed futures and streams and sinks generally aren’t usable unless they are Unpin
. So if you find you can’t actually do anything with the futures you’re passing around, make sure your generics and your boxed returns specify that trait. Pretty much any chaining combinator like map
only works if the object is Unpin
. If you need to box and are reaching for Box::new
, you’ll instead want to use the boxed
method to ensure Unpin
is applied properly. It used to be the case that all futures had to be 'static
as well, but that’s apparently no longer the case.
Tests marked async need to use #[tokio::test]
You should obviously write tests to validate your shiny new async functions. But if you need to await
before unwrapping results, your test function of course needs to be marked async
. You’ll find that as soon as you do this, the Rust compiler complains at you:
error: async functions cannot be used for tests
For reasons beyond my fathoming, there is no default async engine to run tests! But if you import Tokio as a dev-dependency you can work around this issue by replacing #[test]
with #[tokio::test]
for each async test. This will wrap your function in code that sets up a simple engine for you.
Avoid async blocks when future::ready will do
You might have some existing code that uses references to items to filter or adjust a stream somehow. For example, your old code might look like this:
// If there's a stream decode error, close the stream
let stream = stream.take_while(|x| { x.is_ok() });
But with new futures you can’t just use this as-is. The take_while
method in 0.3 requires a future to a bool
, not just a bool
closure. So your first instinct might be to just slap the async keyword on that block and be done with it. But if you do that, you’ll run into issues with the borrow checker, because it can’t guarantee that x
lives across the async boundary.
Instead, create a future that’s immediately ready. The following doesn’t need to borrow anything across an async boundary:
let stream = stream.take_while(|x| future::ready(x.is_ok()));
Avoid std::sync
The standard library sync primitives std::sync::{Mutex, RwLock}
block the current thread until the lock is acquired. If you’re using async code that means it will block one of the async executors, and async code isn’t supposed to block at all. This is especially bad because if all runner threads are waiting for a lock then no futures can make progress. This can deadlock your entire program.
There is a futures-aware Mutex
available in futures::lock
(but no corresponding RwLock
yet). Since async code typically doesn’t hold locks very long anyway, you can often get away with just replacing RwLock
with Mutex
— the latter is still quite performant. If you share your Mutex
between synchronous and asynchronous code, getting access in sync-land requires some finesse. You won’t be able to await
within the synchronous code; you might have to play some tricks with futures::executor::block_on
in order to get things working.
Problems with format! and .await
If you use the format!
macro in the same statement line as await
, you get pages of opaque error messages. Yes, really. I ran into this myself and spent the better part of a day trying to track it down:
tokio::spawn(async move {
^^^^^^^^^^^^ `*mut (dyn std::ops::Fn() + 'static)` cannot be shared between threads safely
So if you see the above, do a ctrl-F for any format macros and bind them to temporaries on a separate line above the await
.
Despite the hurdles, I’m glad I tackled this project. The code is a lot easier to read, the tests are simpler, and everything just seems cleaner. Hopefully I’ve left you a bit more informed about why now is a good time to pull the trigger on a Rust futures refactor, and given you an idea of some of the issues you might encounter along the way. Good luck!