Logstash 1対多の MySQL データを Elasticsearch にインデックスする(Aggregate Filter 編)

Aggregate filter で1対多の情報を1つにまとめる

Kunihiko Kido
VELTRA Engineering
5 min readSep 12, 2017

--

by Kunihiko Kido

こんにちは、クニです。写真は、無印良品が運営するカンパーニャ嬬恋キャンプ場にて、焚き火をした時の写真です。各サイトには写真の焚き火台が用意されていて思う存分焚き火を楽しむことができます。焚き火の炎は癒されますね。

さて、今回は Logstash を使って、リレーショナルデータベースで管理されている1対多のデータを Elasticsearch にインデックスする方法について紹介したいと思います。

Logstash の基本的な手順については、以下を参照してください。

当記事では、1対多のデータ処理方法にフォーカスして説明します。

複数レコードを1つにまとめる

当たり前ですが、メインのテーブルに対して、1対多のテーブルを Join して SELECT すると、その結果はメインテーブルの一意なデータごとに複数のレコードが一覧されます。

とくに検索用途の場合、これらの複数レコードの情報を1つの情報として取り扱いたいと言うシナリオは一般的です。

例えば、以下のような SELECT 結果を

1つのデータとしてまとめます。

まとめた結果を Elasticsearch へインデックスし、例では movie_id で重複のない検索結果を提供します。

Aggregate Filter

このような処理を実現するには、Aggregate Filter を使用します。以下の設定例は、先ほどの処理を実現するための設定例です。

task_id に movie_id を設定し、同じ movie_id に対して処理するように設定しています。

各種フィールドの値を取得し、map を更新します。同じ movie_id でも異なる genre の値は、genres 配列フィールドを定義して、追加します。

ページネーションが必要な場合は要注意

SELECT 結果をページングして取得する場合は、別の方法を検討してください(次回紹介)。ページングしてしま うと、同じ task_id がページをまたぐ可能性があるためです。ページをまたぐと処理の単位も別になってしまうので、後から処理したページの内容で上書きされてしまい正確な情報になりません。

SELECT する時のソートも要注意

これもページネーション時の注意点と同じです。新しい task_id を検出すると、以前の集計処理が終了し、次の task_id 処理用に新しいマップが作成され処理されます。そのため、同じ task_id を正確に処理するには、連続して並んでいる必要があります。SELECT 時のソート条件が task_id で指定したフィールドの値で連続するように指定しましょう。

1対多の情報が多いとメモリを消費する

これは、感覚値でしかないのですが、仕組み上同じ task_id が処理されるまでデータがメモリにマップされる仕組みのようです。そのため、1対多の情報が多いほどメモリを消費する印象です。

まとめ

今回は、Logstash の Aggregate Filter を使った1対多のデータを処理する方法について紹介しました。実際に使ってみて、Input がリレーショナルデータベースで管理しているデータであれば、その制限や特徴からデータ量の多くないマスタデータの取り込みであれば使えそうと言う印象です。データ量の多いトランザクション系のマスタ参照用途であれば、別の方法を検討した方が良いかもしれません。次回はその別の方法について紹介したいと思います。

--

--