BigQuery Connector for AWS Glueを使用したS3からBigQueryへのデータ書き込み
Google BigQuery Connector for AWS Glue を使って AWS 上にあるデータを BigQuery に書き込む要件があり、公式ドキュメントなどはBigQueryからAWS環境へのデータ転送の内容が多く、手順が少し異なる為記載します。
要件
・S3に配置されたCSVファイルをBigQueryに書き込む。
・書き込み時カラムの型を指定する
・S3に配置ファイルは処理後する
実施概要
- BigQuery Coonnectorを使用したGlue Connectionを作成
- GCSバケットの準備
- Glue Jobの作成
なお、GCPのサービスアカウントおよびI AMロールの準備が
必要となりますが、大まかな説明のみとし詳細は割愛します。
BigQuery 接続用のクレデンシャルを登録
BigQuery 接続に必要なサービスアカウントのクレデンシャル JSON を Secrets Manager に登録します。
サービスアカウントで発行したJSONをBASE64にエンコードし登録を行います。
BigQuery 用の AWS Glue Connection 作成
Connector や Glue のバージョンは以下となります
・AWS Glue 4.0 — Supports spark 3.3, Scala 2, Python 3
・Google BigQuery Connector 0.24.2 for AWS Glue 3.0
Marketplaceから取得する方法とconnectorの作成方法は
公式ページを参照し作成します。
GCSバケットの作成
サービスアカウントがGCSバケットを操作する権限を有している事、
リージョンがBigQueryと同じであることが必要です。
すでに別用途で使用しているバケットの場合、オブジェクトの削除保護が
設定されていない事を確認をします。
Glue Jobの作成
Visual Editor でジョブを作成します。
CSVのカラムはGlueでは全てstringと認識されるため、BigQueryにデータを入れる前に型変換を行います。
デフォルトでApply Mappingが設定されるのでそのまま使用します
S3バケット設定
S3のバケットにすでにCSVファイルが配置されていれば、読み込みを行い
OutPut Shemaタブに反映されます。なお、CSVは文字コードがUTF-8以外
はBigQueryに書き込みが行なえない為、ファイル配置より前に文字コード
の変換が必要です。
ApplyMapping設定
BigQueryはテーブルが存在しない場合は作成してくれますが、
書き込む予定のテーブルが存在する場合は型をあわせる必要があります。
また、parquetファイルに変換し書き込みを行う事が原因で、
DATITIME型が使用できないため、DATE型もしくはTIMESTAMP型を
選択します。この部分については深く調査していない為解決方法は
あるかもしれません。
BigQuery Connector設定
作成したconnectionを選択し、Connection optionsに以下を登録します
・parentObject・・・BigQueryプロジェクト名
・temporaryGcsBucket・・・GSCバケット名
・table ・・・「データセット.テーブル名 」の形式で入力
Job detailsタブでジョブ名前を入力し、IAMロールに作成した
ジョブロールを選択し保存します。
スクリプトの修正
Visual Editorで作成したジョブのスクリプトをさらに直接編集し完成させます。
■修正前に
BigQuery への 書き込みはサービスアカウントのクレデンシャル
を配置した Secrets Manager を使ってくれません。別途 S3 に配置する
JSON ファイルをなぜか参照します。
connectorを作成する為に Secrets Managerに便宜上配置しただけとなり
IAMで許可しているS3にBase64にエンコードしない状態でアップロード
しておきます。
■Glue修正
スクリプトを直接編集し保存すると、Visual Editorは使用できなくなります。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
★追加from pyspark.conf import SparkConf
★追加conf = SparkConf()
★追加conf.set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
★追加conf.set("spark.hadoop.fs.gs.auth.service.account.enable", "true")
★追加conf.set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "S3に配置したjsonファイル名")
★追加sc = SparkContext.getOrCreate(conf=conf)
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
★削除sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
以下省略
spark.hadoop.google.cloud.auth.service.account.json.keyfile内の
S3に配置するjsonファイルについてはパスはスクリプト内には記載せず、
Job Detail内のAdvanced propertiesにあるReferenced files pathに記載
します。
また今回はBigQueryに書き込み後、使用したファイルを削除する為、
末尾に以下を追加します。(削除するファイルがあるディレクトリを指定)
glueContext.purge_s3_path("s3://hoge/huga/", options={"retentionPeriod":0}, transformation_ctx="")
最後に
AWSとBigQueryのデータ転送は様々な場面で使用されますが、転送状況の
管理が煩雑になりがちで。AWSとBigQurery別々で処理を監視しなければ
ならない事もあります。
今回の転送方式を使用し、S3へのCSVファイル配置処理からGlueの処理
までの一連の流れをStepFunctionで実行する事で、処理エラー時の検知や
復旧が容易になります。