Goで作るDBレプリケーションツール

timakin
9 min readDec 24, 2016

--

この記事は、Goアドベントカレンダー(その2)の25日目の担当記事です。

さて、先日DBの中身をコピーしてくるツールをGoで書きまして、その話をgolang.tokyo#2の方でもお話させていただきました。

ツールの名前はgopli (go replication)で、意図どおりの名前ですね。

これは何

tomlで書いた、ssh/db接続用の設定ファイルを元に、特定の環境間でデータを同期させるツールです。今の所MySQLのみ対応で、postgresqlに対応しろと言われたものの、なかなか手が回っていない状況です。

使い方は簡単で、

まず以下のように、

sshでのサーバーへの接続と、MySQLへの接続設定をtomlで書きます。

[database]
[database.local]
host = "localhost"
management_system = "mysql"
name = "app_development"
user = "root"
password = ""

[database.staging]
host = "xxx.xxx.xxx.xxx"
management_system = "mysql"
name = "app_staging"
user = "root"
password = ""

[database.production]
host = "yyy.yyy.yyy.yyy"
management_system = "mysql"
name = "app_production"
user = "root"
password = ""

[ssh]
[ssh.local]
host = "localhost" # or "127.0.0.1"

[ssh.staging]
host = "xxx.xxx.xxx.xxx"
port = "22"
user = "timakin"
key = "~/.ssh/id_rsa_staging"

[ssh.production]
host = "yyy.yyy.yyy.yyy"
port = "22"
user = "remoteuser"
key = "~/.ssh/id_rsa_prod"

その後、

gopli sync -f production -t local -c path/to/setting.toml

とか書くとproductionからlocalにデータを同期してくるという感じです。

我々人類にはseedデータを作る時間的余裕はないので、本番データをそのままローカルに持ってきたいですが、 ほどよいツールはありませんでした。

具体的にはmydumperとかmysqlpumpとかいう並列実行できるツールがあるんですが、stagingにレプリケーション用の設定がまだされてない状態でひとまずデータをコピーしてきたい、とか、毎回オプションでipアドレスやportを指定するという、人類に許されてない作業をやらなくて済むようにした、というニーズが僕にはあったので、環境をまたいで設定ファイルを読みさえすればコピーできるツールが欲しくなりました。

おまけに、Goを使えばgoroutineを利用してチャネル数に応じた並行処理が組めるので、1クエリシングルスレッドで処理するMySQLのクライアントに対して、高速に処理する手段として有用だと思われます。

実装

toml

tomlの読み込みは

を使えば本当に簡単に読み込めます。これはcubicdaiyaさんの以下の資料を読めばわかると思うので省略します。

ただ、2点ほど気をつけたのは、

  • configファイル等のロード結果をグローバル変数で持つ前提でいるとテストができないので、ファイルパスはstringで持つこと
  • キーパスを~/.sshとか書いた場合、ファイルパスとしてその文字列を使うのはいけてないので、ちゃんとユーザーのホームディレクトリパスに置き換えてあげること

とかでしょうか。

usr, _ := user.Current() 
keypath = strings.Replace(keypath, “~”, usr.HomeDir, 1)

みたいな感じです。設定ファイルは人によって好きに書いてしまったりするので、上記のような補完は必要な気がします。

データコピー

手順は、ソースにも書いたのですが、メインのコマンドの実装が以下のような関数呼び出しになっておりまして、

SetupMultiCore() 
loadTomlConf(c)
connectToSrcHost()
defer srcHostConn.Close()
fetchTableList(srcHostConn)
defer DeleteTmpDir(loadDirName)
fetchTables(srcHostConn)
connectToDstHost()
if dstHostConn != nil {
defer dstHostConn.Close()
}
deleteTables(dstHostConn)
loadInfile(dstHostConn)
  • srcとなるデータ元をfetchしてくる(SELECT結果のタブ区切りtxt)
  • dstとなるデータ挿入先のデータをDELETEする
  • 挿入先にテーブル単位でLOAD LOCAL DATA INFILEする

という手順になっていて、dump等はしていません。

これは後々データを途中で書き換えたい時とかに、いちいちmysqlにつなぎに行くよりも、ファイルを編集した方が高速というのと、テーブル単位で並行実行したかったので、goroutineとの相性を考えてこうしました。

ちなみにですが、上記のようにメインの関数に処理を並べて書いていくのは、サーバーのAPIとかだと難しいとは思いますが、CLIの場合は処理が追いやすいので便利だなと感じています。

MySQL接続

go-mysql-driver/mysqlというのがmysql接続での有名どころなんだと思いますが、こちらは使いませんでした。というのも、goroutineの関係で、定数で決められたパケットサイズを超えると即落ちるので、データ量が一気に増える場合は使えませんでした。

また、os.Execで、mysqlのコマンドでリモートに接続するという手もあるのですが、これだとMySQLが起動しているホストへの接続でコネクションが使いまわせません。

ということで、sshでリモートホストにつないだ後、goroutine経由で同時に複数セッションを作成して、session.Runで、セッション内部でコマンドを実行します。コネクション自体は1つを使い回せるので、これでまかなうことにしました。

以下が、MaxFetchSession(ここでは3)の数だけセッションを作り、すべてのテーブルをfetchしてくるコードです。

ありがちだと思いますが、

  • sync.WaitGroupで、すべてのテーブルのfetchが完了するまで待つ
  • 許されたsession数だけチャネルを作成し、並行に処理する

ということをしています。

sem := make(chan int, MaxFetchSession)
var wg sync.WaitGroup
for _, table := range tables {
wg.Add(1)
go func(table string) {
sem <- 1
defer wg.Done()
defer func() { <-sem }()
session, err := conn.NewSession()
if err != nil {
panic("Failed to create session: " + err.Error())
}
defer session.Close()
var fetchTableStdoutBuf bytes.Buffer
session.Stdout = &fetchTableStdoutBuf
fetchRowsCmd := fmt.Sprintf(SelectTablesCmd, srcDBConf.User, srcDBConf.Password, srcDBConf.Name, table)
log.Print("\t\t[Fetch] fetcing " + table)
err = session.Run(fetchRowsCmd)
if err != nil {
pp.Fatal(err)
}
fetchTableRowsResultFile := loadDirName + "/" + srcDBConf.Name + "_" + table + ".txt"
ioutil.WriteFile(fetchTableRowsResultFile, fetchTableStdoutBuf.Bytes(), os.ModePerm)
log.Print("\t\t[Fetch] completed fetcing " + table)
}(table)
}
wg.Wait()

挿入先のDBのテーブルのDELETE、LOAD LOCAL DATA INFILEもこれと同様の手段を取っています。

セッションでの制御について

CLIとかだと複数のAPIにつなぎに行って、その結果を待つとかよくやりますが、その場合goroutineを利用すると、容易に限界のパケットサイズを超えることがあると思います。

その場合は、

  • golang.org/x/time/rateによる実行時間単位での制御
  • juju/ratelimitによるリクエストパケットサイズの制御
  • 同時セッション数をチャネル数によって制御

のいずれかの手段をとることでカバーできると思います。

ただ、レスポンスのサイズを制御する方法はまだ自分も調べ切れてないので、そこは考慮の余地があると思います。

まとめ

golangによるDBレプリケーションツールを作成しました。

mysql用のクライアントライブラリは特に使わず、セッションをgoroutine経由で複数作成して効率よくデータを取ってきています。

今後はシークレットデータのマスクや、MySQL以外への対応をやっていきます。

--

--