Featured
Angular and JSON streaming
This article is a continuation of the previous article Optimizing Elasticsearch pagination with IAsyncEnumerable and NDJson and IAsyncEnumerable which addressed the issue of performing efficient searches on Elasticsearch using successive pagination and IAsyncEnumerable
and produce a correct NDJson response from the controller.
As already described in previous articles, we managed to produce a streamed response from our controller:
But how can we correctly read this data stream from our Angular application?
The most obvious solution might be to use the classic approach with HttpClient and parse the response using the usual methods we all know.
Something like this:
private streamWithHttpClient(): Observable<MyRecord> {
return new Observable<MyRecord>(observer => {
this._httpClient.get("/stream", { responseType: 'text' }).subscribe(response => {
const reader = new ReadableStreamDefaultReader(new Response(response).body!);
const gen = createStream(reader);
(async () => {
while (true) {
const { done, value } = await gen.next();
if (done) {
observer.complete();
return;
}
observer.next(value);
}
})();
});
});
}
This approach might seem correct, but it suffers from a major issue. HttpClient does not support response streaming, meaning that the client will wait for the backend call to complete before returning the entire result. This negates the benefits of performing the request as a stream.
We can see from the simulation that the response arrives only after all NDJson rows have been transferred.
How can we overcome this limitation?
Very simply, by correctly integrating fetch with the creation of an observable:
private streamWithFecth(): Observable<MyRecord> {
return new Observable<MyRecord>(observer => {
fetch("/stream").then(res => {
const reader = res.body!.getReader();
const gen = createStream(reader);
(async () => {
while (true) {
const { done, value } = await gen.next();
if (done) {
observer.complete();
return;
}
observer.next(value);
}
})();
});
});
}
At this point, the observable will emit a new value each time a new record is transferred from the backend:
The only drawback of this approach is the lack of support for interceptors already configured in our Angular project.
Under the Hood
As we can see from the proposed example, everything is based on a simple generator function:
export async function* createStream(reader: ReadableStreamDefaultReader): AsyncGenerator<MyRecord, void> {
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) {
if (buffer.length > 0) {
yield JSON.parse(buffer);
}
return;
}
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split(/\r?\n/);
buffer = lines.pop()!;
for (const line of lines) {
yield JSON.parse(line);
}
}
}
A generator function can be exited and later re-entered, a very important concept that we have already encountered while developing pagination in our backend using IAsyncEnumerable and Elasticsearch.
A possible improvement could be introducing a class to extend HttpClient and support JSON streaming in the Angular style with interceptor support. However, as we know, every project has its requirements, so this possibility should be carefully evaluated.
Conclusion
With this article, our journey into the world of JSON streaming comes to an end. We have learned how to correctly handle the generation of our responses as soon as they become available to efficiently retrieve documents from an Elasticsearch instance and do so in the most optimal way possible.
PS: The Angular sample application uses Signals and is configured as a zoneless one!
All the code, including integration tests, is available on the GitHub project.
Please, Subscribe and Clap! Grazie!