DataflowとTFRecordでシームレスなMLパイプライン構築

Hayato Yoshikawa
google-cloud-jp
Published in
12 min readApr 17, 2018

最近では、機械学習を用いた事例が徐々に聞かれるようになってきました。しかし、MLエンジニアの方々からは「モデルを作ったはいいが、プロダクションに持っていくのは難しい」という声をよくいただきます。この要因は様々ですが、システムとして考慮しなければならない構成が多いというのが主だった要因になっています。

モデルはシステム全体のごく一部(黒い四角部分)

中でも特にMLエンジニアを悩ませるのは、データの前処理ではないでしょうか?機械学習では学習と推論時にデータを入力する必要がありますが、プロダクション時のデータは取得してそのままモデルに入力することはできません。なぜなら、データには欠損があるかもしれないし、画像ならモデルに合わせてサイズや階調を変える必要があるからです。さらに、これらのデータが大量にある場合は、1台のマシンでは処理しきれなくなるため、分散環境を構築する必要があります。そして、将来どこかで推論結果がおかしいとなったとき、モデルだけでなく前処理の仕組みや、データになにか問題がないか調査しなければなりません。これらを全て考慮してシステム設計するのは非常に大変です。

実はこの面倒臭さは、DataflowとTFRecordを組み合わせるとほとんど解決してしまいます。Dataflowは前回の記事でも紹介しましたが、TFRecordとは何でしょうか?
TFRecordとは
TensorFlowでのモデルの学習に最適なレコード指向のデータフォーマットです。GCS上にシャーディングして配置することで、モデルが分散環境にあっても効率的にデータをロードすることができます。Dataflowを使うと、データの前処理を簡単に分散環境で実行できるだけでなく、このTFRecord形式のファイルを簡単にGCS上に保存することができます。

DataflowとTFRecordを使ったMLパイプラインの例

それでは例として、GCS上に保存された大量の画像を、Dataflowで加工してTFRecordでGCSに保存する、というDataflowのコードを書いてみましょう。

大量画像の例:衛星画像

GCSの公開データの1つに、ランドサット衛星の衛星画像があります。米国地質調査所(USGS)と NASA から提供された衛星画像のデータセットで、GoogleがGCS上にホスティングしており、GCPユーザーは自由にアクセスすることができます。膨大な数の画像がある上、サイズも大きく、加工しないと通常のモデルには使えないので、実践的な例と言えるでしょう。
これらの画像のインデックスは、BigQueryのデータセットとして用意されています。そこで、BigQueryでインデックスを読み込み、GCSから画像をロードするというパイプラインにしてみましょう。

BigQueryからクエリ結果をパイプラインに流すのはとても簡単です。次のようにBigQuerySourceをパイプラインの先頭に設定するだけです。このクエリは画像ファイルが保存されているGCSのパスが記載されているカラム ( base_url )のみを取得します。

p = beam.Pipeline(options=options)query = "SELECT" \
" base_url " \
"FROM" \
" `bigquery-public-data.cloud_storage_geo_index.landsat_index` " \
"LIMIT" \
" 10"
(p | 'gcs_path' >> beam.io.Read(beam.io.BigQuerySource(
project=PROJECTID,
use_standard_sql=True,
query=query))
| ...次のTransform...

TFRecordで保存

画像を加工したら、TFRecord形式でGCSに保存しましょう。これはWriteToTFRecordを使うだけで実現できますが、与えるデータ(PCollection)は少し工夫が必要です。

# パイプラインにWriteToTFRecordを追加
(p | 'write' >> beam.io.tfrecordio.WriteToTFRecord(
'gs://[BUCKET]/path/img',
file_name_suffix='.tfrecord')
)

WriteToTFRecordに与えるデータは、次の例のようにtf.train.Exampleをシリアライズしたものである必要があります。ここでは画像のrawデータと、縦横の画素サイズを特徴量としてデータ化しています。モデルの学習には(分類問題の場合)ラベルも必要なので、ここで合わせてデータ化しておくとよいでしょう。

def serialize_element(element):
# 何らかの処理
...
# 保存するデータ
height = tf.train.Int64List(value=[256])
width = tf.train.Int64List(value=[256])
img_raw = tf.train.BytesList(value=[img_raw])
# tf.train.Exampleを作成
example = tf.train.Example(features=tf.train.Features(feature={
'height': tf.train.Feature(int64_list=height),
'width': tf.train.Feature(int64_list=width),
'img_raw': tf.train.Feature(bytes_list=img_raw)}))
# シリアライズする
return example.SerializeToString()
実行し終わるとこのようにTFRecordのファイル群が生成される

Dataflowでパイプライン実行

コードをローカルで実行すると、自身のコードをパッケージングしてデプロイとDataflowのjob実行まで自動で行われます。Consoleを見ると実行状況と、どれ位スケールしているかが時系列のチャートで確認する事ができます。

Dataflow実行中の画面。徐々にスケールしていくのがグラフィカルに表示される

TFRecordの読み込み

Dataflowの実行が終わったら、生成されたTFRecordファイルをTensorFlowのコードから読み込んでみましょう。読み込みは簡単で、TFRecordDatasetでGCS上に保存されたファイルを読み込み、あとはイテレータを使って逐次データを取り出すだけです。ここではmapを使ってTFRecordファイル中の特徴量のうち一部だけを取り出すようにしています。

import tensorflow as tf
import numpy as np
from PIL import Image
def _parse_feature(example_proto):
features = {
"height": tf.FixedLenFeature((), tf.int64, default_value=0),
"width": tf.FixedLenFeature((), tf.int64, default_value=0),
"img_raw": tf.FixedLenFeature((), tf.string, default_value="")}
parsed_features = tf.parse_single_example(example_proto, features)
return parsed_features["img_raw"], parsed_features["height"]
# 保存したTFRecordを読み込み、データセットを作成
gcs_path = 'gs://[BUCKET]/tfrecord/'
filenames = [gcs_path + fn for fn in tf.gfile.ListDirectory(gcs_path)]
dataset = tf.data.TFRecordDataset(filenames)
dataset = dataset.map(_parse_feature)
batched_dataset = dataset.batch(1)with tf.Session() as sess:
# 1回だけデータセットをイテレーションする
iterator = batched_dataset.make_one_shot_iterator()
raw, height = iterator.get_next()
img_string = sess.run(raw)
# 画像を可視化
im = np.fromstring(img_string[0], dtype=np.uint32)
img = Image.fromarray(im.reshape((256, 256)))
img.convert('RGB').save('out.png')

Template化してパイプラインをバージョニング

冒頭で述べたように、MLシステムを運用していると、モデルの推論結果に異常が見られたり、または精度が低下してくる場合があります。このときはモデルやデータを見直すだけでなく、前処理のパイプラインを見直し、場合によっては過去のバージョンにロールバックする必要があります。DataflowにはTemplateという機能があり、パッケージングした処理パイプラインをGCSに保存しておくことで、APIからそのパイプラインにJob実行指示をすることできます。このTemplateを利用することで、前処理のパイプラインをバージョニングすることができます。

データ、前処理、モデルを全てバージョン管理し、いつでもロールバックできるようにする

DataflowのTemplateを作るのは非常に簡単です。オプションの設定に次の1行を追加するだけで、パッケージングからGCSへのアップロードまで自動で行ってくれます。

options = beam.options.pipeline_options.PipelineOptions()google_cloud_options = options.view_as(
beam.options.pipeline_options.GoogleCloudOptions)
google_cloud_options.template_location = "gs://[BUCKET]/path/name"

TemplateをAPIで実行

Template化したら、次はAPIから呼び出してみましょう。REST APIでJobの実行指示を出すだけなので、どの環境(たとえDataflowのSDKが入っていなくても)からでも実行することができます。GCPの環境から実行するなら、Discovery APIを使うと認証も自動で行ってくれるので楽です。GAEのcronで定期実行する、といった使い方もすることができます。

from oauth2client.client import GoogleCredentials
from oauth2client.service_account import ServiceAccountCredentials
from apiclient.discovery import build
credentials = GoogleCredentials.get_application_default()
service = build("dataflow", "v1b3", credentials=credentials)
templates = service.projects().templates()
BODY = {
"jobName": "JOB名",
"gcsPath": "gs://[BUCKET]/path/",
"environment": {
"tempLocation": "gs://[BUCKET]/temp",
}
}
dfrequest = service.projects().templates().create(
projectId=PROJECT, body=BODY)
dfresponse = dfrequest.execute()

最後に

機械学習のシステムは、データに依存して結果が不安定になりがちです。そこにシステムの不安定さが加わると、不具合の真の原因を追求するのが非常に困難になります。今回のようなサーバーレス環境であれば、インフラ部分はクラウドに任せる事ができるので、機能に集中して開発と運用を行うことができます。DataflowならPythonでも書けるので、データ処理系はちょっと苦手・・・というMLエンジニアの方々にとっても簡単です。是非お試しください。

Disclaimer この記事は個人的なものです。ここで述べられていることは私の個人的な意見に基づくものであり、私の雇用者には関係はありません。

--

--

Hayato Yoshikawa
google-cloud-jp

Customer Engineer, Google Cloud @Google. Views are my own.