Синхронные / асинхронные вызовы в Node.js
Когда несёшь свои совочки в чужую песочницу
Бытует мнение, что Node.js - это бесконечно вложенные друг в друга callback’и, и нормальному программисту лезть в это болото не комильфо. И всё же, js сам по себе крут именно из-за своей асинхронной природы. Уверен в этом (-:
Опыта программирования в ноде пока не имею, но захотелось сделать простое тестирование, и столкнулся с этим sync / async адом. Как-то мне везёт на нетипичные задачи при изучении новых языков. Сразу отмечу, что моё решение ни разу не правильное, и лучше смотреть в сторону каких-нибудь естественных для языка Async/Await.
Суть задачи
Есть последовательность асинхронно выполняемых функций, которую выстраиваем в очередь. В моём случае это строго последовательный набор:
- Подключение к БД - Connect
- Создание тестовой таблицы - CREATE TABLE
- Наполнение таблицы данными - INSERT
- Отключение от БД - Disconnect
Напоминаю, в ноде каждая из этих функций выполняется асинхронно. Чтобы не начать вставку данных раньше создания таблицы, используем async.series.
На шаге 3 - Наполнение таблицы данными - хотелось бы вставить не одну запись, а 100 тыс., к примеру. Причём, без всяких там batch режимов (поддерживаются некоторыми СУБД и драйверами к ним). Batch - это когда несколько запросов добавляются в одну строку или пакет (пакетная обработка, собственно) и такой кучкой уходят в БД. Один общий запрос - быстрее результат. Так вот, никаких пакетных режимов. Такой вот тест, такие условия.
Batch нельзя, но можно выполнять вставку в несколько потоков (да, я в курсе, что нода не многопоточная). Если просто асинхронно постараться выполнить 100 тыс. запросов, хороший результат не получим. Нужно одновременно выполнять не больше 25 запросов, или что-то типа того. Не в количестве дело, а в самом подходе.
Проблема на примере кода
Попытка решить задачу привела к чему-то такому:
var async = require('async');var db = ...; // некий объект для работы с БД
async.series([
function f01_Connect(next) {
// асинхронное подключение к БД с переходом к следующему шагу
db.connect(params, next);
}, function f2_CreateTable(next) {
db.execute("CREATE TABLE ...", next);
}, function f3_Insert(next) {var query = 'INSERT INTO test (num) VALUES (?)';
for (var num = 0; num < 100000; ++num) {
db.execute(
query, // параметрический запрос
[num], // очередное значение параметра
{ prepare: true }, // полезные флаги для execute
CALLBACK); // <-- ВОТ ТУТ ПРОБЛЕМА
}
},
function f4_Disconnect(next) {
db.disconnect();
// на случай, если next не вызывается нашим методом,
// можем вызвать вручную (иначе очередь не закончится)
next();
}], function (err) {
// сработает в самом конце или при возникновении ошибки;
// callback'и БД посылают первым параметром ошибку, либо null,
// поэтому мы смотрим, задана ли ошибка
if (err) {
console.error('There was an error', err.message, err.stack);
}
});Функция f3_Insert вызывается в порядке очереди, как и другие функции в массиве (в первом параметре вызова async.series). Каждая функция, например, f1_Connect, вызывает внутри себя очередной асинхронный метод объекта db. В качестве callback’а вызову передаётся next, чтобы очередь продвинулась к следующему методу по завершению этого асинхронного метода из db.
Но что делать внутри f3_Insert? Попытка указать вместо CALLBACK метод next приведёт к переходу на f4_Disconnect после первой же вставки записи в базу. Если не указывать никакой CALLBACK методу db.execute, все 100 тыс. INSERT’ов начнут выполняться условно одновременно. Плохо.
Более правильное решение
Код в целом остаётся прежним, но f3_Insert существенно усложняется:
function f3_Insert(next) {var query = 'INSERT INTO test (num) VALUES (?)';
// num eval in range [0, 100000] (inclusive)
var num = 0;
var queueLength = 10;
var maxNum = 100000;
var concurrency = 5;
// called every item callback, so we waiting
// when err is mean something
var makerEveryHandle = function(err) {
if (err) next(err);
};
var makerFunc = function() {
// function for each item
var q = async.queue(function (props, callback) {
db.execute(
query, // параметрический запрос
[props.num], // очередное значение параметра
{ prepare: true }, // полезные флаги для execute
callback); // следующий шаг вложенной очереди
}, concurrency);// finish callback
q.drain = function() {
setTimeout(function(){
// exit f3_Insert() when enough of num
if (num > maxNum) next();
// else make another try
makerFunc();
}, 0);
};
// next block of items to handle
for (var k = 0; k < queueLength; ++k) {
if (num <= maxNum)
q.push({ num: num++ }, makerEveryHandle);
}
};// end of makerFunc()
// first try
makerFunc();
}, // end of f3_Insert()
При каждом вызове метода makerFunc (первый раз вызывается в конце метода f3_Insert) создаётся очередь выполнения однотипной задачи. В нашем случае задача - db.execute. После выполнения задачи над всеми заданными через q.push параметрами, срабатывает q.drain. Внутри этого финализирующего метода либо прекращаем f3_Insert, либо повторяем весь блок ещё одним вызовом makerFunc.
Идея в том, чтобы повторять блок db.execute(INSERT) до тех пор, пока num не доползёт до 100 тыс. Переменная concurrency определяет, сколько “потоков” одновременно будут выполняться. queueLength задаёт количество новых параметров в очереди на каждом шаге. Для нас параметры - сами вставляемые в базу num.
Не самое элегантное решение, но вполне удобное для моих задач тестирования быстродействия вставки записей в БД.