BigQueryとAirflowを活用したDataPlatform運用の10のケース・スタディ

Kurimura Takahisa
Eureka Engineering
Published in
12 min readDec 11, 2019

この記事は Eureka Advent Calendar 2019 11日目の記事です。
10日目は スーパーCompSREエンジニア恩田による「AWSのマルチアカウント管理におけるIAMマネジメントで試行錯誤した話」でした。

こんにちは。BIチームのデータアナリストの栗村(@t-kurimura)です。主にPairsの機能に関する分析を行っていますが、最近は分析を行うまでのデータを整えるデータアーキテクト的お仕事の比率も増えてきています。

さて、この記事では弊社のDataPlatform運用において、試行錯誤してきた中での運用の学びをケース・スタディ的にご紹介します。

エウレカのDataPlatformの現状

前提として、弊社では3つの層にわけて、アプリケーションログやマーケティングに関連するデータをより分析しやすいデータへと加工しています。

  • Dataをほぼそのまま保存しているDataLake層(DL)
  • 分析用の下処理を施したDataWarehouse層(DWH)
  • ビジネス視点で整理したDataMart層(DM)

それぞれの層の間のデータ加工にCloud Composerを採用しています。Cloud Composerは、ワークフロー管理エンジンのAppache Airflowのマネージメントサービスです。(弊社のデータパイプラインのより詳細は、心強いメンバーのこの記事このスライドが参考になると思います!)

10月頃にこのAppache Airflowで管理されたの弊社のDataPlatformのワークフローなどを大きくリファクタリングしました。また、データPipelineの1つであるDigDagのワークフロー、Tableauのデータソース、Redashのクエリの参照先、BigQueryのDatasetについても継続的に整理してきました。

そういった甲斐もあってか、社内においてDataPlatformに関する開発・議論がより活発になってきました。

既存のデータの整理

まず、既存のBigQuery上のデータをどのように整理してきたかをご紹介します。

監査ログとテーブルメタデータを利用して整理する

BigQuery上に保存されているデータを整理する上で、利用やアクセスがされているテーブルを明確にする必要があります。

Google Cloud Platformでは、様々なサービスで監査ログとゆうものが利用できます。ユーザーがシステムやデータに何らかのアクセス・変更をした場合やシステムが何らかを実行した場合のログが記録されます。このログはBigQueryのサービスでも利用することができます。StackDriverに自動的に送信されているものですが、このデータをBigQueryのテーブルにExportすることでより利用しやすくなります。

また、テーブルメタデータを情報を利用して、プロジェクト上にどのようなデータセットがありその中にどのようなテーブルがあるかを、こちらもBigQuery上で把握することがで来ます。

これら監査ログとテーブルメタデータの2つのデータを利用することで

  • 利用頻度の多いテーブル
  • 同時に使われていることが多いテーブル
  • 全く利用されてないテーブル

などの情報を明確に把握することができ、使われているテーブルを消してしまったりすることなく、的確な整理に役立ちます。

Persistent UDFを利用してSQL上のでロジックの共通化する

BigQuery上のViewテーブル、Airflowのタスクのクエリ内、Redashに保存されているクエリなど様々な場所にクエリがあります。その中では同じようなロジックをまとめるために、UDFが定義されていることがあると思います。

(画像削除)

ただ、別の場所で別の人がこんな定義をしているしているかもしれません。

(画像削除)

これでは2つのデータを合わせて利用するとき、再加工をする必要があり困ります。そこでPersistent UDF(Beta)を利用するとこれらの関数のUDFを、任意のDataset配下にまとめることができ、どこからでも同じ関数を呼ぶことができます。

このクエリを実行すると特定のDataset内に保存されます

(画像削除)

しかし、このPersistent UDFの定義もまたドメインロジックを持ちながら、様々なところで定義できてしまうので、弊社では基本的にAirflowのBigQueryOperatorを利用してコードで管理するようにしています。コード管理されるので定義変更などにも柔軟に対応できます。

アプトプットとしての”データ”を考える

実際に依頼されたり、こちらから提供したりする分析のもととなるデータを、どのようにDataMartとしてテーブル定義に落とし込んでいくかについてです。

ViewテーブルやScheduledQueryを用いてDataMartのモックアップを作る

最初から大掛かりにワークフローを組んでデータをつくると、もしDataMartのテーブルにカラム追加などが必要となったときに、DataWarehouseまで修正することになったりし手戻り作業が大きくなります。

プロダクトや機能の初期段階など分析データが固まっていないときは、Viewテーブルを利用したほうが変更が容易です。また、Viewテーブルの実行時に時間がかかるとゆうデメリットを回避したい場合は、ScheduledQueryを利用すると定期的に特定のクエリからテーブルを作成・追加することができます。

ある程度そのデータを運用し要件が固まってから、Airflowなどのワークフロー管理ツールに落とし込むことがおすすめです。

ただ、一度できてしまうとAirflowなどに移行するモチベーションが保てなくなるので用法・用量を守って利用していきたいところです… (いい方法があれば教えてください

BigQueryのSchemaを利用して定義を明確にする

分析者が扱おうとしたデータに、もしこのようなデータがあったとき、ageのカラムに対していくつかの解釈ができてしまう場合があるかもしれません。

  • データ作成時の年齢(≒今日の年齢
  • 登録時の年齢
  • アクション発生時の年齢

テーブルの文脈から推測したり、カラムの命名で工夫で解決できるかもしれませんが、正確を期すためにより明確にカラム情報を付け加えて置きたいことがあるかもしれません。BigQueryのSchema定義のdescriptionを利用するとより明確にカラムの定義を示すことができます。

もし、すべてを書くのが面倒であれば、モックアップで作ったテーブルからbq コマンドライン ツールを利用してdescription以外のプロパティを抜き出すと、型なども間違えることが減り楽に定義ファイルを作成できます。

bq --format=prettyjson show dataset_id.table_name | jq .schema > schema_def.json

ワークフローを実装する

Airflowで実際にワークフローを実装していきます。Airflowでのワークフローの管理に関係ない場合、読み飛ばして頂いて構いません。

冪等性を保つ一連のタスクをWrapする

冪等性とは、「何度同じ操作を行っても同じ結果を得られる性質」のことです。データ加工の文脈でも同じ操作を行えば同じデータが得られることが、データが回復可能かという観点で非常に重要です。この冪等性を実現するためにAirflow上で3つのタスクを連続して行っています。

  • 既存テーブルの削除(Delete)
  • 作成するテーブルのスキーマ定義して空テーブル作成(CreateEmpty)
  • 作成したテーブルにデータを流し込む(Load)

これらを1つのメソッドにまとめています。

(便宜的に一部省略しています)

最低限のRequiredな引数として、

  • 親Dagのdag_id
  • 保存先となるTable名

を引き渡し、返り値としてこれらが直列に実行されるSubdagを作ります。実際のコードでは、デフォルト引数を利用してイレギュラーなDatasetに保存したい場合などにも対応できるようにしています。

タスクの依存関係を工夫して実行時間を最小化する

1つのテーブルのみからDMのテーブルができることは稀だと思います。いくつかのテーブルを作って、それらを合わせた結果として1つのDMが完成することがあると思います。その場合、いくつかの処理を以下のように並列に行うことができます。

ワークフローはサンプルです

Airflowの基礎的な利用法ですが、このようにワークフローツールを活用しタスクの依存関係の設定を工夫することでデータの鮮度を保つことが可能になり、よりビジネスにデータ面から寄与することが可能になります。

https://airflow.apache.org/docs/stable/concepts.html

SubdagをSubdagでNestする

Airflowでは、SubdagのなかのOperatorとしてSubdagを定義し、NestされたSubdagを定義することができます。過去データのさかのぼり作成などに便利です。ただ、このSubdagのNestはハマりポイントがります。

SubdagOperatorにわたすDagのdag_idは必ず親のdag_idと.(ドット)で結合されている必要があります。

つまり、末端のDagのdag_idは

[おじいちゃんdag_id].[親dag_id].[自分のtask_id]

と定義される必要があります。これを行わないとAirflowのParserがうまく家族関係(?)を認識してくれません。SubdagOperatorに渡すDagのdag_idの命名には注意が必要です。

データ管理や開発環境の改善

整理したはずのデータの新たな氾濫や腐敗を防いだり、新たなメンバーに安心してDataPlatformを開発してもらえるように行っていることをご紹介します。

BigQueryのデータセットに期限をもたせる

BigQueryのDatasetの中に利用しないデータが溜まっていくのは利用者がどれを使えばいいか分からなくなり、データへのアクセスの心理的障壁が上がります。また多かれ少なかれBigQueryのデータの保持には金銭的コストがかかるので良い状態ではありません。

永続的にデータをためていくDatasetには必要ありませんが、冪等性を保ったワークフローから作成されるデータを格納するDatasetには期限を設けると不要なデータが自動的に消えます。この場合、日々使用されるデータは一定間隔で、上記のDelete/CreateEmpty/Loadの一連のタスクで新たに作られていれば自動削除の対象になることはありません。

有効的に継続的インテグレーションを行う

弊社の場合、SREチームのメンバーの1人がデータ基盤のインフラ面の整備を専任で担っています。基本的に、SREチームがAirflowのCircleCIで行っている継続的インテグレーションも整えています。

  • コードレビュー工数削減観点でのフォーマッターの適用確認
  • BigQuery上でSQLファイルのDryRun実行
  • dagファイルのComposer上でのAirflowCLIの実行(SyntaxCheck)
  • masterマージ時のCloud Composer環境へのデプロイ

これらをCircleCI上で行うようになっています。これらのおかげで開発者がより安心して、より速くDataPlatformの開発を行えるようになっています。

定例会で語らう

データ基盤を担当するSREエンジニア、DataPlatformの開発するメンバー、分析者者でDataPlatformに関する議論を行う定例会を実施しています。

  • 各種ツールのエラーログの確認
  • リファクタリングの方針相談・進捗共有
  • 開発上困ったことの解決
  • 現状の課題の整理
  • 今後のルール・戦略の共通認識

直近の話題ではこのようなものをトピックに、週次で集まってディスカッションをしています。開発でのネックを相談できたり、一人では解決しきれない課題を分担したりとボトルネックの解消のために非常に重要な時間となっています。

終わりに

いかがでしたでしょうか。エウレカでデータを活用していくためのDataPlatformの運用での知見をまとめてみました。BigQuery、Airflow、会議体といろんな角度から広く浅くになってしまいましたが、どれか一つでも参考になるものがあれば幸いです。

いかにデータ活用を浸透させていくか、またデータの再利用を可能にしていくかは、難しいものがあります。今年は少しDataPlatform充実への土台ができたので、来年はより手を動かさねばとゆう気持ちです。

それでは皆様良いお年を・・・(パタリ

--

--