オンライン機械学習サービスとしてGCP Vertex AIのMLOpsを導入した話

Kenji Sugiki
Eureka Engineering
Published in
13 min readDec 16, 2021

この記事は「Eureka Advent Calendar 2021」の16日目の記事です。

Hello! 世界!MLエンジニアのsugikiです。

2021年ももう終わりますね。急に寒くなってきたので急に年末感がでてきました。

うちの犬もかまくらのハウスで丸くなりなかなか出てこなくなりました。

私は、普段はユーザが接するMLサービス全般に関わる業務に従事しています。今回は、GCPでMLOpsを導入したサービス事例や苦労した点について紹介したいと思います。

全体の構成

今回は、検索やレコメンドでのリランキングサービスを例に構成を紹介したいと思います。機械学習を使ったリランキングはlearning-to-rank (LTR)と呼ばれるタスクでアカデミックでも研究が盛んに行われています。

一般的には、以下のように、検索エンジンやベクトル類似度など比較的軽量な処理でcandidate generationとscorinigをして、さらに、先頭のアイテムを意図した指標(前のスコアとは一致しない)に近づけるためにより重い処理でre-rankingするカスケード構成を取ることが多いです。googleのこちらの資料にとてもわかりやすく記載されているので興味のある方は御覧ください。

https://developers.google.com/machine-learning/recommendation/overview/types から転載

さて、今回実装して運用しているリランキングサービスは非同期のアーキテクチャを取り入れており(理由は後ほど説明)、以下のような構成になっています。

独自に実装したfeaturestore、vertex pipelinesベースのモデルの学習パイプライン、非同期キューによるserving環境、コアサービスへの連携部分があります。

Feature Store

feature storeは複数のデータソースからfeatureを集約して安定的に提供することで、online/offlineでのfeature提供に関する責務を一元化するサービスになります。

feature storeがない場合、複数のデータソースから適切なタイミングで取得し専用のtransformerをへてfeatureを生成する必要があり、また、MLの学習時とオンライン提供時でデータソースが異なり、データの整合性が合わないことも多くあり(かつ、オンラインでの検証が困難)、システムも複雑になりメンテナンスや拡張が非常に難しくなってしまうことが多く起きます。実際、弊社の初期のMLサービスではそのようなことが起こっていました。

feature storeを導入することでfeatureを取得でき、オンライン・オフライン、複数のfeature setに対して共通の処理を利用でき、モデル開発に専念することができます。ingest時にtransformを適用することで学習や推論時に変換する必要がなくなるメリットもあります。

デメリットとしては、データエンジニアリングの負荷が非常に高く、entityがentityに依存したり、embeddingなどモデルベースの重めの学習・変換パイプラインの管理(しかもモデルごとに空間が変わってしまう)、個々のモデル学習で利用するヒューリスティックなラベル生成、データソースからデータを取得するタイミング制御など、運用事例も多くなく、仕組みも煩雑で個別のケースが多くなりがちでどう管理していくかが大きな課題となりそうです。

氷山の一角がfeature storeでそこに関しては扱いがシンプルですが、沈んでる部分がデータエンジニアリングや運用の部分で実際はそこが複雑で難しくなっています。

feature storeは氷山の一角

今回は以下のような構成でfeature storeを実装しました。

feature storeのアーキテクチャ

GCPではVertex AI Feature Storeが提供されているのですが、事例がまだ少ない、オンライン取り込みがない、試すにはコスト感が高い、など気になる点があり、今回は小規模でチーム内に限られるため独自に実装することにしました。

  • バッチ学習にはoffline storeとしてBigQuery、予測にはonline storeとしてMemorystoreを利用
  • Pub/Sub経由でfeatureデータを投入すると両方のstoreに取り込まれる
  • 内部ではstreamingモードでdataflowが動作しETLを実行(実装はapache beamのScla APIのscioを利用)
  • protobufでentity typeやinterfaceのスキーマを定義し、githubで管理しPRを生成するとgithub actionでをprotocを実行し各言語のコードやmarkdown形式のドキュメントが生成される(現在はgolang, scala, pythonのコードを生成)
  • protobufのインタフェースでentityのversioning(entityの空間や定義が変わる場合にバージョンを上げる)
  • ingest pipelinesは言語や環境に依存せず、各entity typeでタイミングを制御
  • 最小限でAPIはまだ提供していない
  • どのentity typeのどのversionがいつ取り込まれているかは今後vertex ML Metadata等で管理する予定

インタフェースの例

training pipeline

2021年5月のGoogle I/Oでkubeflow pipelinesのfull managed serverless環境であるverte AI pipelinesがβ版で発表されてから利用・検証しておりますが、早く11月にGAリリースを迎え、安心して利用できるようになりました。Vertex AI以前はGKEでAI Platform pipelinesを利用していましたが、実行が不安定で追加のコストも多くかかっていました。

今回の実装では、Vertex AI Pipelinesを利用して以下のようなcomponentを直列につないだ学習パイプラインを構築しています。

vertex pipelinesのcomponent DAG実装例
  • データセットを生成するコンポーネントはBQからストリーミングしてテーブルをjoinしてバルクで pyarrow.Table → parquetファイルに変換したいためdataflowを利用
  • dataflowやdeploy以外のコンポーネントはfunction baseのコンポーネントとして実装( @component decorator)
  • データセットの結合や学習時はオンメモリで処理する必要があるため、CPUやメモリの指定をしています (実際チューニングが必要でOOMでよく落ちました)
  • ArtifactはすべてVertex ML Metadataにトラッキングされ探索できるので、タスク名や環境やモデルのパラメータなどメタデータを充実させることが大事(Vertex Metadata APIで最新のモデルを取りたいなど)
  • Webコンソールから評価結果を見えるようにした(※ kubeflowのように画面で一覧を見れないのでMetadataから評価のArtifactを取得するコードを書く必要がある)

評価はこのような形で出力でき非常に便利でした。

ROC曲線を表記したい場合メタデータのサイズ制限があるため、以下のように200–300程度に間引く必要がありました。

補足として、モデルについて少し説明しておくと、ランキング学習ではpairwiseやlistwiseで実装するのが一般的でベースラインにはよくGradientBoostのLambdaRankが考えられるのですが(LightGBMなどで実装)、それがうまくいきませんでした。つまり、これらのモデルは、あるクエリを基準にして相対的な差分を表現しますが、弊社のようなオンラインデーティングサービスのドメインでは、ユーザと相手双方の視点が必要で(つまりユーザによって相手がパーソナライズされるべきなので)、pointwiseのモデルを利用しないと相手の相対的な順番のみが考慮される問題が起きるので、そこがユニークな部分だと思いました。

Triggerやscheduling

kfpライブラリで実行される構成 cloud scheduler + cloud function を参考に実装しています。pipelineのコンパイルしたjsonファイルをcloud functionのエンドポイントにpostする形にしてあります。公式の場合、SDK v1 + argoのようにテンプレートやパラメータが動的に設定できない、サービスアカウントが利用できないなどの問題がありましたので独自に実装し、以下のような構成になっています。

このように構成すると、

  • 日次、週次などでスケジューリング
  • BigQueryでdata driftや予測とground truthとの差分の分布が大きければアドホックで実行

など、対象のパイプラインを簡単に起動することができます。

serving

リランキングサービスのオンラインサービング周りのアーキテクチャは以下のようになっています。

  • コアサービスはAWS
  • ストリーミングデータがAWSとGCPを介して流れる
  • GCPではPub/SubでpullするためGKE上で実行(pushであればCloud Runなど)
  • feature storeがあるためクラウド間で流れるデータは最低限のIDとスコアのみで安全
  • GKEでpreemptive nodeで動いているため低コストで予測ができる

今回のリランキングサービスでは以下のような理由でキューによる非同期アーキテクチャを採用しました。

  • リランキングの操作のみでありそこまで重要ではない
  • もし止まってもコアサービスには影響を与えたくない
  • 同期APIだとSLAやlatencyに対してクリティカルになってしまう
  • 非同期のキューを媒介にした実装にし、次アクセスした時点で予測した結果がKVSにあれば処理するようにする
  • Pub/Subだとsubscriptionを増やすことでshadowリクエストの実現、A/Bテストが容易にできる、不安定なモデルを手軽に試しフォールバックできるなどメリットがある
  • BigQueryに予測結果をsinkすることでground truthの性能を確認することができる
  • GCPのドキュメントに記載されているようにPub/SubのメトリックをCloud Monitoringで監視することでアラート対応

AWSとGCP間の通信の遅延がどうなるか気になっていましたが、実際計測してみるとAWSからのpool送信から推定結果をKVS (DynamoDB)へ書き込むまで1,2秒の遅延で収まり、ユーザがオンラインにいる際に介入できており、A/Bテストでも効果がでていたので安心しました。

詰まった点その1: yamlによるコンポーネント作成方法

残りは、vertex AI pipelinesで実装するときに詰まった点を紹介します。

kubeflow pipelines SDK v2がまだ出て間もないこともあり、Artifactの連携やパラメータの受け取り方など、コンテナ用のコンポーネントを作るときのドキュメントがなく仕様が正確にわからず苦労しました。

  • Artifactの渡し方
  • —param=value のような形で書く方法
  • 余談ですが components/[component_name]/component.yamlでディレクトリを切るとPyCharmがスキーマを補完してくれる

色々試してみて以下のような書き方で書けばよいということがわかりました。

k8sへのモデルdeployコンポーネントの実装例

詰まった点その2: アーティファクトを別のパイプラインで再利用する方法

学習パイプラインとデプロイパイプラインを分けたいなど、アーティファクトを他のパイプラインで使いたい場合、どうやればいいかすぐにわかりませんでした。調べてみると、以下のように最新のモデルのArtifactをVertex ML Metadataから探索してアーティファクトを自分で定義するコンポーネントを書くことでアーティファクトが作れるようです(紐付かないのが難点ですが)。

  • Vertex AI Metadataのドキュメントの例のようにコンソール画面から見えるようなアーティファクトをAPI (or REST)経由で探索
  • こちらの公式のnotebookの例のようにImporter 関数を利用してアーティファクトとしてGCSの(単一)ファイルを読み込むことができる

具体的な実装例は申し訳ないですが長くなってしまうので、今回は割愛させていただきましたが、実際叩いてみると便利でおもしろいのでやってみると良いと思います。

まとめ

ざっと駆け足になってしまいましたが、みなさまの参考になりましたでしょうか?

Vertex AI周りは便利なものが多い印象ですが、具体的な実装や組み合わせの例もまだ少なく、どうやって使ったらいいのかユーザ自身の学習コストも高めなのが現状のようです。また、MLOps自体もまだまだ新しい領域でアーキテクチャや実装もすぐ変わりうる状況で、来年の終わりには全く違うことをしているかもしれません。

不確実性が高くチャレンジングなことに挑戦したい方はMLOpsエンジニアを採用中ですのでこちらからご応募ください!

--

--