Scaling Node.js Services at Fleetx: Challenges and Solutions

Handling CPU-Intensive Tasks with Worker Threads and Scaling Horizontally with Clusters: Fleetx Reporting Service

Satish Kumar
fleetx engineering
7 min readSep 17, 2024

--

Photo by Lukas Blazek on Unsplash

At Fleetx, we generate a large number of reports within our dashboard. All of these reports are processed by our Node.js reporting server, which we affectionately refer to as “Rapp” (short for reporting app). In another article, I’ve detailed why we transitioned report generation from the browser to the Node.js server and the benefits this change has brought.

To handle a large number of requests, we needed to scale our application. As we know, Node.js operates on a single-threaded model, where all tasks are executed by the main thread. While this single-threaded execution environment offers certain advantages like Simplified Concurrency Management, Lower Memory Usage, Event-Driven Architecture, designed to handle I/O operations asynchronously, it also comes with its own set of drawbacks like Limited CPU Utilisation, Blocking Operations Affect Performance, Scalability Constraints, Difficulty Handling CPU-Intensive Tasks.

Scaling the Application:

To serve a large number of requests, we needed to scale our application. Although we had a machine with ample resources (in terms of CPU cores and RAM), only a single process was running, using a single thread. When this process was blocked due to data processing or computation, other requests were delayed, and the application often became unresponsive.

We had two options to address this issue:

  1. Deploying multiple instances of the application on different machines and using a load balancer to distribute requests. However, this approach comes with additional management overhead.
  2. Using Node.js clusters, which start multiple instances of the same application on the same machine and run on the same port. This approach requires minimal management effort while efficiently utilising the machine’s resources.

Using Node.js clusters:

  • Purpose: Designed to scale your Node.js server across multiple CPU cores by creating worker processes, each running an instance of your server application.
  • Use Case: When you have an I/O-bound or CPU-bound server that needs to handle many concurrent client requests. cluster is often used to horizontally scale web servers by distributing incoming requests across multiple worker processes.
  • How It Works: cluster forks multiple child processes (workers), and each worker is a full-fledged Node.js instance that listens on the same server port. The master process handles the worker management and distribution of connections.
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
// This is the master process

console.log(`Master process is running with PID: ${process.pid}`);

// Fork workers based on the number of CPUs
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}

// Listen for workers that exit (crash) and replace them
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died. Spawning a new worker...`);
cluster.fork();
});

} else {
// This is a worker process
console.log(`Worker process started with PID: ${process.pid}`);

// Worker processes can share the same port
http.createServer((req, res) => {
res.writeHead(200);
res.end(`Hello from worker process ${process.pid}\n`);
}).listen(8000);

console.log(`Worker ${process.pid} is now listening on port 8000`);
}

By leveraging clusters to run multiple instances and utilise multiple cores, we were able to handle a large number of requests concurrently more efficiently.

For a long time, this approach allowed us to efficiently generate reports (in Excel, CSV, and PDF formats) for our clients within a reasonable timeframe, and it worked wonderfully. However, issues arose when too many requests were processed simultaneously. Since data preparation and report generation are both memory and CPU-intensive tasks, our CPU and memory usage would occasionally spike to 100%. Additionally, we had no control over the number of concurrent requests being processed, leading to performance bottlenecks.

Additionally, we needed greater control over our reporting system, such as the ability to queue report requests for later processing, automatically retry failed requests at least once, or pause request processing while still accepting new client submissions. We also wanted manual control over retrying failed requests, fine-grained control over concurrency, and the ability to prioritise certain reports during processing.

Using Queuing System:

For this purpose, we implemented a queuing system and integrated BullMQ to efficiently manage and process tasks asynchronously.

Benefits of A Queuing System(BullMQ):

  1. Scalability: BullMQ allows for easy scaling by distributing tasks across multiple workers, making it ideal for handling high volumes of jobs.
  2. Concurrency Control: It supports concurrent job processing, ensuring multiple tasks can be executed simultaneously without blocking.
  3. Job Prioritisation: You can assign priorities to jobs, ensuring that high-priority tasks are processed first.
  4. Delayed and Scheduled Jobs: BullMQ allows you to delay tasks or schedule them to run at a specific time, adding flexibility in task management.
  5. Automatic Retries: In case of failures, BullMQ automatically retries failed jobs based on a configurable retry policy.
  6. Job Rate Limiting: It provides rate-limiting capabilities to ensure that tasks are processed at a controlled rate, preventing system overload.
  7. Persistence and Reliability: Jobs are persisted in Redis, ensuring they are not lost even if the system crashes, making BullMQ highly reliable for critical tasks.
  8. Monitoring and Metrics: BullMQ offers comprehensive monitoring tools to track job progress, status, and performance, allowing you to manage workloads effectively.
const { Queue } = require('bullmq');
const Redis = require('ioredis');
const { Worker } = require('bullmq');

//---------Adding Reprots------------------

// Create a connection to Redis
const connection = new Redis();

// Create a new queue
const myQueue = new Queue('myQueue', { connection });

// Add a Report to the queue
const addJob = async (reportData) => {
await myQueue.add('JOB_SUMMARY_REPORT', reportData);
console.log('Report added to the queue');
};

//--------starting worker to process report----------

// Create a worker to process jobs
const worker = new Worker('myQueue', async (job) => {
console.log('Processing job:', report.id, report.name);
console.log('Report data:', report.data);

// Simulate task processing
await new Promise((resolve) => setTimeout(resolve, 2000));

console.log('Report completed');
}, { connection });

// Handle worker errors
worker.on('error', (error) => {
console.error('Worker encountered an error:', error);
});

We also developed a user-friendly dashboard to manage the queue and provide complete visibility into the status of reports in the queue.

Fleetx Report Queue Dashboard

With this in place, we gained complete control over the report generation process, along with full visibility into the status and progress of each report

With the combination of clustering and a queuing system, our application could handle a large volume of requests while maintaining full control over report generation. Clustering allowed us to serve more requests concurrently by running multiple instances of the same application. Meanwhile, the queuing system enabled us to quickly accept requests by adding reports to the queue for later processing, rather than processing them immediately. We configured our workers to handle one report at a time, but because multiple instances of the application were running, we were able to process as many reports simultaneously as the number of instances active.

Everything was running smoothly until we encountered a new challenge. Some of our reports required heavy computation on very large datasets, with some reports taking 10 minutes or more to process. These intensive tasks were blocking the main Node.js thread, causing the process to become choked and unable to accept new requests. Since the cluster system uses a Round Robin algorithm to distribute tasks to worker processes, if a request was forwarded to an already busy process, that request would become unresponsive, further compounding the issue.

To address this issue, we needed to find a solution. Since JavaScript runs everything on the main thread, we had to find a way to offload these heavy computations to a different thread or process, ensuring that the main thread remained unblocked and could continue handling incoming requests smoothly.

Using Worker Threads (worker_threads):

  • Purpose: Designed for running heavy CPU-bound tasks (e.g., data processing, cryptography, machine learning) in parallel without blocking the main event loop.
  • Use Case: When you have a single-threaded Node.js application but need to offload CPU-intensive tasks to background threads for parallel processing.
  • How It Works: You create worker threads in your Node.js program that run in parallel and communicate with the main thread using messages. Each worker thread runs a piece of code independently and can share memory with the main thread.

We offloaded all heavy CPU-intensive tasks to separate worker threads, preventing the main thread from being blocked.

// worker.js
const { parentPort, workerData } = require('worker_threads');
const { data} = workerData

for (let i = 0; i < data.length; i++) {
const item = data[i];
// perform computation
item.result = 'computedValue'
}

parentPort.postMessage(data); // Send the result back to the main thread
const { Worker } = require('worker_threads');
const path = require('path');

// from your function in main thread
const startWorkerThread = (fileName, workerData = {}) => {
return new Promise((resolve, reject) => {
const worker = new Worker(path.resolve(__dirname, fileName), { workerData: workerData });
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0) reject(new Error(`Worker stopped with exit code ${code}`));
});
});
};

async function getResult(){
//... some other code
const workerData = {data:[]}
const result= await startWorkerThread('worker.js', workerData);// at this point main thread is free to pick other task
// /... use result here
console.log(result)
}

By combining clusters, worker threads, and a queuing system, we can build a highly robust and scalable system.

--

--

Satish Kumar
fleetx engineering

AVP Engineering - frontend Apps at fleetx.io Ex-Aviso Inc. NITian 🎓