【GCP】Cloud Composerを営業時間だけ使いたい

Shintaro Uchiyama
JDSC Tech Blog
Published in
11 min readFeb 5, 2020

Airflowを動かしてくれるCloud Composer非常に便利ですが
起動させているだけでも月10万円近くかかり、一時停止機能もない
そこで、営業時間だけ自動でComposerが起動するピタゴラスイッチを作成してみた。これを用いれば6割以上コストカットができる

ソースコードは以下リポジトリに格納されているので、参照ください
https://github.com/shintaro-uchiyama/gcp-cloud-composer

補足:
Airflowは複数タスクの実行順序を定義できるワークフローエンジン
様々なケースで使用することができるが、以下イメージ図を例にするとBigQueryでsqlを実行してテーブルを生成する「task」の実行順序を定義
これにより「task1で生成されたテーブルをtask2で参照する」と言った
依存関係を考慮したワークフローの実現が可能

Airflow image

実現イメージ

朝自動でComposer作成
夜自動でComposer削除

ディレクトリー構成

以下のようなディレクトリ構成で上記イメージを実現

.
├── build # 5. dagをAirflowにuploadするCloud Build
│ └── upload_dags_to_airflow
│ ├── cloudbuild.yaml
│ └── trigger_config.yaml
├── composer # 4. Airflowにuploadするdag
│ └── dags
│ └── sample
│ └── dag.py
└── functions
└── manipulate_composer
├── docker # 2. Docker Container on Instance for Polling
│ ├── Dockerfile
│ │ # 1. Containerで動作するProgram
│ ├── manipulate_composer
│ │ ├── cloud_build.py
│ │ ├── composer.py
│ │ ├── compute_engine.py
│ │ ├── env.py
│ │ ├── main.py
│ │ └── utils.py
│ └── requirements.txt
├── .env # 6. 各環境毎に設定するファイル
└── main.py # 3. Polling GCE作成用のCloud Functions

数ステップに分けて、それぞれのファイルでどのような処理をしているか記載していく

1. Containerで動作するProgram

Composerは作成・更新に数十分かかるので、環境作成をしばらく待ち、完成の後にPyPIのupdateみたいなPolling箇所が必要となる。
そこで、上記作成イメージの④⑤⑥を実施するProgramを作成

Dockerから呼び出されるmain.py
GCP各サービスクラスに指示して作成や更新を実施

functions/manipulate_composer/docker/manipulate_composer/main.py

処理の完了待ち、Slack通知機能はutilsファイルに配置

functions/manipulate_composer/docker/manipulate_composer/utils.py

GCPサービスを操作するファイル群
Composer作成・更新といった操作周りを一通り実現するファイル
PyPIの追加はComposer作成時に指定できないため作成完了後にupdate
また、Composerを削除してもGCSや一部ディスクは削除されないため、一通り消す処理も記載

functions/manipulate_composer/docker/manipulate_composer/composer.py

Composer(Airflow)にDAGをuploadするためのCloudBuild

functions/manipulate_composer/docker/manipulate_composer/cloud_build.py

最後に自分自身(Polling Instance)を削除するComputeEngineファイル

functions/manipulate_composer/docker/manipulate_composer/compute_engine.py

2. Docker Container on Instance for Polling

1.で作成したファイル群を実行するコンテナイメージの作成
Python3.8.1のイメージにてpip installを実行
補足としては、Dockerfile起動時に引数を渡すためにCMDではなく
exec形式([]で囲む形式)のENTRYPOINTを指定

functions/manipulate_composer/docker/Dockerfile
functions/manipulate_composer/docker/requirements.txt

デプロイ
project_nameに自身のGCPプロジェクト名を指定して以下コマンドを実施
Container Registry にDocker imageをpush

project=project_name
cd functions/manipulate_composer/docker
docker build -t manipulate-composer .
docker tag manipulate-composer:latest asia.gcr.io/${project}/manipulate-composer:latest
docker push asia.gcr.io/${project}/manipulate-composer

3. Polling GCE作成用のCloud Functions

GCEインスタンスを作成し、前項のDocker imageを元にContainerを作成するCloud Functionsの作成

Cloud Schedulerから作成、削除等の指示を受け取ることを想定して
Dockerコンテナ起動時の引数を渡す形で作成

functions/manipulate_composer/main.py

4. Airflowにuploadするdag作成

検証のために特に何もしないDummyOperatorを使用したdag作成

composer/dags/sample/dag.py

5. dagをAirflowにuploadするCloud Build

masterにpushされたらdagをuploadするCloud Buildのトリガーを作成

Step作成
作成されたComposerにdagをuploadするCloud Buildのステップを作成

build/upload_dags_to_airflow/cloudbuild.yaml

設定ファイル作成
以下の通り設定ファイルに自身の希望する設定を記載
owner:接続させたい自身のGitHub owner
name:接続させたい自身のGitHub name
例)https://github.com/[owner]/[name]
_PROJECT_ID:自身のプロジェクト名
_COMPSER_ENV:作成したいComposer名

build/upload_dags_to_airflow/trigger_config.yaml

デプロイ
GCPコンソールでリポジトリの接続を完了させた上で
以下コマンドによりCloud Buildのトリガーを作成

gcloud beta builds triggers create github \
--trigger-config="build/upload_dags_to_airflow/trigger_config.yaml"

権限設定
IAMと管理ページにてCloud Buildのサービスアカウントに
「環境とストレージ オブジェクトの管理者」役割を追加
Composer(Airflow)にDAGをアップロードするために必要な作業

6. 各環境毎に設定するファイル

以下の通り自身の環境に応じた情報を設定
ENVIRONMENT_NAME:好きなComposer環境名を設定
WEBHOOK_URL:操作状況をSlackに通知するためのwebhook url
以下URLからwebhook URLを作成し設定
https://slack.com/services/new/incoming-webhook
DAG_NAME_TO_RUN:今回作成したDAG IDのsampleを指定
BRANCH_NAME:DAGを反映させたいGitHub branch。master指定

functions/manipulate_composer/.env.yaml

デプロイ
作成した設定ファイルを読み込みCloud Functionsをデプロイ
project_nameには自身のGCP project名を指定

project=project_name
gcloud functions deploy manipulate_composer \
--runtime python37 \
--trigger-topic manipulate_composer \
--source functions/manipulate_composer \
--project ${project} \
--region asia-northeast1 \
--env-vars-file functions/manipulate_composer/.env.yaml

Cloud Schedulerの作成

自身のGCP project_nameを指定して以下の通りデプロイすることで
平日特定の時間のみComposerを起動させておくことができる

朝8:00にComposerを作成するJob

project=project_name
gcloud beta scheduler jobs create pubsub create_composer \
--project ${project} \
--time-zone='Asia/Tokyo' \
--schedule='0 8 * * 1-5' \
--topic=manipulate_composer \
--message-body='create'

夜19:00にComposerを削除するJob

project=project_name
gcloud beta scheduler jobs create pubsub delete_composer \
--project ${project} \
--time-zone='Asia/Tokyo' \
--schedule='0 19 * * 1-5' \
--topic=manipulate_composer \
--message-body='delete'

1回だけAirflowを実行するJob

タイトルとはずれますが
月に1回だけAirflowを実行したい時に使えるSchedulerも作成
以下の例は毎月1日の10:00にAirflowを実行するScheduler

project=project_name
schedule="0 10 1 * *"
gcloud beta scheduler jobs create pubsub run_composer_once \
--project ${project} \
--time-zone='Asia/Tokyo' \
--schedule=${schedule} \
--topic=manipulate_composer \
--message-body='run_once'

まとめ

GoogleがComposer停止機能を作るまでの短命な機能だが
使用していない時間もコストがかかるのはもったいないのでおすすめです

--

--