Michal Pristas
Oct 28, 2018 · 7 min read

Recently I faced an interesting problem, which is bugging me during the nights to this day.

The problem is like this. You allow the user to upload a file, huge file to your system. At the same time, you use multiple backing cloud storages, or whatever storages you think of. For the sake of simplicity, I will think of different locations on the disk.

Now user uploads his huge file and it’s up to you to back it up at all of these backing locations.

Thinking about Go code, you have a reader as input and multiple readers (not writers) passed to different storage APIs. To avoid timeouts, as this is an HTTP call, you don’t want to write to them sequentially.

I will assume Storage implementation which writes to disk and provides a Save method

type Storage struct {
rootDir string
}
func NewStorage(rootDir string) *Storage {
return &Storage{
rootDir: rootDir,
}
}
func (s *Storage) Save(c context.Context, b string, r io.Reader) error {
// TODO: implement
}

The first solution which comes to mind (if you’re running on a machine with unbound resources and sky is the limit for your memory) is to load it into the memory and create a whole new reader for backing storages.

func uploadBlob(ctx context.Context, blobName string, blobReader io.Reader, storages []*Storage) error {
// read content into memory
content, err := ioutil.ReadAll(blobReader)
if err != nil {
return err
}
var results = make(chan error, len(storages))
defer close(results)
// compose reader for each backend
for _, store := range storages {
go func(s *Storage) {
rdr := bytes.NewReader(content)
select {
case <-ctx.Done(): // something failed
break
case results <- s.Save(ctx, blobName, rdr):
}
}(store)
}
var errs error
for i := 0; i < len(storages); i++ {
if r := <-results; r != nil {
errs = multierror.Append(errs, r)
}
}
return errs
}

Works OK-ish, but the drawback is memory consumption. Let’s run it on a calculator and it will blow.

So what other option do we have? IO package provides a set of readers/writers which might come in handy.

TeeReader

First one mentioned is TeeReader:

func TeeReader(r Reader, w Writer) Reader

According to docs: TeeReader returns a Reader that writes to w what it reads from r. Sounds good so far.

So if we feed it a reader we got from an HTTP call, we will get a reader we can pass along and a writer on top of that which will be populated as a reader is being read.

But we have multiple consumers. If we got 2, the problem is solved. We will use another construct of io package called MultiWriter

func MultiWriter(writers ...Writer) Writer

MultiWriter creates a writer that duplicates its writes to all the provided writers, similar to the Unix tee(1) command.

Ok, so now we can have a writer for each consumer. But the consumer does not want a writer, it wants a reader, the consumer wants to Read the content.

Luckily, io package is powerful, one of the most powerful packages there is.

It gives us another tool: Pipe

func Pipe() (*PipeReader, *PipeWriter)

Pipe creates a synchronous in-memory pipe. It can be used to connect code expecting an io.Reader with code expecting an io.Writer.

Using these 3 tools we can build this rocket.

For each consumer, we create a pipe providing us with a set of Writers and Readers.

for i := 0; i < count-1; i++ {
pr, pw := io.Pipe()
readers = append(readers, pr)
pipeWriters = append(pipeWriters, pw)
}

We can pass those readers to consumers and use writers to write to them.

for i, store := range storages {
go func(s *Storage, rdr io.Reader) {
rdr := bytes.NewReader(content)
select {
case <-ctx.Done(): // something failed
break
case results <- s.Save(ctx, blobName, rdr):
}
}(store, readers[i])
}

So we can do all the writes at once, we combine them using MultiWriter.

multiWriter := io.MultiWriter(pipeWriters...)

Now we just redirect source reader to MultiWriter

teeReader := io.TeeReader(source, multiWriter)

But now nobody is able to read from teeReader. So before we actually call the consumers. We need to replace one of the readers with this one. Or think one step ahead and generate N-1 pipes.

When you try to run the code in this form you will get the Deadlock. As we need to close each PipeWriter.

For this we will construct MultiCloser:

type MultiCloser struct {
closers []io.Closer
}
func NewMultiCloser(closers []io.Closer) *MultiCloser {
return &MultiCloser{
closers: closers,
}
}
func (m *MultiCloser) Close() error {
var err error
for _, c := range m.closers {
if e := c.Close(); e != nil {
err = multierror.Append(err, e)
}
}
return err
}

So in its final form, it will look like this

func uploadBlob(ctx context.Context, blobName string, blobReader io.Reader, storages []*Storage) error {
var results = make(chan error, len(storages))
defer close(results)
readers, closer := getReaders(blobReader, len(storages)) // compose reader for each backend
for i, store := range storages {
go func(s *Storage, rdr io.Reader) {
select {
case <-ctx.Done(): // something failed
break
case results <- s.Save(ctx, blobName, rdr):
}
}(store, readers[i])
}
var errs error
for i := 0; i < len(storages); i++ {
if r := <-results; r != nil {
errs = multierror.Append(errs, r)
}
if i == 0 {
// close once source is read
closer.Close()
}
}
return errs
}
func getReaders(source io.Reader, count int) ([]io.Reader, io.Closer) {
readers := make([]io.Reader, 0, count)
pipeWriters := make([]io.Writer, 0, count)
pipeClosers := make([]io.Closer, 0, count)
for i := 0; i < count-1; i++ {
pr, pw := io.Pipe()
readers = append(readers, pr)
pipeWriters = append(pipeWriters, pw)
pipeClosers = append(pipeClosers, pw)
}
multiWriter := io.MultiWriter(pipeWriters...)
teeReader := io.TeeReader(source, multiWriter)
// append teereader so it populates data to the rest of the readers
readers = append([]io.Reader{teeReader}, readers...)
return readers, NewMultiCloser(pipeClosers)
}

Clever? No! It will result in synchronous reading so the slowest consumer will dictate the pace of processing for all others. Or maybe not, but I think it will.

Complete sources can be found here: https://github.com/michalpristas/fan-out-writes

io.Copy

Looks like I forgot that io.Copy exists. Shame on me!

So let’s modify the previous solution with io.Copy.

The first thing changing is that we will produce N pipes instead of N-1

for i := 0; i < count; i++ {
pr, pw := io.Pipe()
readers = append(readers, pr)
pipeWriters = append(pipeWriters, pw)
pipeClosers = append(pipeClosers, pw)
}

Then we will get rid of TeeWriter. In order to do that we change the signature of getReaders function a bit:

from:

getReaders(source io.Reader, count int) ([]io.Reader, io.Closer)

to:

getReaders(count int) ([]io.Reader, io.Writer, io.Closer)

We don’t need source anymore which was used by TeeWriter and we added one more output in form of reader.

So now we return:

  • io.Readers for consumers,
  • io.Writer for io.Copy so readers can be populated,
  • io.Closer so we can close the pipes

Whole getReaders method got a bit shorter:

func getReaders(count int) ([]io.Reader, io.Writer, io.Closer) {
readers := make([]io.Reader, 0, count)
pipeWriters := make([]io.Writer, 0, count)
pipeClosers := make([]io.Closer, 0, count)

for i := 0; i < count; i++ {
pr, pw := io.Pipe()
readers = append(readers, pr)
pipeWriters = append(pipeWriters, pw)
pipeClosers = append(pipeClosers, pw)
}

return readers, io.MultiWriter(pipeWriters...), NewMultiCloser(pipeClosers)
}

In the previous example, TeeReader took care of copying the content between readers now it’s our job to do it. So in main we spin up a separate goroutine which will copy the content from source reader to the readers passed to our clients and after copying is done, it closes all pipes.

go func() {
io.Copy(writer, blobReader)
closer.Close()
}()

So once again, the final code looks like this:

func uploadBlob(ctx context.Context, blobName string, blobReader io.Reader, storages []*Storage) error {
var results = make(chan error, len(storages))
defer close(results)
readers, writer, closer := getReaders(len(storages)) // compose reader for each backend
for i, store := range storages {
go func(s *Storage, rdr io.Reader) {
select {
case <-ctx.Done(): // something failed
break
case results <- s.Save(ctx, blobName, rdr):
}
}(store, readers[i])
}
go func() {
io.Copy(writer, blobReader)
closer.Close()
}()
var errs error
for i := 0; i < len(storages); i++ {
if r := <-results; r != nil {
errs = multierror.Append(errs, r)
}
}
return errs
}
func getReaders(count int) ([]io.Reader, io.Writer, io.Closer) {
readers := make([]io.Reader, 0, count)
pipeWriters := make([]io.Writer, 0, count)
pipeClosers := make([]io.Closer, 0, count)

for i := 0; i < count; i++ {
pr, pw := io.Pipe()
readers = append(readers, pr)
pipeWriters = append(pipeWriters, pw)
pipeClosers = append(pipeClosers, pw)
}
return readers, io.MultiWriter(pipeWriters...), NewMultiCloser(pipeClosers)
}

Source code can be found here: https://github.com/michalpristas/fan-out-writes/tree/io-copy

This code is just a tiny bit smaller in size but the mental model is much simpler. No need to think about who populates the readers, which one of the clients is primary etc. We do that.

The only drawback if we compare this solution to the one with TeeReader is memory footprint. The difference is not so critical though.

io.Copy internally uses a buffer of size 32kB while TeeReader replicates in byte by byte fashion. This is a small price for a much more clear solution.

If we would want to have a similar footprint as with TeeReader, we can use io.CopyBuffer which accept a buffer array of any size but 0.

Look the only change is

go func() {
buf := make([]byte, 1)
io.CopyBuffer(writer, blobReader, buf)
closer.Close()
}()

Source code again: https://github.com/michalpristas/fan-out-writes/tree/io-copy-buf

Looks like we iterated to a really simple solution, which is easy to understand and read. But in case you know a better way to do this don’t be afraid to create a PR. I would love to see it.

Michal Pristas

Written by

Developer and BJJ addict

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade