Logstash を使って MySQL データを Elasticsearch にインデックスする(基本編)

Kunihiko Kido
VELTRA Engineering
Published in
9 min readJul 4, 2017

リレーショナルデータベースで管理しているデータを Elasticsearch で検索・分析したい場合、Logstash が便利です。

Logstash とは?

Logstash はオープンソースのサーバーサイドデータ処理パイプラインです。様々な数のソースからデータを取り込み、変換し、指定された任意のストア先にデータを格納することができます。

処理の内容はシンプルで、Input ステージでソース元の接続先情報を管理し、Filter ステージで変換をし、Output ステージで格納先接続先情報を定義します。Input 及び Output プラグインはデフォルトで様々なソースをサポートしています。そのため、Logstash を使えば、プログラミングレスで MySQL のデータを取り込み、変換し、Elasticsearch へインデックスすることができるのです。

事前準備

MySQL と Elasticsearch の他には、Logstash と Logstash から MySQL へ接続するための JDBC Driver (MySQL Connector/J) が必要ですので、それぞれダウンロードしてインストールしてください。なお、Logstash のバージョンは、5.4.2 を使用してます。

MySQL のデータを取り込む設定

Logstash の設定ファイルは、input{} ブロック、filter{} ブロック、output{} ブロックから構成されます。MySQL のデータを取り込む設定は、input{} ブロックに記述します。ここでは、sample.conf として話を進めます。

# sample.conf
input {
jdbc {
jdbc_driver_library => "mysql-connector-java-5.1.39/mysql-connector-java-5.1.39-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/product"
jdbc_user => "product"
jdbc_password => "1234"
statement => "SELECT * FROM product"
type => "products"
}
}
filter {
...
}
output {
...
}
  • jdbc_driver_library: ダウンロードした MySQL Connector/J のパスを記述します。相対パスで記述する場合は、Logstash を実行するディレクトリからの相対パスです。
  • jdbc_driver_class: これは例のまま記述すれば良いです。
  • jdbc_connection_string: jdbc:mysql://{MYSQL_HOST}:{PORT}/{DATABASE_NAME}
  • jdbc_user: MySQL のユーザー名を記述します。
  • jdbc_password: MySQL ユーザーのパスワードを記述します。
  • statement: MySQL からデータを抽出するためおSELECT文を記述します。
  • type: filter や output ステージで特定のイベントを識別するための識別名を設定しておきます。input ステージは複数の

必要最低限の設定は以上です。スケジュールを設定したり、ページングの設定をしたり色々な設定ができますので詳細は以下のページを参考にしてください。

データの加工処理を定義する

リレーショナルデータベースで定義されているデータは、フラグ系のフィールドなど分かりづらいデータ設計になっている場合が多いので、コードを名称などの理解しやすいデータに変換したり、必要な加工処理は Filter ステージで定義します。

input {...}
filter {
if [type] == "products" {
# ---------------------------------
# {"lang_id": "1"}
# convert to
# {"language": "Japanese"}
# ---------------------------------
translate {
field => "lang_id"
destination => "language"
dictionary => {
"1" => "English"
"2" => "Japanese"
"3" => "Chinese"
}
remove_field => "lang_id"
}
# ---------------------------------
# {"colors": "Red,Blue,Yellow"}
# Convert to
# {"colors": ["Red", "Blue", "Yellow"]}
# ---------------------------------
mutate {
split => {"colors" => ","}
}
}
}
output {...}

if [type] == “products” {} は、input ステージで、定義した MySQL から取得したデータを対象に処理するための条件式です。

注意 translate プラグインを使用する場合はインストールする必要があります。

Elasticsearch のインデックス設定

MySQL からデータを取得し、加工したデータを Elasticsearch にインデックスする設定は、output ステージで定義します。

input {...}
filter {...}
output {
if [type] == "products" {
elasticsearch {
manage_template => false
hosts => ["localhost:9200"]
index => "product"
document_type => "%{type}"
document_id => "%{id}"
}
stdout {codec => rubydebug {metadata => true }}
}
}

設定した内容を実行する

設定した内容を実行するには、以下のコマンドを実行します。設定に問題がなければ、MySQL のデータが加工され Elasticsearch へインデックスされるはずです。

bin/logstash -f conf/sample.conf

設定内容は環境変数から取得することもできる

Logstash の設定ファイル内では、OS の環境変数も参照できるため、環境に依存する設定内容は環境変数を参照するようにした方が良いでしょう。環境変数は参照するだけでなく、デフォルトを設定しておくこともでき流ので便利です。 port => "${TCP_PORT:3306}”

まとめ

いかがでしたでしょうか? Logstash を使えば、リレーショナルデータベースで管理されているデータをほとんどプログラミングする必要なく、加工して Elasticsearch へインデックスすることができます。

また、Logstash は様々なプラグインを標準で用意していますので、今回紹介した例以外にも様々なデータストア間で利用することができます。

そろそろ Logstash 使ってみてはどうだろうか。

--

--