Streaming an avalanche of data with Go’s io package

Jason Lui
Xendit Engineering
Published in
5 min readMay 17, 2022

In Go, using io package (io.Pipe(), io.Reader/WriteCloser) to stream data could avoid reading all data into the memory.

If you prefer reading code rather than my babbling, here’s a gist with revisions, and the playground link before and after the refactor. (but the last part The Final Fine-tuning is not covered)

One fine day I started working on a task to extract a bunch of data based on user-specified filter, transform that into CSV format and upload it to a network location.

“That’s rather typical,” thought I naively. So here I went:

Nice, modular and testable. (See Playground link for a demo) What could go wrong?

Enter load testing…

memory consumption at load testing

This was just a few millions of records, and it’s Xendit we’re talking about here, one single request could easily hit that number, this wouldn’t work.

Solutioning

Let’s identify the root cause of this RAM spike.

Apparently, by the interface getData(context.Context, filter) ([]item, error), I was storing all items in the RAM. To reduce the RAM usage, I needed to find a way to stream the data from DB directly to the network location.

io Package to the Rescue

Taking a closer look, the upload SDK takes an io.Reader as input, like most other libraries in Go involving input/output. Therefore,

should be refactored as

Now I have a Reader, how do I write to it? Go’s io package has a function Pipe() that returns a connected pair of Reader (PipeReader) and WriteCloser (PipeWriter).

Hence, I should create the pair, pass the reader to the upload interface, and use another goroutine to perform the writing (if I don’t use a goroutine, the reader and writer will block each other).

To perform the writing, I need to poll the database result, and write to it. So the main logic becomes

The datastore interface would return the rows now:

It worked! (See Playground link for an updated demo)

Memory Consumption Comparison

Most of the allocations came from the upload/reader part. To put a cherry on top, the time consumption was cut in half because the read and write were performed concurrently, albeit still being O(n).

Time Consumption Comparison

Package Boundary

That’s not the end of the story! In reality, I separated the DB access in another package, in hope of reusing it, this meant the package would expose *sql.Rows (actually I’m using sqlx, so *sqlx.Rows), making it tightly coupled to database or even a specific DB library.

To address this, I decided to communicate with a channel of items instead. So now the DB interface changed from

to

And the CSV writer part was updated accordingly, see Playground link for the updated demo.

Let’s check the performance again!

Memory Consumption Comparison with Refactor

Yup, almost overlapping red and yellow lines indicated that the refactor didn’t burden the memory much. Time shouldn’t be impacted as I didn’t…

Time Consumption Comparison with Refactor

Wait… What? The refactor (in yellow) slowed it down even worse than the original solution. We’ve got a new problem.

The Final Fine-tuning

Why was it slowed down? Time to pull out the big gun — pprof.

I added the following lines to capture the CPU profile:

(See the gist for the final code.) Before the refactor (where *rows is returned), the visualization of pprof partly looked like this:

CPU Profile when *rows is returned

After that (where a channel is used), it looked like this:

CPU Profile when channel is used

There’s a significant amount of time spent on waiting, by this great article,

Each channel internally holds a mutex lock which is used to avoid data races in all kinds of operations.

I thought buffering the channel might help, and it did.

Time Consumption Comparison with Buffered Channel

Buffered channels (green/orange line) consumed less time than the original solution (blue line), but still more than the solution where *rows was used (red line), and the size of the buffer hardly mattered beyond a certain threshold, because the upload part was able to drain the items.

The memory consumption shouldn’t be impacted much as I didn’t…

Memory Consumption Comparison with Buffered Channel

No more surprises. All good now.

As a last thought, I think the use of channel is not well justified as the only reason is that I want to separate the DB access part, which, on a second thought, might not be reusable anyway because the filter is quite specific to this use case. Hence, I decided to merge the DB access with the main logic and use the *rows directly.

In summary, when a large amount of data is flowing out of your Go system:

  1. use io.Pipe() to create a pair of connected io.Reader and io.WriteCloser ;
  2. pass the io.Reader to your data output component; (e.g., S3 SDK, Kafka library)
  3. write the data to the io.WriteCloser in another goroutine; (don’t forget to Close it at the end!)
  4. if necessary and feasible, use buffered channel to pass the data to the writer to facilitate loose coupling.

This way you wouldn’t need to hold all data in RAM.

Thank you for reading and I hope this helps you!

--

--