Comedy: Node.JS actors for flexible scalability
In this article I’m introducing Comedy — a Node.JS actor model implementation. Actors are all about flexible scalability. They allow you to scale your Node.JS modules to multiple processes on multiple machines without any code change.
About actors
What is an actor? According to Wikipedia, an actor is a thing that can:
- send messages
- receive messages
- create child actors
It’s really as simple as that! But this simple abstraction is very useful for scalability. Why? Just think about it: the only thing you can do to an actor is to send it a message. The internal actor state is completely isolated from the outside world. This makes an actor a universal unit of scalability: you can run it anywhere in any number of instances given it’s isolation, and you can always reach an actor given the messaging abstraction. It’s only a matter of implementing the message transports and scaling mechanisms, and that’s exactly what Comedy does.
The actor’s capability of creating children is another useful thing. It lets us form a clear module structure with hierarchical job separation. This job separation is somewhat similar to the one in human organization, where the manager receives a high-level requirements and delegates the work to his/her subordinates according to their job roles.
This all still might sound a bit abstract. We’ll jump into practice shortly, but first let’s step back and see why we need to use actors for scalability in the first place.
Motivation
If you have been programming in Node.JS for some time, you probably know that it is single-threaded. And this is the strength of Node.JS, because single-threaded asynchronous communication simplifies your developer life a lot. You are free from the whole class of the most nasty and most hard-to-reproduce problems: the multithreaded bugs.
But single-threadedness is also a weakness of Node.JS when it comes to CPU-intensive computations. These computations can easily block your precious single thread and make your application completely irresponsive. So, in practice, we usually tend to use Node.JS for network-intensive applications and avoid using it for CPU-intensive applications.
However, even if our application is network-intensive, it still consumes some amount of CPU — it needs to run business logic, parse protocol packets, encode data etc. So, as our network load grows, we can still come to situation where we consume 100% of our single CPU core. And what happens then? We lack CPU for handling incoming traffic, our task queue grows, our heap grows, our event loop lag grows and then BOOM!!! Our app crashes with out-of-memory error.
And that is exactly the point where we need to scale our application to more CPUs. And ideally, we don’t want to limit ourselves with just the ones on local machine — we may need multiple machines with many cores. And the less we change the application code while we scale — the better. Wouldn’t it be nice if we could scale by just changing the configuration without any code change at all?
This is where Comedy comes into play.
Hands-on example: Prime numbers service
To demonstrate how Comedy works, I made a simple example: a microservice that finds prime numbers. The service is accessed via REST API.
Surely, prime number search is a pure CPU-intensive task. If we would have been building such a service in real life, we would have had to think twice before choosing Node.JS. But for the sake of example, we intentionally picked CPU-intensive task to easily reproduce a situation when we are out of CPU time.
So, let’s start with the core of our service and implement an actor that finds prime numbers. Here is the code:
The nextPrime()
method of a PrimeFinderActor
class finds a prime number next to the one specified in the input parameter (not necessarily prime). The method uses tail recursion, which is supported in Node.JS 8.
To run example, use Node.JS 8. It is needed for several features including async-await.
What we see in the code snippet above looks like a simple class. And it is. But we can use it as an actor definition, i.e. an actor behavior description. It describes, what messages the actor can accept (each method is a handler for a message with topic name equal to method name), what it does when it accepts these messages (that’s a method implementation) and what result it returns in response to the messages (that’s a method return value).
At the same time, because it’s a plain class, we can write a unit test for it and easily test correctness of implementation. The unit test could look like this.
Now we have prime finder actor!
Our next step is to implement REST server actor. Here is how it will look like:
In actor definition above we see initialize()
method (starting from line 15), which will be called by Comedy during actor initialization. initialize()
receives an actor instance as an argument. An actor instance is exactly a thing you can send messages to, and it has a number of other useful methods. getLog()
method returns a logger instance for an actor (we’ll be using it), and with createChild()
method we create a child actor. We pass an actor definition to createChild()
method and get back a promise, which yields an instance of newly-created child actor when it is initialized. We’ll use createChild()
to create child instance of PrimeFinderActor
.
As you can see, actor initialization is asynchronous operation. Our initialize()
method is also asynchronous (it returns promise). So, our RestServerActor
will be considered initialized only when the promise we have returned is resolved.
After we have created PrimeFinderActor
child actor, awaited for it’s initialization and saved a reference to child actor instance to primeFinder
field (line 17), the only thing left is to initialize and configure REST server. We do it in _initializeServer()
asynchronous method using Restify library.
We create a single request handler for GET /next-prime/:n
request (line 37), which computes a first prime number that follows the one specified by sending a message to a child PrimeNumberFinder
actor and receiving response. We send a message with sendAndReceive()
method that receives topic name as a first parameter and a message as a second parameter. The topic name in our case will be nextPrime
(by corresponding method name), and the message will be just an input number corresponding to the method input parameter. The sendAndReceive()
method is asynchronous and returns a computation result promise.
You can send not only primitive numbers or strings as messages, but data objects as well. You can even send objects of your custom classes, provided they have
toJSON()
method or a configured marshaller/unmarshaller.
Almost done. The only thing left is to launch all this. We need to add two more lines to our example:
Here we create an actor system. We specify root actor definition as a parameter, in our case it is RestServerActor
. And we get the following hierarchy once the system is initialized:
We are lucky with hierarchy, it’s quite simple! An example of real hierarchy is here.
So, let’s run and test!
$ nodejs prime-finder.js
Mon Aug 07 2017 15:34:37 GMT+0300 (MSK) - info: Resulting actor configuration: {}$ curl http://localhost:8080/next-prime/30; echo
31
It works! It’s giving us prime number 31 in response to input number 30.
Let’s experiment a bit more. Now we’ll measure timing for various inputs.
$ time curl http://localhost:8080/next-prime/30
31
real 0m0.015s
user 0m0.004s
sys 0m0.000s
$ time curl http://localhost:8080/next-prime/3000000
3000017
real 0m0.045s
user 0m0.008s
sys 0m0.000s
$ time curl http://localhost:8080/next-prime/300000000
300000007
real 0m2.395s
user 0m0.004s
sys 0m0.004s
$ time curl http://localhost:8080/next-prime/3000000000
3000000019
real 5m11.817s
user 0m0.016s
sys 0m0.000s
As our starting number grows, the request handling time grows as well. A transition from 3 million to 3 billion is particularly impressive. Let’s try parallel requests:
$ curl http://localhost:8080/next-prime/3000000000 &
[1] 32440
$ curl http://localhost:8080/next-prime/3000000000 &
[2] 32442
In top
we see that one CPU core is fully loaded:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
32401 weekens 20 0 955664 55588 20956 R 100,0 0,7 1:45.19 node
In server logs we see:
Mon Aug 07 2017 16:05:45 GMT+0300 (MSK) - info: InMemoryActor(5988659a897e307e91fbc2a5, RestServerActor): Handling next-prime request for number 3000000000
This means that the first request is being handled and the second is just waiting in queue. This is exactly the situation that was described before: we’re lacking CPU cores. Let’s scale now!
Scaling!
So, it’s time for us to scale. Important: all actions that follow won’t require any code modification.
Let’s first move PrimeFinderActor
to a separate sub-process. This step on it’s own is useless, but I’ll just do it to gradually demonstrate you the capabilities of the framework.
In the project root directory we create an actors.json
file with the following content:
After creating actors.json
file we restart the example. What happens? Let’s look at the process list:
$ ps ax | grep nodejs
12917 pts/19 Sl+ 0:00 nodejs prime-finder.js
12927 pts/19 Sl+ 0:00 /usr/bin/nodejs /home/weekens/workspace/comedy-examples/node_modules/comedy/lib/forked-actor-worker.js PrimeFinderActor$ pstree -a -p 12917
nodejs,12917 prime-finder.js
├─nodejs,12927 /home/weekens/workspace/comedy-examples/node_modules/comedy/lib/forked-actor-worker.js PrimeFinderActor
│ ├─{V8 WorkerThread},12928
│ ├─{V8 WorkerThread},12929
│ ├─{V8 WorkerThread},12930
│ ├─{V8 WorkerThread},12931
│ └─{nodejs},12932
├─{V8 WorkerThread},12918
├─{V8 WorkerThread},12919
├─{V8 WorkerThread},12920
├─{V8 WorkerThread},12921
├─{nodejs},12922
├─{nodejs},12923
├─{nodejs},12924
├─{nodejs},12925
└─{nodejs},12926
We see that now we’ve got 2 nodejs
processes. One is our main starter process. Another one is a child process, which runs PrimeFinderActor
, because it is configured to be in "forked"
mode. We did it with actors.json
file without any code change.
We now have the following picture:
Running the test again:
$ curl http://localhost:8080/next-prime/3000000000 &
[1] 13240
$ curl http://localhost:8080/next-prime/3000000000 &
[2] 13242
Looking in the logs:
Tue Aug 08 2017 08:54:41 GMT+0300 (MSK) - info: InMemoryActor(5989504694b4a23275ba5d29, RestServerActor): Handling next-prime request for number 3000000000
Tue Aug 08 2017 08:54:43 GMT+0300 (MSK) - info: InMemoryActor(5989504694b4a23275ba5d29, RestServerActor): Handling next-prime request for number 3000000000
Good news: it still works! Bad news: it works almost like before. The difference is: now it’s our child process that loads the CPU 100%. Let’s check it out:
PID PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
12927 20 0 907160 40892 20816 R 100,0 0,5 0:20.05 nodejs
So, we’ve moved our load to child process, but it still can’t handle more than one request simultaneously, and still requests are being handled one-by-one. Let’s make more child processes! We’ll scale PrimeFinderActor
up to 4 instances. It’s as easy as setting clusterSize
option in actors.json
:
{
"PrimeFinderActor": {
"mode": "forked",
"clusterSize": 4
}
}
After re-starting the example what we see is:
$ ps ax | grep nodejs
15943 pts/19 Sl+ 0:01 nodejs prime-finder.js
15953 pts/19 Sl+ 0:00 /usr/bin/nodejs /home/weekens/workspace/comedy-examples/node_modules/comedy/lib/forked-actor-worker.js PrimeFinderActor
15958 pts/19 Sl+ 0:00 /usr/bin/nodejs /home/weekens/workspace/comedy-examples/node_modules/comedy/lib/forked-actor-worker.js PrimeFinderActor
15963 pts/19 Sl+ 0:00 /usr/bin/nodejs /home/weekens/workspace/comedy-examples/node_modules/comedy/lib/forked-actor-worker.js PrimeFinderActor
15968 pts/19 Sl+ 0:00 /usr/bin/nodejs /home/weekens/workspace/comedy-examples/node_modules/comedy/lib/forked-actor-worker.js PrimeFinderActor
There are 4 child processes now! Just like we wanted to. By mere configuration change we moved to the following process structure:
So, Comedy has spawned 4 PrimeFinderActor
instances, each in a separate process, and in between these child actors and a parent RestServerActor
it has inserted an intermediary RoundRobinBalancerActor
which will distribute incoming messages among the child actors using round-robin routing scheme.
Running the test again:
$ curl http://localhost:8080/next-prime/3000000000 &
[1] 20076
$ curl http://localhost:8080/next-prime/3000000000 &
[2] 20078
We see that 2 CPU cores are busy now:
PID PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
15953 20 0 909096 38336 20980 R 100,0 0,5 0:13.52 nodejs
15958 20 0 909004 38200 21044 R 100,0 0,5 0:12.75 nodejs
In the logs we see 2 requests being handled simultaneously:
Tue Aug 08 2017 11:51:51 GMT+0300 (MSK) - info: InMemoryActor(5989590ef554453e4798e965, RestServerActor): Handling next-prime request for number 3000000000
Tue Aug 08 2017 11:51:52 GMT+0300 (MSK) - info: InMemoryActor(5989590ef554453e4798e965, RestServerActor): Handling next-prime request for number 3000000000
Tue Aug 08 2017 11:57:24 GMT+0300 (MSK) - info: InMemoryActor(5989590ef554453e4798e965, RestServerActor): Handled next-prime request for number 3000000000, result: 3000000019
Tue Aug 08 2017 11:57:24 GMT+0300 (MSK) - info: InMemoryActor(5989590ef554453e4798e965, RestServerActor): Handled next-prime request for number 3000000000, result: 3000000019
The scaling works!
More cores!
Currently, our sample service can handle up to 4 prime find requests simultaneously. All additional requests get into the queue and wait. On my machine there are only 4 CPUs (2 physical with hyperthreading). If I want to handle more than 4 parallel requests, I need to scale to other hosts in the neighborhood. Let’s do it!
A little bit of a theory first. In the last example we moved PrimeFinderActor
to "forked"
mode. What other modes are possible? Each actor can be in the one of the following modes:
"in-memory"
(default): actor works in the same process as the code that created it. Sending messages to such actor boils down to a mere method call. The overhead for communicating with"in-memory"
actor is zero (or near zero);"forked"
: actor is launched in a separate process on the same machine, where the creator process works. Communication with"forked"
actor is done through IPC (Unix pipes in Unix, named pipes in Windows);"remote"
: actor is launched in a separate process on a remote machine. Communication with"remote"
actor is done via TCP/IP.
As you probably guessed, we now need to switch PrimeFinderActor
from "forked"
to "remote"
mode. We want to get the following scheme:
Let’s edit actors.json
file. Simply specifying "remote"
instead of "forked"
is not enough in this case. We need to also specify a host, where we want to launch an actor. I’ve got a nice machine with 192.168.1.101
IP address in the neighborhood. And it is the one I’ll take:
{
"PrimeFinderActor": {
"mode": "remote",
"host": "192.168.1.101",
"clusterSize": 4
}
}
The trouble is: this remote machine has no clue about Comedy. We need to launch a special listener process there on a well-known port. It is done like this (all commands are done on remote machine):
weekens@192.168.1.101 $ mkdir comedy
weekens@192.168.1.101 $ cd comedy
weekens@192.168.1.101 $ npm install comedy
...
weekens@192.168.1.101 $ node_modules/.bin/comedy-node
Thu Aug 10 2017 19:29:51 GMT+0300 (MSK) - info: Listening on :::6161
Now our listener process is ready to accept actor creation requests on a well-known port 6161
. Let’s try it:
$ nodejs prime-finder.js$ curl http://localhost:8080/next-prime/3000000000 &
$ curl http://localhost:8080/next-prime/3000000000 &
Looking processes on local machine:
$ topPID PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
25247 20 0 1978768 167464 51652 S 13,6 2,2 32:34.70 chromium-browse
No Node.JS-related activity.
Looking on remote machine:
weekens@192.168.1.101 $ topPID PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
27956 20 0 908612 40764 21072 R 100,1 0,1 0:14.97 nodejs
27961 20 0 908612 40724 21020 R 100,1 0,1 0:11.59 nodejs
Prime calculation is running, just as we wanted to!
There is only one tiny step left: to use CPU cores both on local and remote machine. This is easy: we just need to specify several hosts in "host"
option in actors.json
.
{
"PrimeFinderActor": {
"mode": "remote",
"host": ["127.0.0.1", "192.168.1.101"],
"clusterSize": 4
}
}
Comedy will distribute actors evenly between the specified hosts and will route messages to them with round-robin. Let’s check it out.
First we need to launch comedy-node
listener process on local machine as well:
$ node_modules/.bin/comedy-node
Fri Aug 11 2017 15:37:26 GMT+0300 (MSK) - info: Listening on :::6161
Then we restart our example:
$ nodejs prime-finder.js
Let’s see the process list on local machine:
$ ps ax | grep nodejs
22869 pts/19 Sl+ 0:00 /usr/bin/nodejs /home/weekens/workspace/comedy-examples/node_modules/comedy/lib/forked-actor-worker.js PrimeFinderActor
22874 pts/19 Sl+ 0:00 /usr/bin/nodejs /home/weekens/workspace/comedy-examples/node_modules/comedy/lib/forked-actor-worker.js PrimeFinderActor
And on remote machine:
192.168.1.101 $ ps ax | grep node
5925 pts/4 Sl+ 0:00 /usr/bin/nodejs /home/weekens/comedy/node_modules/comedy/lib/forked-actor-worker.js PrimeFinderActor
5930 pts/4 Sl+ 0:00 /usr/bin/nodejs /home/weekens/comedy/node_modules/comedy/lib/forked-actor-worker.js PrimeFinderActor
Each has 2 PrimeFinderActor
processes running, just like we wanted to.
Sending requests:
$ curl http://localhost:8080/next-prime/3000000000 &
[1] 23000
$ curl http://localhost:8080/next-prime/3000000000 &
[2] 23002
Checking the load on local machine:
PID PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
22869 20 0 908080 40344 21724 R 106,7 0,5 0:07.40 nodejs
And checking the load on remote machine:
PID PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
5925 20 0 909000 40912 21044 R 100,2 0,1 0:14.17 nodejs
One CPU core is loaded on each machine. This means that we evenly distribute the load between local and remote machines. Notice that we’ve done this without changing a single line of code! Comedy and actor model made it possible.
Wrap Up
In this article we have seen an example of a Node.JS application that flexibly scales thanks to actor model and it’s implementation — Comedy. We took the following steps to achieve scalability:
- We described our application in terms of actors.
- We configured actors appropriately to spread load across multiple CPU cores evenly.
How to describe an application in terms of actors? It’s analogous to the question: “How to describe an application in terms of classes and objects?” Programming with actors is very much similar to Object-Oriented Programming (OOP). We could say this is OOP++. In OOP there are well-established and successful design patterns. Similarly, actor model has it’s own patterns. These patterns may be quite useful to you. Anyway, if you are already familiar with OOP, you’ll surely have no problems with actors.
What if your application is already written? Should you “rewrite it into actors”? Well, you’ll have to do code modifications in this case, that’s true. But you will not necessarily have to perform massive refactoring. You can pick out several major, “large” actors, each representing a big part of your application. After that you can already start scaling. “Large” actors can be later broken into smaller ones. Again, if your application is already written in OOP-style, the transition to actors is very likely to be painless. The only thing you’ll probably have to work out is isolation — actors are completely isolated from each other in contrast to objects.
A couple of words regarding the framework maturity. First working version of Comedy was developed inside SAYMON project in June 2016. From that first version, the framework has been working in production and has proven it’s stability. In April 2017 Comedy was open-sourced under Eclipse Public License. It continues to be a part of SAYMON project and is used to scale the system and provide it’s robustness and fault tolerance.
The upcoming feature list is here.
In this article I haven’t mention a number of features Comedy has: actor respawn, resource injection, named clusters, user class marshalling, TypeScript support. You can find this information in documentation, as well as in tests and examples.
Use Comedy! Create issues! Waiting for your comments!