Paralelizando promesas

Concurrencia, throttling y manejo de errores

Lupo Montero
Laboratoria Devs

--

Estos últimos meses me ha tocado trabajar con Firebase, y ha sido una oportunidad para profundizar un poquito en las promesas (Promise), ya que el SDK de Firebase hace uso de promesas para todas las operaciones asíncronas.

Para la gran mayoría de operaciones, podemos valernos de promesas nativas (tal y como vienen con el leguaje). Es una interfaz simple y elegante, pero hay ciertos casos donde necesitamos ejecutar muchas tareas asíncronas y se convierte en una gran limitación no poder controlar la concurrencia de las operaciones con Promise.all (entre otras cosas).

Para ilustrar, veamos un ejemplo, el caso de uso real que me llevó a escribir esto… Un poco de contexto... Mencioné que estoy chambeando con Firebase y hace un tiempo me puse a ver cómo hacer para borrar todos los usuarios de un proyecto de Firebase de forma programática (esto es algo que necesitamos para entornos de desarrollo, testing y staging).

El SDK de Firebase (Firebase Admin SDK) no incluye una manera de borrar todos los usuarios (algo tipo auth.deleteAllUsers()), ni tampoco una manera programática de consultar todos los usuarios (algo como auth.getAllUsers() para saber cuáles borrar). Lo que el SDK nos ofrece es auth.deleteUser(uid), que nos va a permitir borrar usuarios por uid, uno por uno… 😰

Para sacar la lista de usuarios a borrar (todos los usuarios), por ahora podemos usar la herramienta de CLI de Firebase:

firebase auth:export auth.json

Esto va a exportar todos los usuarios a un archivo (auth.json), que ahora podemos importar en un script y usar como índice sobre el que iterar.

// El archivo `auth.json` es un objeto con una propiedad `users` que
// es un arreglo de objetos (cada uno representando a un usuario).
const { users } = require('./auth.json');

Ahora que tenemos la data de los usuarios y la función que queremos invocar para cada usuario (auth.deleteUser()), ya estamos listos para empezar a borrar usuarios 💣💀

En paralelo (todas a la vez con Promise.all)

El enfoque más naive, usando Promise.all(), podría ser algo así:

const firebaseAdmin = require('firebase-admin');
const serviceAccountKey = require('./service-account-key.json');
const { users } = require('./auth.json');
firebaseAdmin.initializeApp({
credential: firebaseAdmin.credential.cert(serviceAccountKey),
});
Promise.all(users.map(user => auth.deleteUser(user.localId)))
.then(console.log)
.catch(console.error);

Todo parece simple y claro. Por lo menos queda clara nuestra intención: queremos ejecutar auth.deleteUser(user.localId) para cada objeto user (cada elemento del arreglo users). Sin embargo, si ejecutamos nuestro código veremos este error:

{ Error: An internal error has occurred. Raw server response: "{"error":{"code":400,"message":"QUOTA_EXCEEDED : Exceeded quota for deleting accounts.","errors":[{"message":"QUOTA_EXCEEDED : Exceeded quota for deleting accounts.","domain":"global","reason":"invalid"}]}}"
at FirebaseAuthError.FirebaseError [as constructor] (/Users/lupo/work/lupomontero/paralelizando-promesas/node_modules/firebase-admin/lib/utils/error.js:39:28)
at FirebaseAuthError.PrefixedFirebaseError [as constructor] (/Users/lupo/work/lupomontero/paralelizando-promesas/node_modules/firebase-admin/lib/utils/error.js:85:28)
at new FirebaseAuthError (/Users/lupo/work/lupomontero/paralelizando-promesas/node_modules/firebase-admin/lib/utils/error.js:143:16)
at Function.FirebaseAuthError.fromServerError (/Users/lupo/work/lupomontero/paralelizando-promesas/node_modules/firebase-admin/lib/utils/error.js:173:16)
at /Users/lupo/work/lupomontero/paralelizando-promesas/node_modules/firebase-admin/lib/auth/auth-api-request.js:723:45
at process._tickCallback (internal/process/next_tick.js:68:7)
errorInfo:
{ code: 'auth/internal-error',
message:
'An internal error has occurred. Raw server response: "{"error":{"code":400,"message":"QUOTA_EXCEEDED : Exceeded quota for deleting accounts.","errors":[{"message":"QUOTA_EXCEEDED : Exceeded quota for deleting accounts.","domain":"global","reason":"invalid"}]}}"' },
codePrefix: 'auth' }

😱 🙀 😱

Esto se debe a que cuando usamosPromise.all, todas las promesas se inician a la vez (en paralelo). Promise.all agrupa promesas de tal forma que podemos ser notificados cuando todas las promesas se resuelven (o cuando alguna se rechazada), pero no se encarga de iniciar la acción (eso lo hacemos al mapear sobre users) lo que significa que estamos iniciando miles de peticiones HTTP, todas a la vez 😜, y Google se queja diciendo que hemos excedido nuestra quota para esta operación (que de acuerdo a la documentación, para borrar usuarios es un máximo de 10 peticiones por segundo).

Tasks (promise creators)

El primer paso para poder solucionar este problema es separar la invocación que inicia las peticiones individuales (auth.deleteUser()) de la creación de la promesa que engloba a todas las demás. En el ejemplo anterior estábamos creando un arreglo de promesas usando users.map(), e invocando auth.deleteUser() para cada usuario, lo cual directamente inicia la acción, y map no espera para la siguiente invocación.

La manera más sencilla de crear esa separación es anidando las invocaciones a auth.deleteUser() dentro de otra función. En el resto de los ejemplos me voy a referir a estas funciones anidadas como tasks o tareas.

// retorna una promesa
const promise = auth.deleteUser(user.localId);
// promesa anidada en una función
const task = () => auth.deleteUser(user.localId);
// creando un arreglo de promesas
const promises = users.map(user => auth.deleteUser(user.localId));
// creando un arreglo de tareas (funciones que crean una promesa)
const tasks = users.map(user => _ => auth.deleteUser(user.localId));

En series (secuencialmente)

Ahora que ya podemos hablar de un arreglo de tareas en vez de un arreglo de promesas, podemos irnos al otro extremo, lo contrario de Promise.all, y procesar las tareas en serie (una por una, cada una esperando a la anterior), es decir, con concurrencia 1.

La función series() hace uso recursión y va encadenando las promesas secuencialmente usando .then(). Esta función series() ahora la podríamos invocar así:

// ejecuta todas las tareas en series (en secuencia)
series(tasks)
.then(console.log)
.catch(console.error);

Quizás útil en otros casos, y un bonito ejemplo de recursión con promesas, pero no soluciona nuestro problema: evitar hacer más de 10 peticiones por segundo. Para eso tenemos que romper las tareas en pedacitos más chiquitos…

En batches

Teniendo en cuenta que nuestro input (entrada) es un arreglo, y este puede tener un montón de elementos (cientos, miles, decenas de miles, …), quizás tenga sentido comenzar por dividir el arreglo de entrada en lotes (batches), y así manejar las tareas por grupos. Para hacer esto implementamos una función splitArrayintoBatches(array, limit), que recibe un arreglo y un número (limit), y retorna un nuevo arreglo donde cada elemento es a su vez un arreglo de como máximo limit elementos. Veamos la función en sí para ver a qué me refiero…

Ahora podemos invocar la función y ver cómo divide el arreglo en un arreglo de arreglos:

splitArrayintoBatches(['a', 'b', 'c', 'd', 'e'], 1);
// => [['a'], ['b'], ['c'], ['d'], ['e']]
splitArrayintoBatches(['a', 'b', 'c', 'd', 'e'], 2);
// => [['a', 'b'], ['c', 'd'], ['e']]
splitArrayintoBatches(['a', 'b', 'c', 'd', 'e'], 3);
// => [['a', 'b', 'c'], ['d', 'e']]
splitArrayintoBatches(['a', 'b', 'c', 'd', 'e'], 4);
// => [['a', 'b', 'c', 'd'], ['e']]
splitArrayintoBatches(['a', 'b', 'c', 'd', 'e'], 5);
// => [['a', 'b', 'c', 'd', 'e']]

Usando la función splitArrayIntoBatches() podemos dividir nuestro input (las tareas) para agruparlas en lotes o batches, y de esta forma tener cierto control sobre la concurrencia (cuántas operaciones realizamos a la vez — en paralelo). La idea es que queremos procesar las tareas en lotes (de diez en diez), y estos lotes (batches) se deben procesar en series (un lote a la vez) pero dentro de cada lote las peticiones individuales las hacemos en paralelo.

Veamos un poquito de código:

La función batched usa splitArrayIntoBatches para dividir el arreglo de entrada en batches, y declara y usa una función processBatches() para procesar cada lote de forma recursiva, cada uno esperando al anterior (en series, parecido a lo que hicimos en la función series de antes). Esto nos permite asegurar que nunca hayan más de concurrency tareas ejecutándose a la vez.

// ejecuta todas las tareas de 5 en 5 (en batches/lotes)
batched(tasks, 5)
.then(console.log)
.catch(console.error);

Throttling

Nos vamos acercando a lo que necesitamos. Ya podemos limitar la concurrencia, pero todavía nos falta agregar el factor tiempo. La limitación de Firebase dice que debemos limitar a no más de 10 peticiones por segundo. Usando la función batched() podemos decir que se hagan de 10 en 10, pero si cada petición demora 10ms por ejemplo, cada lote estará demorando aprox.10ms, lo cual no nos garantiza que nos mantengamos dentro del límite establecido por Firebase.

Para solventar esto, podemos agregar un tiempo de espera entre lotes/batches y así tener control del acelerador (throttle) y poder espaciar los batches en el tiempo. Para implementar esto, partimos de la función batched(tasks, concurreny), agregamos un argumento (interval) con el número de milisegundos que queremos esperar entre batches. La nueva firma de la función es throttled(tasks, concurrency, interval).

Además del nuevo argumento, en el caso recursivo vamos a reemplazar la invocación a processBatches() (que retorna una promesa), por una nueva promesa que se va a encargar de invocar processBatches() después de esperar interval milisegundos.

// antes invocábamos `processBatches` directamente
processBatches(batches.slice(1), results)
// ahora vamos a reemplazar la expresión anterior por ésta, una
// nueva promesa que se encarga de invocar a `processBatches`
// después de un tiempo de espera
new Promise((resolve, reject) => setTimeout(
() => processBatches(batches.slice(1), results)
.then(resolve, reject),
interval,
))

Un detalle más, para evitar tener que esperar el timeout cuando ya no queden batches que procesar, vamos a agregar un condicional parecido al de nuestro caso base para comprobar que todavía quede algo que hacer antes de volver a invocar recursivamente a nuestra función processBatches.

(batches.length <= 1)
? results
: new Promise((resolve, reject) => setTimeout(
() => processBatches(batches.slice(1), results)
.then(resolve, reject),
interval,
));

Veamos los cambios en contexto:

Ahora podemos invocar nuestra función throttled() así:

// ejecuta todas las tareas de 5 en 5, esperando 1s entre cada lote
throttled(tasks, 5, 1000)
.then(console.log)
.catch(console.error);

Llegado a este punto, ya tenemos una función capaz de funcionar dentro de las limitaciones presentadas al principio. En el ejemplo anterior (throttled(tasks, 5, 1000)), al ejecutar las tareas de 5 en 5, y dejar un espacio de 1s entre lotes, podemos estar seguros de que las tareas siempre se procesarán a una velocidad menor de 5 operaciones por segundo.

Manejo de errores

Una vez que iniciamos una tarea que retorna una promesa, solo podemos esperar que pasen dos cosas:

  1. Que la promesa resuelva correctamente a un valor y lo comunique a través del método .then(result).
  2. Que la promesa resulte en un error y lo comunique a través del método .catch(error). (o then((result, err))).

Esto implica que el comportamiento por defecto es que si una de las tareas falla, la promesa se va directamente a .catch() y solo recibimos ese error. No hay una manera de saber qué pasó con las otras peticiones, o continuar con el resto de tareas y reportar los errores al final.

Siguiendo el caso de uso original (borrar todos los usuarios de un proyecto de Firebase), nuestro input (el arreglo users) puede ser bastante grande, y además la operación es idempotent, así que resulta muy útil poder continuar procesando tareas independientemente de si una o más fallan, y ver los resultados completos (resultado exitosos y errores) al final.

Para implementar esto agregamos otro argumento más (failFast) y le damos un valor por defecto de true. Cuando failFast sea true vamos a tener el comportamiento que hemos visto hasta ahora, pero cuando sea false vamos a evitar que se frene la ejecución y que nos vayamos al .catch(). Para ello vamos a manejar directamente el error con el método .catch(), y convertir el valor al que resuelve en el error en sí. De esta forma, el arreglo final de resultados tendrá el error en lugar del resultado exitoso para la entrada en cuestión, pero no frenará al resto del proceso.

Continuando con la función throttled(), cambiemos las invocaciones a las tareas mismas (fn) para manejar el caso de failFast=false.

// así procesábamos las tareas de un batch (en parallelo)
Promise.all(
batches[0].map(fn => fn()),
)
// agregamos la opción `failFast` y manejejamos errores cuando
// `failFast` sea `false`
Promise.all(
batches[0].map(fn => (failFast ? fn() : fn().catch(err => err))),
)

En contexto:

A esta última implementación le he dado el nombre de pact (pacto en inglés, ya que se trata de un conjunto de tareas basadas en promesas) 😉

// ejecuta todas las tareas de 5 en 5, esperando 1s entre cada lote
// si alguna falla, las demás se ejecutarán igual y los resultados
// pasados a `.then()` tendrán o el resultado exitoso o el error
// para cada tarea
pact(tasks, 5, 1000, false)
.then(console.log);

Comparando las funciones anteriores, vale la pena mencionar que la siguientes son equivalentes:

series(tasks)
equivale a pact(tasks)
equivale a pact(tasks, 1, 0, true)
batched(tasks, 2)
equivale a pact(tasks, 2)
equivale a pact(tasks, 2, 0, true)
throttled(tasks, 2, 1000)
equivale a pact(tasks, 2, 1000)
equivale a pact(tasks, 2, 1000, true)

--

--