Async iterators, operators and pull based streams.

Laurent Renard
DailyJS
Published in
8 min readFeb 20, 2018

--

Async iterations are now officially part of 2018 ECMAScript specification, and as different JavaScript engines have started to support async generators and the for await statement, async iterable/iterator protocol begins to show all his promises (no pun intended).

Async Iterable

An async iterable must implement the Symbol.asyncIterator method. This method must return an asyncIterator (see below). By implementing this method, consumers aware of this semantic shall know they’ll be handling asynchronous iterations. for await statement is an example of such consumer.

Async Iterator

Async Iterators follow an interface similar to regular iterators: they implement a next method and eventually a return (usually used to release underlying resources) and a throw methods. However the iterator results they produce should be wrapped within a Promise. They can be easily implemented with no particular transpilation on platforms which already support async methods.

The following example is an iterator which produces a sequence of integers with a given time interval.

// wait
const
wait = time => new Promise(resolve => {
setTimeout(() => resolve(), time);
});

const asyncCounterIterator = (limit = 10, delay = 100) => {
let iter = 1;
let exhausted = false;

return {
async next() {
if (iter > limit || exhausted) {
return {done: true};
}
await wait(delay);
const iteratorResult = {value: iter, done: false};
iter++;

return iteratorResult;
},
async throw(e) {
console.log('oops something is wrong');
throw e;
},
async return() {
exhausted = true;
console.log('I have been released !!!');
return {done: true};
}
};
};

The related iterable could be:

const asyncCounter = (limit = 10, delay = 100) => ({
[Symbol.asyncIterator]() {
return asyncCounterIterator(limit, delay);
}
});

And you could consume it in this way:

const consume = async asyncIterable => {
const iterator = asyncIterable[Symbol.asyncIterator]();

while(true){
const {done, value} = await iterator.next();

if(done){
break;
}
console.log(value);
}
};

consume(asyncCounter());

This program will print the first ten integers with an interval of 100ms.

But caution! In the same way as a regular iterator can be a leaky abstraction (Reginald Braithwaite), manually iterating over an async iterator without care can be tedious and error prone. For example, If you run the previous example you will not see in the console “I have been released !!!” as you have not called explicitly the return method: you have created a memory leak!

For await

for await statement is safer to consume async iterators as it will handle breakage in your loop whether it is early or not, intended or due to an exception.

const consume = async asyncIterable => {
const overwriteLimit = 5;
for await (const i of asyncIterable){
if(i > overwriteLimit){
break;
}
console.log(i);
}
};

This consumer will correctly release our iterator even though the loop is broken before the iterator gets exhausted.

Async generators function

As you can use generator functions to create iterators (one of its main use case), you’ll be happy to use an async generator function as an async iterator factory. They usually make the process of writing async iterators way more easier and more explicit using regular try/catch/finally statements.

For example, our async ounter could be written:

const counterGen = async function * (limit = 10, delay = 100) {
let iter = 1;
try {
while (true) {
if (iter > limit) {
break;
}
await wait(delay);
yield iter;
iter++;
}
} catch (e) {
console.log('oops something is wrong');
throw e;
} finally {
console.log('I have been released !!!');
}
};

const asyncCounter = (limit =10, delay=100) => ({
[Symbol.asyncIterator](){
return counterGen(limit, delay);
}
});

Few remarks:

  1. the [Symbol.asyncIterator] method can not have any argument (at least when consumed with the for await statement).
  2. An iterator can be iterable as well (if the Symbol.asyncIterator methods return itself).

3. The returned value of an async generator function is also an iterable.

However in the situations 2 and 3 your iterable will likely be consumable only once.

const counter = counterGen();consume(counter);
consume(counter);

This program will only print the result once as the generator will already have been exhausted when called the second time.

Streams

You’ll probably have understood how useful these new constructs can be to build streams and transformations on streams.

Libraries such lorenzofox3/for-await offer operators with an API very similar to traditional well known synchronous collections (such the good old Array).

For instance you’ll be able to quickly transform your integer stream.

import {stream} from '@lorenzofox3/for-await';const myStream = stream(asyncCounter())
.filter(i => i % 2 === 0) //keep odds only
.map(i => i * i); // square them

(async function () {
for await (const t of myStream) {
console.log(t);
}
})();

This may sound familiar if you have already done a bit of reactive programming or used abstraction on top of Nodejs streams (see through for example). However with async iterables the implementation is usually straightforward (the library is around ~250 sloc) and does not depend on any particularity of any platform as it is regular EcmaScript only.
A simple map function replaces a whole transform stream.

What is left to you however, is to make sure the platform specific data sources implement the async iterable protocol. And again, this becomes more or less trivial with async generators.

Let’s build a basic csv parser for Node and the browser.

CSV Parser

Our parser will have to understand files starting with a first line made of headers, and then a list of data records based on theses labels (one by line).

id, foo, bar, bim
1, 42, 66, 33
2, 21, 99, 4
// etc (10 000 lines)

We will abstract away our data source so our parser does not need to run on a specific platform. The contract the data source must fulfill is pretty simple: it should be an async iterable producing “chunck” of data.

With these considerations in mind our parser could be written

// produces lines from a stream of chuncks
const lines = chunkStream => ({
[Symbol.asyncIterator]: async function * () {
let remaining = ''; // chunk may ends in the middle of a line
for await (const chunk of chunkStream) {
const chunkLines = (remaining + chunk).split('\n');
remaining = chunkLines.pop();
yield * chunkLines;
}
yield remaining;
}
});
// parser
export async function parser(source) {
const iterable = stream(lines(source))
.map(i => i.split(',')); // lines will be produced as arrays
// interpret the headers first
const headers = await iterable
.slice(0, 1)
.reduce(acc => acc);
// produces every record as an object
return stream(iterable)
.slice(1)
.map(line => {
const item = {};
for (let i = 0; i < headers.length; i++) {
item[headers[i]] = line[i];
}
return item;
});
}

And that’s it…

What is left is to create our data source depending on the platform we want our code to run on.

For nodejs we could write the following program

import {fromReadable} from '@lorenzofox3/for-await-node';
import {parser} from './csv-parser.mjs';
import fs from 'fs';
const fromFile = (file, opts = {encoding: 'utf8'}) => ({
[Symbol.asyncIterator]() {
return fromReadable(fs.createReadStream(file, opts));
}
});
//program which consumes our stream
(async function () {
const stream = await parser(fromFile('./examples/fixture.csv'));
for await (const line of stream) {
console.log(line);
}
})();

Note: the fromReadable function simply converts any Nodejs Readable Stream into an async iterable.

But wait, browsers also have their own semantic for readable stream (they are used with the Fetch API for example). So if we could make it an async iterable we could use our parser in the browser as well. And that is quite easy with async generators:

export default async function * (readable) {
let exhausted = false;
const reader = readable.getReader();
try {
while (true) {
const {value, done} = await reader.read();
exhausted = done;
if (done) {
break;
}
yield value;
}
}
finally {
if (!exhausted) {
reader.cancel();
}
}
}

Now we could use the Fetch API to get our file through the network and parse it within the browser (in a Web Worker for example)

import iterable from './browser-adapter.js';
import {parser} from './csv-parser.mjs';
const fromFetch = (path, otps = {}) => {
return {
[Symbol.asyncIterator]: async function * () {
const res = await fetch(path, {cache: 'default'});
for await (const chunk of iterable(res.body)) {
// decode
yield String.fromCodePoint(...chunk);
}
}
};
};
(async function () {
const csvData = await parser(fromFetch('./examples/fixture.csv'));

for await (const line of csvData){
console.log(line);
}
})();

Thanks to async iterators and libraries such lorenzofox3/for-await, we have been able to share a good amount of code between our two implementations while decoupling the different stages of the parser from the code which deals with IO operations (platform specific).

On side effects

lorenzofox3/for-await provides a good abstraction to manipulate streams and asynchronous values in a familiar way. However you have to understand the underlying data source. By definition a stream is statefull and very different from a (even 10 000 items) array stored in the stack memory. When you call the reduce operator you need to create and consume a stream (even if it is to throw it after the first line), so you do when you use map. It may be cheap to create a file system stream on Node.js, but a Fetch request will go through the network and that is not without any consequence, especially if you do not cache the response properly.

An implementation based on side effect may be better in this case (even though more difficult to reason about):

export async function parser(source) {
let headers;
return stream(lines(source))
.map(line => line.split(','))
.map((line, index) => {
if(index === 0){
headers = line;
} else {
const item = {};
for (let i = 0; i < headers.length; i++) {
item[headers[i]] = line[i];
}
return item;
}
})
.slice(1);
}

Programming is about creating and manipulating abstractions, however keeping in mind what is behind these abstractions is as important and more than ever when it can impact the performances of your program.

Nevertheless, async iterators (especially with async generators and for await statement) provide a new neat way to create abstractions which deal with asynchronous data.

Note: you can find the code related to these example in the @lorenzofox3/for-await repository.

--

--