Menangani CPU Intensive Task di Node.js dengan RabbitMQ

ibnu musyaffa
Badr Interactive
Published in
6 min readMay 30, 2024
Photo by Aswathy N on Unsplash

Latar Belakang

Javascript secara umum bisa dibilang cukup bagus dalam menangani operasi yang bersifat I/O (interaksi ke disk/network) misalnya HTTP server, mengakses database, membaca atau menulis file.

Tetapi, Javascript relatif kurang bagus untuk menangani operasi CPU yang intensif, hal ini karena ketika kita menjalankan operasi CPU yang intensive, operasi tersebut akan membuat event loop sibuk dan tidak bisa memproses task yang lain, selain itu by default Javascript/Node.js akan menggunakan 1 core saja untuk 1 instance/process.

Sebagai contoh, saat kita menjalankan sebuah HTTP server, request baru tidak bisa dilayani selama operasi CPU intensif tersebut berlangsung.

Perhatikan contoh kode berikut.

const express = require("express");
const app = express();
const port = 5000;

function generateReport() {
const endTime = new Date().getTime() + 30 * 1000; // 30 seconds

//cpu intensive task in 30 seconds
while (new Date().getTime() < endTime) {
console.log("processing...");
}
}

app.get("/home", (req, res) => {
res.send("Hello World");
});

app.get("/report", (req, res) => {
generateReport();
res.send("done");
});


app.listen(port, () => {
console.log(`Example app listening on port ${port}`);
});

Dari kode di atas, kita bisa melihat ada 2 api endpoint yang tersedia yaitu “/home” dan “/report”. jika kita membuat request ke “/report” dimana api tersebut akan menjalankan proses CPU intensif selama 30 detik, hal itu akan membuat endpoint http lain semisal “/home” tidak bisa melayani request sampai proses tadi selesai.

Lalu bagaimana caranya untuk mengatasi hal tersebut ? secara umum kita mempunya 2 opsi berikut.

  1. Menggunakan worker thread, di mana Node.js akan mengeksekusi sebuah task di thread yang berbeda.
  2. Membuat service queueu/antrian, di mana suatu message atau data akan dikirim melalui message broker (misalnya RabbitMQ) yang kemudian akan diproses oleh suatu service secara bergantian sehingga tidak mengganggu service lain.

Di artikel kali ini kita akan menggunakan solusi yang kedua.

Gambaran Arsitektur

Untuk lebih jelasnya berikut gambaran arsitektur solusi yang akan kita pakai.

Dari gambaran arsitektur di atas bisa dijelaskan beberapa point berikut.

  • Sebuah task akan dikirim ke sebuah worker dengan mekanisme FIFO (first in first out).
  • By default sebuah task hanya dikerjakan sekali oleh suatu consumer/worker.
  • Kita bisa mempunyai lebih dari 1 instance worker, baik di server yang sama ataupun berbeda selama bisa terkoneksi dengan RabbitMQ.
  • Kita juga bisa membuat worker dengan bahasa lain selama bisa terkoneksi dengan RabbitMQ.

Pra Syarat

Sebelum melanjutkan tutorial ini, pastikan kita telah memenuhi syarat berikut

  • Pengetahuan dasar Node.js dan Express
  • Node.js dan NPM telah terinstall

Instalasi RabbitMQ

Selain Node.js, kita juga memerlukan RabbitMQ di sistem kita. Jika kamu mempunyai docker yang sudah terinstall, kita bisa menggunakan perintah berikut untuk menjalankan program RabbitMQ di port 5672 dan 15672 untuk web management.

docker run -d --name my-rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Jika tidak menggunakan docker kamu bisa menggunakan beberapa pilihan instalasi di dokumentasi resmi RabbitMQ melalui link berikut.

Membuat Project Node.js

Pertama buat folder baru misalnya “nodejs-rabbitmq”, kemudian di dalam folder tersebut jalankan perintah berikut

npm init -y

selanjutnya kita install express sebagai HTTP server dan amqplib yang berfungsi sebagai driver untuk membuat koneksi dengan RabbitMQ.

npm install express amqplib --save

Setelah itu kita buat file server.js untuk membuat HTTP server dan rabbitmq.js untuk membuat koneksi dengan RabbitMQ.

// server.js

const express = require("express");
const app = express();
const port = 5000;

app.get("/", (req, res) => {
res.send("Hello World");
});

app.listen(port, () => {
console.log(`Example app listening on port ${port}`);
});
// rabbitmq.js

const amqp = require("amqplib");

let connection = null;

async function connect() {
if (connection === null) {
connection = await amqp.connect({
protocol: "amqp",
hostname: "localhost",
port: 5672,
username: "guest",
password: "guest",
});
console.log("connected to rabbitmq")
}

return connection;
}

module.exports = connect;

Untuk memastikan http server berjalan baik, kita bisa coba pastikan dengan command berikut.

node server.js

Kemudian kunjungi URL berikut http://localhost:5000, jika sukses kita bisa melanjutkan ke langkah berikutnya.

Publish sebuah message ke Queue/Antrian

Untuk mempublish sebuah message ke queue, pertama kita coba tambahkan kode berikut di endpoint “/generate-report” di server.js yang berfungsi sebagai pemicu untuk mempublish message ke queue RabbitMQ yang bernama “generate-report”

// server.js

const express = require("express");
const app = express();
const port = 5000;
//import rabbitmq
const rabbitmq = require("./rabbitmq");

app.get("/home", (req, res) => {
res.send("Hello World");
});

app.get("/generate-report", async (req, res) => {
try {
// ambil koneksi RabbitMQ
const connection = await rabbitmq();
// Buat channel
const channel = await connection.createChannel();
const queueName = "generate-report";
// Create queue dengan nama 'generate-report' dan opsi durable aktif
await channel.assertQueue(queueName, { durable: true });

// Kirim sebuah message ke queue 'generate-report' dengan opsi persisten aktif
const message = JSON.stringify({ month: "01" });
channel.sendToQueue(queueName, Buffer.from(message), {
persistent: true,
});

return res.send({ message: "Success" });
} catch (error) {
console.error(error);
return res.send({ message: error.message });
}
});

app.listen(port, () => {
console.log(`Example app listening on port ${port}`);
});

Untuk membuat sebuah message tidak hilang ketika RabbitMQ terhenti kita perlu menambahkan opsi durable pada queue dan opsi persistent pada method sendToQueue() ketika mengirim message.

Membuat Consumer/Worker untuk mengkonsumsi sebuah message dari Queue

Setelah membuat kode yang berfungsi untuk mengirim message ke queue, selanjutnya kita perlu membuat consumer/worker yang akan standby menerima message yang kemudian bisa kita proses.

Buat file bernama report-worker.js dengan kode sebagai berikut.

// report-worker.js

const rabbitmq = require("./rabbitmq");

(async () => {
try {

const connection = await rabbitmq();
const channel = await connection.createChannel();
const queueName = "generate-report";

await channel.assertQueue(queueName, { durable: true });

channel.consume(
queueName,
(message) => {
if (message) {
const data = JSON.parse(msg.content.toString());
console.log("Received");
console.log(data);
//tandai sebuah message sudah diterima dengan baik
channel.ack(msg)
}
},
{ noAck: false }
);

console.log("Waiting for messages. To exit press CTRL+C");
} catch (err) {
console.warn(err);
}
})();

Dari kode di atas kita bisa melihat ada opsi noAck, opsi tersebut berfungsi untuk menentukan apakah message yang dikonsumsi perlu dikonfirmasi secara manual atau tidak.

  • Jika noAck = false, message yang diterima dari queueu harus dikonfirmasi secara eksplisit oleh consumer menggunakan metode channel.ack(msg). Jika tidak dikonfirmasi, message tersebut akan tetap berada di queue dan akan dikirim ulang ke consumer lain jika koneksi consumer saat ini terputus atau mengalami kesalahan. Hal ini agar memastikan message tidak hilang dan bisa di proses dengan benar.
  • Jika noAck = true, message yang diterima dari queue dianggap sudah dikonfirmasi secara otomatis begitu diterima oleh consumer, consumer tidak perlu memanggil channel.ack(msg). Ini dapat meningkatkan performa dalam situasi di mana konfirmasi manual tidak diperlukan, tetapi memiliki risiko bahwa message bisa hilang jika consumer gagal memprosesnya setelah diterima.

Untuk menjalankan worker tersebut kita bisa menggunakan command berikut

node report-worker.js

Setelah command tersebut dijalankan, worker tersebut akan standby menunggu message yang masuk. Selain itu kita juga bisa menjalankan instance dari worker tersebut lebih dari sekali baik di server yang sama ataupun server yang berbeda.

Sebagai contoh, jika kita mempunyai 2 server yang masing-masing mempunya 4 core, kita bisa saja menjalankan 4 worker atau lebih di masing-masing server tersebut, Hal ini sangat berguna jika aplikasi yang kita buat berpotensi mempunyai banyak background proses.

Tidak hanya worker untuk satu fitur saja, kita juga bisa membuat worker untuk fitur lain, selama nama queue yang dibuat berbeda dengan yang sudah ada.

Kesimpulan

Pada artikel ini, kita telah membahas bagaimana menangani operasi CPU intensif pada aplikasi Node.js menggunakan worker queue yang diimplementasikan dengan RabbitMQ.

Di sini kita telah mempelajari :

  1. Mengapa Node.js kurang cocok untuk operasi CPU intensif
  2. Instalasi RabbitMQ
  3. Cara mengirimkan message ke queue RabbitMQ dari endpoint HTTP.
  4. Cara membuat worker/consumer yang akan memproses message dari queue RabbitMQ.
  5. Fungsi acknowledgment (ack) di RabbitMQ

Dengan menggunakan worker queue, kita dapat memproses tugas-tugas CPU intensif di background tanpa mengganggu kinerja HTTP server. Hal ini membuat aplikasi kita tetap responsif dan scalable.

Implementasi ini juga memungkinkan kita untuk memanfaatkan arsitektur distributed, di mana kita bisa menjalankan banyak worker di server yang terpisah dengan mudah.

Semoga bermanfaat.

--

--