EventMachine internals and the Reactor pattern
Continuing from the previous posts about primitives and abstractions, in this part of the series we’ll pick a few interesting internals from EventMachine’s source code, and explain the core ideas behind these snippets. If you’re not familiar with EventMachine, this is a solid series of posts to get you started.
The Reactor pattern
A Reactor is a process running in an infinite loop reacting to stimulii from the outside world. Its reactions are codified in the form of registered callbacks that get executed when appropriate conditions are met or certain events occur. EventMachine itself has several implementations of this idea targeting different platforms. In this post we’ll focus on pure Ruby implementation.
Let’s see what Wikipedia has to say about the topic:
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.
In other words, it’s a way of managing concurrently occurring events in the outside world. We have various events coming in through one or more channels, and we use the Reactor to cherry-pick events we want to react to by defining callbacks.
In EM, whenever you provide a block to execute under a certain condition, you’re actually registering a callback in the Reactor. A condition is usually the completion of a lingering operation, for example:
- an HTTP request has been completed,
- a DB table has been read from,
- a file has been written to.
There’s a pattern to be noticed here. We use EM when we want to avoid waiting for IO operations to complete. Instead of waiting, we register a callback that gets executed once the operation is completed and move on to other things.
It’s an efficient way of doing IO and a nice way of abstracting away the gory details of multi-threading that’s usually deployed in similar scenarios.
Let’s now see how this pattern works under the hood.
A Reactor in EventMachine is implemented as an object that wraps an infinite loop and provides methods for managing execution. Ruby implements the Reactor as a Singleton object with several methods for starting the loop, registering callbacks and timers, and signaling loop breaks.
First, the constructor that initializes the Reactor.
As we can see in the snippet above, quite a few variables are initialised in the constructor, which are as follows:
@selectables– references to various IO capable things (network connections, files, pipes and sockets in general),
@timers– references to code blocks that need to be run at a certain point in time, sorted by time-to-execution,
@next_heartbeat– a variable that holds the point in time when the next selectable-aliveness check should be done (explained later),
@stop_scheduled– a variable that, if set to
true, tells the Reactor to shut down in the next iteration.
Other variables are almost self-explanatory. Let’s now introduce and explain a few terms used across EM’s source code.
@selectable is an instance of the
Selectable class which wraps a socket (or a file descriptor) and abstracts away IO operations on it. Since it’s assumed that all IO operations should be non-blocking by default, the non-blocking flag1is set in the constructor for anything the
Selectable class wraps.
The class additionally implements methods for controlling, reading and writing from a socket in a non-blocking way (for example, only writing a few packets at a time so the reactor loop doesn’t get blocked).
Next in line is the Loop, a method that actually runs the Reactor and starts the infinite loop.
Looking at the
run method, we can see that all it does is open a loopbreaker (purpose of which we’ll explain later) and start an infinite loop which in each iteration:
run_timers–> runs the registered
@timerswhen a specified point in time has been reached,
crank_selectables–> checks the read/write state of
@selectables(IO objects) and performs IO operations on them if they’re ready, and
run_heartbeats–> checks if all selectables are alive (by invoking a heartbeat method on a selectable).
Finally, the cleanup is done after the Reactor has finished running (usually done when the process is terminated).
Apart from the initialization and the loop itself, the following three methods called in each iteration of the loop are the foundation of EventMachine.
crank_selectables method is the only interesting one here. It partitions all selectables into two arrays, one with readable and one with writable sockets. It then invokes the
select system call and passes these two arrays to it. The select2system call then selects the sockets that are ready for reading or writing and invokes eventable reading or writing on them. Eventable in this context means a certain number of bytes per cycle.
The other two methods are pretty straightforward in their purpose. The
run_timers method executes all code blocks from the
@timers set only once their time-to-execute has been reached, while the
run_heartbeats runs the
heartbeat method on each selectable every
HeartbeatInterval seconds. The details of the
heartbeat method are covered further down in this post.
These few snippets cover the basic setup and operation of EventMachine, but we should cover a few more patterns at this point so we can see a clearer picture of what EM in its totality is. Most importantly, we’ll cover the Loopbreaker, the Heartbeat and the threadpool.
A loopbreak is a way to signal the Reactor it should do something. Why do it this way? Well, since it’s running an infinite loop, it can’t respond to calls — you can think of it as being deaf to all messages while running. Loopbreaker is a type of a communication channel between the reactor and the outside world. When something outside the loop signals a loopbreak, the reactor stops for a moment and lets other things happen. It does not terminate the loop, it just allows other things to run.
The loopbreak is signaled whenever we schedule something via the
defer methods, since we need to tell the reactor that something has been scheduled. It can then, in turn, run those tasks.
However, since it is stuck in an infinite loop, the reactor can’t confirm a “hey, you have new stuff to do” type of messages, so we produce a loopbreak signal to command it to check if there are tasks to run.
Here’s a (slightly modified 3) EventMachine implementation of the loopbreaker:
Internally, a loopbreaker is usually implemented as a one-way IO pipe between entities that are communicating. So, a loopbreak signal is in effect just a few bytes sent over that pipe. When the reactor checks selectables for IO activity, it will see that it has something in the
LoopbreakReader, the receiving end of the pipe wrapped in a
Selectable, and will start the method to run scheduled blocks.
A heartbeat is a way to continuously check the staleness of the selectables. Every selectable for which a stale state is possible (ie. a long-lived connection) has a
heartbeat method implementation. This method inspects if the connection is stale by checking if there was any activity in a certain specified interval.
Here’s the implementation for a
StreamObject that represents any socket used for long-lived data streaming.
heartbeat method checks for inactivity, and if it deduces that the selectable is inactive, it schedules it for closing.
The last thing of interest is the threadpool. Basically, a combination of a queue of tasks, ie. callable objects (blocks), and a pool of Threads that are used to perform those tasks. As we mentioned in the previous post, the threadpool is used to handle blocking IO. Whenever we need to perform a blocking IO call, we do it in a separate Thread. EM’s threadpool abstracts this away. We only need to
defer the callable object and EM will take care of it when it can.
The following three methods represent the entirety of the EM’s threadpool implementation.
defer method is used to test if there is a threadpool, and if there is none, it creates the
@resultqueue and starts the threadpool creation routine. Afterwards, it simply pushes a task given to it on a task queue.
spawn_threadpool is the one that actually creates the threadpool. It creates a number of threads and sets up each of them to continuously fetch tasks from the
Queue#pop blocks execution whenever there are no tasks to fetch, threads will wait until there is a task that has to be executed. Once a task to execute is produced, the first thread to fetch 4it executes it and pushes the result along with the callback that handles the result to the
@resultqueue. It then signals a loopbreak to notify the reactor that it should run the callback that handles the result.
run_deferred_callbacks method is not strictly a part of the threadpool system, but since it’s tied to it, we’ll cover it as such. It’s used for running the result handling callbacks. It pops results (along with callbacks) from the
@resultqueue and runs them 5. The reason why it’s not strictly a part of the threadpool system is that it’s also used to run the callbacks scheduled via the
next_tick mechanism. It (thread-safely) consumes the
@next_tick_queue for executables to run until it empties the queue. That odd little
next_tick call in the ensure block is just a way of telling the reactor to keep running and bubble up the exception (and not immediately stop) if one happens.
I hope I have given you a somewhat understandable explanation of EM’s internals and that you have a clearer picture in your mind of what’s happening under the hood. If you have any questions or comments, be sure to leave them in the comments section below, and I’ll give my best to answer them.
Next up, Celluloid and the Actor pattern.
Originally published at pltconfusion.com.