新卒1年目data engineerがdbt×Dagsterでオーケストレーションしてみた
はじめに
この記事は「Eureka Advent Calendar 2023」17日目の記事です。
自己紹介
BIチーム新卒data engineerのYu Sakamotoです!会社ではhoppyと呼ばれており、業務としてはproductの利用傾向の分析等を主に行っています。
入社してからもう8ヶ月も経つなんて時間の早さは恐ろしいですね、、
このように記事を書いて公開するのが初めてなので温かい目で読んでいただけたらうれしいです!
記事の概要ときっかけ
この記事は自分がデータ分析基盤の理解を深めるためにローカルで実装した内容をまとめたものになります。
BIチームにとってデータ分析基盤は必要不可欠なものであり業務では常にお世話になっています。データ分析基盤に触れていく中でふと、どのように基盤は動いているのだろうと思ったのが、今回話していく内容について勉強しようと思ったきっかけになります。
想定読者
- データ基盤に興味がある
- なんとなくdbtやデータパイプライン関連を触ってみたい
- クエリは書いたことある
今回の記事で書かれていないことは主に
- ツールごとの細かい挙動の説明
- 実運用を考えた実装
ざっくりローカルでDWH(data warehouse)を作ってみようといった内容なのでゴールは動けばヨシって感じです。
実装
ローカルで実装するにあたって使用するものは以下になります。
・BigQuery
・dbt
・Dagster
「ローカルで」と言っているのにBigQueryか、って言いたい方もいると思うのですが許してください。
詳細は個々に書いていきます。
BigQuery
BigQueryをDWHとして今回は使用します。
今回自分はkaggleのPredict Future Salesからデータをもってきました。
このコンペで公開されているデータは、商業利用も含めいかなる目的で利用可能です。他のコンペのデータはわからないのでkaggle以外で使用する際は利用規約を確認しましょう。
まずrawというdatasetを作成してそこにCSVデータをインポートします。
rawをdata lakeに見立ててる感じです。
data lakeは雑に説明するとさまざまなデータソース(アプリのDBや、ログ、ファイル)からのデータを集約させた場所といった感じです。ETLの考え方ではDWHの中にdata lakeを作る場合も多いようです。
dbt
dbt(data build tool)はETLにおけるT (transform)の部分を担うツールで、SQLを書くだけで簡単にデータを変換することができます。またテーブルが増えてくると依存関係の把握が大変になるのですが、その辺をいい感じにやってくれます。
詳しい情報は公式docsを参照してください。
今回はdbt cloudではなく完全無料で使うことのできるdbt-coreを用います。dbt cloudはSaaS製品でGUI上でdbtの操作ができますが、dbt-coreはCLIでの操作のみになります。
- install
dbtを触る中で必要になるものをインストールします。
$ brew update
$ brew install git
$ brew tap dbt-labs/dbt
$ brew install dbt-bigquery
下記コマンドでdbtをインストールできたか確認します。
$ dbt --version
下記のような表示であればOKです。
Core:
- installed: 1.6.4
- latest: 1.7.3 - Update available!
Your version of dbt-core is out of date!
You can find instructions for upgrading here:
https://docs.getdbt.com/docs/installation
Plugins:
- bigquery: 1.6.6 - Update available!
At least one plugin is out of date or incompatible with dbt-core.
You can find instructions for upgrading here:
https://docs.getdbt.com/docs/installation
2. dbt projectを立ち上げてみる
下記のコマンドで初期プロジェクトを立ち上げることができます。
$ dbt init
注意点としてはlocationの選択肢で自分がBigQuery側で設定したものがでてこない場合もあります。その際は直接profiles.ymlを書き換えれば大丈夫です。profiles.ymlはBigQueryとの接続に関するconfigを記載しているファイルで、init後にもし存在しなければ自動で作成され、存在する場合は追記されます。
dbtとBigQueryの接続テストをしたい場合は下記コマンドを入力します。
$ dbt debug
これでエラーを吐かなければOKです。
profiles.ymlはホームディレクトリ直下に配置してもいいのですが、Dagsterをいじる際にdbtの作業ディレクトリ直下にある方が都合がいいのでそちらに配置します。
3. rawにインポートしたtableをdbt上で参照できるようにする
BigQueryの部分でraw datasetにCSVからインポートしたデータをdbtで参照できるようにします。
下記のコードをmodels配下に置きます。今回自分はmodels/raw/raw.ymlとしました。
version: 2
sources:
- name: raw
dataset: raw
tables:
- name: item_categories
- name: items
- name: shops
4. modelを作成してみる
ここでは下記の2点について進めていきます。
- dbtのbest practiceに沿って(raw->)staging->intermediate->martsで依存関係のあるmodelを作成する
- 作成したmodelを任意のBigQueryのdatasetに格納する
各レイヤーの説明は今回は省略します。
また本来であればmodelを作成する際に各レイヤーでデータを加工するのが一般的ですが、今回は依存関係のあるmodelを複数用意したいだけなのでその辺も省略しています。
クエリを書いてmodelを作成してみます。
select
hoge
from
{{ ref('参照モデル名') }}
-- {{ source('raw', 'table_name') }} raw->stagingの場合のみこのように書く
ここで大事なのはfrom句の書き方で、{{ ref(‘参照したいmodel’) }}という書き方をすることによって依存関係をdbtが勝手に認識してくれるというメリットがあります。
また1つのmodelとセットでymlも書きます。
dbt_project.ymlでdataset単位(レイヤー単位)でconfigを書くこともできるのですが自分は個々に書いていく方が慣れている、かつmodelごと柔軟に設定ができるのでそうしています。
version: 2
models:
- name: stg_test
description: "test staging table"
config:
materialized: table
cluster_by: item_id
columns:
- name: item_name
type: string
description: "name"
- name: item_id
type: integer
description: "id"
- name: item_category_name
type: string
description: "category name"
作成したmodelを任意のdatasetに格納して管理したいのでdbt_project.ymlに変更を加えます。
models:
dbt_project_name:
staging:
+dataset: staging
+timezone: Asia/Tokyo
intermediate:
+dataset: intermediate
+timezone: Asia/Tokyo
marts:
+dataset: marts
+timezone: Asia/Tokyo
またそれぞれのクエリ、ymlを下記のディレクトリ構成で置きます。
.
└── dbt_project/
├── models/
│ ├── raw/
│ │ └── raw.yml
│ ├── staging/
│ │ ├── stg.sql
│ │ └── stg.yml
│ ├── intermediate/
│ │ ├── hoge.sql
│ │ └── hoge.yml
│ └── marts/
│ ├── hoge.sql
│ └── hoge.yml
├── dbt_project.yml
└── profiles.yml
ここまでできたらrunしてみましょう。
$ dbt run
BigQuery上でテーブルが作成されていることを確認できると思います。
リネージはBigQuery上でも確認できるのですが、今回は下記コマンドで確認してみようと思います。
$ dbt docs generate
$ dbt docs serve
そうすると自動でブラウザに飛ばされ、作成したdbt projectのドキュメントを参照することができます。
簡単にmodelの依存関係を確認できるのがdbtの強みです。
Dagster
Dagsterはデータオーケストレーションツールで、ざっくりいうとデータパイプラインのジョブ管理(データ更新の実行やスケジューリング)をしてくれるものといった感じです。airflowやprefectが類似したツールとなっています。
こちらも公式docsを参考に進めていきます。
- install
今回はdbtのmodelをDagsterで管理したいのでdagster-dbtをインストールします。
$ pip install dagster-dbt
2. Dagsterのprojectを立ち上げる
とりあえずprojectを下記のコマンドで立ち上げます。
このときprofiles.ymlがdbt_projectディレクトリ直下に配置しておきます。ホームディレクトリ直下に置いておくとエラーを吐いてしまうので注意です。
$ dagster-dbt project scaffold --project-name 任意のproject_name
このコマンドでdbtの作業ディレクトリ直下にDagsterのprojectが立ち上がります。構成は下記のようになるはずです。
.
└── dbt_project/
├── models
├── hoge
├── .
├── .
└── dagster_project
3. Dagster UIに接続する
Dagsterのprojectが作成されたら、そのディレクトリに移動してから下記のコマンドを入力します。
$ DAGSTER_DBT_PARSE_PROJECT_ON_LOAD=1 dagster dev
そうするとterminal上にURLが表示されるので、そこにアクセスするとdbtで作成したmodel(Dagsterでいうところのasset)が表示されていると思います。
materialize allを押して緑色でmaterializedとなってればOKです。
BigQuery上でも確認するとtableが作成されている、もしくは更新されているはずです。
4. スケジューラーの設定
最後にジョブのスケジューリングの設定は下記画像のrunningにチェックを入れておけば自動でDagsterのジョブが走り、データの更新が行われます。
デフォルトではUTCで0:00に更新となっているので変更したい場合は、schedules.pyのcron_scheduleの部分を書き換えてあげれば任意の時間で設定することができます。
"""
To add a daily schedule that materializes your dbt assets, uncomment the following lines.
"""
from dagster_dbt import build_schedule_from_dbt_selection
from .assets import dbt_project_dbt_assets
schedules = [
build_schedule_from_dbt_selection(
[dbt_project_dbt_assets],
job_name="materialize_dbt_models",
cron_schedule="0 0 * * *",
dbt_select="fqn:*"
),
]
実装の説明は以上になります。
おわりに
今回はBigQuery, dbt, Dagsterを用いてローカルで動くデータ基盤(のようなもの)の作成方法について説明させていただきました!
実際に運用をするとなるとDagsterをクラウドサービス上で動かす、またはdbt cloudやDagster cloudを用いて運用していく感じになるかと思います。
dbtは日本コミュニティも活発でdbtに関連する日本語の記事がたくさんあるのですが、Dagsterに関しては僕の観測する範囲で日本のコミュニティはなく日本語で書かれた記事も少ないです。これから先多くの人が使うようになってdbtくらい盛り上がったらいいなと思っています。
長くなってしまいましたが最後まで読んでいただきありがとうございました!記事に関してのコメントや意見のある方は旧twitter @ its_my_hoppyまでお願いしますmm