Apache Hadoop のデータを BigQuery で分析するための移行手順

Keiji Yoshida
google-cloud-jp
Published in
24 min readDec 1, 2019

--

この記事は Google Cloud Japan Customer Engineer Advent Calendar 2019 の 2 日目の記事です。

TL;DR

  • Apache Hadoop のデータを BigQuery で分析できるようにするための移行手順をご紹介します。
  • Google Cloud が提供する、フルマネージドでサーバレスなデータ ウェアハウスである BigQuery を活用することで、インフラやミドルウェアの運用保守作業を行う必要がなく、データ分析作業に専念できるようになります。
  • (個人的な意見ですが)オンプレミスで Apache Hadoop クラスタを運用している場合、サーバの調達や、ミドルウェアのインストール、各種リソースの使用率のモニタリング、パフォーマンス チューニングなどの運用保守作業が定期的に発生し、効率的にデータ分析環境を運用することができない、といった課題があるかと思います。もし、そのような課題感をお持ちでしたら、今回ご紹介する方法で、BigQuery を活用したフルマネージドでサーバレスなデータ分析環境を Google Cloud で構築することを、ご検討いただけましたら幸いです。

はじめに

Google Cloud の Data Analytics Specialist の吉田 啓二と申します。お客様のデータ分析に関する技術的な、あるいはビジネス上の課題を、BigQuery を中心に構成される Google Cloud のデータ分析ソリューションを提供して解決することを、主な業務内容として行っております。

今年、数社のお客様に対して、Apache Hadoop のデータを、Google Cloud のデータ ウェアハウスである BigQuery で分析できるようにするための支援を行っておりました。本記事では、その内容をまとめまして、Apache Hadoop のデータを BigQuery で分析できるようにするための移行手順をご紹介します。

BigQuery の特徴

Apache Hadoop から BigQuery への移行手順をご紹介する前に、まずは、BigQuery の主な特徴をご説明します。

BigQuery の主な特徴

エンタープライズ データ ウェアハウス

BigQuery は Google Cloud が提供するエンタープライズ向けのデータ ウェアハウスです。様々な構造化データをテーブル形式で蓄積し、SQL を実行してデータの集計や分析を行ったり、また、スプレッドシートデータポータルLooker などのサービスと連携してデータの可視化を行ったりすることができます。

ペタバイト規模のスケール

BigQuery では、データを保管するストレージと、クエリを実行するコンピュート クラスタが、それぞれ独立して分かれています。ストレージとコンピュート クラスタはそれぞれ、データのボリュームや処理負荷に応じてスケールするようになっており、ペタバイト規模のデータを蓄積したり、検索したりすることが可能になっています。

BigQuery のアーキテクチャの概要

機密性、可用性、耐久性

BigQuery に保存されるデータは全て、自動的に暗号化されるようになっています。また、Cloud Data Loss Prevention を使用して、BigQuery に機密情報が保存されていないかを定期的、自動的に検査することができます。

BigQuery のコンピュート クラスタでは、クエリの実行時に複数のワーカー コンテナが起動し、それらのワーカー コンテナで分散して、クエリの処理が実行されるようになっています。万一、一部のゾーンやワーカー コンテナで障害が発生しても、自動的に別のワーカー コンテナに処理が割り当てられ、全体の処理としては継続するようになっており、それによって高い可用性が実現されています。

BigQuery のクエリ実行

BigQuery のストレージでは、データは全て、ゾーンをまたいで複製されて保存されるようになっています。それによって高い耐久性が実現されています。さらに、データセットを別のリージョンに定期的にコピーすることも可能になっています。また、BigQuery では、データの変更履歴が一定期間保存されるようになっており、特定の時点のテーブルのデータを参照することができるようになっています。

BigQuery のストレージ

フルマネージドでサーバレス

前述の通り、BigQuery のストレージおよびコンピュート クラスタは、データのボリュームや処理負荷に応じて、自動的にスケールするようになっています。そのため、BigQuery のユーザは、クラスタやインフラ、ミドルウェアなどを管理する必要がなく、データの分析作業に専念することができるようになっています。

従来のデータウェア ハウスと BigQuery の比較

ストリーミング データのリアルタイム分析

BigQuery には、ストリーミング データをリアルタイムに登録する機能があります。登録されたデータは、すぐにクエリで検索できるようになっており、ストリーミング データをリアルタイムに集計、分析することが可能になっています。

ML 機能の標準搭載

BigQuery には BigQuery ML という、BigQuery のデータおよび SQL を使用して、機械学習モデルを構築して予測を行うことができる機能があります。また、AutoML Tables という、BigQuery のデータセットをもとに、GUI の操作だけで機械学習モデルを構築して予測を行うことができるサービスもあります。これらの機能やサービスを利用することで、機械学習モデルの構築と予測を、BigQuery 上で完結して行うことが可能となっています。

BigQuery ML の概要

高速インメモリの BI エンジン

BigQuery には、BigQuery BI Engine という、BigQuery のデータをメモリにキャッシュして、高速にクエリの結果を返す機能があります。データポータルなどの BI ツールにおいて、複数人のユーザが同時にアクセスした場合にも、レポートやダッシュボードを高速に描画することが可能になります。

BigQuery BI Engine の概要

BigQuery が解決する課題

前述の通り、BigQuery を使用することの主なメリットとしては、データ ウェアハウスのクラスタやインフラ、ミドルウェアを管理する必要なく、大規模データの処理、分析作業に専念できる、ということがあります。

オンプレミスで Apache Hadoop クラスタを運用している場合は、サーバの調達、ミドルウェアのインストール、各種リソースの使用率のモニタリング、パフォーマンス チューニング、データの管理や整理などの、運用保守作業が定期的に発生します。特に、特定の部署で、全社横断のデータ分析基盤を Apache Hadoop クラスタで構築、運用している場合は、限られた人手で、これらの運用保守作業を継続して実施し続けることが難しい、という課題があります。

このような課題を抱えているお客様に対して、BigQuery を活用したフルマネージドでサーバレスなデータ分析環境を Google Cloud で構築する、というソリューションの提供を、今年行っておりました。

BigQuery を活用した Google Cloud でのデータ分析環境の構築例

Apache Hadoop から BigQuery への移行手順

Apache Hadoop のデータを BigQuery で分析できるようにするためには、以下のステップで移行作業を進める必要があります。

  1. Cloud Storage へのファイルの移行
  2. BigQuery へのファイルのロード
  3. BI ツールのクエリの書き換え
  4. GCP への Hive Metastore の移行
  5. GCP への ETL バッチ処理の移行

1. Cloud Storage へのファイルの移行

まず最初に、Google Cloud のオブジェクト ストレージである Cloud Storage へ、ファイルを移行する必要があります。移行元の環境および移行方法によって、以下の 3 つの移行パターンがあります。

  1. オンプレミス HDFS からの移行 、push モデル
  2. オンプレミス HDFS からの移行、pull モデル
  3. S3 からの移行

1.1. オンプレミス HDFS からの移行、push モデル

オンプレミス環境の Hadoop クラスタの HDFS 上のファイルを、Cloud Storage へ移行するパターンになります。オンプレミス環境の Hadoop クラスタ上で distcp ジョブを実行することで、オンプレミス環境の Hadoop クラスタから Cloud Storage へ直接ファイルを送信します。

Push モデル

オンプレミス環境の Hadoop クラスタから Cloud Storage へ接続するために、Hadoop クラスタの各ノードに Cloud Storage コネクタをインストールする必要があります。その後、以下のようなコマンドをオンプレミス環境の Hadoop クラスタで実行して、distcp ジョブによりファイルを Cloud Storage へ送信します。

$ hadoop distcp hdfs://xxxx gs://xxxx

push モデルでは、簡潔な方法でファイルを Cloud Storage へ送信可能ですが、ファイルの送信にオンプレミス環境の Hadoop クラスタのリソースを使用する点が、主な特徴としてあります。

1.2. オンプレミス HDFS からの移行、pull 型

オンプレミス環境の Hadoop クラスタの HDFS 上のファイルを、Cloud Dataproc 上の Hadoop クラスタ経由で Cloud Storage へ移行するパターンになります。Cloud Dataproc 上の Hadoop クラスタで distcp ジョブを実行し、オンプレミス環境の Hadoop クラスタの HDFS 上のファイルを、 Cloud Storage へ送信します。

Pull モデル

pull モデルでは、distcp ジョブが Cloud Dataproc 上の Hadoop クラスタで実行されるため、ファイル移行時の、オンプレミス環境の Hadoop クラスタのリソースの使用を必要最低限にすることができる、distcp ジョブが実行される Cloud Dataproc 上の Hadoop クラスタのサイズを柔軟に変更できる、などの利点があります。ただし、Cloud Dataproc 上の Hadoop クラスタの各ノードから、オンプレミス環境の Hadoop クラスタの各ノードへアクセスできるようにする必要があるため、Cloud VPNCloud Interconnect の導入が必要になります。

1.3. S3 からの移行

S3 上のファイルを Cloud Storage へ移行するためには、Storage Transfer Service を使用します。GUI もしくはプログラムから、S3 上のファイルを Cloud Storage へ移行するジョブを実行することが可能です。また、定期的にファイル移行を実施するスケジューリングも可能となっています。以下は GUI で Storage Transer Service のジョブを実行する際のスクリーンショットです。

Storage Transer Service の GUI のスクリーンショット

また、以下は Storage Transer Service のジョブを実行する Python のサンプルコードです。

credentials = GoogleCredentials.get_application_default()
service = discovery.build('storagetransfer', 'v1',
credentials=credentials)
dt = 'yyyy-mm-dd'
now = datetime.datetime.now()
transfer_job_body = {
'description': 's3_to_gcs_' + dt,
'project_id': 'gcp-project',
'transfer_spec': {
'object_conditions': {
'include_prefixes': [
'tbl/',
'tblp1/dt=' + dt,
'tblp2/dt=' + dt
]
},
'aws_s3_data_source': {
'bucket_name': 'awsbucket',
'aws_access_key': {
'access_key_id': 'xxxx',
'secret_access_key': 'xxxx'
}
},
'gcs_data_sink': {'bucket_name': 'gcsbucket'}
},
'schedule': {
'schedule_start_date': {
'year': now.year,
'month': now.month,
'day': now.day
},
'schedule_end_date': {
'year': now.year,
'month': now.month,
'day': now.day
}
}
}
service.transferJobs().create(body=transfer_job_body).execute()

2. BigQuery へのファイルのロード

Cloud Storage 上の ORC または Parquet ファイルを BigQuery へロードするには、bq load コマンドを使用します。また、Cloud Storage 上の Hive パーティション パス(gs://bucket/key1=val1/key2=val2/... など)からのデータロードにも対応しており、パスの Hive パーティションの key, value を読み取りながら、BigQuery へデータをロードすることが可能になっています。

ORC または Parquet ファイルを BigQuery へロードする場合、スキーマ情報は、ロード対象のファイルから自動的に抽出されるため、初回のロード実行時に、事前に BigQuery のテーブルを作成する必要はありません。(参考:ORC ファイルのロードParquet ファイルのロード

この作業が完了すると、データが全て BigQuery 上に存在するようになるため、BigQuery でのデータ分析が実施可能になります。

パーティションが無い Hive テーブルのロード

以下は、Cloud Storage 上の、Hive パーティションの無い Hive テーブルのデータ(ORC ファイル)を BigQuery へロードする際の bq load コマンドの実行例です。

$ bq load --replace --source_format=ORC test.tbl gs://bucket/tbl/*

gs://bucket/tbl/ 配下の ORC ファイルが、BigQuery のテーブル test.tbl へロードされます。replace オプションが指定されているため、コマンド実行の度に test.tbl テーブルのデータが洗い替えられます。前述の通り、ロード対象の ORC ファイルからスキーマ情報が自動的に抽出されるため、初回ロード時に事前に test.tbl テーブルを作成する必要はありません。

1 階層のパーティションが有る Hive テーブルのロード

以下は、Cloud Storage 上の、Hive パーティション dt=yyyy-mm-dd が有る Hive テーブルのデータ(ORC ファイル)を BigQuery へロードする際の bq load コマンドの実行例です。

$ bq load --replace --source_format=ORC \
--hive_partitioning_source_uri_prefix=gs://bucket/tblp1/ \
--hive_partitioning_mode=AUTO \
--time_partitioning_field=dt \
'test.tblp1$20191202' gs://bucket/tblp1/dt=2019-12-02/*

hive_partitioning_source_uri_prefix=gs://buckt/tblp1/ が指定されていることにより、この prefix 以降のパスの部分が Hive パーティションで構成されている(/key1=val1/... など)と、bq load コマンドでは認識されます。

hive_partitioning_mode=AUTO が指定されていることにより、パスの Hive パーティションの値の型が、自動的に推論されます。今回の例では、パーティションが dt=2019–12–02 となっており、値 2019–12–02 の型が日付型であると、自動的に推論されます。

time_partitioning_field=dt が指定されていることにより、BigQuery のテーブルのパーティションとして dt カラムの値が使用されます。

replace オプションが指定されており、また、ロード先の BigQuery のテーブル名に test.tblp1$20191202 という、パーティション デコレータ付きのテーブル名が指定されているため、コマンド実行の度に、BigQuery テーブルの単一パーティション (20191202) のデータが洗い替えられます。

2 階層のパーティションが有る Hive テーブルのロード

以下は、Cloud Storage 上の、Hive パーティション (dt=yyyy-mm-dd, service=xxxx) が有る Hive テーブルのデータ(ORC ファイル)を BigQuery へロードする際の bq load コマンドの実行例です。

$ bq load --replace --source_format=ORC \
--hive_partitioning_source_uri_prefix=gs://bucket/tblp2/ \
--hive_partitioning_mode=AUTO \
--time_partitioning_field=dt \
--clustering_fields=service \
'test.tblp2$20191202' gs://bucket/tblp2/dt=2019-12-02/*

先ほどの、1 階層のパーティションが有る Hive テーブルのロードの場合と基本的には同じですが、今回は、2 階層目の Hive パーティション キー service を BigQuery のテーブルのクラスタリング カラムに指定しています。BigQuery では、テーブルのパーティション カラムには、一つのカラムしか設定できないため、ロード元の Hive テーブルの Hive パーティション キーが複数存在する場合は、一つのカラムを BigQuery のテーブルのパーティション カラムに指定して、残りのカラムをクラスタリング カラムに指定することで、BigQuery のテーブルの検索効率を向上させることができます。

Hive テーブルのデータを BigQuery へロードするツール

基本的には、前述の通り、bq load コマンドを実行することで、Cloud Storage 上の Hive テーブルのデータを BigQuery へロードすることが可能です。しかし、「Hive パーティションのキーと値がパスに含まれない場合」や「Hive Metastore のスキーマ定義と ORC ファイル内のスキーマ定義が異なる場合」など、bq load コマンドで正常に BigQuery へデータをロードできない場合がいくつかあります。その場合は、Hive Metastore の情報を参照しながら BigQuery へデータをロードする OSS のツール BigQuery Hive External Table Loader がありますので、こちらのツールを使用して BigQuery へのデータロードを行う必要があります。

Cloud Storage 上の Hive テーブルのデータに対する検索

これまで、Cloud Storage 上の Hive テーブルのデータを BigQuery へロードする方法をご紹介しましたが、BigQuery へファイルをロードせずに、BigQuery から直接 Cloud Storage 上の Hive テーブルのデータを検索する方法もあります。Cloud Storage 上の Hive テーブルのデータを参照する BigQuery の外部データソースを作成して、その外部データソースに対してクエリを実行する、という流れになります。詳細な手順につきましては、こちらのドキュメントを参照いただければと思います。ただし、「データの整合性が保証されない」、「ネイティブの BigQuery のテーブルに対するクエリ実行よりも、性能が劣る可能性がある」などの制約がいくつかありますので(外部データソースに関する制約事項につきましては、こちらのドキュメントを参照してください)、BigQuery へデータをロードするか、それとも、外部データソースを利用するかを、状況に応じて判断いただければと思います。

3. BI ツールのクエリの書き換え

BigQuery への移行後も、既存の BI ツールを継続して使用される場合は、BI ツールの接続先を BigQuery へ切り替える際に、BI ツール上の既存のクエリの書き換えが必要となる場合があります。BigQuery の標準 SQL は SQL:2011 に準拠していますが、関数など、データ ウェアハウスや検索エンジン独自の機能を使用しているクエリについては、書き換えが必要となる場合があります。BigQuery でクエリをドライランすることによって、クエリの構文が正しいものであるかを確認することができます。

4. GCP への Hive Metastore の移行

ETL バッチ処理を GCP へ移行するための準備作業として、Hive Metastore を GCP へ移行する必要があります。GCP では、Hive Metastore を Cloud Dataproc と Cloud SQL を使用して構築することができます(詳細な手順はこちらのドキュメントを参照してください)。GCP への Hive Metastore のデータの移行方法としては、以下の 2 つの方法があります。

  1. Hive テーブルの DDL を移行
  2. Hive Metastore Database のデータを移行

4.1. Hive テーブルの DDL を移行

こちらの方法は、各 Hive テーブルの DDL をベースとして、移行する方法になります。以下のような手順で作業を実施します。

  1. show create table を実行して、各 Hive テーブルの DDL を取得する。
  2. 各 DDL の location を hdfs://xxxxs3://xxxx から gs://xxxx へ変更する。
  3. 各 DDL を Cloud Dataproc の Hive サーバで実行する。
  4. パーティションがある Hive テーブルについては msck repair table を実行する。

4.2. Hive Metastore Database のデータを移行

こちらの方法は、移行元の Hive Metastore Database のデータをエクスポートして、それを Cloud SQL へインポートする方法になります。MySQL を Hive Metastore Database として使用している場合は、例えば、以下のような手順で作業を実施します。

# 1. 移行元のデータベースのデータをエクスポートする。
$ mysqldump -u hive OLD_METASTORE_DB -p > hive_metadata.sql
# 2. hive_metadata.sql を修正し、各 Hive テーブルの location を `hdfs://xxxx` や `s3://xxxx` から `gs://xxxx` へ変更する。# 3. Cloud SQL へデータをインポートする。
$ mysql -u hive NEW_METASTORE_DB -p < hive_metadata.sql

5. GCP への ETL バッチ処理の移行

最後に、ETL バッチ処理を GCP へ移行します。前述の「1. Cloud Storage へのファイルの移行」の作業の結果、データやファイルは既に Cloud Storage 上に存在するため、HDFS の代わりに Cloud Storage をストレージとして使用します。これにより、GCP では Hadoop クラスタ上にデータやファイルを保存する必要がないため、ETL バッチ処理を実行するときのみ、Hadoop クラスタを起動して処理を実行するようにします。Cloud Dataproc では、平均して 90 秒未満で Hadoop クラスタを起動することが可能であるため、Cloud Dataproc を使って、ジョブ単位で(あるいは、関連する一連のジョブの単位で)Hadoop クラスタを起動して ETL バッチ処理を実行し、処理完了後にその Hadoop クラスタを破棄するようにします。GCP では、このような、ジョブ単位で起動、破棄される Hadoop クラスタを「エフェメラル クラスタ」と呼んでいます。また、Hive Metastore は、前述の「4. GCP への Hive Metastore の移行」で、Cloud Dataproc と Cloud SQL を使って GCP 上に構築した Hive Metastore を使用します。

GCP での ETL バッチ処理の実行環境

Cloud Storage をストレージとして使用するため、ETL バッチ処理のプログラムに記述されているパス hdfs://xxxxs3://xxxx などを全て gs://xxxx へ変更してから、GCP へ ETL バッチ処理を移行します。

GCP への ETL バッチ処理の移行が完了すると、全ての処理が GCP 上で完結して実行されるようになるため、前述の「1. Cloud Storage へのファイルの移行」で定期的にファイルを HDFS や S3 から Cloud Storage へ連携する仕組みを導入している場合は、それを撤去します。

おわりに

本記事では、Apache Hadoop のデータを BigQuery で分析できるするようにするための移行手順をご紹介しました。「クラスタやインフラ、ミドルウェアなどの運用保守に関する作業負荷を極力減らして、データ分析環境を社内に展開したい」というような課題感がもしありましたら、ぜひ、今回ご紹介した方法で、BigQuery を活用したフルマネージドでサーバレスなデータ分析環境を Google Cloud で構築することをご検討いただけましたら幸いです。Google Cloud が提供している、オンプレミス環境の Hadoop クラスタの移行に関するドキュメントもありますので、もしよろしければ、ご参照いただければと思います。

明日は Yuki Iwanari による「GKE のアップグレード戦略を改めて確認しよう」です。お楽しみに。

--

--