Cloud Data Fusion を最速で触ってみる

Yuki Nagae
google-cloud-jp
Published in
17 min readApr 11, 2019

Cloud Data Fusion とは?

Cloud Data Fusion とはGoogle Cloud Next 2019でアナウンスされた、フルマネージドのデータ統合プラットフォームです。

今回の新サービスに関する情報は以下の公式ブログが詳しいです。

参照: From data ingestion to insight prediction: Google Cloud smart analytics accelerates your business transformation | Google Cloud Blog

今回やること

Quickstart | Cloud Data Fusionの通りにやってみます。

内容としては

  1. Cloud StorageにあるJSONファイル(本のベストセラーに関するデータ)を読み込む
  2. データをパースして綺麗にする
  3. 先週出版されて評価が高かった$25以下の本のデータをBigQueryに投入

です。

事前に必要なこと

  • プロジェクトの用意(既存のプロジェクトを利用することもできますが、試した後に削除しやすいので新規にプロジェクト作成をオススメします)
  • プロジェクトのbillingが有効であること
  • Cloud Data Fusion APIが有効であること

インスタンスの作成

Cloud Data Fusionのページにアクセスし、「CREATE AN INSTANCE」をクリックします。

適当な Instance name を入力して「CREATE」します。

インスタンス作成に20分程度かかるらしいのでゆっくり待ちましょう。 (実際、13分かかりました)

パーミッションを設定する

「Instance name(上記例では data-fusion-1 )」をクリックし、「Service Account」の値を確認。

以下の場合だと、cloud-datafusion-management-sa@yd9c10377068300f0-tp.iam.gserviceaccount.com です。

IAM & adminサービスでパーミッションを設定します。以下画面で「ADD」をクリックします。

  • 「New members」に先程確認した「Service Account」の値を設定します。
  • この場合は、 cloud-datafusion-management-sa@yd9c10377068300f0-tp.iam.gserviceaccount.com です。
  • 「Role」には Cloud Data Fusion API Service Agent を設定します。
  • 「Save」をクリックします。

注: これの手順を忘れていたせいで30分ほど時間を無駄にしてしまった。どのようなエラーになるかは最下段の「おまけのエラー1」を参照してください。ドキュメントをちゃんと読むことが大事ですね。

IPのQuotaの上限を増やす

この手順は公式ドキュメントに記載ないです。この手順を行うとご使用のメールアドレスに確認メールがきますが、その内容によると申請が通るのに早くても2〜3日かかるらしいですが、運がよければ数時間以内に結果がきます。(※もしこの手順がめんどくさいユーザーは、後述の「(おまけ手順)「IPのQuotaの上限を増やす」の申請がめんどくさいユーザー向け」を実施してください)

  1. IAM & adminサービスから「Quota」を選択
  2. 対象のService(※この場合は Compute Engine API(In-use IP addresses) で、かつインスタンス作成したlocationである us-west1を選択)
  3. 「EDIT QUOTAS」をクリックします

ほしいquotaの数を入力し、「Submit request」をクリックします。

注: これの手順がないとどのようなエラーになるかは最下段の「おまけのエラー2」を参照してください。

Cloud Data FusionのGUIを見てみる

Cloud Data Fusionサービスに戻り、「Instance name(上記例では data-fusion-1 )」をクリックし、Instance URLの「View Instance」をクリックするとGUIに遷移します。

初回なので何もありませんね。

サンプルのpipelineをデプロイする

すでに Cloud Data Fusion Hub という仕組みでpipelineが共有されているみたいなので、そのサンプルをそのまま使います。

Cloud Data FusionのGUI画面から、「HUB」をクリックします。

色々選べるみたいですね。

左のサイドバーの「Pipelines」をクリックします。

Cloud Data Fusion Quickstart を選び、「Create」をクリックします。

BEST SELLER と書いてあるとおり、本のベストセラーデータのようです。

pipelineに適当な名前をつけ、「Finish」をクリックします。

pipelineができました! 次に「Customize Pipeline」をクリックします。

GUIのクールな画面が出てきましたね。この画面のことを Data Fusion Studio と呼びます。

pipelineは作成済みですが、まだデプロイされていません。右上の「Deploy」をクリックしましょう。

デプロイ完了しました。

(おまけ手順)「IPのQuotaの上限を増やす」の申請がめんどくさいユーザー向け

「IPのQuotaの上限を増やす」の申請がめんどくさいユーザーは以下の設定のworker数を減らせば大丈夫です。

  1. Data Fusion Studio上部の「Configure」をクリックします
  2. 「Compute config」から「Customize」をクリックします

Worker Nodesの「Number of Workers」の値を小さい数字(※7以下の値なら問題ないはずです。Master NodeとWorker Nodesの総計が、許可されているIP数以下になるようにします。本記事では許可されているIPは8を想定しています)を設定し、「Done」をクリックします。

pipelineを実行する

準備は万端なので、試しに実行してみましょう。上部の「Run」をクリックします。 実行すると、 Cloud Dataprocクラスターがプロビジョニングされてpipelineが実行されるようです。 (Apache Hadoop MapReduce もしくは Apache Spark選んで使用することができます)

「Status」が Succeeded になっていれば成功です!

BigQueryで結果を確認する

結果をBigQueryで確認しましょう。以下のようなテーブルが新規作成されているはずです。

プレビューを見ると、以下のような結果が格納されています。

Good job!

おまけのエラー1

「パーミッションを設定する」の手順を忘れていたせいで、pipelineの「Run」でエラーになってしまった。 その一部始終をご覧ください。

あれ、Failedしてしまいました汗。

Logsを見てみましょう。

以下のエラーを見ると、「compute.firewalls.list」のpermissionがないためにエラーになっているように見えます。

com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden
{
"code" : 403,
"errors" : [ {
"domain" : "global",
"message" : "Required 'compute.firewalls.list' permission for 'projects/cloud-data-fusion-test'",
"reason" : "forbidden"
} ],
"message" : "Required 'compute.firewalls.list' permission for 'projects/cloud-data-fusion-test'"
}

原因がわからず30分ほど右往左往する。エラーメッセージでgoogle検索しても原因がわからない。

おまけのエラー2

別のエラー: PROVISION task failed in REQUESTING_CREATE state for program run program_run:default.DataFusionQuickstart.-SNAPSHOT.workflow.DataPipelineWorkflow.05950624–5c12–11e9–8aba-129dddfa0155.

com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Insufficient 'IN_USE_ADDRESSES' quota. Requested 11.0, available 8.0.
at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:49) ~[na:na]
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72) ~[na:na]
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60) ~[na:na]
at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:95) ~[na:na]
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:61) ~[na:na]
at com.google.common.util.concurrent.Futures$4.run(Futures.java:1123) ~[com.google.guava.guava-13.0.1.jar:na]
at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:435) ~[na:na]
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:900) ~[com.google.guava.guava-13.0.1.jar:na]
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:811) ~[com.google.guava.guava-13.0.1.jar:na]
at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:675) ~[com.google.guava.guava-13.0.1.jar:na]
at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:492) ~[na:na]
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:467) ~[na:na]
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:41) ~[na:na]
at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:684) ~[na:na]
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:41) ~[na:na]
at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:392) ~[na:na]
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:475) ~[na:na]
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63) ~[na:na]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:557) ~[na:na]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:478) ~[na:na]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:590) ~[na:na]
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[na:na]
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[na:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_212]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_212]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_212]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_212]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_212]
Caused by: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Insufficient 'IN_USE_ADDRESSES' quota. Requested 11.0, available 8.0.
at io.grpc.Status.asRuntimeException(Status.java:526) ~[na:na]
... 19 common frames omitted

IAM & adminの「Quota」を確認(※us-west1 のlocationだけ確認)すると、Limitが8になっています。

つまり、Requested 11.0, available 8.0. というエラーの意味は、IPを11個要求したが使用可能なIPが8個しかないということです。

--

--