Go言語のio.Pipeでファイルを効率よくアップロードする方法
はじめに
前回はGo言語のmime/multipart
パッケージによるファイルのアップロードを見ましたが、パフォーマンスの特徴にはあまり触れませんでした。
大規模なETLジョブや、制限の厳しいサーバーレスの環境などでは、ファイルを扱うプログラムのリソースを慎重に考える必要があります。本記事ではメモリ使用量を大幅に減らすio.Pipe
の使い方を見ていきます。
全てのコードはサンプルレポジトリにあります。
同期処理にある問題
前回のコードをもう一度見てパフォーマンスを考えてみましょう。
// ファイルを開く
file, _ := os.Open(filename)
// リクエストボディのデータを受け取るio.Writerを生成する。
body := &bytes.Buffer{}
// データのmultipartエンコーディングを管理するmultipart.Writerを生成する。
mw := multipart.NewWriter(body)
fw, _ := mw.CreateFormFile(fieldname, filename)
// fwで作ったパートにファイルのデータを書き込む
_, _ = io.Copy(fw, file)
// contentTypeとbodyを使ってリクエストを送信する
resp, _ := http.Post(url, contentType, body)
データの流れを考えるとステップが大きく2つあります。
- 12行目でファイルの中身をエンコードしながら
io.Copy
でバッファ(bytes.Buffer
)に書き込む。 http.Post
の中でバッファの中身をio.Copy
でHTTPコネクション(net.Conn
)に書き込む。
このステップが同期的なので、最初のio.Copy
が終わるまで次のio.Copy
が開始できません。つまり、バッファがファイルの中身全体を一度保持する必要があります。ファイルサイズが大きくなるほど、プログラムのメモリ使用量が増えるように見えますが、benchmem
のベンチマークを確認してみましょう。
┌───────┬────────┐
│ File │ Memory │
├───────┼────────┤
│ 6b │ 39KB │
│ 5KB │ 44KB │
│ 29KB │ 71KB │
│ 51KB │ 169KB │
│ 500KB │ 1.94MB │
│ 1MB │ 1.96MB │
│ 10MB │ 33.5MB │
└───────┴────────┘
メモリ使用量がファイルサイズに比例していることが分かりますが、ファイルサイズ自体を大きく上回ることもあります。この実装では、ファイルサイズによってはメモリ不足のリスクが高くなります。
io.Pipeとは
multipart.Writer
はio.Writer
、http.Post
は io.Reader
を受け取るので、繋げることができません。読み込みと書き込みを並行して進められれば、読み込んだ分のデータだけを保持すればいいはずです。Go言語でこれを可能にするのがio.Pipe
です。
Pipe
は、メモリ内に同期パイプを作成します。これはio.Reader
を必要としているコードと、io.Writer
を必要としているコード間を接続します。片方の読み込みは、もう一方の書き込みに対応し、データのコピーは二者間でバッファリングされることなくダイレクトに行われます。
公式ドキュメントのコード例を見てみましょう。
pr, pw := io.Pipe()
go func() {
fmt.Fprint(pw, "some text")
w.Close()
}()
buf := new(bytes.Buffer)
buf.ReadFrom(pr)
fmt.Print(buf.String())
コードの流れは以下のようになります。
io.Pipe
がio.PipeReader
(pr
) とio.PipeWriter
(pw
) を返却する。- 別のGoルーチンで
pw
にデータを書き込む。pr.Read
の呼び出しまでpw.Write
がブロックする。 pr.Read
によりpw
のデータがpr
にコピーされ、クリアされる。pw
が再度書き込まれる、もしくはpw.Close
までpr.Read
がブロックする。pw.Close
によりpw
のGoルーチンが開放されpr.Read
がEOF
を返却し、処理が終了する。
pr.Read
の呼び出しごとにpw
の中身がクリアされるので、最小限のデータしか一度に保持されません。しかし、pr
とpw
がバッファなしチャネルでやり取りするので、pr.Read
とpw.Write
がブロックする可能性があります。
pr, pw := io.Pipe()
fmt.Fprint(pw, "Please don't lock")
log.Fatal("No deadlock!")
上記のコードを実行すると log.Fatal
に届かずdeadlockが発生します。
fatal error: all goroutines are asleep - deadlock!
ファイルのアップロードを効率化する
では、io.Pipe
を活かして効率の良いファイルのアップロードを実装してみましょう。
package main
import (
"io"
"log"
"mime/multipart"
"net/http"
"os"
)
func main() {
url := "http://localhost:3000/upload"
fieldName := "file"
fileName := "hello.txt"
file, err := os.Open(fileName)
handleError(err)
pr, pw := io.Pipe()
// io.PipeWriterをmultipart.Writerに渡す
mw := multipart.NewWriter(pw)
// Goルーチンを開始し非同期でpwに書き込む
go func() {
// prの処理が正常に終わるように必ずpwをクローズする
defer pw.Close()
defer mw.Close()
fw, err := mw.CreateFormFile(fieldName, fileName)
if err != nil {
return
}
if _, err := io.Copy(fw, file); err != nil {
return
}
}()
// io.PipeReaderをHTTPリクエストのボディに渡す
resp, err := http.Post(url, mw.FormDataContentType(), pr)
handleError(err)
err = resp.Body.Close()
handleError(err)
}
func handleError(err error) {
if err != nil {
log.Fatal(err)
}
}
今回はmultipart.NewWriter
にio.PipeWriter
を渡し、別のGoルーチンでfw
へ非同期的に書き込んでからClose
を呼び出します。同時にhttp.Post
にio.PipeReader
を渡し、記事内の「同期処理にある問題」で紹介した2つのステップを並行して進めます。
io.Pipe
を使うとHTTPリクエストのボディがチャンク(Chunk) に分け送信されるので、Content-Length
ヘッダの代わりにTransfer-Encoding
ヘッダにchunked
という値を設定する必要がありますが、Goの標準ライブラリに自動で設定されます。
上記で最小限のデータしか一度に保持されないと述べましたが、またベンチマークを確認してみましょう。
┌───────┬─────────┐
│ File │ Memory │
├───────┼─────────┤
│ 6b │ 71KB │
│ 5KB │ 71KB │
│ 29KB │ 71KB │
│ 51KB │ 71KB │
│ 500KB │ 71KB │
│ 1MB │ 71KB │
│ 10MB │ 76KB │
└───────┴─────────┘
今回はファイルサイズが小さい場合は若干オーバーヘッドがあるようですが、数十KBを超えるとメモリ使用量が大幅に下がってきます。これでは大きいファイルの扱いでもメモリ不足のリスクがほぼ無くなるはずです。
まとめ
今回はio.Pipe
の特徴と利用について記述し、前回のファイルアップロードの実装を最適化する方法を紹介しました。io.Copy
のみの実装よりio.Pipe
を用いた実装でメモリ使用量を抑えることができました。ところが、多少のオーバーヘッドやデッドロックの恐れなどもあるので、早すぎる最適化を避け、適切に使っていきましょう。
参考
- Package io#Pipe
- io.Pipe関数の2つのdeadlockポイント
- Streaming data in Go, without bytes.Buffer
- Examples For Using io.Pipe in Go
- MDN web docs — Transfer-Encoding
Cover image by Jared Young / Go gopher created by Renée French