Go言語のio.Pipeでファイルを効率よくアップロードする方法

James Kirk
Eureka Engineering
Published in
9 min readFeb 5, 2019
パイプ(土管)をGo言語でも楽しめる

はじめに

前回はGo言語のmime/multipartパッケージによるファイルのアップロードを見ましたが、パフォーマンスの特徴にはあまり触れませんでした。
大規模なETLジョブや、制限の厳しいサーバーレスの環境などでは、ファイルを扱うプログラムのリソースを慎重に考える必要があります。本記事ではメモリ使用量を大幅に減らすio.Pipeの使い方を見ていきます。
全てのコードはサンプルレポジトリにあります。

同期処理にある問題

前回のコードをもう一度見てパフォーマンスを考えてみましょう。

// ファイルを開く
file, _ := os.Open(filename)

// リクエストボディのデータを受け取るio.Writerを生成する。
body := &bytes.Buffer{}

// データのmultipartエンコーディングを管理するmultipart.Writerを生成する。
mw := multipart.NewWriter(body)
fw, _ := mw.CreateFormFile(fieldname, filename)

// fwで作ったパートにファイルのデータを書き込む
_, _ = io.Copy(fw, file)

// contentTypeとbodyを使ってリクエストを送信する
resp, _ := http.Post(url, contentType, body)

データの流れを考えるとステップが大きく2つあります。

  1. 12行目でファイルの中身をエンコードしながらio.Copyでバッファ(bytes.Buffer)に書き込む。
  2. http.Postの中でバッファの中身をio.CopyでHTTPコネクション(net.Conn)に書き込む。
同期処理によるファイルのアップロード

このステップが同期的なので、最初のio.Copyが終わるまで次のio.Copyが開始できません。つまり、バッファがファイルの中身全体を一度保持する必要があります。ファイルサイズが大きくなるほど、プログラムのメモリ使用量が増えるように見えますが、benchmemのベンチマークを確認してみましょう。

┌───────┬────────┐
│ File │ Memory │
├───────┼────────┤
│ 6b │ 39KB │
│ 5KB │ 44KB │
│ 29KB │ 71KB │
│ 51KB │ 169KB │
│ 500KB │ 1.94MB │
│ 1MB │ 1.96MB │
│ 10MB │ 33.5MB │
└───────┴────────┘

メモリ使用量がファイルサイズに比例していることが分かりますが、ファイルサイズ自体を大きく上回ることもあります。この実装では、ファイルサイズによってはメモリ不足のリスクが高くなります。

io.Pipeとは

multipart.Writerio.Writerhttp.Postio.Readerを受け取るので、繋げることができません。読み込みと書き込みを並行して進められれば、読み込んだ分のデータだけを保持すればいいはずです。Go言語でこれを可能にするのがio.Pipeです。

Pipeは、メモリ内に同期パイプを作成します。これはio.Readerを必要としているコードと、io.Writerを必要としているコード間を接続します。片方の読み込みは、もう一方の書き込みに対応し、データのコピーは二者間でバッファリングされることなくダイレクトに行われます。

公式ドキュメントのコード例を見てみましょう。

pr, pw := io.Pipe()

go func() {
fmt.Fprint(pw, "some text")
w.Close()
}()

buf := new(bytes.Buffer)
buf.ReadFrom(pr)
fmt.Print(buf.String())

コードの流れは以下のようになります。

  1. io.Pipeio.PipeReader (pr) と io.PipeWriter (pw) を返却する。
  2. 別のGoルーチンでpwにデータを書き込む。pr.Readの呼び出しまでpw.Writeがブロックする。
  3. pr.Readによりpwのデータがprにコピーされ、クリアされる。pwが再度書き込まれる、もしくはpw.Closeまでpr.Readがブロックする。
  4. pw.CloseによりpwのGoルーチンが開放されpr.ReadEOFを返却し、処理が終了する。

pr.Readの呼び出しごとにpwの中身がクリアされるので、最小限のデータしか一度に保持されません。しかし、prpwバッファなしチャネルでやり取りするので、pr.Readpw.Writeがブロックする可能性があります。

pr, pw := io.Pipe()
fmt.Fprint(pw, "Please don't lock")
log.Fatal("No deadlock!")

上記のコードを実行すると log.Fatalに届かずdeadlockが発生します。

fatal error: all goroutines are asleep - deadlock!

ファイルのアップロードを効率化する

では、io.Pipeを活かして効率の良いファイルのアップロードを実装してみましょう。

package main

import (
"io"
"log"
"mime/multipart"
"net/http"
"os"
)

func main() {
url := "http://localhost:3000/upload"
fieldName := "file"
fileName := "hello.txt"
file, err := os.Open(fileName)
handleError(err)

pr, pw := io.Pipe()
// io.PipeWriterをmultipart.Writerに渡す
mw := multipart.NewWriter(pw)

// Goルーチンを開始し非同期でpwに書き込む
go func() {
// prの処理が正常に終わるように必ずpwをクローズする
defer pw.Close()
defer mw.Close()
fw, err := mw.CreateFormFile(fieldName, fileName)
if err != nil {
return
}
if _, err := io.Copy(fw, file); err != nil {
return
}
}()

// io.PipeReaderをHTTPリクエストのボディに渡す
resp, err := http.Post(url, mw.FormDataContentType(), pr)
handleError(err)

err = resp.Body.Close()
handleError(err)
}

func handleError(err error) {
if err != nil {
log.Fatal(err)
}
}

今回はmultipart.NewWriterio.PipeWriterを渡し、別のGoルーチンでfwへ非同期的に書き込んでからCloseを呼び出します。同時にhttp.Postio.PipeReader を渡し、記事内の「同期処理にある問題」で紹介した2つのステップを並行して進めます。

io.Pipeによる非同期的なアップロード

io.Pipeを使うとHTTPリクエストのボディがチャンク(Chunk) に分け送信されるので、Content-Lengthヘッダの代わりにTransfer-Encodingヘッダにchunkedという値を設定する必要がありますが、Goの標準ライブラリに自動で設定されます。

上記で最小限のデータしか一度に保持されないと述べましたが、またベンチマークを確認してみましょう。

┌───────┬─────────┐
│ File │ Memory │
├───────┼─────────┤
│ 6b │ 71KB │
│ 5KB │ 71KB │
│ 29KB │ 71KB │
│ 51KB │ 71KB │
│ 500KB │ 71KB │
│ 1MB │ 71KB │
│ 10MB │ 76KB │
└───────┴─────────┘

今回はファイルサイズが小さい場合は若干オーバーヘッドがあるようですが、数十KBを超えるとメモリ使用量が大幅に下がってきます。これでは大きいファイルの扱いでもメモリ不足のリスクがほぼ無くなるはずです。

まとめ

今回はio.Pipeの特徴と利用について記述し、前回のファイルアップロードの実装を最適化する方法を紹介しました。
io.Copyのみの実装よりio.Pipeを用いた実装でメモリ使用量を抑えることができました。ところが、多少のオーバーヘッドやデッドロックの恐れなどもあるので、早すぎる最適化を避け、適切に使っていきましょう。

参考

Cover image by Jared Young / Go gopher created by Renée French

--

--