BigQueryにマイクロバッチ的にデータをロードする

Yuta Hono
google-cloud-jp
Published in
16 min readDec 3, 2020

BigQuery へのデータを取り込む際に、こんな言葉をみたことはないでしょうか?

「BigQuery にデータを取り込む方法は2つ、データの読み込みとストリーミングです。それに加えてデータを読み込まずにクエリを実行できます。」・・・でもマイクロバッチがしたい!そんな方のための記事です。

BigQuery には、以下の2つの取り込み方法がざっくりあります。

読み込みジョブ:無料で使える。DWH のリソース(スロット)を消費しない。Cloud Storage に置いたデータ (Avro, ORC,JSON, CSV, Parquet など)をスループット最適化のバッチジョブで読み込める。大きなファイルがたくさんある場合に向いている。派生として、BigQuery Data Transfer Service があり、 Amazon S3 や様々なデータウェアハウスからのデータ転送に対応している。

ストリーミング挿入:データをリアルタイムで BigQuery に挿入する。 サーバーの Fluentd からログ収集して直接送ったり、メッセージングサービスである Pub/Sub を経由して BigQuery にリアルタイムに反映することができる。

その上で、データを取り込まずに分析する フェデレーション(外部テーブル) という機能もあります。 Cloud SQL (MySQL/PostgreSQL)や、Bigtable 、Cloud Storage のデータを BigQuery のクエリエンジンを用いて分析を行うことができます。オンプレミスや他のクラウドの RDBMS を外部マスターとして設定して、BigQuery からフェデレーションしていることもよくあります。

(2020 12/03 21:20追記): Twitterでご指摘いただいたのですが、Cloud Storageにマイクロバッチ的に放り込んで、それらファイルをパスでまとめて外部テーブルで扱うことでもこれから紹介する方法に類似したことが可能です。 確かに〜。ありがとうございます。

「で、ストリーミングじゃなくて、CSV とか JSON 置いてマイクロバッチしたいんですけど、どうすればいいですか」 ー そんな話がこの記事の主眼です。前置きが長くなりましたが、そんな時に利用できるのが、 Dataflow テンプレートです。今回はこれを使って、マイクロバッチ的な実装をしてみたいと思います。

Dataflow テンプレートは、 GUI からサーバーレスのデータ処理サービスである Dataflow をパラメーターだけで起動し、データ処理のパイプラインが作れると言うものです。 Dataflow テンプレートについては、下記の記事で紹介していますので、興味がある方はご覧ください。

今回の実装方法

今回は Dataflow テンプレートの中でも、 Text Files on Cloud Storage to BigQuery (Stream) を利用していきます。このテンプレートを利用すると、 Cloud Storage に配置されたファイルを検知して BigQuery にストリーミング挿入ができます。

・・・結局ストリーミングです!ごめんなさい!しかし、今回のポイントは「マイクロバッチ的に」と言うことです。ファイルを短い頻度でアップロードすると、順次 BigQuery に反映される、ということが今回の構成では実現できます。 BigQuery の読み込みジョブは先に述べたとおりスループットに最適化されているので、大量のファイル・大きなファイルを読み込むのは高速です。しかし、随時来る小さなファイル1つに対し1ジョブのようなユースケースで、ぽちぽち叩くのには向きません。(ロード数のクオータは、サポートに問合せすれば増加可能なソフトリミットです。が、マイクロバッチ的に使うには最適化されていません。すぐに、Quota Exceeded : Load jobs per table という403エラーを見ることになると思います。)ストリーミング挿入は、 Fluentd を利用してサーバーサイドのログ更新を追いかけながら挿入したりと言うことが楽にできますが、諸事情により、どうしてもファイルでおきたい、と言う場合もあると思います。

今回はこのテンプレートを利用してCSVファイルを Cloud Storage に逐次アップロードし、 BigQuery に反映します。

下準備

まずはデータアップロード対象となる Cloud Storage バケットを作成します。 CLI から行いますが、 GUI から行っても構いません。バケット名は一意になるようにします。

# Cloud Storage バケットを Standard クラス、 asia-northeast1 リージョンで作成する <bucket-name> は自身の好きなユニークな値に変更してください$ gsutil mb -b on -c standard -l asia-northeast1 gs://<bucket-name>

次に、マイクロバッチ的にデータを投入する先のBigQueryテーブルを作成します。今回は keyvalue だけのシンプルなテーブルを作成しました。( GUI から行っても OK です)

# データセットの作成
$ bq mk --data_location=asia-northeast1 microbatch
# テーブルの作成 <project-id>は自身のGoogle CloudプロジェクトIDに置換してください
$ bq mk --table <project-id>:microbatch.table1 key:INTEGER,value:STRING

成功すると以下のようにテーブルが確認できるはずです。

# bq ls でmicrobatchデータセットの中のテーブル名を確認する
$ bq ls microbatch
tableId Type Labels Time Partitioning Clustered Fields
--------- ------- -------- ------------------- ------------------
table1 TABLE

ここまででデータ挿入先の準備は完了しました。次はマイクロバッチ的に挿入する CSV ファイルを用意してみましょう。(テキストエディタやCloud Shell Editor などでも OK です )

以下のような CSV ファイルを用意しておきます。input ディレクトリとして構成しました。 input/1.csv

1,value1
2,value2

input/2.csv

3,value3
4,value4

input/3.csv

5,value5

最後にエラーになりそうな CSV もおまけで用意しておきましょう。今までカンマ区切りだった CSV が突然スペース区切りでくる、リアルにありそうなケースです。 input/4_error.csv

6 value6

ここまででテストデータの準備ができました。ここから Dataflow テンプレート実行のための情報を用意します。 job_files としたディレクトリを作成し、以下のような BigQuery のスキーマ情報を作成します。

Text Files on Cloud Storage to BigQuery (Stream) テンプレートでは、Cloud Storageの所定のパスに配置されたファイルからデータを読み出し、1行ごとに Transform 用に定義された関数を実行します。そしてその関数の返り値を BigQuery に JSON 形式でデータを渡す、という仕組みになっています。これを用いて、 CSV をテーブルスキーマに変換することができます。

変換の中身を javascript で定義します。今回は、カンマ区切り CSV であるとして、以下のようにしてみました。

もっと複雑な変換を書くことももちろん可能です。改行なし JSON が1行ごとにあるパターンであれば、その内容をそのまま返す関数を作れば、 BigQuery にロードが可能です。

これらファイルの準備ができたら、 Cloud Storage の作成したバケットの配下に job_files ディレクトリとしてアップロードしておきましょう。

job_filesディレクトリとして、アップロードした

これで準備が完了です!

マイクロバッチ用のジョブを起動する

早速、ジョブを実行してみましょう。 Google Cloudの画面左メニューから Dataflow を選択し「テンプレートからジョブを作成」をクリックします。

Dataflowの画面から「テンプレートからジョブを作成」

表示された画面でジョブの情報を入力します。

Dataflow テンプレートで、パラメータを入力してジョブを実行する
  • GCS location of your JavaScript UDF : Cloud Storage 上の先ほどの作成した UDF へのパスを入力します。 gs://<bucket-name>/jobs_files/microbatch-csv-transform.js
  • GCS location of your BigQuery schema file, described as a JSON : Cloud Storage 上の先ほどの BigQuery スキーマファイルのパスを入力します。 gs://<bucket-name>/jobs_files/microbatch-csv-bq-schema.json
  • The name of Javascript function you with to call as your UDF : UDF の中で呼び出す関数を入力します。 split
  • Fully Qualified BigQuery table : BigQuery のテーブルへの完全パスを入力します。 <project-id>:microbatch.table1
  • The GCS location of the text you’d like to process : CSVをマイクロバッチ的に置いていきたいCloud Storageのパスを指定します。今回はワイルドカードを使って input ディレクトリの中のCSVを検知するようにしましょう。 gs://<bucket-name>/input/*.csv
  • Temporary directory for BigQuery Loading process : Dataflow が利用する中間ファイルのパスを設定します。 gs://<bucket-name>/temp/output-
  • 一時的なロケーション: Dataflow が利用する中間ファイルのディレクトリを設定します。 gs://<bucket-name>/temp

これらを入力し、「ジョブを実行」をクリックするとジョブが実行されます。これで準備は完了です!

ジョブが実行された

マイクロバッチ的にアップロードしてみる

まずは BigQuery のテーブルの中身に、まだデータがないことを確認してみましょう。先ほど作成した microbatch.table1 を確認します。

microbatch.table1にまだデータがないことがわかる

では早速ここに、 input/1.csv からデータをアップロードをしてみましょう。 Cloud Storage の gs:<bucket-name>/input/ ディレクトリに CSV をアップロードすると検知されます。以下は CLI で操作を行います。(もちろん、 Cloud Storage の GUI から行っても構いません)

# 先ほど作成したバケットの input ディレクトリに 1.csv をアップロードする
gsutil cp ./input/1.csv gs://<bucket-name>/input/
Copying file://./input/1.csv [Content-Type=text/csv].../ [1 files][ 17.0 B/ 17.0 B]Operation completed over 1 objects/17.0 B.

BigQuery のテーブルを選択し直すと、データの追加がすぐに反映されました。

データの追加がマイクロバッチ的に反映された

同様に 2.csv , 3.csv も追加します。

$ gsutil cp ./input/2.csv gs://<bucket-name>/input/
Copying file://./input/2.csv [Content-Type=text/csv]...
/ [1 files][ 17.0 B/ 17.0 B]
Operation completed over 1 objects/17.0 B.
$ gsutil cp ./input/3.csv gs://<bucket-name>/input/
Copying file://./input/3.csv [Content-Type=text/csv]...
/ [1 files][ 9.0 B/ 9.0 B]
Operation completed over 1 objects/9.0 B.

こちらもすぐに反映されました。

マイクロバッチ的に全CSVのデータが挿入できた

最後に、エラーが出るようにした CSV を追加してみましょう。

$ gsutil cp ./input/4_error.csv gs://<bucket-name>/input/
Copying file://./input/4_error.csv [Content-Type=text/csv]...
/ [1 files][ 9.0 B/ 9.0 B]
Operation completed over 1 objects/9.0 B.

データ挿入対象である table1 テーブルには反映されませんが、 table1_error_records テーブルにこのようなデータが反映されました。

table1_error_recordsテーブルにデータが挿入された

この table1_error_records テーブルは Dataflow テンプレートで自動作成される、デッドレター テーブルと呼ばれるものです。メッセージキューイングサービスにおけるデッドレターキューの概念に相当します。ここに、パースに失敗した不正なレコードが挿入されるようになっています。 payloadString カラムを見ると、 Dataflow から渡されたメッセージペイロードが記載されているので、これを利用してメッセージの復旧を行うことができます。今回は以下のようなクエリで整形し、データを table1 に復旧させてみましょう。

こんな感じでデータが挿入できました。デッドレター テーブルが BigQuery にあることで、 UDF (User Defined Function、ユーザー定義関数)や SQL ですぐにリカバリができます。

エラーのレコードも挿入できた

まとめ

今回は Dataflow テンプレートのText Files on Cloud Storage to BigQuery (Stream)を GUI 上から利用して BigQuery にマイクロバッチ的にデータを入れる方法について紹介しました。

Dataflow テンプレートは非常に気軽に使えるサーバーレスサービスで、今回設定しなかった以下のパラメーターを設定することで、データの処理量に応じて自動的にスケールアウト、スケールインをしてくれますので非常に楽です。

  • 最大ワーカー数 : スケールアウト時に許容する最大スケールアウト数
  • ワーカーの数 : 使用する初期ノード数、デフォルト 1
  • マシンタイプ : n1-standard-1 以上が利用可能、デフォルトはn1-standard-4 (コストをおさえたい時には変更しましょう)

また、今回は CSV としてパースしましたが、このテンプレートは行を読み込んでその行を JSON として BigQuery に渡せればいいので、改行なしJSONでも利用することができます。

ジョブのコードも公開されているのでカスタマイズして利用することも可能です。

マイクロバッチ的にデータを読ませたくなった場合にはぜひトライしてみてください。

BigQuery 的には正直サーバーサイドからのログ収集では fluentd を利用してストリーミングインサートする方が楽ではあると思います。そのような場合はこちらの記事を参考にすると良いでしょう。

Disclaimer: この記事は個人的なものです。ここで述べられていることは私の個人的な意見に基づくものであり、私の雇用者には関係はありません。

--

--

Yuta Hono
google-cloud-jp

Solutions Eng at @Google, @GoogleCloud. Opinions are my own, NOT the views of my employer. All posts here are my personal opinion.