Logstash 1対多の MySQL データを Elasticsearch にインデックスする(Aggregate Filter 編)

Aggregate filter で1対多の情報を1つにまとめる

by Kunihiko Kido

複数レコードを1つにまとめる

当たり前ですが、メインのテーブルに対して、1対多のテーブルを Join して SELECT すると、その結果はメインテーブルの一意なデータごとに複数のレコードが一覧されます。

| movie_id | name             | genre     |
|:---------|:-----------------|-----------|
| 1        | Toy Story (1995) | Adventure |
| 1        | Toy Story (1995) | Animation |
| 1        | Toy Story (1995) | Children  |
| movie_id | name             | genres                           |
|:---------|:-----------------|----------------------------------|
| 1        | Toy Story (1995) | [Adventure, Animation, Children] |

Aggregate Filter

このような処理を実現するには、Aggregate Filter を使用します。以下の設定例は、先ほどの処理を実現するための設定例です。

filter {
  aggregate {
    task_id => "%{movie_id}"
    code => "
      map['movie_id'] = event.get('movie_id')
      map['title'] = event.get('title')
      map['genres'] ||= []
      map['genres'] << event.get('genre')
      event.cancel()
    "
    push_previous_map_as_event => true
    timeout => 3
  }
}

ページネーションが必要な場合は要注意

SELECT 結果をページングして取得する場合は、別の方法を検討してください(次回紹介)。ページングしてしま うと、同じ task_id がページをまたぐ可能性があるためです。ページをまたぐと処理の単位も別になってしまうので、後から処理したページの内容で上書きされてしまい正確な情報になりません。

SELECT する時のソートも要注意

これもページネーション時の注意点と同じです。新しい task_id を検出すると、以前の集計処理が終了し、次の task_id 処理用に新しいマップが作成され処理されます。そのため、同じ task_id を正確に処理するには、連続して並んでいる必要があります。SELECT 時のソート条件が task_id で指定したフィールドの値で連続するように指定しましょう。

1対多の情報が多いとメモリを消費する

これは、感覚値でしかないのですが、仕組み上同じ task_id が処理されるまでデータがメモリにマップされる仕組みのようです。そのため、1対多の情報が多いほどメモリを消費する印象です。

まとめ

今回は、Logstash の Aggregate Filter を使った1対多のデータを処理する方法について紹介しました。実際に使ってみて、Input がリレーショナルデータベースで管理しているデータであれば、その制限や特徴からデータ量の多くないマスタデータの取り込みであれば使えそうと言う印象です。データ量の多いトランザクション系のマスタ参照用途であれば、別の方法を検討した方が良いかもしれません。次回はその別の方法について紹介したいと思います。

VELTRA Engineering

Posts from the VELTRA Engineering team. www.veltra.com

Kunihiko Kido

Written by

木戸 国彦

VELTRA Engineering

Posts from the VELTRA Engineering team. www.veltra.com