Understanding Reactive IO and Back-Pressure with (your own) Akka Http Server
Reactive programming, particularly for writing web services is getting a lot of attention in last few years. It’s changing the way programmers have to write code, and what they assume about the underlying threading mechanisms.
Most web servers used to use thread per connection model to handle http requests. In this model, the servers also used to use blocking IO libraries to read requests from connections and write responses. This works well when connections are short lived. Traditionally, most http requests used to be short lived. A client opened a connection, sent request, read the response and closed the connection. This allowed servers to have a fixed thread pool to serve the requests and use blocking IO to read and write to connections. Using blocking IO, also allowed having synchronous APIs, which is a simple programming model to work with.
This model created a big problem with changing patterns of web requests though.
To support Keep-Alive connections, http pipelining, servers could no longer rely on blocking IO and threads only. Apache, as of version 2.4 uses evented IO to handle connections (https://httpd.apache.org/docs/2.4/mod/event.html).
In the JVM world, most popular web servers like Jetty use Java NIO as a backbone. Jetty 9.3, e.g. uses full non blocking IO . (https://webtide.com/jetty-9-3-features/)
With non blocking IO and evented APIs, programming is harder.
The most primitive APIs are callback based and they are extremely difficult to compose.
When we start using these APIs, beyond hello world programs, we start seeing these difficulties. So there are few efforts happening to simplify the API to be more composable and easier to use. One such effort to simplify these APIs in JVM world was RxJava. There is a widespread effort now to standardize these APIs as reactive streams API. [Apart from composability, one of the major feature of reactive streams is backpressure, and we will see what it means later] (http://www.reactive-streams.org/). With JDK 9 supporting reactive programming with its Flow API, I see that most web frameworks will provide a reactive streams version for their programming APIs.
HTTP/2 with its use of single persistent connection will push every web server/web framework in that direction. So it’s mandatory for every programmer to understand what reactive programming is and how it builds on top of more fundamental IO libraries like Java NIO.
Akka Http is one of the web frameworks, which is reactive from grounds up. So in order to understand what it means to be reactive and how reactive APIs make use of Java NIO underneath, it’s a very good framework to study.
To understand and study Akka Http, Reactive Streams, and how it all maps to Java NIO, we tried writing three very simple http servers. One with Java NIO, one with Akka Actors with Java NIO, and then a one with Akka Streams. The code for these is available on Github (https://github.com/unmeshjoshi/reactiveio)
The server is very very minimal, supporting only Http GET requests.Sometimes copying code from akka http code base. The aim of this blog post and source code is that once you understand how it works, you should be able to understand Akka Http code base and it will make it easier to understand internals of any other reactive streams based http framework or server.
To start with let’s see how a very simple web server can be written using Java NIO
2. A Simple HTTP Server with Java NIO.
Java NIO provides non blocking IO API in Java. An excellent overview of the API can be found in this presentation by Doug Lea. The concepts are not very new and they were already well known ‘patterns’ back in 1990s. (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/). A typical HTTP server using patterns in this book can be implemented by applying patterns Reactor, Acceptor-Connector, Asynchronous Completion Token etc.. (If you browse through any HTTP server code like Jetty, you will find these patterns)
To write a HTTP server using Java NIO can be easily done by using Reactor Pattern. The primary API provided by Java NIO is that of Selector, Channels and Buffers. Channels can be registered with Selector for read/write/accept kind events. Once a channel is ready to be read/write or accepted, an event is generated which then can be handled. Once the read/write or accept events are available, the read/write calls on channels never blocks.
The basic implementation is as following
This will start a server on port 5555 and register interest in OP_ACCEPT events. This means whenever a new connection is made to the server and its ready to be accepted, we will get an event.
The server then runs a continuous event loop as following
The event loop will run continuously and look for available events.
To start with, if we do “curl -i http://localhost:5555/test”. The Accept event will be generated and the server will handle it.
The handler will accept the connection and register its interest to read data from the connection. We also register http parser object that needs to be used for parsing the data we read from socket channel.
Important thing to note here is that of state management. With non blocking event driven IO, we need to know till what point the data is processed. This state is maintained as part of registration information itself. The channel allows us to store any object as an attachment to channel registration key. Lets look in little more details, what state management means in the context of HTTP server.
2.1. State Management
In reactive web servers, because a single thread handles multiple connections, managing state becomes an important concern. In thread per connection way of programming, the whole request -> process -> response cycle happens in a single thread. So state can be easily managed in functions or objects processing the data from connection.
With single thread managing multiple connections, there needs to be a way to store per connection state.
For a HTTP server, one of the first thing that needs to happen is parsing incoming requests.
Writing request parsers is tricky with non blocking IO.
2.1.1. Stateful HTTP request parser
One of the important considerations when writing request parsers with non blocking IO is that they need to manage parsing state. All the request data might not be available at a time.
With blocking IO, the parser can just block on IO read. With non blocking IO, that’s not an option.
Lets see how state management happens with blocking IO.
If we need to parse first line of HTTP request, we read data from InputStream from the TCP connection.
A very simple parser Let’s take an example of HTTP GET request to be parsed by the server
GET /resource HTTP/1.1
Let’s say we start parsing the HTTP request by parsing the HTTP method as a first step. The code will look something like following
Because the Inputstream.read call blocks when data is not available, we do not need to worry about the state of the parser. Its maintained naturally by the method call stack.
The situation changes drastically when using non blocking IO. When using non blocking IO, we get an event when data is available to be read on the socket. But we never know if the data is enough to be able to parse the request. So we need to save the state for each connection. Assuming we only have HTTP parsing as the only thing we do, we have HTTP parser object attached to the channel key.
Reading the available bytes from the socket, the parseBytes method of the HTTP parser is called. Its the responsibility of the HTTP parser implementation to maintain the state, to know, till what point the request is parsed.
Akka Http uses partially applied function to maintain the state and the request bytes needed for parsing, in the function parameters of a partially applied function.
The parsing function is defined something like following
Anytime there is no enough data to be able to parse a HTTP request, the parser throws NotEnoughtDataException.
Every byte is read as following.
Whenever this exception is thrown, the state member of the parser is changed to a new partially applied function, with the bytes read so far from socket passed as function parameters.
The continue function which implements this key functionality looks like following.
As you can see, in very basic case, if we always use offset as 0. The state will be a new partially applied function having the first parameter containing the available array of bytes. The next time data is read from the socket channel, it will be appended to the previously read bytes and passed to the startNewMessage function to parse it again.
The read handler will continue to register for READ events on connection, till the time HTTP parser returns NeedsMoreData.
Writing the response
Once the parsing is successfully completed, and we are ready to process the request, the handler can register for WRITE events.
When we get the event that the socket channel is ready to be written, we can process the request and write the response to the channel.
There is a small caveat here. Just like when we read from socket channel, it’s not guaranteed how many bytes we read, when we write the response, it’s not guaranteed how many bytes get written to the socket. So it’s necessary to keep the state for written bytes as well and keep writing till all the response is written to the socket channel successfully.
So state management is needed for writing response as well. It’s not covered in this writeup.
2.2. Issues with Single Threaded Server.
While usage of non blocking IO allowed a single threaded server to support hundreds of connections, there are a major limitation. Not leveraging multiple cores. Because the server is single threaded, to leverage multiple cores, we need to run multiple processes, on a single machine. This is called n-copy approach for using multiple cores. Typically same number of processes as the number of cores are started. Servers like node.js use this approach.
On the JVM though, it’s better and proven to optimally utilize all the cores of underlying hardware by using threads.
2.2.1. Using ThreadPool or Thread per connection.
To leverage multi-core architectures, we could use ThreadPools and dedicate one thread per connection (or request). But it has its limitations as well. With HTTP/2 and usage of persistent connections, we can not dedicate a Thread per connection. Because when there are thousands of connections to a particular web server, we can not have a thread dedicated to each of those. (Thousands seem too less now a days, when people are talking about millions of connections per server)
Using lightweight tasks per connection, sharing a small number of threads (possibly one per core) works really well.
3. HTTP Server with Actors
Actors fit very well for this task. We can create an Actor per connection. Actor are lightweight, so we can create thousands of them. They work very well to maintain the state per connection and are able to react to IO events from NIO selectors.
Asynchronous message passing in actors is perfect fit for evented nature of Java NIO. We can also leverage multi core processors by having optimal thread pool dispatchers for actors.
In Akka Http, which uses Akka IO internally, the structure of actors is as following. There is a TCPListner actor which maintains ServerSocketChannel and listens for new connections. It creates a new actor called TCPIncomingConnection for each new accepted connection.
To create a server, we create a Server actor which bootstraps the process and then a TCPConnectionHandler actor to do application specific processing.
The overall structure looks as following
As we saw in the NIO server above, the selector event loop is an isolated activity which can run on its own. So we could have an Selector actor running on a dedicated thread pool dispatcher, typically needing only one thread per selector. (We can potentially use multiple NIO selectors, by default only one selector is used with a dedicated dispatcher). The while loop is converted into a task repeatedly scheduled on the selector dispatcher. There are two tasks, one for Selector and one for Registrations.
Selector task, when executed, translates NIO events into actor messages as following
As you can see, the state per connection is maintained in an actor, which is stored in an attachment per selection key. (In our previous single threaded example, we had a http request parser object as an attachment). The attachment actor reference is attached to selection key whenever the TCPIncomingConnection actors are created to handle the connection. One of the interesting things is that each of these actors are created in the context of SelectionHandler making it a Supervisor of all the connection handling actors. The selection handler also maintains ChannelRegistery object which wraps the NIO selector, and passes this registry to every new TCPIncomingConnection actor that’s created. The actors can then register their interest with the selector through this registry.
And TCPIncomingConnection as following
The SelectionHandler communicates with the TCPListner and TCPIncomingConnection actors with following set of messages
Responding to these messages, then the actors can read or write to the corresponding channel.
As can be seen, the TCPListner actor doesnt directly create the TCPIncomingConnection actor. But a factory method is passed to SelectionHandler so that it can at as a supervisor for every connection handling actor.
TCPIncomingConnection actor handles messages corresponding to NIO Selectors events as well as the application specific handlers for writing data to the connection.
The message contract between the connection actor and the application handling actors is
It’s a set of Commands which are sent to TCPListner and TCPIncomingConnection actors from application actors and corresponding set of events which are sent from TCPListner and TCPIncomingConnection actors to application actors.
The application server can be written as following. There needs to be a Server actor
The TcpConnectionHandler actor is where the application specific processing happens. The HTTP request parsing can happen in this actor. Its implemented as following
As you can see, every time the Received(data) message is handled, httpParser.parseBytes method is called. Http request parser is the same which is explained in the last section.
As can be clearly seen, it’s much easier to maintain the state per connection in an actor.
There are several advantages of using Actors over the single threaded server of the first section. It’s much easier to maintain state. It’s easier to optimally utilise multi core processors. As we saw in the case of SelectionHandler, the IO event loop can be run in a dedicated thread of its own, while request processing happening in the separate actors scheduled on separate dispatchers.
(Note: It’s not clear to me yet, if there is an overhead of context switching when the messages are passed from IO actors like TCPIncomingConnection to application actors. E.g. Jetty has optimization to reduce thread context switching overhead by having EatWhatYouKill strategy (https://webtide.com/eat-what-you-kill/))
3.1. Back Pressure
One of the key aspect of HTTP server design is how it responds to spikes in load. Spikes can happen in two ways, 1. Number of connections made to the server 2. Amount of data read from connections.
- A lot of connections made to the server
If server is under load while processing requests and connections keep flowing in. If there is no bound on number of connections to be accepted. It will just keep on creating the connection handling actors. Because the server is already busy processing already made connections, the new actors will not be processed, but will add to the load on the server in terms of resources to be managed.
- Lot of data is read from connections
Server is under load, processing requests from a few connections and but it keeps on reading data from connections. If we continue reading data and pushing the data to server handlers, we know it’s going to grow actor mailboxes with data being pushed without it getting consumed.
If there is a mechanism to tell the connection handling actors to stop/start accepting connections and stop/start reading data, it will always have load on the server which it is able to handle and accept more work only when it has free capacity.
This is where the concept of ‘back pressure’ comes in. Akka Streams framework has back pressure as a first class concept. So it’s possible to implement a HTTP server with back pressure baked into actor based implementation discussed in previous section.
4. HTTP Server with Akka Streams
Akka Streams is a framework for stream processing built around the concept of back pressure. The primary abstraction that the framework provides is that of a graph stage. So all the application processing can be modelled in terms of graph stages connected to each other. A graph stage can have an input port through which it reads data and/or output port to which it writes data. The data is read or written only if downstream stages ask for it. There is an explicit API to signal demand of more data to process.
GraphStage can have either both input and output (Its called Flow), only output (Its called Source) or just input (Its called Sink). The program is then structured as a sequence of connected ports and their handlers to form a graph. The consumer calls pull on the handlers whenever there is a demand, and the provider responds by handling an onPull event to produce more.
Unless there is a asynchronous boundary between stages, all the stages are executed in a single actor, called ActorGraphInterpreter
HTTP server can be implemented in terms of graph stages. Each step in HTTP request/response processing where we need to get specific demand from downstream stages or propagate demand to upstream stages are following.
- A stage for accepting new connections
- A stage for reading and writing data to a connection
- A stage for parsing HTTP request
- A stage for rendering HTTP response
These stages can be then wired up using Akka Streams DSL and form a graph as following
4.1. Integrating GraphStages with IO Actors.
We have IO layer based on actors as discussed in previous section. With streams we add a layer which integrates IO actors with Akka stream graph stages to implement back pressure. The four graph stages which are mentioned above are built on top of IO actors.
4.1.1. A stage for accepting new connections.
TcpHandlingGraphStage accepts incoming connections on demand. The key is creating a stream logic which registers the graph stage actor to communicate with IO actor.
As startup of the logic, binds the server to specified IP address and port.
Very similar to the Actor based server in the previous section, the receive method of the stage actor is defined as following. The key here is how to implement back pressure.
Back pressure implementation in accepting connection means accept connection only when the upstream stages ask for a new connection.
4.1.2. Accepting connections on demand
As discussed in the first section, for getting events from NIO channels, we need to register our interest in Selector instances. E.g. for getting event when a new connection is ready to be accepted, we need to register our interest as following
With this, once there is a new connection to be accepted, we get an event as following
Once we get the keys for specific events, there is a way to de-register our interest in specific type of events. It can be done by negating the bytes representing interest.
With the above code, the server socket channel will stop getting events for newly available connections till the interest is registered again.
For letting the selector know when to start giving ACCEPT events again, we need to register the interest on the SelectionKey corresponding to the channel again. For communicating this to the TCPListener actor, it accepts a message called ResumeAccepting.
So the way it works is as following
- TCPListener actor accepts a connection and then deregisters interest in ACCEPT event. Java NIO selector will not give any more ACCEPT events for new connections.
- An upstream handler sends ResumeAccepting message to TCPListener
- TCPListener enables interest on on the SelectionKey.
- It gets the next connection accept event.
- The cycle continues.
The TCPListener actor handles ResumeAccepting message as following
Once we have this basic mechanism of de-registering and re-registering interests with Java NIO selectors, it’s relatively straightforward to integrate IO actors with the push-pull handlers of graph stages.
As we can see, whenever there is a demand on upstream for new connections, a ResumeAccepting message is sent to TCPListener actor. With this message, the SelectionKey for TCPListener is again registered for ACCEPT event and a new connection is accepted when available.
The important thing to notice here is that, till the time there is no Pull from upstream, no new connection is accepted.
4.1.3. A stage for reading and writing data to connections.
Once a new connection is accepted, data needs to be read and written to the connection. As discussed in the actor based server implementation, every time data is read, a Received(data) message is sent to upstream handler from TCP connection handling actor. Every time data needs to be written to a connection, a Write(data) message is sent to the actor.
To implement backpressure, both these actions need to happen on demand. Data should be read only when there is a demand from upstream handlers and more data has to be written only when the connection handling actor is ready to write more data to connection.
4.1.4. Reading data from connections on demand
As discussed in previous section, the Selector event loop deregisters interest in READ events on a socket channel. The TcpIncomingConnection actor responds to messages ResumeReading to re-register interest in READ events on SocketChannel key associated with NIO selector. So no more data is read, unless the upstream handlers send ResumeReading message to the connection handling action.
Once this mechanism is in place, it can be hooked into graph stage in the TcpHandlingGraphStage. Each time a new connection is accepted and it is handled in TcpHandlingGraphStage by creating a new object of type IncomingConnection, which is pushed output port of the Source.
The IncomingConnection object is created a Flow type to handle connection reads and writes.
IncomingConnectionStage is the graph stage which handles the back pressure mechanism for reading and writing data to connection.
TcpStreamLogic is where is key read/write handling happens. onPull from upstream stages, sends a ResumeReading message to connection handling actor. onPush from upstream stages send Write(data) command.
When the data is received in response to ResumeReading message, it is pushed to upstream.
If there is no pull from upstream stages, no more data is read from connection.
4.1.5. HTTP Parsing data from connection on demand
Once we have these basic connection handling graph stages, we can hook in HTTP Parsing stage as following. This stage reads ByteString and emits HttpRequest. It can reuse the stateful HTTP parser we built in earlier section. Whenever the parser responds with NeedsMoreData response, a pull request is raised in the graph stage, which will enable read interest in the connection to read more data.
4.1.6. Writing HTTP response to connection on demand
The last part of the puzzle is writing HTTP response to connection. We can have a rendering graph stage as following. This stage accepts a HttpResponse object and pushes ByteString to the connection.
Once we have these building blocks in place, the HTTP server can be wired together as following
4.1.7. A runnable graph for accepting incoming connections
A Source is created from the TCP connection handling graph stage. The source is mapped with parallelism of 1024. This means till the number of connections reach 1024, the pull will happen from this stage and the connections will be accepted. When the count reaches 1024, there will be no further pull and no more connections will be accepted.
The mapping function gets handle to each of the IncomingConnection. This mapping function is where the HTTP handling of each incoming request happens.
4.1.8. A runnable graph for processing incoming connection
There are three steps in handling each incoming HTTP request.
1. Parsing the request
2. Handling the http request to produce HTTP response
3. Rendering and writing the HTTP response.
So for each incoming connection, we need to wire together HttpRequestParsingStage, HttpResponseRedender and the user specified handler function which takes HttpRequest and gives HttpResponse.
The parsing flow is created as following
The rendering flow is created from HttpResponseRenderer stage as following
The handler function is used to created a mapping flow
These two flows are then wired together with the request handling flow as following
The resulting flow then can be used to handle the IncomingConnection
Because each runnable graph runs in its own Actor. (An ActorGraphInterpreter executing the onPull , onPush handlers on graph stages), depending on the parallelization number set (number of connections accepted in parallel, which in above example is 1024), those many Actors will be created to process requests from connections. These actors will be scheduled on a dispatcher thread pool, which can tuned based on number of cores available.
The same Actor (and so possibly same thread), producing and consuming the request payloads. (This I think will give an effect similar to EatWhatYouKill policy of Jetty (https://webtide.com/eat-what-you-kill/). I have not done any performance measurements, but just code inspection. So this is just a guess)
The code for this post is available at (https://github.com/ unmeshjoshi/reactiveio). It contains working copy of all three HTTP servers. Copying some code from Akka IO as needed.
This is by no means a production code and sole purpose of this is to understand how reactive IO and more importantly how Akka HTTP works.
I hope this will be useful for developers who are trying to understand under the hood of Akka HTTP.