A beginner’s guide to streams in Deno
Introduction
Streams are exactly what they sound like, a stream of data. You can think of it as a real, actual stream, with data flowing from one place to another. The data flows through the stream till it reaches the end.
A stream is very similar to an array, except that it’s a flow of items that will be arriving at some time, hopefully in the near future.
Processing large amount of data
The most common use case for streams is to process a large amount of data that cannot be hold in memory, or if it’s held, it’ll cause a memory spike. For example, let’s say there is a 10 GB file with data, and we need to do some transformation on each line present in the file. If we read the 10 GB file into memory, it would take quite a bit of time just to read it into memory. In some cases, it may not be even possible to read that much data into memory. We may run out of memory unless we’ve a really large amount of RAM. A better & more efficient way is to read one line at a time from the file and process them individually. This would surely be slower than getting everything into memory, but it’ll not cause a memory spike. We can attach a handler to the stream and process line after line, processing one at a time until we’re done with the huge file. This way, we can start processing the file right away, and we’ll not be using much memory at all.
A simple test
Let’s run a simple test by loading a 100 MB file into memory. This file contains a comma separated list of numbers. We’ll be counting the total number of readings present in the file. The usual way is to load the entire file in memory, split the loaded file by comma, and counting the length of the array.
Here is the code:
const file = await Deno.readTextFile("./readings100M.txt");
const numReadings = file.split(",").length;
console.log(`Total readings found=${numReadings}`);
When we run the above code, we notice that the file gets processed pretty fast:
> time deno run --allow-read app.ts
Total readings found=20971521real 0m1.994s
user 0m2.423s
sys 0m0.572s
The problem is with the memory spike. As per Mac’s process sampling, here is the peak memory usage:
Physical footprint (peak): 641.1M
This is the peak memory usage for a single file. Imagine what would happen if multiple such files are processed in parallel.
Streams to the rescue
With streams, we can slowly process the file in chunks. A chunk of file is loaded into memory. The loaded chunk is processed. Once the chunk is done, another chunk is loaded into memory. This procedure gets repeated till the file finishes. This procedure is surely going to be much slower than loading an entire file into memory, however the memory would stay under control no matter how much is the size of the file.
Another important use case of streams is transforming the flowing data into something else. In this case, the input & output are both streams. The only difference being that the output stream is different from the input stream. For example — A large sized zipped file can be passed through a transformer that would unzip the file as the file contents flows through the transformer. This way, we don’t have to load the entire zip file into memory and then unzip it.
That’s all about the introduction. The rest of the article focuses on stream handling WRT Deno.
Types of streams
As the article is about stream handling in Deno, we’ll learn the basics of the three types of streams supported by Deno. Deno is web compatible, therefore it supports the web standard streams:
ReadableStream
A ReadableStream represents a source of data from which you can read. In other words, data comes out of a readable stream. As the name indicates, ReadableStream is read-only. The data always comes out of the readable stream. The ReadableStream can be built from any compatible source, however the data always comes out of it.
Some examples of ReadableStream are:
- HTTP request body
- HTTP response body
- A file opened for reading
- Stdout and stderr from the child process
WritableStream
A WritableStream represents a destination for data into which you can write. In other words, data goes in to a writable stream. Again, as the name indicates, the WritableStream is write-only. The data always goes into the writable stream. The data never comes out. If data needs to come out, it has to come through a ReadableStream interface.
Some examples of WritableStream are:
- A file opened for writing
- Stdin for the child process
TransformStream
A TransformStream represents both ReadableStream and WritableStream. A transform stream consists of a pair of streams: a writable stream, known as its writable side, and a readable stream, known as its readable side. Writes to the writable side result in new data being made available for reading from the readable side.
The primary purpose of TransformStream is to take data in some format and emit data in another format.
Some examples of TransformStream are:
- Zip a file
- Unzip a file
- Raw data to string
- String to raw data
That’s all about the initial background behind the streams. Let’s move on to some places where streams are found.
Stream interfaces
The streams are present in a number of places. Being a web standard advocate, Deno strives to provide the web standard streams interface wherever possible.
Here is a list of common places along with the type of stream:
- Request.body: Contains the HTTP request body stream (Type: ReadableStream)
- Response.body: Contains the HTTP response body stream (Type: ReadableStream)
- FsFile.readable: Contains the data stream from a file opened in reading mode (Type: ReadableStream)
- FsFile.writable: Contains the data stream towards a file opened in writing mode (Type: WritableStream)
- Child.stdin: Contains the data streams towards standard input for a child process (Type: WritableStream)
- Child.stdout: Contains the data stream from the standard output of a child process (Type: ReadableStream)
- Child.stderr: Contains the data stream from the standard error of a child process (Type: ReadableStream)
In earlier releases, Deno used to support Deno.Reader and Deno.Writer interfaces. However, all the interfaces have slowly moved to web standard ReadableStream and WritableStream interfaces.
The only part remaining in the streams is connecting Readable & Writable streams. Let’s have a look at it.
Stream pipes
A pipe is a connection between two types of streams. Imagine placing a pipe between the source and the destination of the data. Once a pipe is placed, the data starts flowing from source and destination as they’re now connected. There are two types of web standard pipes.
PipeTo
The pipeTo() method of the ReadableStream interface pipes the current ReadableStream to a given WritableStream. This is an async procedure that completes sometime in the future.
Piping a stream will generally lock it for the duration of the pipe, preventing other readers from locking it.
PipeThrough
The pipeThrough() method of the ReadableStream interface provides a chainable way of piping the current stream through a transform stream or any other writable/readable pair.
Piping a stream will generally lock it for the duration of the pipe, preventing other readers from locking it.
The output of pipeThrough is a transformed ReadableStream. Usually, the transformed stream will be different from the original stream.
Use cases of streams
Save HTTP request body in a file
The HTTP request body is always offered as a ReadableStream. A file opened for writing offers a WritableStream. These two can be connected using pipeTo() for the data to flow from the request body to the file.
Here is the code of an HTTP server that pipes the request body to a file:
import { serve } from "https://deno.land/std/http/mod.ts";const reqHandler = async (req: Request) => {
const destFile = await Deno.open(`./uploadedFile`, {
create: true,
write: true,
truncate: true,
});
await req.body?.pipeTo(destFile.writable);
return new Response(null, { status: 201 });
};serve(reqHandler, { port: 8082 });
Here is a test run:
> deno run --allow-write=./ --allow-net=:8082 app.ts
Listening on http://localhost:8082/> ls -l readings100M.txt
-rw-r--r-- 1 mayankc staff 104857600 Apr 6 2021 readings100M.txt> curl http://localhost:8082 --data-binary "@./readings100M.txt" -v
> POST / HTTP/1.1
> Content-Length: 104857600
> Expect: 100-continue
>
< HTTP/1.1 100 Continue
< HTTP/1.1 201 Created
Serve a file
Serving a file is the reverse of the above use case, save an HTTP request body in a file. A file opened for reading offers a ReadableStream. As the Response object expects a ReadableStream, the file’s ReadableStream can be directly given to the Response object. No piping is required.
Here is the code of an HTTP server that serves a file:
import { serve } from "https://deno.land/std/http/mod.ts";const reqHandler = async (req: Request) => {
const fileName = `./${new URL(req.url).pathname}`;
const srcFile = await Deno.open(fileName);
return new Response(srcFile.readable);
};serve(reqHandler, { port: 8082 });
Here is a test run:
> deno run --allow-read=./ --allow-net=:8082 app.ts
Listening on http://localhost:8082/> cat testdata/sample.txt
Learning Deno Is Fun!> curl http://localhost:8082/testdata/sample.txt
Learning Deno Is Fun!
Note: While streaming a file, the content-length header is not sent because there is no way to get the length of a stream. The content-length header needs to be set explictily (get the size of file using Deno.stat).
Zip a file
To zip a file, we need to deploy both the methods:
- pipeThrough() to transform file’s ReadableStream
- pipeTo() to send zipped stream to destination file’s WritableStream
The source file would be opened for reading to get a ReadableStream. The destination file would be opened for writing to get a WritableStream. The source stream would be transformed using CompressionStream.
Here is the code that zips a file:
const srcFile = await Deno.open("./testdata/sample.pdf");
const dstFile = await Deno.open("./sample.gz", {
create: true,
write: true,
truncate: true,
});
srcFile.readable
.pipeThrough(new CompressionStream("gzip"))
.pipeTo(dstFile.writable);
Here is a test run:
> deno run --allow-read=./ --allow-write=./ app.ts> ls -ltr ./testdata/sample.pdf
-rw-r--r--@ 1 mayankc staff 69273 Apr 3 2021 ./testdata/sample.pdf> ls -ltr sample.gz
-rw-r--r-- 1 mayankc staff 64602 May 29 22:42 sample.gz
Save output of a child process in a file
A child process created through spawnChild API offers three streams:
- stdout: ReadableStream
- stderr: ReadableStream
- stdin: WritableStream
The data flows from child process to parent process through stdout and stderr. The data flows from parent process to child process through stdin.
Here is the code that creates a child process and saves the output streams in a file:
const child = Deno.spawnChild("./someScript.sh", {
args: [
"THIS SHOULD BE ON STDOUT",
"THIS SHOULD BE ON STDERR",
],
});
const dstFileOut = await Deno.open("./childOutput", {
create: true,
write: true,
});
const dstFileErr = await Deno.open("./childErr",
{ create: true, write: true });
child.stdout.pipeTo(dstFileOut.writable);
child.stderr.pipeTo(dstFileErr.writable);
Here is a test run:
> deno run --allow-read=./ --allow-write=./ --allow-run --unstable app.ts> cat someScript.sh
echo "Hello world to $1"
echo "Hello world to $2" >&2> cat childOutput
Hello world to THIS SHOULD BE ON STDOUT> cat childErr
Hello world to THIS SHOULD BE ON STDERR
Unzip & process HTTP request line-by-line
This is a complex example that deploys a number of stream APIs alongwith both kind of piping. An HTTP server gets a zipped request body. The body is unzipped, converted to string, and then processed line-by-line. All of this happens on streams. The number of words in each line are printed.
Here is the code of such a server:
import { serve } from "https://deno.land/std/http/mod.ts";
import { TextLineStream } from "https://deno.land/std/streams/mod.ts";const reqHandler = async (req: Request) => {
if (!req.body) {
return new Response(null, { status: 415 });
}
const lines = req.body
.pipeThrough(new DecompressionStream("gzip"))
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream());
let numWords = 0;
for await (const line of lines) {
numWords += line.split(" ").length;
}
return new Response("Total words:" + numWords);
};serve(reqHandler, { port: 8082 });
Here is a test run:
> curl http://localhost:8082 --data-binary "@./500K.txt.gz"
Total words:75830
Before closing, let’s go back to the first problem. We processed a 100 MB file contains comma separated readings. We wanted to count the number of readings. Without streaming, we observed a memory spike. The max memory usage reached ~ 650 MB.
Let’s rewrite the same code using streams:
import { DelimiterStream } from "https://deno.land/std/streams/mod.ts";const delim = new TextEncoder().encode(",");
const srcFile = await Deno.open("./readings100M.txt");
const readings = srcFile.readable
.pipeThrough(new DelimiterStream(delim));
let numReadings = 0;
for await (const reading of readings) {
numReadings++;
}
console.log(`Total readings found=${numReadings}`);
First, let’s look at the time taken:
> time deno run --allow-read=./ app.ts
Total readings found=20971521real 0m21.813s
user 0m21.433s
sys 0m0.277s
The streaming approach is definitely slower than loading the entire file in the memory.
Now. let’s have a look at the peak memory usage:
Physical footprint (peak): 111.7M
The peak memory usage stays ~ 110M for a 100M file. This is 6 times lesser than loading entire file in memory.
Let’s repeat the test for a 1G file:
> time deno run --allow-read=./ app.ts
Check file:///Users/mayankc/Work/source/denoExamples/app.ts
Total readings found=200000001real 3m11.084s
user 3m10.007s
sys 0m2.164s// --Physical footprint (peak): 158.7M
This story is a part of the exclusive medium publication on Deno: Deno World.