Apache Airflowでエンドユーザーのための機械学習パイプラインを構築する Part4

piqcy
programming-soda
Published in
8 min readApr 18, 2019

Part4はパイプラインの「運用」にフォーカスします。Part3は構築からデプロイまでを扱ったため、その先の話となります。「運用」という言葉がさす範囲は広いですが、本記事ではAirflowにおける「スケジューリング」と「障害復旧」にフォーカスします。スケジューリングは日常の、障害復旧は文字通り障害時の対応を行うにあたり重要なポイントとなります。本記事では、以下2つにわけて解説を行います。

  • Airflowにおけるスケジューリング
  • Ariflowにおける障害復旧

Airflowにおけるスケジューリング

Airflowのスケジューリングで重要な項目は、以下3つとなります。

  • start_date : ワークフローの開始日付
  • end_date : ワークフローの終了日付
  • schedule_interval : ワークフローのスケジュール間隔

3つの項目は、ワークフロー(DAG)を定義する際に指定します。基本的には start_date から end_date までの間、 schedule_interval の間隔で実行されるという挙動になります。 ただ、この動きにはちょっとクセがあります。以下が、Airflowの挙動を示した図になります。

Airflowにおける、スケジュール開始のタイミング

start_date を4/15に設定した場合、普通は4/15になったらスケジュールされると考えると思います。実際は4/17に4/16のジョブ(DagRun)がスケジュールされます。これは以下2点の仕様のためです。

  • start_date + schedule_interval が経過したらスケジュールされる
  • スケジュールされるタイミングはschedule_intervalの経過後

start_date が4/15でschedule_interval が@daily(=1日)の場合「4/16」日付の実行が最初にスケジュールされるジョブになります。「4/16」日付のジョブがスケジュールされるタイミングはschedule_interval経過後のため、4/17になります。その後「4/17」日付のジョブは4/18にスケジュールされ、と以後schedule_interval 分ずれて進んでいきます。

スケジュールに際しては、start_date からschedule_interval 間隔でジョブが実行されているかがチェックされます。実行されていない日付があると、その日付からスケジュールが行われます。以下は、正常に毎日実行した場合と2日間の停止があった場合を比較した図です。Airflowでは、空いている日付の実行がまずスケジュールされます(Backfillを行う場合は、(指定した期間の)空いている日付全てを埋めるようスケジュールされます)。

Airflowのスケジューリングで、正常に実行された場合と2日間の停止があった場合の比較

空いているときは意図的に止めていたので、その分はスケジュールしたくないケースもあります。DAGを作成する際 catchup=False を指定すると、空いている日付の実行を無視することができます。この「無視」は、内部的には start_date を「現在- schedule_interval 」にすることで実現されています。そのため、catchup=False を設定していてstart_dateが「現在- schedule_interval 」より過去の日付である場合、start_date は「現在- schedule_interval 」に修正されます。つまり、catchup=False を指定した場合schedule_interval より前の日付の実行はできなできなくなります。この点は注意してください。

end_date を設定した場合、end_dateからschedule_interval経過後に「 execution_date=end_date」のジョブがスケジュールされ、これが最終となります。以下は、この動き図にしたものです。4/17にスケジュールされる「4/16」日付のジョブが最終で、4/18になるとスケジュールは行われません。

Airflowにおける、スケジュール終了のタイミング

本番環境にDAGを持って行く際は、事前に意図した日にスケジュールされるかテストしたいものです。スケジュールをテストする方法についてはAirflowのGitHub内にあるtest_jobs.pyが参考になります。test_jobs.pyを参考に実装したテストコードは以下になります。こちらも参考にしていただければ幸いです。

Ariflowにおける障害復旧

ジョブは当然落ちる場合があります。その場合、どのようにリカバリができるのかを検証してみます。

Airflowではエラー時のリトライ数、リトライ間隔を設定することができます。以下では、失敗した場合のリトライを3( retries: 3)、リトライ間隔を5秒に設定しています( retry_delay: timedelta(seconds=5))。

default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2019, 4, 1),
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(seconds=5)
}

意図的に失敗するOperatorを作成し、定義通りに実行されるかを確認してみます。

意図的にfailするワークフローの定義(fail_taskで例外発生)

実行してログを見て見ると、 fail_task については計4回実行されています。初回実行+リトライ3回と思われるので、定義通りの動作です(間隔は定義したよりちょっと長かったですが・・・)。

失敗したジョブは、「失敗した」とマークされます。これは「未実行」とは異なるため、 backfillなどで再度スケジュールされることはありません。再実行を行うには、ステータスをクリアする必要があります。クリアの操作は、GUI上か airflow clearのコマンドで行うことができます。以下は、GUI場で行う場合です。Clearのボタンをクリックするだけです。

Taskの実行画面

Clearしてから実行した場合、その実行は「当時の実行(execution_date)」として扱われます。以下は意図的に失敗するタスク(fail_task)を修正して再実行した際のログです。最初の実行ログ(初回実行+リトライ3回の計4ログ)に追加される形(5番目のログ)でログが記録されています。

まとめると、Airflowで障害が起こった場合の対応手順は以下の通りとなります。

  1. 指定パラメーターに沿い、リトライが行われる。リトライでもエラーとなる場合、ジョブはfailとしてマークされる。マークされたジョブは、以後再スケジュールされることはない。
  2. 再スケジュールを行いたい場合は、ステータスのクリアを行う。ステータスがクリアされるとスケジュール対象となり、実行当時のパラメーターで再実行される。

リトライについてはかなりわかりやすい印象です。スケジュールのスタートさえミスらずに軌道に乗ってしまえば、あとはつつがなくいける?かもしれません。

次回Part5は、Part3で構築したワークフロー(EDINETからのデータ取得)を本稼働させてみようと思います。そこから得られた知見をまとめる予定です。なお、Cloud Composerはちょっと高すぎるので(月額3万くらい持ってかれる)普通のインスタンスを使う予定です。

参考資料

--

--

piqcy
programming-soda

All change is not growth, as all movement is not forward. Ellen Glasgow