Playing with Node.JS 10.5.0 threads

Vitaly Senko
Sep 5, 2018 · 5 min read

Hello everyone

Recently I had a conversation between me and .NET developers about threads in Node.JS 10.5.0. We were wondering if we need to synchronize threads. As a first step, we decided to solve a task about parallel file writing.

About threads in Node.JS

This is experimental technology introduced in Node.JS 10.5.0. To have an access for this feature, you should start your Node.JS program with “ — experimental-worker” flag. To simplify application start, let us write this flag in start script in package.json:
{
“name”: “worker-test”,
“version”: “1.0.0”,
“description”: “”,
“main”: “app.js”,
“scripts”: {
“start”: “node — max-old-space-size=4096 — experimental-worker app.js “
},
“author”: “”,
“license”: “ISC”
}

After that you will be able to import threads module with require(‘worker_threads’).

Application logic

Main thread creates N worker threads. Each worker writes into file 1000 times. Without any intervals. Seems simple, right?

Application code

To simplify codebase i decided to split main thread code and worker thread code. This seems very naturaly and clean.

Main file: ./app.js:

const { Worker } = require(‘worker_threads’);
const path = require(‘path’);
const WORKERS_NUMBER = 100;console.log(‘Hello from main!’);for (var i = 1; i <= WORKERS_NUMBER ; i++) {
const w = new Worker(path.join(__dirname, ‘./writer-worker-app/app.js’), { workerData: { id: i } });
}

Here we simply creates 100 new threads.

Worker thread code: ./writer-worker-app/app.js:

const { workerData, parentPort } = require(‘worker_threads’);
const logger = require(‘./logger’);
const id = workerData.id;console.log(`Worker ${id} initializad.`);for (let i = 0; i < 1000; i++) {
sendMessage();
}
function sendMessage() {
logger.log(`Hello from worker number ${workerData.id}\r\n`);
}
</source>
One thousand writes. Simple as spoon.
Basic logger file: ./writer-worker-app/logger.js
<source lang=”javascript”>
const fs = require(‘fs’);
function log(message) {
return fs.appendFileSync(‘./my-file.txt’, message);
}
module.exports = {
log
};

First results

On first launch all of us expected some crashes or rewrites. But file is clean, with exact WORKER_THREADS * 1000 rows. Success! No semaphores, no locks, .NET devs were mad how simply and beautifully Node.js works :-)

Here is part of file:

Hello from worker number 14
Hello from worker number 3
Hello from worker number 9
Hello from worker number 15

Cool experiment, but maybe we want something more?

Threads synchronization

What if we need to sync our threads? For example, let us change our task — now we need to write lines in files ordered by worker number! First one, then second, then third… then first again, second, etc.

To manage threads was created Orchestrator thread (we could just use main thread, but it was so easy and funny to create isolated threads). To create some communication between Orchestrator and Worker threads, we need some sort of channel. This is were MessageChannel class should be used. MessageChannel instance contain 2 fields: port1 and port2. Each port can post and receive message to another. To receive message you should add handler via .on(‘message’, handler). To post message you should call .postMessage(data).

That’s important: when we just send some objects on thread creation, it will be copied! And only MessageChannel are not copied, that’s why we can you only them for in-process communication.

To create communication between two threads, we need to give a port to each one.

Important: with Node.JS 10.5.0 it is impossible supply thread with port through construction, you should do this with worker.postMessage(), with specifying port object in transferList param!

Thread-Manager will send write commands to writer threads in order of increment their ids. Next command will be send only after successful response of previous.

Noob UML diagram of application

Main app

Our new main app: ./app.js:

const { Worker, MessageChannel } = require(‘worker_threads’);
const path = require(‘path’);
const WORKERS_NUMBER = 100;console.log(‘Main app initialized and started.’);const workersMeta = [];for (var i = 1; i <= WORKERS_NUMBER; i++) {
const channel = new MessageChannel();
const worker = new Worker(path.join(__dirname, ‘./writer-worker-app/app.js’), { workerData: { id: i } });
workersMeta.push({ id: i, worker, channel });
}
workersMeta.forEach(({ worker, channel }) => {
worker.postMessage({ orchestratorPort: channel.port1 }, [channel.port1]);
})
setTimeout(() => {
const orchestrator = new Worker(path.join(__dirname, ‘./orchestrator-worker-app/app.js’));
const orchestratorData = workersMeta.map((meta) => ({ id: meta.id, port: meta.channel.port2 }));
orchestrator.postMessage({ workerPorts: orchestratorData }, orchestratorData.map(w => w.port));
console.log(‘All worker threads have been initialized’);
}, WORKERS_NUMBER * 10);

Here we create workers and give a port to each one.

After that we create Orchestrator thread and give him port list for communication with writers.

Important: With my empirical way i figured out that it is better to wait some time until thread will be fully initialized. The best way is to handle some “I am ready” message, but for our purpose setTimeout() will be enough.

Writer thread

Let us change writer thread to write only on command and send response.
./writer-worer-app/app.js:

const { workerData, parentPort } = require(‘worker_threads’);
const logger = require(‘./logger’);
const id = workerData.id;console.log(`Worker ${id} initializad.`);parentPort.on(‘message’, value => {
const orchestratorPort = value.orchestratorPort;
orchestratorPort.on(‘message’, data => {
if (data.command == ‘write’) {
console.log(`Worker ${id} received write command`);
sendMessage();
sendResult(orchestratorPort);
}
});
console.log(`Worker ${id} started.`);
});
function sendMessage() {
logger.log(`Hello from worker number ${workerData.id}\r\n`);
}
function sendResult(port) {
port.postMessage({ id, status: ‘completed’ });
}

Here we do correct initialization from parent message. After that we start listening orchestrator communication channel. After receiveing command we write line into file and response with success status.
A little notice: our logger in synchronous, so we do not need any Promises/callbacks here.

Orchestrator thread

./orchestrator-worker-app/app.js:

const { parentPort } = require(‘worker_threads’);console.log(‘Orchestrator initialized.’)let workerPorts;parentPort.on(‘message’, (value) => {
workerPorts = value.workerPorts;
workerPorts.forEach(wp => wp.port.on(‘message’, handleResponse));
console.log(‘Orchestrator started.’);
sendCommand(workerPorts[0]);
});
function handleResponse(status) {
const responseWorkerId = status.id;
let nextWorker = workerPorts.find(wp => wp.id == responseWorkerId + 1);
if (!nextWorker) {
nextWorker = workerPorts[0];
}
sendCommand(nextWorker);
}
function sendCommand(worker) {
worker.port.postMessage({ command: ‘write’ });
}

We receive ports list, order it by thread it (just in case), set up response callback on each port. After that we send write command to first writer.

Conclusion

That’s it, our multithreading thread-managed application is ready. We have learned how to create threads and how to manage them. In my opinion, this thread architecture in Node.JS is one of the best solutions in parallel programming. It is clean and safe with very easy communication.

Source code can be found here

Thanks for your attention.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade