Apache Airflowでエンドユーザーのための機械学習パイプラインを構築する Part2
Part2では、実際にAirflowを使ってパイプラインを構築してみたいと思います。Part1でのサーベイを受けて構成を考える、というのがセオリーですがまずはAirflowの機能がどんなものかをざっくりと体感しようと思います。知らないツールでいきなり高度なものは作れないためです。
本編では、シンプルなワークフローの実装を通じてAirflowの特性を把握します。基本的には以下のチュートリアルをなぞる形で進めていきます。
目次は以下の通りとなっています。
- Apache Airflowとは
- Apache Airflowによるワークフローの定義と実行
Apache Airflowとは
Workflow as a Codeという言葉が、Airflowを表すのに最も適していると思います。各タスク(Operator)の依存関係をPythonで記述することで、その依存関係通りにタスクを実行してくれます。「依存関係通りの実行」というとジョブスケジューラーが思い浮かびますが、ジョブの場合は実行順序が固定的です。Airflowではコード上の依存関係を変更することでいわば動的にジョブを生成することが可能です。
コードの変化に追随し動的にワークフローを生成する、という点がAirflow最大の特徴になると思います。動的だと実際どう実行されるのか心配、という感じもしますがGUIのツールが付属しており構築されたワークフローを参照することが可能です。
動的なワークフロー生成、「動的」を支えるリッチなGUI。これらはもちろんApache Airflowの魅力ですが、スケール性という点でAirflowをサポートしているクラウドサービスがあるというのも魅力の一つです。GCPでは、Airflowで定義したワークフローをクラウド上で実行するCloud Composeという機能があります。
機械学習では、クラウド上のリソースを使用することが多いです。その点から考えても、クラウド環境と親和性の高いAirflowは良いのではないかと思います(なので、連載に採用しました)。
ちなみに、AirflowはもともとAirbnbで作成されていました。2014年に開発、2015年GitHub公開、2016年にApache Foundationに参加となっています。開発はMaxime Beaucheminさん(通称”Max”)によって率いられています。”Max”によるApache Airflowに関するQ&Aを以下サイトで見ることができます。他ワークフローツールについての印象や、AirbnbからApacheへの移転にまつわるあれこれなどについて語られています。
Apache Airflowによるワークフローの定義と実行
では、実際にAirflowでワークフローを構築してみましょう。実装に使用したリポジトリは以下となっています。
Setupにも記載していますが、2019/3/16の段階だとインストールが少し手間です。GPL依存ライブラリの導入確認、またPython3.7で使う場合依存ライブラリ(tenacity)のバージョンを上げる必要があります。最終的なインストールスクリプトは以下のように長くなります。tenacityへの依存は、1.10.3で解決されるようです。
SLUGIFY_USES_TEXT_UNIDECODE=yes pip install apache-airflow tenacity==4.12.0 --no-binary=python-slugify
なお、残念ながらAirflowはWindowsで動かすことができません(不可能ではないが、不都合が多い)。そのためWindowsで使う場合はDockerを使用するか、Windows Subsystem for Linuxを利用しましょう。
インストール後、 airflow initdb
を実行することでAIRFLOW_HOMEに設定したディレクトリに設定ファイルやワークフロー定義を格納するDBが作成されます。 airflow webserver --port 8080
でサーバーを立ち上げるとWeb UIの起動が確認できると思います。
デフォルトでは、AIRFLOW_HOME/dagのフォルダにワークフロー定義を格納することになっています。AIRFLOW_HOMEには前述の通りAirflowの稼働に必要な様々なファイルが含まれるため、この設定ではワークフロー定義のGit管理が行いにくいです。そのため、 airflow.cfg
の dags_folder
を修正することでワークフロー定義を格納するディレクトリを変更しています。
これで準備が整いました。実際に、ワークフローを定義してみましょう。チュートリアルを参考に、今後メインで使うと思われる PythonOperator
で簡単なワークフローを作成しています。
基本的には、まずワークフローである DAG
を作成します。そこにタスク(Operator
)を登録していき、登録したタスク間の依存関係を定義する、という流れになっています。依存関係はあくまでタスク同士が把握しており、 DAG
は関与していないという点にAirflowらしさが感じられます。
依存関係はset_upstream
/set_downstream
だけでなく、ビットシフトの演算子(>>
)でも定義が可能です。これは直感的でわかりやすいと思います。
実装をしたら、 airflow list_dags
で実装したワークフローが出てくるか確認します。もし出てこなかったら、 dags_folder
の設定などに誤りがあります。
テストをする場合はairflow test
、実行する場合はairflow run
で個別のタスクを実行できます。DAG全体を依存関係に基づき実行する場合は airflow backfill
を実行します。
run
/backfill
では、実行済みのタスクは実行されません。例えば一回3/16にAというタスクを部分的に実行していたら、3/16のタスクAは実行済みとしてマークされ、次回の実行ではスキップされます。 backfill
では日付範囲を指定しますが、指定範囲のうち実行済みの日付/タスクに関してはスキップされます。このあたりの挙動については、以下の資料が詳しいです。
実行履歴は、airflow clear
でクリアが可能です。そのため、再実行したい場合はclearをして再度 backfill
などを実行する、という流れになります。
Airflowにおいては、ワークフローはテストが完了したら何らかのスケジュールに沿って実行されるものと想定しています。そのため、スケジュールから外れた実行を何度も行う(手動で任意のタイミングで実行するなど)は想定されていないようです。この点にはちょっと注意が必要です。
今回は、Airflowの基本的な使い方を把握しました。次回は、もう少し機械学習寄りのワークフローを構築してみようと思います。