Cloud Composerでデータ基盤のワークフローを作る

この記事は eureka Advent Calendar 2018 11日目の記事です。

どうも、みなさん、おはこんばんにちは。
エウレカBIチームで、データ基盤をコソコソ作っているsotaroです。
今年一番みて良かったアニメは『ヴァイオレット・エヴァーガーデン』と『ゆるキャン△』です。一番といったのに二つ選んですみません。キャンプしたいです。

私は、主にPairsのデータを分析したり、分析するためのデータを作ったり整えたりするお仕事をしています。
特に最近では、Pairsのデータ基盤を刷新するプロジェクトに注力しており、今日はそのプロジェクトの中で最近導入&試験運用を始めたGCPのマネージドAirflow、Cloud Composerについてご紹介します。

なぜCloud Composerを使うことにしたのか?

本題に入る前に、まずは「なぜ?何を?」などの外堀を埋めていきたいと思います。 
ただの「使ってみた」よりは、導入にあたっての課題感や前提などが分かった方が読む人にとって意味があると考えているためです。

以下では、「そもそもデータ基盤とは?」という話と、どういう課題や背景があって、弊チームでCloud Composerを導入したのかという話を簡単にします。

そもそもデータ基盤って?

先ほど、「データ基盤を刷新するプロジェクトをやっている」という話をしましたが、「分析基盤」とか「データ基盤」といったワードは話す人のやっていることやスタンスによって微妙に異なる意味を帯びて使われているようです。
なので、まずこの記事における(というかエウレカBIにおける)スタンスを言語化します。

データ基盤: 「求めている人が正確なデータに必要な限り速くアクセスできる」ための基盤のこと

利用者は機械学習エンジニアかもしれないし、データサイエンティストかもしれないし、プロダクトマネージャーかもしれません。
特定のユーザーを想定するというよりは、より広範に「プロダクトのデータを活用しやすくするために作るいろいろ」といったところです。
「データ基盤」という大きな括りの中に、特定の用途向けのデータマートなんかが作られるイメージが分かりやすいでしょうか。

ところで、先日、データ基盤界隈?で有名なゆずたそさんがこんな記事を書かれていました。

この記事でも、以下のゆずたそさんによる定義のような意味合いで、「データ基盤」の話をしてると思っていただければ分かりやすいかなと思います。

なお、ここでは分析基盤ではなくデータ基盤と呼びます。 データ分析だけでなく、集計・レポーティングや機械学習にも使うことを想定するからです。 そのため「分析」の基盤ではなく「データ」を活用するための基盤であることを強調するために「データ基盤」と呼称します。

ちなみに、ゆずたそさんの元記事では、データレイク/DWH/データマートの分類が分かりやすく書かれていたりしてとてもタメになるので、まだ読んでいない方はぜひ。(個人的に「進化的データモデリング」という考え方がすごく良かったです。)

データ分析で困っていたこと

上記のようなデータを活用していくための基盤を作るとなると、「本番DBのデータや各種行動ログなどをとりあえずBigQueryに置いておく」だけではつらくなってきます。

「とりあえずBigQueryに突っ込んどけばおk」のようなやり方は、初期段階では妥当なやり方かもしれませんが、データ活用の機運が高まるにつれ、以下のような問題を引き起こします。※ 弊社事例です😇

  • 分析のたびに似たようなデータセットを何度も作り直すハメになる
  • 同じような指標を微妙に異なる定義で算出していることもあり、単純に他の人が作ったものを使いまわすことにも問題が発生する
  • モデルに組み込むためのデータセットが実行時に生成されて管理されておらず再現性がない
  • etc…
こんなことがよくありました(初めのうちは別に良かった)。

特に1つめの問題は、どう考えても頭が悪いし、そんな作業に私たちの貴重な日々を費やしたくはありません。頑張らなくて良いところでは極力楽をするべきです。 
また、そんな怠けた理由以外にも、上記のような問題があることで、分析結果の正確性や提供スピードの速さに悪い影響が出ることは想像に難くありません。

そんなこんなで、ある程度明確になっているニーズに関しては、データマートという形で用途別のデータソースを作って、それをいつでも簡単にTableauで見られるようにしよう!というプロジェクトが始まりました。

左側のまとまりがログ系データなどがとりあえず置かれたデータレイクのようなもの

そうすることで、共通の(よく検証された)定義の下に作られた正確なデータソースを、適切なアクセス権限で、適切な人に届けることができます。

用途別のデータソースの作成には、「Aのデータが前日分まで揃っていてかつBの更新が終わっていたら、Cのデータソースを作れる」みたいな依存関係がよくあるので、DAGという形式でタスク間の依存関係を管理できるワークフローエンジンAirflowのマネージドサービスであるCloud Composerを導入することにしました。
導入背景はざっとこんなところです。


Cloud Composerのアーキテクチャとワークフローを動かすまで

自分たちの話はこの辺にしておいて、ここからはCloud Composer自体の使い方などの話をしていきたいと思います。 
実際に、Cloud Composerの環境を用意して、ワークフローを定義しタスクを動かしていくところまでやっていきます。

Cloud Composerのアーキテクチャ

出典:https://cloud.google.com/composer/docs/images/architecture.svg?hl=ja

Cloud Composerのアーキテクチャは上記のようになっています。

GCPのマネージドGKE(フルマネージドではない)の上にAirflowが立っています。私たち利用者は、Airflowにおける各コンポーネントの管理などの運用はそれほど気にせずに、「DAGファイル」と呼ばれる.pyのファイルをGCSの指定のバケット下に置くことで、ワークフローをどんどん追加して、やりたいタスクを動かしていくことができます。

利用する上で最低限覚えておくべき構成要素は以下あたりでしょうか。(※ 使っていくとDatabaseとかRedisとかも気になってきますが…)

  • Airflow Worker(s)
    各タスクを実行するワーカー。node_countの設定で増やせる。
  • StackDriver
    Logs: 各ワーカー上で出力されたログをほぼリアルタイムに見ることができる
  • Cloud Storage
    DAGs: ワークフローを記述するファイルを配置する
    Logs: 各ワーカー上で出力されたログのファイルが配置される
    plugin: DAGのなかで読み込みたい外部ファイルやモジュールなどを配置する

その他、アーキテクチャに関する詳細な話は、公式ドキュメントに書かれているので、ご興味があれば読んでみてください。

Cloud Composerでワークフローを動かす

では、実際に動かしていきましょう。

まずCloud Composerの環境を作成する必要があります。
GCPのコンソール上でポチポチやCLIでやるでも良いのですが、後々airflow configやnode数などのCloud Composer環境の設定諸々をアップデートすることが出てくると思うので、Terraformで管理するのがおすすめです。

下記のように書くと、node数の設定などだけでなく、Airflow側の設定や外部Pythonパッケージのインストールまで行えるので、必要に応じてupdateするなど、うまく管理できます。

ただ、Terraform公式に「Due to limitations of the API, Terraform will not be able to automatically find or manage many of these underlying resources.」とあるように、Cloud ComposerがのっているGKE下で管理される他のresourcesについてはTerraform側で自動で管理してくれるわけではないので、必要であればサービスアカウントの設定なども一緒にしておきましょう。
(ドキュメントを見ればわりと簡単に書けました)

resource “google_composer_environment” “test-environment” {
name = “environment-name”
project = “your-project”
region = “asia-northeast1”
config {
node_count = 4
node_config {
zone = “asia-northeast1-c”
machine_type = “n1-standard-1”
disk_size_gb = 100
}
# Airflow config
software_config {
airflow_config_overrides {
dag_concurrency = 20
}
pypi_packages {
numpy = “”
pandas = “”
}
env_variables {
foo = “bar”
}
}
}
}

上記の.tfファイルを保存して、 terraform applyすれば該当のプロジェクト下にCloud Composerの環境ができているはずです。
ここまでで環境の用意ができたので、次は早速DAGファイルを書いてタスクを動かしてみましょう。

実際に弊チームでやっている以下のようなタスクを簡単に作ってみたいと思います。

先ほど載せた弊社で動かしているワークフローを簡略化した感じです。

ワークフローを記述するためのDAGファイルは以下のように簡単に書けます。

今回は、BigQueryのtest_aテーブルにクエリを投げて、同じBigQueryデータセット内にtest_bを作りたいので、 BigQueryOperatorを利用しています。

こちらのDAGファイルをdag.pyなど名付けておいて、以下を実行します。
( --locationには先ほどのTerraform側で指定したregionを入れておけばおkです。)

gcloud composer environments storage dags import \
--environment $(ENVIRONMENT) \
--location $(LOCATION) \
--source ./dag.py

実行できたら、GCPのコンソール上からアクセスできる「Airflowウェブサーバー」に行ってみると、該当のタスクが登録され、実行されていることがわかります。

とりあえず動くものは作れましたが、このままDAGファイル内に直接SQLを記述しておいては、今回のような短いクエリが1つや2つくらいの場合はまだ良いですが、クエリが複雑になってきたり、数が多くなってくると管理するのが嫌になってきます。
なので、以下のようにクエリをSQLファイルとして別にしておきます。

先ほどのDAGファイルの該当箇所も修正します(該当箇所だけ載せています)。主SQLファイルの読み込み部分が追加されてるくらいです。

下記のようなディレクトリ構成にしておくと、SQLファイル≒該当データソースのスキーマのように管理できるので、楽です。

.
├── dag.py
└── sql
├── data_source_a.sql
├── data_source_b.sql
└── data_source_c.sql

ただ、このままでは先ほどのDAGファイルのGCSへのアップロードだけではうまくいかなくなるので、先ほどのSQLファイルをpluginとして環境内にインポートする必要があります。

pluginのインポートはファイルだけでなく、ディレクトリごとやることもできるので、下記のようにgcloudコマンドを叩くことで、先ほどのsqlディレクトリ以下のSQLファイルを全てインポートできます。

gcloud composer environments storage plugins import \
--environment $(ENVIRONMENT) \
--location $(LOCATION) \
--source sql/

これで、Cloud Composer環境内にSQLファイルをインポートすることができたので、問題なく実行できるはずです。

弊チームでは、SQLファイルとDAGファイル(その他plugin含めて)をgit管理することで、データソースの定義を管理しつつ、これらのデータソースをTableauに接続することで、正確な定義のデータに利用者がすぐにアクセスできるようにしています。

その他、ちょっとしたtips

Subdag

Cloud Composer(というかAirflow)では、DAGという単位で1つのワークフローを管理することになりますが、このDAGをネストさせることも可能です。
先ほどのDAGファイルを少し書き換えて、複数のデータソース作成タスクを記述してみます。

SubdagOperatorというクラスが出てきました。
依存関係を持たず、並列に実行できるタスクは以下のようにsubdagという単位でまとめて動かすことができます。
概念的に同一のグループに属するタスク群は、subdagの形式でまとめるとワークフロー定義がすっきりして管理もしやすくなります。

上記のsubdagファイルも、先ほどのSQLと同様に以下コマンドで環境内にインポートできます。

gcloud composer environments storage plugins import \
--environment $(ENVIRONMENT) \
--location $(LOCATION) \
--source subdag/bq_to_bq_subdag.py

このあと見るように、環境にインポートしたpluginは、 /home/airflow/gcs/plugins以下に配置され、元のDAGファイルからはplugins以下をパッケージとしてimportすることができます。

airflow.cfg

Airflowでは、タスクの並行実行数など諸々の設定をairflow.cfgというtomlのような設定ファイルに記述して管理します。
Cloud Composerでも当該環境のGCSバケット下に airflow.cfg が置いてあるので、そちらを見てみるといろいろと設定項目があることがわかります。
マネージドなので、そんなにいじらなくてもよさそうですが、以下くらいは覚えておくと良さそう(一部省略しています)。

[core]
dags_folder = /home/airflow/gcs/dags
base_log_folder = /home/airflow/gcs/logs
plugins_folder = /home/airflow/gcs/plugins
dags_are_paused_at_creation = False
load_examples = False
donot_pickle = True
parallelism = 30
dag_concurrency = 15
max_active_runs_per_dag = 15
enable_xcom_pickling = False
[celery]
celeryd_concurrency = 6
celery_ssl_active = False

plugins_folder以下はdas_folder以下に配置したDAGファイルからは、パッケージやモジュールとしてインポートできます。変更も可能です。

parallelism, dag_concurrency, max_active_runs_per_dag, celeryd_concurrencyあたりは並列、並行実行に関わるパラメータなので、場合によってはチューニングが必要そうです。
この辺りはまだ運用を始めたばかりで知見がないので、貯まってきたらまた記事にでもしたいと思います。

また、これらの設定は環境に対して適用するものなので、都度WebUI上からいじるのは管理上あまりよろしくありません。
そういった点でもCloud Composerの環境はTerraformで管理するのが良さそうです。 software_config.airflow_config_overridesの項目で変更ができます。

おわり

以上、ざっと弊チームでのデータ基盤にまつわる課題感と、その一部を解決するために導入したCloud ComposerというGCPのマネージドサービスについて、紹介しました。

まだ弊チームでも導入したばかりなので、より良い使い方を模索中ではありますが、抱えていた以下のような問題は解消できているように思います。

・分析のたびに似たようなデータセットを何度も作り直すハメになる
・微妙に異なる定義を使っていたりして、単純に他の人が作ったものを使いまわすことにも問題が発生したりする

結果として、自分たちも楽になるし、利用者にとっても快適な環境が提供できるようになってきたと思います。これからももっと楽で、快適な世界を作っていきたいと思います。

明日は、Tableau用データマート作成のタスクを一緒に進めてくれているミニオンこと鈴木さんがAppsFlyerの話をしてくれるようです。
鈴木さんは5人以上の飲み会に来ないことで有名です。特に僕がいると来てくれません。
明日もお楽しみに!

おわり。