Apache Airflowでエンドユーザーのための機械学習パイプラインを構築する Part4
Part4はパイプラインの「運用」にフォーカスします。Part3は構築からデプロイまでを扱ったため、その先の話となります。「運用」という言葉がさす範囲は広いですが、本記事ではAirflowにおける「スケジューリング」と「障害復旧」にフォーカスします。スケジューリングは日常の、障害復旧は文字通り障害時の対応を行うにあたり重要なポイントとなります。本記事では、以下2つにわけて解説を行います。
- Airflowにおけるスケジューリング
- Ariflowにおける障害復旧
Airflowにおけるスケジューリング
Airflowのスケジューリングで重要な項目は、以下3つとなります。
start_date
: ワークフローの開始日付end_date
: ワークフローの終了日付schedule_interval
: ワークフローのスケジュール間隔
3つの項目は、ワークフロー(DAG)を定義する際に指定します。基本的には start_date
から end_date
までの間、 schedule_interval
の間隔で実行されるという挙動になります。 ただ、この動きにはちょっとクセがあります。以下が、Airflowの挙動を示した図になります。
start_date
を4/15に設定した場合、普通は4/15になったらスケジュールされると考えると思います。実際は4/17に4/16のジョブ(DagRun)がスケジュールされます。これは以下2点の仕様のためです。
start_date
+schedule_interval
が経過したらスケジュールされる- スケジュールされるタイミングは
schedule_interval
の経過後
start_date
が4/15でschedule_interval
が@daily(=1日)の場合「4/16」日付の実行が最初にスケジュールされるジョブになります。「4/16」日付のジョブがスケジュールされるタイミングはschedule_interval
経過後のため、4/17になります。その後「4/17」日付のジョブは4/18にスケジュールされ、と以後schedule_interval
分ずれて進んでいきます。
スケジュールに際しては、start_date
からschedule_interval
間隔でジョブが実行されているかがチェックされます。実行されていない日付があると、その日付からスケジュールが行われます。以下は、正常に毎日実行した場合と2日間の停止があった場合を比較した図です。Airflowでは、空いている日付の実行がまずスケジュールされます(Backfillを行う場合は、(指定した期間の)空いている日付全てを埋めるようスケジュールされます)。
空いているときは意図的に止めていたので、その分はスケジュールしたくないケースもあります。DAGを作成する際 catchup=False
を指定すると、空いている日付の実行を無視することができます。この「無視」は、内部的には start_date
を「現在- schedule_interval
」にすることで実現されています。そのため、catchup=False
を設定していてstart_date
が「現在- schedule_interval
」より過去の日付である場合、start_date
は「現在- schedule_interval
」に修正されます。つまり、catchup=False
を指定した場合schedule_interval
より前の日付の実行はできなできなくなります。この点は注意してください。
end_date
を設定した場合、end_date
からschedule_interval
経過後に「 execution_date=end_date
」のジョブがスケジュールされ、これが最終となります。以下は、この動き図にしたものです。4/17にスケジュールされる「4/16」日付のジョブが最終で、4/18になるとスケジュールは行われません。
本番環境にDAGを持って行く際は、事前に意図した日にスケジュールされるかテストしたいものです。スケジュールをテストする方法についてはAirflowのGitHub内にあるtest_jobs.pyが参考になります。test_jobs.pyを参考に実装したテストコードは以下になります。こちらも参考にしていただければ幸いです。
Ariflowにおける障害復旧
ジョブは当然落ちる場合があります。その場合、どのようにリカバリができるのかを検証してみます。
Airflowではエラー時のリトライ数、リトライ間隔を設定することができます。以下では、失敗した場合のリトライを3( retries: 3
)、リトライ間隔を5秒に設定しています( retry_delay: timedelta(seconds=5)
)。
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2019, 4, 1),
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(seconds=5)
}
意図的に失敗するOperatorを作成し、定義通りに実行されるかを確認してみます。
実行してログを見て見ると、 fail_task
については計4回実行されています。初回実行+リトライ3回と思われるので、定義通りの動作です(間隔は定義したよりちょっと長かったですが・・・)。
失敗したジョブは、「失敗した」とマークされます。これは「未実行」とは異なるため、 backfill
などで再度スケジュールされることはありません。再実行を行うには、ステータスをクリアする必要があります。クリアの操作は、GUI上か airflow clear
のコマンドで行うことができます。以下は、GUI場で行う場合です。Clearのボタンをクリックするだけです。
Clearしてから実行した場合、その実行は「当時の実行(execution_date
)」として扱われます。以下は意図的に失敗するタスク(fail_task
)を修正して再実行した際のログです。最初の実行ログ(初回実行+リトライ3回の計4ログ)に追加される形(5番目のログ)でログが記録されています。
まとめると、Airflowで障害が起こった場合の対応手順は以下の通りとなります。
- 指定パラメーターに沿い、リトライが行われる。リトライでもエラーとなる場合、ジョブはfailとしてマークされる。マークされたジョブは、以後再スケジュールされることはない。
- 再スケジュールを行いたい場合は、ステータスのクリアを行う。ステータスがクリアされるとスケジュール対象となり、実行当時のパラメーターで再実行される。
リトライについてはかなりわかりやすい印象です。スケジュールのスタートさえミスらずに軌道に乗ってしまえば、あとはつつがなくいける?かもしれません。
次回Part5は、Part3で構築したワークフロー(EDINETからのデータ取得)を本稼働させてみようと思います。そこから得られた知見をまとめる予定です。なお、Cloud Composerはちょっと高すぎるので(月額3万くらい持ってかれる)普通のインスタンスを使う予定です。