Синхронные / асинхронные вызовы в Node.js

Когда несёшь свои совочки в чужую песочницу

Бытует мнение, что Node.js - это бесконечно вложенные друг в друга callback’и, и нормальному программисту лезть в это болото не комильфо. И всё же, js сам по себе крут именно из-за своей асинхронной природы. Уверен в этом (-:

Опыта программирования в ноде пока не имею, но захотелось сделать простое тестирование, и столкнулся с этим sync / async адом. Как-то мне везёт на нетипичные задачи при изучении новых языков. Сразу отмечу, что моё решение ни разу не правильное, и лучше смотреть в сторону каких-нибудь естественных для языка Async/Await.

Суть задачи

Есть последовательность асинхронно выполняемых функций, которую выстраиваем в очередь. В моём случае это строго последовательный набор:

  1. Подключение к БД - Connect
  2. Создание тестовой таблицы - CREATE TABLE
  3. Наполнение таблицы данными - INSERT
  4. Отключение от БД - Disconnect

Напоминаю, в ноде каждая из этих функций выполняется асинхронно. Чтобы не начать вставку данных раньше создания таблицы, используем async.series.

На шаге 3 - Наполнение таблицы данными - хотелось бы вставить не одну запись, а 100 тыс., к примеру. Причём, без всяких там batch режимов (поддерживаются некоторыми СУБД и драйверами к ним). Batch - это когда несколько запросов добавляются в одну строку или пакет (пакетная обработка, собственно) и такой кучкой уходят в БД. Один общий запрос - быстрее результат. Так вот, никаких пакетных режимов. Такой вот тест, такие условия.

Batch нельзя, но можно выполнять вставку в несколько потоков (да, я в курсе, что нода не многопоточная). Если просто асинхронно постараться выполнить 100 тыс. запросов, хороший результат не получим. Нужно одновременно выполнять не больше 25 запросов, или что-то типа того. Не в количестве дело, а в самом подходе.

Проблема на примере кода

Попытка решить задачу привела к чему-то такому:

var async = require('async');
var db = ...;    // некий объект для работы с БД
async.series([
   function f01_Connect(next) {
// асинхронное подключение к БД с переходом к следующему шагу
db.connect(params, next);
},
   function f2_CreateTable(next) {
db.execute("CREATE TABLE ...", next);
},
   function f3_Insert(next) {
      var query = 'INSERT INTO test (num) VALUES (?)';
for (var num = 0; num < 100000; ++num) {
db.execute(
query, // параметрический запрос
[num], // очередное значение параметра
{ prepare: true }, // полезные флаги для execute
CALLBACK); // <-- ВОТ ТУТ ПРОБЛЕМА
}
   },
   function f4_Disconnect(next) {
db.disconnect();
// на случай, если next не вызывается нашим методом,
// можем вызвать вручную (иначе очередь не закончится)
next();
}
], function (err) {
// сработает в самом конце или при возникновении ошибки;
// callback'и БД посылают первым параметром ошибку, либо null,
// поэтому мы смотрим, задана ли ошибка
if (err) {
console.error('There was an error', err.message, err.stack);
}
});

Функция f3_Insert вызывается в порядке очереди, как и другие функции в массиве (в первом параметре вызова async.series). Каждая функция, например, f1_Connect, вызывает внутри себя очередной асинхронный метод объекта db. В качестве callback’а вызову передаётся next, чтобы очередь продвинулась к следующему методу по завершению этого асинхронного метода из db.

Но что делать внутри f3_Insert? Попытка указать вместо CALLBACK метод next приведёт к переходу на f4_Disconnect после первой же вставки записи в базу. Если не указывать никакой CALLBACK методу db.execute, все 100 тыс. INSERT’ов начнут выполняться условно одновременно. Плохо.

Более правильное решение

Код в целом остаётся прежним, но f3_Insert существенно усложняется:

function f3_Insert(next) {
   var query = 'INSERT INTO test (num) VALUES (?)';
   // num eval in range [0, 100000] (inclusive)
var num = 0;
var queueLength = 10;
var maxNum = 100000;
var concurrency = 5;
   // called every item callback, so we waiting
// when err is mean something
var makerEveryHandle = function(err) {
if (err) next(err);
};
   var makerFunc = function() {
// function for each item
var q = async.queue(function (props, callback) {
db.execute(
query, // параметрический запрос
[props.num], // очередное значение параметра
{ prepare: true }, // полезные флаги для execute
callback); // следующий шаг вложенной очереди
}, concurrency);
      // finish callback
q.drain = function() {
setTimeout(function(){
// exit f3_Insert() when enough of num
if (num > maxNum) next();
// else make another try
makerFunc();
}, 0);
};
      // next block of items to handle
for (var k = 0; k < queueLength; ++k) {
if (num <= maxNum)
q.push({ num: num++ }, makerEveryHandle);
}
   };// end of makerFunc()
   // first try
makerFunc();
}, // end of f3_Insert()

При каждом вызове метода makerFunc (первый раз вызывается в конце метода f3_Insert) создаётся очередь выполнения однотипной задачи. В нашем случае задача - db.execute. После выполнения задачи над всеми заданными через q.push параметрами, срабатывает q.drain. Внутри этого финализирующего метода либо прекращаем f3_Insert, либо повторяем весь блок ещё одним вызовом makerFunc.

Идея в том, чтобы повторять блок db.execute(INSERT) до тех пор, пока num не доползёт до 100 тыс. Переменная concurrency определяет, сколько “потоков” одновременно будут выполняться. queueLength задаёт количество новых параметров в очереди на каждом шаге. Для нас параметры - сами вставляемые в базу num.

Не самое элегантное решение, но вполне удобное для моих задач тестирования быстродействия вставки записей в БД.