Readable Streams in Deno

Mayank C
Tech Tonic

--

An update of this article has been published here.

Introduction

Streams

Streams are a one of the commonly used data structures for chunked handling of a large amount of flowing data. The primary motivation behind using streams is to process a large volume of data in chunks without increasing memory pressure. They key is volume and chunks.

For example — a large file (say 5G) should be read/processed in chunks rather than loading it completely in memory and having a spike in the process’s memory usage.

The web standards support readable, writable, transformable streams for handling data like files, media, HTTP request/response bodies, etc.

Reader & Writer

Since it’s inception, Deno came with Reader and Writer interfaces that works almost like streams. These interfaces support reading and writing in chunks, similar to what streams do. Deno follows the web standards as much as possible. One of the most common use of Deno would be building web apps, REST APIs, etc. Therefore, one of the most common places where streams would be seen is in HTTP body handling.

For example — the request/response body of the fetch API could be a readable stream that can be read using either the default reader (proposed by web standards) or the Deno reader. There are places/utility functions in Deno, where Deno’s Reader and Writer interfaces are supported, but web standard’s readable/writable streams aren’t.

The question is:

how to deal with the two ways of handling streams: web standard’s streams and Deno’s reader/writer

Purpose

ReadableStreams are useful in getting data, while WritableStreams are useful in sending data into a sink. Considering Deno runtime’s position & use cases, ReadableStreams would be more commonly used than WritableStreams. Therefore, in this article, we’ll go over converting web standard’s streams to/from Deno’s reader. We’ll see how to convert:

  • readable stream to Deno reader
  • Deno reader to readable stream

We’ll discuss WritableStream in a future article.

ReadableStream <-> Reader

Readable streams are useful in processing data flowing towards a destination. Generally, it would be from HTTP request and response body handling. In web standards, both the Request and Response object supports ReadableStream for bodies.

One example is fetch API, other example is serveHttp:

We’ll see how to convert ReadableStream to/from Deno’s Reader. The usage is the same whether it’s fetch or serveHttp.

Readable stream to Reader

Consider the following fetch API call:

const res=await fetch('https://deno.land');
res.body;
//ReadableStream { locked: false }

The response object contains a body that’s of type ReadableStream because fetch is a web API. There are easy ways to get the response body: r.text() or r.json() or r.formData(), etc. but these are useful in cases when the user wants to read the data completely (definitely good and useful for small sized bodies).

To read data from the body as a stream (i.e. in chunks), the stream need to be converted to the web’s ReadableStreamDefaultReader.

const res=await fetch('http://deno.land');
res?.body?.getReader();
//ReadableStreamDefaultReader

Now we’ve ReadableStreamDefaultReader, still reading from this reader is a bit cumbersome (Refer https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/getReader to understand how to read from the default reader). Fortunately, there is an easy way!

At this point, the Deno’s Reader comes into the picture. If the default reader can be converted to Deno’s Reader, we can:

  • read chunks from it easily
  • use it in places where reader is supported

Read chunks easily

Deno’s standard library’s io module comes with a function readerFromStreamReader to convert ReadableStreamDefaultReader into Deno.Reader. There is an import required:

import { readerFromStreamReader } from "https://deno.land/std/io/mod.ts";

The readerFromStreamReader is a small but amazing function! It returns Deno.Reader that can be used to read in the chunks easily. The chunk size would be capped at 8K (8192 bytes). Usually, reading would go in a loop till the reader is empty.

import { readerFromStreamReader } from "https://deno.land/std/io/mod.ts";const res=await fetch('https://deno.land');
const wrdr=res?.body?.getReader();
if(wrdr) {
const r=readerFromStreamReader(wrdr);
let n=0;
do {
const b=new Uint8Array(10000);
n=await r.read(b) || 0;
} while(n>0)
}

Here is the output of a run:

//Iteration 1
//n=8192 Uint8Array(10000) [60, 33, 68, 79, ....]
//Iteration 2
//n=8192 Uint8Array(10000) [103, 114, 97, 121, ....]
//Iteration 3
//n=8192 Uint8Array(10000) [110, 101, 32, 116, ....]
//Iteration 4
//n=1823 Uint8Array(10000) [67, 73, 34, 32, ....]
//Iteration 5
//n=0

It’d be tempting to read all 26399 bytes at once using readAll function (part of the same io module). That’s fine if the data is small. However, if the data is big (say 10MB), the Deno process would observe a memory spike. The spike would get worse when there are multiple such requests at the same time. For production systems, this might raise an alert in the monitoring systems.

Use in places where reader is used

The other use of converting to reader is to use in places where reader can be used. For example — save the response stream into a file. The following code sends the response stream directly to a file using Deno.copy function. Deno.copy function would keep copying chunks of data from reader to writer till it empties the reader.

import { readerFromStreamReader } from "https://deno.land/std/io/mod.ts";const res=await fetch('https://deno.land');
const wrdr=res?.body?.getReader();
if(wrdr) {
const r=readerFromStreamReader(wrdr);
const file=await Deno.open('/var/tmp/d.txt', {create: true, write: true});
await Deno.copy(r, file)
file.close();
}
//26399 bytes copied to file

Very simple! Utilizing built-in functions like Deno.copy make the work easier. In just one line code, the stream is piped.

As mentioned earlier, Deno’s native HTTP server (based on hyper) also follows web standards (Request and Response). It presents a Request to user, and expects a Response. The request contains a ReadableStream for any uploaded data. The ReadableStream can be converted to Reader in the same way. Similarly, Response could take a ReadableStream, and that’s what we’ll see next (Reader to ReadableStream).

Reader to Readable stream

Consider another fetch call, but this time it’s going to upload data. The fetch API can take a ReadableStream to send body in the HTTP request. This time we need to build a ReadableStream object. Again, building a ReadableStream object using only web standards is a bit cumbersome.

Deno’s reader comes to the rescue again! It’d be easier to create a Reader and then convert it to ReadableStream. We’ll see how it makes the process easier.

To convert Reader into ReadableStream, a function readableStreamFromReader need to be imported from Deno’s standard library’s io module:

import { readableStreamFromReader } from "https://deno.land/std/io/mod.ts";

Here is an example of a file upload. The file is read in chunks by the fetch function through Readable stream.

const file=await Deno.open('/var/tmp/d.txt');
const r=readableStreamFromReader(file);
const req=new Request('https://localhost:1234', {method: 'POST', body: r});
const res=await fetch(req);

Similar to fetch, serveHttp takes Response object as input. The Response object could take a body as a ReadableStream. The above function can be used to build a ReadableStream. Before closing, let’s see an example of serveHttp for upload and download of a file.

ServeHttp upload and download

Now that we know about both of the useful methods, let’s write a small example using serveHttp. First, we’ll see a file upload example, and then we’ll see a file download example.

Here is the code for file upload using serveHttp. The uploaded file is directly saved on the disk.

import { readerFromStreamReader } from "https://deno.land/std/io/mod.ts";const listener = Deno.listen({ port: 5000 });
for await(const conn of listener) {
for await(const { request, respondWith } of Deno.serveHttp(conn)) {
const wrdr=request?.body?.getReader();
if(wrdr) {
const r=readerFromStreamReader(wrdr);
const file=await Deno.open('/var/tmp/z1.txt', {create: true, write: true});
await Deno.copy(r, file);
file.close();
await respondWith(new Response(undefined, {status: 200}));
}
}
}
//--> curl -F 'data=@./readings100M.txt' http://localhost:5000
> du -kh /var/tmp/z1.txt
100M /var/tmp/z1.txt

The code is exactly the same! This is because both fetch and serveHttp use Request object.

Lastly, let’s see an example of file download. Here we’ll send the same file back (/var/tmp/z1.txt) using Response object. This code is also the same!

import { readerFromStreamReader, readableStreamFromReader } from "https://deno.land/std/io/mod.ts";const listener = Deno.listen({ port: 5000 });
for await(const conn of listener) {
for await(const { request, respondWith } of Deno.serveHttp(conn)) {
const file=await Deno.open('/var/tmp/z1.txt');
const r=readableStreamFromReader(file);
await respondWith(new Response(r, {status: 200}));
file.close();
}
}
//--curl http://localhost:5000 -o /dev/null
% Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed
100 100M 0 100M 0 0 178M 0 --:--:-- --:--:-- --:--:-- 178M

--

--