Logstash 1対多の MySQL データを Elasticsearch にインデックスする(jdbc streaming 編)

JDBC Streaming Filter で1対多の情報を管理する

by Kunihiko Kido

こんにちは、クニです。写真は前回同様無印良品が運営するカンパーニャ嬬恋キャンプ場近くの湖の写真です。

さて、前回は、Aggregate Filter を使って、1対多のデータを Elasticsearch へインデックスする方法について説明しました。今回は、JDBC Streaming Filter を使って、1対多の情報を扱う方法について説明したいと思います。

Aggregate Filter vs JDBC Streaming Filter

Aggregate Filter はデータベースを SELECT して、複数のデータを1つにまとめるアプローチでしたが、JDBC Streaming Filter は、Input ステージで取得したデータを元に、Filter ステージから直接外部のデータベースに接続して1対多の情報を取得する方法です。

1対多の情報を取得する

今回使用するデータは以下の通りです。1つの映画1レコードで管理されている Movies テーブル。ユーザー毎に映画にタグ付けした履歴を管理する Tags テーブル。Movies 1レコードを Elasticsearch の1ドキュメントとしてインデックスを作成する要件とします。

Movies:

| movie_id | title                   |
|:---------|:------------------------|
| 1 | Toy Story (1995) |
| 2 | Jumanji (1995) |
| 3 | Grumpier Old Men (1995) |

Tags:

| user_id | movie_id | tag      | timestamp  |
|:--------|:---------|:---------|:-----------|
| 15 | 1 | anime | 1193435061 |
| 73 | 1 | children | 1170560997 |
| 91 | 1 | Pixar | 1170626366 |

Input 処理はメインのテーブルを SELECT するだけ

Aggregate Filter では、JDBC Input Plugin で、これら2つのテーブルをJOINしてデータを取得していましたが、JDBC Streaming Filter を使用する場合は、メインのデータ Movies のテーブルのみを SELECT します。

input {
jdbc {
jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/moviedb"
jdbc_user => "mysql"
jdbc_password => "secret"
statement => "SELECT * from movies"
}
}

Filter 処理で関連するタグ情報を取得する

filter 処理に、jdbc_streaming 処理を追加し、movie_id に関連するタグ情報を取得します。以下の設定れでは、該当する情報を user_tags フィールドに追加する例です。

filter {
jdbc_streaming {
jdbc_driver_library => "/path/to/mysql-connector-java-5.1.34-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => ""jdbc:mysql://localhost:3306/moviedb"
jdbc_user => "mysql"
jdbc_password => "secret"
statement => "select * from tags WHERE movie_id = :movie_id"
parameters => {"movie_id" => "movie_id"}
target => "user_tags"
}
}

SELECT した Movies データは、この処理を通ることで、movie_id と title のみのデータに、タグ情報が追加されます。先ほどの設定例では、user_tags フィールドにその SELECT 結果が追加されます。

{
"movie_id": 1,
"title": "Toy Story (1995)",
"user_tags": [
{
"user_id": 15,
"movie_id": 1,
"tag": "anime",
"timestamp": 1193435061
},
{
"user_id": 73,
"movie_id": 1,
"tag": "children",
"timestamp": 1170560997
},
{
"user_id": 91,
"movie_id": 1,
"tag": "Pixar",
"timestamp": 1170626366
}
]
}

1対1のデータでも、JDBC Streaming Filter で取得したデータは配列になってしまうので、必要に応じで加工しましょう。

注意点

Aggregate Filter と比べて、処理の制限は少ない印象です。あえて注意するなら、複数の JDBC Streaming Filter 設定できるので、データベース側のコネクション数に注意するくらいでしょうか?あと、参照するデータ数が多すぎる場合の制限の考慮など。

まとめ

いかがでしたでしょうか?Logstash を使って、1対多のリレーショナルデータを管理するのは、Aggregate Filter よりも JDBC Streaming の方が適しているように思います。

また、メインのデータがデータベースでなくても、JDBC Streaming Filter を使って、データベースで管理しているマスターデータを参照することも可能です。例えば、ログにはコードやIDのみ出力されているデータをデータベースで管理されている名称に置き換えるなどでも利用できます。