Sitemap
We-Code Digital Studio

Software Development

Follow publication

Featured

Angular and JSON streaming

--

This article is a continuation of the previous article Optimizing Elasticsearch pagination with IAsyncEnumerable and NDJson and IAsyncEnumerable which addressed the issue of performing efficient searches on Elasticsearch using successive pagination and IAsyncEnumerable and produce a correct NDJson response from the controller.

Streaming is an architectural choice.

As already described in previous articles, we managed to produce a streamed response from our controller:

Result streamed

But how can we correctly read this data stream from our Angular application?

The most obvious solution might be to use the classic approach with HttpClient and parse the response using the usual methods we all know.

Something like this:

private streamWithHttpClient(): Observable<MyRecord> {
return new Observable<MyRecord>(observer => {
this._httpClient.get("/stream", { responseType: 'text' }).subscribe(response => {
const reader = new ReadableStreamDefaultReader(new Response(response).body!);
const gen = createStream(reader);
(async () => {
while (true) {
const { done, value } = await gen.next();
if (done) {
observer.complete();
return;
}
observer.next(value);
}
})();
});
});
}

This approach might seem correct, but it suffers from a major issue. HttpClient does not support response streaming, meaning that the client will wait for the backend call to complete before returning the entire result. This negates the benefits of performing the request as a stream.

We can see from the simulation that the response arrives only after all NDJson rows have been transferred.

HttpClient example

How can we overcome this limitation?

Very simply, by correctly integrating fetch with the creation of an observable:

private streamWithFecth(): Observable<MyRecord> {
return new Observable<MyRecord>(observer => {
fetch("/stream").then(res => {
const reader = res.body!.getReader();
const gen = createStream(reader);
(async () => {
while (true) {
const { done, value } = await gen.next();
if (done) {
observer.complete();
return;
}
observer.next(value);
}
})();
});
});
}

At this point, the observable will emit a new value each time a new record is transferred from the backend:

Fetch example

The only drawback of this approach is the lack of support for interceptors already configured in our Angular project.

Under the Hood

As we can see from the proposed example, everything is based on a simple generator function:

export async function* createStream(reader: ReadableStreamDefaultReader): AsyncGenerator<MyRecord, void> {
const decoder = new TextDecoder();
let buffer = '';

while (true) {
const { done, value } = await reader.read();

if (done) {
if (buffer.length > 0) {
yield JSON.parse(buffer);
}
return;
}

buffer += decoder.decode(value, { stream: true });
const lines = buffer.split(/\r?\n/);
buffer = lines.pop()!;

for (const line of lines) {
yield JSON.parse(line);
}
}
}

A generator function can be exited and later re-entered, a very important concept that we have already encountered while developing pagination in our backend using IAsyncEnumerable and Elasticsearch.

A possible improvement could be introducing a class to extend HttpClient and support JSON streaming in the Angular style with interceptor support. However, as we know, every project has its requirements, so this possibility should be carefully evaluated.

Conclusion

With this article, our journey into the world of JSON streaming comes to an end. We have learned how to correctly handle the generation of our responses as soon as they become available to efficiently retrieve documents from an Elasticsearch instance and do so in the most optimal way possible.

PS: The Angular sample application uses Signals and is configured as a zoneless one!

All the code, including integration tests, is available on the GitHub project.

Please, Subscribe and Clap! Grazie!

--

--

Diego Bonura
Diego Bonura

Written by Diego Bonura

Software engineer with a strong sense of humor. Anyway yesterday it worked.

No responses yet