Async IO for Rust (part II)

This is a second article about designing “rotor”, the library for doing asynchronous IO in Rust. This part describes what changed since the previous article as well as expands on discussion points of the previous write-up.

State Machines Are Good Enough

One of the hottest topics in the discussion of the previous article was whether threads are the better abstraction for I/O. And if OS threads are not good enough some kind of green threading should be introduced. There are two such libraries being developed: mioco and simplesched (both of them do not work in stable rust yet). Still I don’t believe it’s a good fit for low-level protocol handling.

This section looks at issues with the threading model for handling I/O in Rust. You may skip the section if you are more interested in Rotor rather than design decisions.

  1. Memory ownership

Typical request processing code looks like:

It looks nothing wrong at a glance. No memory leaks here. But note that both “buf” and “parsed” are still being allocated while the response is being sent. But they are useless at that point in time. Is it easy to fix? Yes, just pass them by value, or wrap code block in braces. Is it easy to find? No. The code is too simplistic to show you the complexity. But compare it to the state machine:

It’s easy to reason about memory usage in this case. Also, note that nothing is allocated for idle connection at all (except obviously the state machine itself).

2. Timeout handling

Timeout handling is inevitable in any networking code. In most languages with green threading timeouts are simple: spawn another micro thread that sleeps and throws an exception to parent. If parent finished earlier, kill timer thread. But Rust doesn’t have exceptions.

An example of timeout handling in go should show you the complexity. To give you the short breakdown: it adds a timer with a callback, which finds current “cancel callback” in a map, which in turn closes an underlying connection. The process involves at least 3 shared locked objects, and may involve sending a message through a channel. A cancel callbacks are changed during the lifetime of the request several times and special channels just for cancel operation are created in multiple places (if it’s not clear: that happens on every request even if no timeout occurred).

3. Connection pooling

Threaded code dealing with client connections usually works along the lines of:

  1. Acquire a connection from pool
  2. Do something with the connection
  3. Release the connection to pool

Sometimes it’s okay. But often things go out of control. For example, one may use two different connections for backends A and B, and keep both acquired at the same time.

When resource A becomes slow, connection pool B quickly becomes exhausted too, just because coroutines hold on the resource.

This is probably the norm for small python applications which handle ten simultaneous requests. But this can quickly become an issue for a server in Rust which can probably process a million requests per second (unproven yet, but I’ve got half of the million on 4 core i7).

Rotor forces the user to think about such cases. The easiest way to handle client connection in rotor is to send a message to a connection pool with the message being “do this unit of work for me”. Where examples of the unit of work are: execute a request, push message to Kafka, execute a transaction. This is possible with threading model, just much less common.

4. Unit tests

And it’s much easier to unit test a state machine. You can inspect it layer by layer because the state machines in Rotor are generic over the type of the next layer. You can test each state and each action separately without starting from the initial handshake. Often you can clone the state machine in test and continue by multiple paths. Obviously, you can test an assembled state machine. You can test with fake transport (i.e. without actually creating the sockets).

Many of the unit test features are easy and obvious on state machines but are impossible or very cumbersome on threaded code.

What’s New in Rotor?

So we keep state machines. We still pass them to action by value and rely on return value optimization (RVO) to do that fast. We still use the Context thing and have state machine types generic over it.

However, we get rid of Scope. This was an object that was carrying main loop operations to the state machine. It was too hard to handle because each layer of abstraction required a new unique type passed to the next layer and required that type to implement a number of traits. The traits could not be derived automatically in current rust language.

The most important subset of the functionality is now served by return value. We have a common return type that is used in all actions:

The M is a state machine, and V is a return value from the action to the lower layer. The V is defined by the specific lower layer.

You should think of it as of asynchronous counterpart of Result. Any action may return “Continue” to wait for the next event. “Stop” to stop the state machine and “Timeout” to set the timeout on a connection.

The value V is very dependent on layer used. For example on lowest layer there is a trait EventMachine, which has the following action:

If an action returns “Continue(m, Some(n))” this means n is a new state machine that must be inserted into the map of the state machines of the main loop. The “accept” transport uses it to accept connections. (Note the type of both things is the same because all connections are stored in the same slab, so are of the same type; “accept” transport uses enum to differentiate between the initial listening socket and a client connection).

Any communication between two subsequent layers may be performed as a series of the action calls and return values. For example, HTTP server implementation may accept full Response as value, but may also accept enum of Headers, ChunkOfBody, EndOfBody, to allow asynchronous response generation.

Timers work similarly: return the time of the next wake up. Next timer returned from the action replaces the previous one. This allows to get rid of possible timer leaks. The timers of each subsequent layer coalesce into a single timer (simple “min(x, y)” operation), so we have maximum one timer per state machine. We currently use a deadline-style timers instead of timeout-style, unlike in system calls.

The return type will probably slightly change in the future. For example, the Stop action may grow an error type. But overall Async type looks like a more deliberate decision than what we have used before.

Another thing we keep is Transport. It is the structure that contains network buffers which we pass from the stream to the protocol parser. We don’t pass the socket directly for a couple of reasons:

  1. This way protocol parser doesn’t need to be generic over streams (TCP, Unix, SSL). Transport type is the same for all of them.
  2. It’s easier to unit test protocol parser. Just fill the buffers, instead of opening real OS sockets (especially it’s important for testing partial sends, which might be coalesced by OS)
  3. It’s unproven, but by using this API it should be possible to pass RDMA buffers or TCP buffers of userspace TCP stack directly to the protocol without changing the code. If protocol itself handles buffers, it will do additional buffering anyway in those cases.

Future Work

In the near future, I’m going to figure out the shape of the “Async” object. In particular, whether it’s possible to use it as a Carrier for Trait-based exception handling RFC, and what consequences of this are.

Also, I’m going to play with timeouts more. The large problem of the timeouts is not how to handle them but rather how to define them. For example, HTTP application might have five timeout classes: idle, header-receiving, request-body-receiving, response-generation, response-sending. And at least two of them may depend on the request itself.

Another hot topic is messaging between state machines. Currently, it is accomplished by a “wakeup” event that is triggered by a message to the main loop that contains machine token and no payload. The action then propagates through the state machine to all layers. It’s expected that state machine will find out what is the payload itself by looking into some queue or a cell. Still better abstractions should be created to accomplish request-reply pattern on top of that (probably something that looks like a Future) similar thing for queues shared between connection pool and may be other things.

Another exciting task is to figure out a better interface for the Stream/Transport pair. This is interesting for two reasons:

  1. Simpler to write protocols
  2. Super-efficient RDMA and Userspace network stack

The second point needs a detailed explanation. For example, instead of the current “wake me up after the next read” abstraction we could provide a “wake me up when there are 128KiB in the buffer” operation. Then changing the underlying stream, into userspace TCP, the driver can arrange a network buffer of this size for the connection. So you don’t copy incoming packets and still have a contiguous chunk of data from the network, which can be used directly.

Disclaimer: my understanding of how RDMA or userspace TCP stack work are very sparse and theoretical, so I may be wrong.

Conclusions

Different languages have different idioms. I think I’ve found a good way to make asynchronous applications in rust.

This is not to say that threads are bad. Threads in rust are very great. Just threads for handling a large number of network connections and timers is not very good.

I believe that “rotor” library starts to gain its shape. Which means there will probably be no drastic changes in how it will work. Still some types, names, and other things will change in backwards-incompatible ways until it is feature complete. Which probably means that at least some critical number protocols implemented and at least few small applications.

Benchmark

And the fun part. I’ve just passed the point of 0.5 millions of requests per second served:

This works on i7–4790K (which is desktop-class CPU), on localhost (which also means that “wrk” runs on the same machine too).