みなさまこんにちわ!大西と申します。gcpugへの投稿は今回が初めての投稿です。

筆者は、普段、小売企業でデータエンジニアとして働いており、主に各種システムからBigQueryへデータをロードするパイプラインをComposerやDataflowで開発する業務をやっています。

皆さん、GCP上で各種デプロイを自動化する場合、Cloud Buildを活用されていると思います。筆者も、業務で開発しているDataflowジョブのデプロイを自動化する際に、まずCloud Buildだけを使ってお手軽にビルド・単体テスト・デプロイを自動化しようとしました。しかし、後述するDataflowジョブのデプロイ時の制約により、Cloud Buildだけではデプロイを完結できず、GKE上にKubernetes jobをデプロイし、VPC内からDataflowジョブをデプロイするという、ちょっとした工夫を入れる必要がありました。

本記事では、表題の通り、VPC内リソースにアクセスするDataflowジョブのデプロイをCloud BuildとGKEで自動化した際に苦労した点・各種TIPSなどを紹介します。

(注)

  • 本記事で紹介している内容は、DataflowがVPC内リソースにアクセスする際に考慮が必要なものであり、GCS、BigQuery、PubSubなどインターネットに公開されたサービスを利用する場合は、関係ないことにご注意ください。
  • 記事執筆時点でCloud BuildはVPC内へのアクセスに対応していません。Cloud Buildが対応した場合、GKEを利用する必要は無くなります。

記事の背景:DataflowジョブがVPC内リソースにアクセスする際、デプロイもVPC内でやる必要があった

まず、記事の背景となる、私が開発したDataflowジョブの概要を説明します。

Dataflowジョブデプロイ時の制約
  • 本ジョブは、Apache Beam Java SDKを使ってScalaで実装されており、VPC内に立てたKafkaクラスタ(GKE上に構築)に接続し、外部システムから受け取ったメッセージを一部加工し、BigQueryテーブルにストリーミングデータとして挿入するという処理を行います。
  • このKafkaクラスタは、インターネットに外部エンドポイント(Network Load Balancer)、VPC内部に内部エンドポイント(Internal TCP load balancer)を公開しています。外部システムはインターネット越しに外部エンドポイントへTLS接続し、Dataflowなど外部に公開する必要のないリソースはVPC内の内部エンドポイントに接続します。
  • ここで、Apache Beamのライブラリを用いて実装されたDataflowジョブは、デプロイされる際に、まずパイプラインのDAG(タスクの依存関係の有向グラフ)情報を構築し、各種エラーを確認した上で、DataflowのサービスにDAG情報が送信されて、GCEのワーカノードにジョブがデプロイされます。この時のエラーチェックの過程で、パイプライン上で依存しているリソース(本件の場合は、GCSバケット、BigQueryテーブル、KafkaブローカをデプロイしたGKEノード)に一通り接続が可能なことを確認する作業を行うため、Datflowジョブが内部エンドポイントを利用する場合、VPC外部からデプロイ不可となります。
  • (参考)Dataflowのデプロイの詳細については公式ドキュメントを参照ください。
  • 実際にデプロイ時の振る舞いを確認するため、VPC内外でデプロイできるを確認しています。まず、VPC内にGCEインスタンスを立て、その上でデプロイする場合は、内部エンドポイントおよび外部エンドポイント両方ともデプロイできました。VPC外部(例えば、ラップトップ)からデプロイする場合は、外部エンドポイントにしか接続できませんでした。

Dataflowジョブのデプロイプロセスの確認:Jarを実行する所だけVPC内で実行が必要

上記の通り、Kafkaブローカの内部エンドポイントに接続するには、VPC内からデプロイが必要であることが分かりました。デプロイの方法を検討する前にDataflowジョブのビルド・デプロイプロセスのうち、実際にどの部分はVPC外でも良くて、どこからVPC内でやる必要があるのか以下の通り、整理しました。
このプロセスにおいて、(1)と(2) はKafkaの内部エンドポイントに接続しないため、VPC外部でも実行できます。(3)は、エラーチェックの過程でVPC内にアクセスが必要なため、VPC内で実行する必要があります。
(注)もし、Apache Beamを用いたDataflowジョブのビルド・デプロイについて詳しくない場合は、Apache Beamのドキュメントを参照ください。

(1) コードをビルドする 不要

  • デプロイに必要な実行可能なJarファイルが生成される。

(2) 単体テストを実行する。

(3) (コンパイルと単体テストに成功した場合)実行可能Jarを実行する。 必要

  • Apache Beamを使ったアプリケーション開発では、依存ライブラリを全て梱包し、実行可能なJarを生成する
  • このJarファイルを実行すると、DAG情報(タスクの依存関係をグラフ表現したもの)を構築し、各種エラーチェックが行われる。
    エラーチェックには、依存するサービス(今回はGCS、BigQuery、GKE上のKafka)への接続確認も含まれる。
  • この部分はVPC内部で実行する必要がある。

(4) (エラーチェックに問題ない場合)DAG情報がGCP上のDataflowサービスに送信される。

  • 3が成功すれば4以降はDataflowサービス内で行われるため、ユーザは気にする必要はなし。

(5) Dataflowサービス内でDAGの最適化や実際のデプロイ処理が行われる(詳細は省略)。

VPC内でデプロイする方法の整理

一連のデプロイプロセスの整理により、ビルド結果の実行可能Jarファイルを実行する所だけ、VPC内部で実施する必要があることを確認しました。ここから実際のデプロイ方法として検討した案と最終的にGKEを使うことを選んだ理由を説明します。

(1) Jenkinsジョブでビルド・デプロイ

  • VPC内にJenkinsサーバを立てる。Github webhookでジョブを起動し、一連のビルド・デプロイを実施する。
  • チームが別のプロジェクトでJenkinsを使っており、Jenkinsに慣れていた。
  • しかし、サーバを保守する必要がある。

(2) デプロイ処理(jar実行)だけKubernetes jobで実施

  • Cloud Buildでビルド・単体テストを実行し、実行可能Jarファイルを生成する。
  • JarファイルをDockerイメージに入れて、Container Registryにpushする。
  • Cloud BuildからKubernetes jobを実行し、ジョブの中でJarを実行する。

まず、安直な方法として、VPC内のGCE上にJenkinsを立て、Webhookでジョブを起動し、一連のビルド・デプロイをやろうと考えました。しかし、せっかくマネージドサービスを駆使して、サーバ管理の労力を排除しようとしているのだから、それはやめようとなり、この案はボツとなりました。

次に、GKE上で単発のジョブを実行させる案を検討しました。Dataflowのデプロイを検討する時点で、すでにKafkaブローカをGKE上にデプロイしていたため、Kubernetesについては概要を把握しており、Kubernetesにはサーバプロセスをデプロイするほか、単発のジョブを実行させる機能があることも把握していました(リンクを参照)。この機能を利用すると、Jarを実行する部分をVPC内で実行できる上、ジョブを実行するときだけリソースを確保するため、サーバプロセスを監視する必要もありません。

また、本件以外にアプリケーションのデプロイをVPC内で行う必要が出た場合に同じ方法を採用できそうだと考え、Kubernetes jobでデプロイする案を採用しました。

Cloud BuildとGKEを使ったデプロイ方法

ここからは実際にどうやってデプロイしているかご紹介します。全体の流れは以下の図の通りです。本アプローチはGKEのチュートリアル「Cloud Build を使用した GitOps スタイルの継続的デリバリー」を参考にしています。

  • (1〜3)Cloud Buildのジョブトリガーでブランチの変更を検知すると、ビルドジョブが実行され、Dataflowジョブのデプロイに必要な成果物一式がDockerイメージ化し、Container Registryにpushされる。
  • (4〜6)kubectlにKubernetesマニフェストファイルを指定し、Dataflowジョブデプロイ用のKubernetesジョブをGKEクラスタ上にデプロイする。
  • (7〜9)Kubernetesジョブ内では、実行可能Jar(Apache Beamのビルド結果)を実行し、Dataflowジョブが実際にデプロイされる。
Cloud BuildとGKEを使ったDataflowのデプロイプロセス

次に各ステップでのジョブ実行に必要な成果物を紹介します。

(1) PRをマージする〜(2)ジョブトリガーでジョブを起動する

Cloud Buildは、ジョブトリガーという機能を使い、リポジトリのブランチに対する変更を検知して、ジョブを起動することができます。ジョブトリガーについては、GCPのドキュメントを参照ください。

筆者は、terraformでジョブトリガーを作成しています。その場合、GCPのプラグインを用いて、まず以下のようなモジュールを作成します。

  • 複数のジョブ(DAG)に対応するため、各ジョブの設定を配列としてまとめたdag_listを渡し、各ジョブに対してトリガーを作成するようにしています。
  • trigger_templateが、ジョブトリガーの設定箇所です。ここでは、トリガーを設定したい、リポジトリとブランチを指定しています。
  • substitutionsは、環境に応じて可変の箇所を変数値で置き換えるために使います。開発環境・環境・本番環境と同じビルド手順を使いたい場合は、CloudBuildのビルド手順を記述するYAMLファイルは共通にし、substitutionsで変数値を置き換えると良いでしょう。詳細はGCPのドキュメントを参照ください。
resource "google_cloudbuild_trigger" "composer_build_trigger" {
count = length(var.dag_list)
description = "Push to ${var.dag_list[count.index]["branch_name"]} branch"
included_files = var.triggerlist

trigger_template {
branch_name = var.dag_list[count.index]["branch_name"]
repo_name = var.dag_list[count.index]["repo_name"]
}

substitutions = {
_BOOTSTRAP_SERVERS = var.bootstrap_servers
_PROJECT = var.dag_list[count.index]["project"]
_BUILD_IMAGE_NAME = "${var.dag_list[count.index]["dataflow_job_name"]}-build"
_CLOUDSDK_COMPUTE_ZONE = var.zone
_CLOUDSDK_CONTAINER_CLUSTER = var.gke_cluster_name
_NETWORK = var.network
_SUBNETWORK = var.subnetwork
_DATAFLOW_JOB_NAME = var.dag_list[count.index]["dataflow_job_name"]
_K8S_JOB_NAME = "${var.dag_list[count.index]["dataflow_job_name"]}-deploy"
_SCHEMA_REGISTRY_URL = var.schema_registry_url
_SERVICE_ACCOUNT = var.dag_list[count.index]["sevice_account"]
_SOURCE_BUCKET = var.dag_list[count.index]["source_bucket"]
}

filename = "cloudbuild.yaml"
}

環境に合わせた値を指定している箇所は以下の通りです。DAGによって可変の箇所はリストで指定し、共通設定は配列外で指定しています。

module "cloudbuild-dataflow-stream-dags-deployment" {
source = "path/to/modules/cloudbuild-dataflow-stream-dags-deployment"
service_name = "dags-sync"
zone = "asia-northeast1"
gcp_project = local.gcp_project
env = "dev"
triggerlist = ["**"]
gke_cluster_name = "kafka"
bootstrap_servers = "kafka.example.com\\:9090"
schema_registry_url = "https\\:\\/\\/schema.example.co,"
network = "default"
subnetwork = "default"
kms_location = local.region
dag_list = [
{
repo_name = "your-repo-name1"
branch_name = "develop"
project = "your-project"
dataflow_job_name = "your-job-name1"
sevice_account = "sa-name@your-project.iam.gserviceaccount.com"
source_bucket = "you-bucket"
},
{
repo_name = "your-repo-name2"
branch_name = "develop"
project = "your-project"
dataflow_job_name = "your-job-name2"
sevice_account = "sa-name@your-project.iam.gserviceaccount.com"
source_bucket = "you-bucket"
}
]
}

(3)Dataflowジョブのデプロイ用のDockerイメージをpushする〜(6)実際にKubernetesジョブをデプロイする

Cloud Buildのジョブトリガーの設定で、Cloud Buildのビルド手順の設定ファイルをcloudbuild.yamlと指定しました。これはビルド対象のリポジトリのルートフォルダに左記のファイル名で設定ファイルを置くこと指定しています。

  1. デプロイ用スクリプトを生成する・・・リポジトリには、Jarファイルを実行する際のオプションをテンプレート化したデプロイスクリプトを用意しています。環境ごとに可変な変数値はCloud Buildのsubstitutionsで提供するため、ビルド手順の中で実際の値に置き換えます。
  2. Dataflowジョブデプロイ用のイメージをビルドする・・・Kubernetesジョブとしてデプロイするには、コンテナイメージに固める必要があるため、デプロイに必要なファイル一式を含むイメージを作成します。
  3. Container Registryへイメージをpushする・・・ビルドしてできたイメージをpushします。
  4. Kubernetesジョブのマニフェストファイルを生成する・・・Kubernetesジョブのマニフェストファイルはジョブに関する設定ファイルです。マニフェストファイルもリポジトリにはテンプレートのみ置いており、ここでは環境あるいはコミットによって異なる値を置き換える処理をやり、実際のデプロイに使えるマニフェストファイルを生成しています。
  5. DataflowジョブをデプロイするKubernetesジョブを起動する・・・kubectl applyにマニフェストファイルを指定し、Kubernetesジョブをデプロイします。
  6. ジョブが完了するまで待機する
  7. ジョブのリソースを削除する・・・ジョブ実行完了後もPodが残り続けるため、完了後は明示的にジョブを削除します。
steps:
# デプロイ用のスクリプト(後述)を生成する
- name: 'gcr.io/cloud-builders/gcloud'
id: Generate deploy script
entrypoint: /bin/sh
args:
- '-c'
- |
sed "s/PROJECT/${PROJECT_ID}/g" scripts/dataflow/deploy-update.sh.tpl | \
sed "s/DATAFLOW_JOB_NAME/${_DATAFLOW_JOB_NAME}/g" | \
sed "s/SERVICE_ACCOUNT/${_SERVICE_ACCOUNT}/g" | \
sed "s/SOURCE_BUCKET/${_SOURCE_BUCKET}/g" | \
sed "s/BOOTSTRAP_SERVERS/${_BOOTSTRAP_SERVERS}/g" | \
sed "s/SUBNETWORK/${_SUBNETWORK}/g" | \
sed "s/NETWORK/${_NETWORK}/g" | \
sed "s/SCHEMA_REGISTRY_URL/${_SCHEMA_REGISTRY_URL}/g" > scripts/dataflow/deploy-update.sh
# Dataflowジョブデプロイ用のイメージをビルドする
- name: 'gcr.io/cloud-builders/docker'
id: Build image
args:
- 'build'
- '-t'
- 'gcr.io/$PROJECT_ID/${_BUILD_IMAGE_NAME}:$SHORT_SHA'
- '.'

# Container Registryへイメージをpushする
- name: 'gcr.io/cloud-builders/docker'
id: Push image
args:
- 'push'
- 'gcr.io/$PROJECT_ID/${_BUILD_IMAGE_NAME}:$SHORT_SHA'

# Kubernetesジョブのマニフェストファイルを生成する
- name: 'gcr.io/cloud-builders/gcloud'
id: Generate manifest
entrypoint: /bin/sh
args:
- '-c'
- |
sed "s/GCP_PROJECT/${PROJECT_ID}/g" kubernetes.yaml.tpl | \
sed "s/BUILD_IMAGE_NAME/${_BUILD_IMAGE_NAME}/g" | \
sed "s/COMMIT_SHA/${SHORT_SHA}/g" > kubernetes.yaml

# DataflowジョブをデプロイするKubernetesジョブを起動する
- name: 'gcr.io/cloud-builders/kubectl'
id: Deploy
args:
- 'apply'
- '-f'
- 'kubernetes.yaml'
env:
- 'CLOUDSDK_COMPUTE_ZONE=${_CLOUDSDK_COMPUTE_ZONE}'
- 'CLOUDSDK_CONTAINER_CLUSTER=${_CLOUDSDK_CONTAINER_CLUSTER}'

# ジョブが完了するまで待機する
- name: 'gcr.io/cloud-builders/kubectl'
id: Wait
args:
- 'wait'
- '--for=condition=complete'
- '--timeout=600s'
- 'job/${_K8S_JOB_NAME}'
env:
- 'CLOUDSDK_COMPUTE_ZONE=${_CLOUDSDK_COMPUTE_ZONE}'
- 'CLOUDSDK_CONTAINER_CLUSTER=${_CLOUDSDK_CONTAINER_CLUSTER}'

# ジョブのリソースを削除する
- name: 'gcr.io/cloud-builders/kubectl'
id: Clean up
args:
- 'delete'
- 'job'
- '${_K8S_JOB_NAME}'
env:
- 'CLOUDSDK_COMPUTE_ZONE=${_CLOUDSDK_COMPUTE_ZONE}'
- 'CLOUDSDK_CONTAINER_CLUSTER=${_CLOUDSDK_CONTAINER_CLUSTER}'

cloudbuild.yamlの最初に生成しているデプロイ用のスクリプトのテンプレートは以下のようなものを使っています。英大文字で書かれた箇所が環境によって可変であり、Cloud Buildによって実際の値に置換されます。Apache BeamコードのDataflowへのデプロイの詳細については、Apache Beamの公式ドキュメントを参照ください。

#!/usr/bin/env bash
#dataflow-update.sh.tpl
region=asia-northeast1
runner=DataflowRunner
appVersion=0.1.0
scalaVersion=2.12
java -jar target/scala-${scalaVersion}/app-name-${appVersion}.jar \
--region=${region} \
--runner=${runner} \
--project=DATAFLOW_PROJECT \
--jobName=DATAFLOW_JOB_NAME \
--serviceAccount=SERVICE_ACCOUNT \
--sourceBucket=SOURCE_BUCKET \
--tempLocation=gs://SOURCE_BUCKET/JOB_NAME/tmp \
--bootstrapServers=BOOTSTRAP_SERVERS \
--network=NETWORK \
--subnetwork=https://www.googleapis.com/compute/v1/projects/DATAFLOW_PROJECT/regions/${region}/subnetworks/SUBNETWORK \
--schemaRegistryUrl=SCHEMA_REGISTRY_URL \
--streaming=true \
--experiments=enable_stackdriver_agent_metrics \
--update

そしてKubenetesジョブで使うDockerイメージをビルドするDockerfileは以下の通りです。筆者はScalaでDataflowのコードを書いており、ベースイメージとしてsbtを使っていますが、公式のサンプルと同様にMavenを使うことももちろん可能です。

FROM mozilla/sbt:8u212_1.2.8
COPY ./project/ ./project
COPY ./scripts/ ./scripts
COPY ./src/ ./src
COPY ./build.sbt .
RUN sbt clean assembly
FROM openjdk:8u222-jdk
ARG SCALA_VERSION="2.12"
ARG APP_VERSION="0.1.0"
COPY --from=0 /target/scala-$SCALA_VERSION/app-name-$APP_VERSION.jar ./target/scala-$SCALA_VERSION/
COPY --from=0 /scripts/ ./scripts

最後にKubernetesジョブのデプロイに使うマニフェストファイルのテンプレートは以下の通り。英大文字の箇所はCloud Buildによって実際の値に置換されます。実際にやっていることは、前述で紹介したシェルスクリプトを実行することのみです。

apiVersion: batch/v1
kind: Job
metadata:
name: your-job-name1-deploy
spec:
# automatically clean up finished job
ttlSecondsAfterFinished: 0
template:
metadata:
name: your-job-name1-deploy
spec:
containers:
- name: your-job-name1-deploy
image: gcr.io/GCP_PROJECT/BUILD_IMAGE_NAME:COMMIT_SHA
command: ["bash"]
args: ["scripts/dataflow/deploy-update.sh"]
# Do not restart containers after they exit
restartPolicy: Never

まとめ

本記事では、VPC内のリソースにアクセスするDataflowジョブはVPC内でしかデプロイを完結できないという制約を克服するため、Cloud BuildからVPC内にKubernetesジョブを起動し、そのジョブからデプロイを行うというアプローチを取ることで、JenkinsなどCIサーバを管理せずに、比較的に簡単なCI/CDパイプラインを構築できることを紹介しました。

今回のアプローチは、Dockerイメージさえ作ってしまえば、Kubernetes上で好きなジョブを実行させることができるため、かなり自由度の高いアプローチだと言えます。読者のみなさまも、ぜひ活用ください!もし本記事がみなさまの日々の開発の助けになれば幸いです。

--

--