BigQuery Connector for AWS Glueを使用したS3からBigQueryへのデータ書き込み

永井豪
paronym
Published in
8 min readAug 6, 2023

Google BigQuery Connector for AWS Glue を使って AWS 上にあるデータを BigQuery に書き込む要件があり、公式ドキュメントなどはBigQueryからAWS環境へのデータ転送の内容が多く、手順が少し異なる為記載します。

要件

・S3に配置されたCSVファイルをBigQueryに書き込む。

・書き込み時カラムの型を指定する

・S3に配置ファイルは処理後する

実施概要

  1. BigQuery Coonnectorを使用したGlue Connectionを作成
  2. GCSバケットの準備
  3. Glue Jobの作成

なお、GCPのサービスアカウントおよびI AMロールの準備が

必要となりますが、大まかな説明のみとし詳細は割愛します。

BigQuery 接続用のクレデンシャルを登録

BigQuery 接続に必要なサービスアカウントのクレデンシャル JSON を Secrets Manager に登録します。

サービスアカウントで発行したJSONをBASE64にエンコードし登録を行います。

BigQuery 用の AWS Glue Connection 作成

Connector や Glue のバージョンは以下となります

・AWS Glue 4.0 — Supports spark 3.3, Scala 2, Python 3

・Google BigQuery Connector 0.24.2 for AWS Glue 3.0

Marketplaceから取得する方法とconnectorの作成方法は

公式ページを参照し作成します。

GCSバケットの作成

サービスアカウントがGCSバケットを操作する権限を有している事、

リージョンがBigQueryと同じであることが必要です。

すでに別用途で使用しているバケットの場合、オブジェクトの削除保護が

設定されていない事を確認をします。

Glue Jobの作成

Visual Editor でジョブを作成します。

CSVのカラムはGlueでは全てstringと認識されるため、BigQueryにデータを入れる前に型変換を行います。

デフォルトでApply Mappingが設定されるのでそのまま使用します

S3バケット設定

S3のバケットにすでにCSVファイルが配置されていれば、読み込みを行い

OutPut Shemaタブに反映されます。なお、CSVは文字コードがUTF-8以外

はBigQueryに書き込みが行なえない為、ファイル配置より前に文字コード

の変換が必要です。

ApplyMapping設定

BigQueryはテーブルが存在しない場合は作成してくれますが、

書き込む予定のテーブルが存在する場合は型をあわせる必要があります。

また、parquetファイルに変換し書き込みを行う事が原因で、

DATITIME型が使用できないため、DATE型もしくはTIMESTAMP型を

選択します。この部分については深く調査していない為解決方法は

あるかもしれません。

BigQuery Connector設定

作成したconnectionを選択し、Connection optionsに以下を登録します

・parentObject・・・BigQueryプロジェクト名

・temporaryGcsBucket・・・GSCバケット名

・table ・・・「データセット.テーブル名 」の形式で入力

Job detailsタブでジョブ名前を入力し、IAMロールに作成した

ジョブロールを選択し保存します。

スクリプトの修正

Visual Editorで作成したジョブのスクリプトをさらに直接編集し完成させます。

■修正前に

BigQuery への 書き込みはサービスアカウントのクレデンシャル

を配置した Secrets Manager を使ってくれません。別途 S3 に配置する

JSON ファイルをなぜか参照します。

connectorを作成する為に Secrets Managerに便宜上配置しただけとなり

IAMで許可しているS3にBase64にエンコードしない状態でアップロード

しておきます。

■Glue修正

スクリプトを直接編集し保存すると、Visual Editorは使用できなくなります。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
★追加from pyspark.conf import SparkConf

★追加conf = SparkConf()
★追加conf.set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
★追加conf.set("spark.hadoop.fs.gs.auth.service.account.enable", "true")
★追加conf.set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "S3に配置したjsonファイル名")

★追加sc = SparkContext.getOrCreate(conf=conf)

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
★削除sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

以下省略

spark.hadoop.google.cloud.auth.service.account.json.keyfile内の

S3に配置するjsonファイルについてはパスはスクリプト内には記載せず、

Job Detail内のAdvanced propertiesにあるReferenced files pathに記載

します。

また今回はBigQueryに書き込み後、使用したファイルを削除する為、

末尾に以下を追加します。(削除するファイルがあるディレクトリを指定)

glueContext.purge_s3_path("s3://hoge/huga/", options={"retentionPeriod":0}, transformation_ctx="")

最後に

AWSとBigQueryのデータ転送は様々な場面で使用されますが、転送状況の

管理が煩雑になりがちで。AWSとBigQurery別々で処理を監視しなければ

ならない事もあります。

今回の転送方式を使用し、S3へのCSVファイル配置処理からGlueの処理

までの一連の流れをStepFunctionで実行する事で、処理エラー時の検知や

復旧が容易になります。

--

--