Apache Airflowでエンドユーザーのための機械学習パイプラインを構築する Part3
Part2では、Airflowの基本的な使い方を学びました。Part3では、実際にパイプラインの実装に入っていきたいと思います。とはいえ一度に全部は作れないため、パートに分けて作成をしていきます。今回は、データ収集を担当するパートの一部を実装しようと思います。
具体的には、以下の処理のうち1~2を実装してみます。
- EDINETから、決算関連文書のリストを取得する
- リストに掲載された文書をダウンロードして、ストレージに格納する
- ダウンロードされた文書からテキストを抽出し、DBに格納する
- 抽出したテキストから特徴量を計算し、ストレージに格納する
EDINETは、企業の決算文書などを公開している金融庁のシステムです。最近APIで文書にアクセスできるようになったため、こちらを題材に使ってみたいと思います。
基本的には、ファイルダウンロード・テキスト抽出・特徴量抽出、という3点セットです。Part3では、ベースとなるファイルダウンロードまでを実装してみます。
実装コードは以下となります。
ここからは、実装の詳細について解説を行っていきます。
- APIを通じたデータの取得
- ワークフローの実装
- ワークフローのテスト
- ワークフローのデプロイ
APIを通じたデータの取得
EDINETのAPI仕様については、仕様書が公開されています。
基本的なAPIは、一覧取得(書類一覧API)と文書取得(書類取得API)の2種類です。まず一覧を取得し、そこから必要な文書をダウンロードする、という形ですね。本記事はEDINETの解説記事ではないので、詳細については割愛しようと思います。APIの仕様、またAPIにアクセスするための実装については本連載以外のところでまとめようかなと思います。
ワークフローの実装
今回実装するワークフローは、以下のような構成となっています。
edinet_get_document_list
: 実行日付における、公開文書の一覧を取得edinet_get_document
: 文書一覧を基に、各文書をダウンロードしてストレージに保存する
一覧を取得して、ダウンロード。ワークフローはとてもシンプルです。実際に実装を行う際は、以下のような構成要素を把握しておくとよいです。
- Definition
ワークフロー上で行わる処理は、OperatorかSensorで実装する。 - DAG Definition
ワークフロー(DAG)は、OperatorかSensorをインスタンス化したTaskをつなげることで定義する。 - DAG Run
ワークフローが実行されると、実行結果(DAG Run)が生成される。DAG Runは各タスクの実行を担うTask Instanceから構成される。Task Instanceは、実行ステータスを持つ。
Task Instance間のデータの受け渡しを行う仕組みとして、XComsが提供されている。また、共通の設定値などはVariableで管理できる。
Operatorは前回登場しましたが、Sensorは今回が初登場です。実行が長時間であり、定期的な監視を行いたい場合に使用できます。今回は、文書の取得処理の方をSensorとして作成しています。
OperatorはBaseOperator、SensorはBaseSensorOperatorを継承することで作成できます。シンプルな処理は前回使用したPythonOperatorでも実装可能ですが、複雑になってきたらこちらを使用したほうが良いと思います。BaseOperatorの場合はexecute
、BaseSensorOperatorの場合はpoke
をオーバーライドします。実装については、こちらを参照していただければと思います。
XComsを利用することで、タスク間でデータの受け渡しを行うことができます。今回は、取得した文書の一覧を渡すのに使用しています。一覧の文書を全部ダウンロードするのは時間がかかるため、ダウンロード処理はSensorを使用しpoke_intervalごと文書をダウンロードするように実装しています。
他にもいくつか構成要素がありますが、そちらは公式のConceptに詳しいです。
ワークフローのテスト
ワークフローのテストは、前回使用した airflow test
のコマンドで実行します。テストの体系については、以下の記事が詳しいです。
端的には、以下のようなテストの種類があります。
- Component Check
DAGの構成要素のチェック(意図したタスク数が含まれているかなど) - Flow Check
DAGのフローのチェック(循環などがないか) - Task Unit Test
個々のTaskが正常に稼働するかテストする - Task Relation Test
特別な依存関係があるTask間の接続をテストする(XComを利用した値の受け渡しがあるなど) - Integration Test
DAG全体のテスト
今回は、テストについては airflow test
を使用したTask Unit Testしか行っていません。そのため、ワークフローのテストについては後日しっかりと検証しようと思います。
ワークフローのデプロイ
Airflowでは、ワークフローはスケジュール実行されるものとなっています。自分のパソコンをスケジュール実行のためにずっとオンしておくのは酷であるため、ここはクラウド環境にデプロイしたいところです。
GCPでは、Airflowのホスティング環境としてCloud Composerを提供しています。そこで、今回はCloud Composerを使用してホスティングをしようと思います。Cloud Composerにホスティングする手順については、公式のチュートリアルが参考になります。
Cloud Composerは以下のような構成となっています。ご覧いただいてわかる通りかなり大規模で、作成するのに結構時間がかかります(ちなみに料金も結構かかり、本検証で2000円くらい持ってかれました。手軽に試すには向いてないかもしれません・・・)。
Cloud Composerの作成が終了すると、GCSにAIRFLOW_HOMEが作成されています。その中に dags
というフォルダがあるので、ローカルのDAG定義を丸ごとこちらにアップロードすればGCP上でもワークフローが見えるようになるはずです(本来はGitHubリポジトリなどと連携できた方が良いのですが、それはまだ検証中です。zipファイルでもいいはずなんですが、こちらはうまくいきませんでした)。
何のPythonパッケージも使わずに実装をしている、というケースはレアだと思います。その場合は、依存ライブラリのインストールがサポートされています。
お手軽な方法としては、Cloud Composerの管理画面からパッケージを追加する方法です。ある処理でしか使わない、という場合はAirflow側でPythonVirtualenvOperatorという指定環境で実行するOperatorを使用することもできます。
準備が完了できたら、実際に実行してみましょう。上手く稼働できていれば、クラウド上でも実行されるはずです。
GCSにも、文書が保存されることを確認できました(テストのため、上限を3文書に制限しています)。
ただ、肝心のスケジュール実行についてはまだ実装できていません。毎日実行する場合は万全を期したいという小心のせいでもありますが、この点については今後の連載で検証していこうと思います。
以上で、ワークフローの構築・テスト・デプロイという基本的な作業を確認できました。以後のパートでは、前処理などの実装に入る前に、テストをしっかり行う、デプロイを自動化するなど、下支えとなるプロセスの完成度を高めていこうと思います。