Elasticsearch サーバーレスで Reindex APIを外付け実装する

Kunihiko Kido
Hello! Elasticsearch.
11 min readDec 16, 2015

--

AWS Lambda と Amazon API Gateway を使ってサーバーレスで、Elasticsearch の API を拡張する。Reindex API 編

この記事は Elasticsearch Advent Calendar 2015 16日目のエントリーです。気がついたらもうこんな季節、今年もエントリーさせていただきました。

Elasticsearch を運用していると、インデックスの作成やマッピング定義、データの登録、削除など API を使用してオペレーションすることも多いかと思います。Elasticsearch は様々なAPIを提供しているので、一つのAPIをコールして完結できるオペレーションは困ることはないのですが、例えば、リインデックス (インデックス済みのデータを再度インデックスし直す)や データのダンプなど、たまに複数のAPIを組み合わせないとできないオペレーションもあります。

リインデックスのオペレーション

リインデックスのオペレーションを見てみましょう。

  1. Search API で scroll_id を取得する。
  2. 取得した scroll_id を Scrall API に渡してドキュメント群と次の scroll_id を取得する。
  3. 取得したドキュメント群をBulk API に渡してインデックスする。
  4. 2〜3を繰り返す。

少なくとも3つのAPIを使わないと処理できません。手動でオペレーションするのは無理ですよね?

今回は、リインデックスのオペレーションを例に、Amazon API Gateway と AWS Lambda を使って、サーバーレスで Elasticsearch の API を拡張する方法について紹介したいと思います。

システム構成

[Client: [cURL/etc]] — [Reindex API Endpoint: [Amazon API Gateway]] — [Reindex Processing: [AWS Lambda]] — [Elasticsearch: [EC2/Amazon Elasticsearch Service]]

すごく簡単な図ですみません。。

イメージ的には、Elasticsearchの他のAPI同様に、cURLなどのクライアントからリクエスト可能なリインデックス用のAPIを公開して、クライアントからそのAPIを呼び出すと、あとは先ほどのオペレーションをうまいこと自動でやってくれる感じの構成です。

Amazon API Gateway とは?

Amazon API Gatewayとは、フルマネージド型のサービスで、APIを作成・公開が簡単にできるサービスです。バックエンドには、他のAPIをバックエンドにプロキシ的に使用したり、AWS Lambda で稼働中のコードを使ってバックエンドからのデータ、ビジネスロジック、機能にアクセスするエンドポイントのAPIを作成することができるサービスです。

AWS lambda とは?

AWS Lambdaとは、フルマネージド型のサービスで、コードさえあればサーバーレスで様々なロジックを実行することができるサービスです。コードは、他の AWS サービスから自動的にトリガーするよう設定することも、ウェブやモバイルアプリケーションから直接呼び出すよう設定することもできます。

Reindex Processing: AWS Lambda

え〜と、Elasticsearchはセットアップ済みとしてまずは、AWS Lambda で動かす Reindex Processing の作成から。

Input Event

まずは Input Event の設計です。Input Event は、作成したLambdaハンドラーの event パラメータに渡されてくるオブジェクトです。今回作成するLambdaファンクションは、S3やDynamoDBなど他のAWSサービスからから自動的に呼び出すものではないので、自由に設計できます。

リインデックス用のInput Event は以下のように設計しました。他のインデックスにデータをインデックスするなど(コピーする)の用途も考慮して、参照用とインデックス用のElasticsearchを別々に設定できるようにしました。

{
"source_host": "http://<your_elasticsearch_server:9200>/",
"source_index": "blog",
"target_host": "http://<your_elasticsearch_server:9200>/",
"target_index": "blog",
"scroll": "5m",
"scan_options": {
"size": 100
},
"bulk_options": {
"chunk_size": 100
}
}
  • source_host: インデックス済みのドキュメントを取得する対象のElasticsearchクラスターのホスト
  • source_index: 参照するドキュメントがインデックスされているsource_host上のインデックス名
  • target_host: (optional) 取得したドキュメントをインデックスする対象のElasticsearchクラスターのホスト
  • target_index: (optional) 取得したドキュメントをインデックスするtarget_host上のインデックス名
  • scroll: (optional) 次のドキュメント参照するまでの保持期間
  • scan_options.size: (optional)1回のドキュメント取得で取得するシャードごとの最大ドキュメント数(size * number_of_primary_shards )
  • bulk_options.chunk_size: (optional)1回のBulk APIで送信する最大ドキュメント数

この Input Event が以下のようなPythonで作成したLambdaハンドラーの event に渡されてくる想定です。

# lambda_function.pydef lambda_handler(event, context):
....

Lambda Handler

今回、Lambda の処理を記述する言語は Python を選択しました。Python は外部APIをコールするような処理でも基本的には同期処理のため、直列の処理が記述しやすいですし(Node.js は非同期なのでちょっと慣れが必要?)、Bulk 処理を簡単に実装できるヘルパーメソッド付属の Elasticsearch 公式 Python クライアントモジュールも使用できるので、個人的にはオススメです。

その他の言語では、Java や Node.js で記述することもできます。この辺りはお好きな言語で。。

Lambda で処理を設計する時は、処理実行時間やメモリ、ディスク容量など制限があるのでそれらを考慮して設計してください。

※ 参考:AWS Lambda の制限

リインデックスの処理では、少ないデータ問題ありませんが、データが多くなってくると、処理実行時間が最大300秒というLambdaの制限をクリアする必要がありそうです。

そこで、Lambda の提供しているAPIを使用して、先ほど設計した Input Event の内容に取得した scroll_id を追加して、Lambda関数内部から自分自身を非同期で Invoke (Lambdaの関数を呼び出すことをInvokeと言うらしい。)して、処理をチェーンすることにしました。こうすることで、一回の Scroll API で取得したドキュメント群を処理する単位が300秒以内に完了すれば良いことになります。

以下はそのコードです。

作成したコードは関連する他のPythonモジュールも含めZip形式にアーカイブして、AWSのウェブコンソールまたはAPIを使用してリインデックス用のLambda関数として登録することができます。登録が完了すると、サンプルの Input Event を使用してテストすることもできます。

Lambda 関数の実行ログは、CloudWatch Logs に出力されるため、print または、logger を使用して情報を出力することができるので便利です。

※ AWS Lambda Python の設定手順は以下のサイトが参考になります。

Reindex API Endpoint: Amazon API Gateway

次に、Amazon API Gateway を使って、クライアントからのリクエストを受付て、先ほどのLambda関数を実行する API を作成します。作成と言っても、ぽちぽちと設定するだけですので、詳細は省きます。

※ Amazon API Gateway の設定手順は以下のサイトが参考になります。

Reindex API の設計

POST /{indexname}/_reindex

Elasticsearch 風に API を設計するとこんな感じでしょうか? {indexname} に任意のインデックス名を指定して、POST するとリリンデックス処理が流れる感じです。

curl -XPOST https://<api_gateway_url>/blog/_reindex

Amazon API Gateway 設定例

Amazon API Gateway 設定例

図「Amazon API Gateway 設定例」の右下のテンプレートの内容が、Lambda 関数に渡ってくる感じです。souece_host は固定で、source_index は {indexname} から動的に変わるように設定しています。

# Request mapping template:{
"source_host": "http://<your_elasticsearch_server>/",
"source_index": "$input.params('indexname')"
}

API Gateway は作成したメソッドのテストをすることもできるので、ここでテストして問題なければ公開します。また、必要に応じてAPIへのアクセス認証を API キーや AWS Identity and Access Management (IAM) を使用して、API に対するアクセス認証を実施してください。

まとめ

AWS Lambda 関数はスケジュール実行もできるので、API Gateway と組み合わせる他に、定期的にスナップショットを取るとか、Elasticsearch の運用自動化を実装する最適な仕組みだと感じました。是非チャレンジしてみてください。

※ 今回作成した Lambda 関数のソース一式は Github で公開しましたので詳細は以下を参照してください。

--

--