F1 Query: Declarative Querying at Scale」を読んだ

Kimura, Sotaro
20 min readJan 27, 2019

--

こんにちは。移行直後でEditorと悪戦苦闘している今日この頃です。

元のメモをMarkdownで書いてしまったので、階層が限られるのが微妙に厳しい・・・

ともあれ、「F1 Query: Declarative Querying at Scale」を読んだので概要をまとめます。

読み違えている個所などもありそうですが、一旦のまとめということで。アーキテクチャを読むのが目的のため、論文の後半はまとめていません。

F1の焦点

  • ワークロードを単一のクエリエンジン上で扱うこと
  • 断片化したデータセットへの対応
  • 実行環境は「データセンター」
  • 小規模から大規模へ両対応
  • UDFs、UFAs、TVFs等による拡張性担保

ワークロードを単一のクエリエンジン上で扱うこと

以下のワークロードに対応することを目的とする。

  • 小数データにアクセスするOLTPクエリ
  • 大量データに対するOLAPクエリ
  • 大量データの変換に用いるETLパイプライン

断片化したデータセットへの対応

Googleでは多くのデータ保存オプションが存在しており、あるアプリケーションに対するデータでも複数のデータストアに分散していることが多い。

分析ワークロードを実行するにあたり、このような複数のデータストアをまたいだ統一ビューを提供する必要がある。

実行環境は「データセンター」

特定のホストやクラスタに適合するのではなく、データセンター単位上での実行に焦点を置く。
クエリ処理やデータ保存をすべて自前で実施するRDBとは対照的に、F1はデータストアとクエリ処理を分離し、その結果データセンターの全データに各クエリがアクセス可能な前提が整う。
Colossus File SystemやSpannerではデータが広域分散されるため、特定のボトルネックは存在しない。ただし、このような技術の進歩はあったものの、リクエストへの遅延はデータセンターの環境次第で大きく変動する。この変動を緩和することも目標の一つとなっている。

小規模から大規模へ両対応

以下のような実行モードを切り替え可能とし、各モードにおいて、スケールアウトによる性能追加を可能とする。

  • シングルノード上での小規模・短時間クエリ
  • 短時間で完了するOLAPクエリで、チェックポイント取得を行わずに信頼性をおさえて低レイテンシを提供
  • ETLのような大規模クエリをチェックポイントを保持して高信頼性を担保して実行

システム構成

以下の要素から構成される。

F1 Master

あるデータセンターにおいてクエリ実行の計測やF1 Serverの管理を行うコンポーネント。

F1 Server

Clientからクエリを受け付け、応答を返すコンポーネント。

  • 小規模クエリやトランザクションクエリはF1 Serverがリクエストを受信したら即実行
  • 分散実行が必要な大規模クエリは必要に応じてWorkerプールからリソースを確保して実行
  • ETLクエリは高信頼性として設定され、MapReduce上で実行
  • F1 ServerとWorkerはステートレスなため、クライアントは任意のF1 Serverにアクセスすればいい
  • F1 ServerとWorkerデータを持たないため、再配分コストなどもない
  • F1 ServerとWorkerを追加すればその分性能をスケールさせることが可能

DataSource

各種データストアへの接続コンポーネント。
抽象化レイヤを挟んで、処理レイヤと分離することで、幅広いデータソースに対応可能となっている。
F1 Catalog Serviceに定義されたスキーマか、クエリでCREATE TABLEとして生成したスキーマを利用可能。例として以下のクエリがある。

DEFINE TABLE People(
format = ‘csv’,
path = ‘/path/to/peoplefile’,
columns = ‘name:STRING,DateOfBirth:DATE’);
SELECT Name, DateOfBirth FROM People
WHERE Name = ‘John Doe’;

以下の情報が必要で、Table Valued Function(TVFs)という形で対応可能なコネクタを追加可能となっている。

  • コネクタ種別
  • 存在場所
  • スキーマ
  • コネクタごとの固有設定(接続パスワードetc)

DataSink

DataSourceと同じく、F1 Catalog Serviceやクエリ上での定義が可能。

F1 Catalog Service

クエリ上で利用可能なテーブルや、テーブルのスキーマを管理するコンポーネント。

クエリ実行時のフロー

クエリ実行時は以下の流れで実行される。

  • F1 Serverにクエリを送信するとLocal Datacentorに該当のデータがない場合は存在データセンターを返す。
  • クエリをDAGに変換してLogical/Physicalの最適化を適用し、以下の図のように実行モードを選択
  • インタラクティブのシングルノード実行の場合はそのホストで実行
  • インタラクティブの分散実行の場合はClientからクエリを受けたF1 Serverは仲介役として稼働
  • バッチ実行の場合は、実行情報をジョブリポジトリに登録し、そこから非同期でMapReduceを稼働させて実行

インタラクティブ実行: シングルノード実行

DAGを末端に降りて行って、Scanすれば取得可能なものに帰着後、Scanして実行を開始する。
データをイテレータのように読み込むことで巨大データをメモリ上に確保することを回避している。
各種DataSourceにはPushdownに対応しているためデータ取得を効率化可能なものもある。
いわゆるJoinの他にLookup Join等の各種オペレータを利用可能

インタラクティブ実行:分散実行

PhysicalPlanにおいて各ノードをFragmentsという形でPartition(StartDate)分割して分散実行。
Join等がある場合は利用するキーでハッシュ化する形でWorker間の移動が行われる。

Worker間は基本RPCで連携する。インタラクティブモードなのでスピード優先
データをScanしたWorkerは下流の対象PartitionWorkerに対してデータを送信する。
自己HashJoinをするときなどは各PartitionがScanを終えてからデータの出力・交換が実行されるなど、複雑なケースも存在する。

以下のような性能的な対処もうっている。特に、HashPartition時のデータの偏りが問題になることが多い。データのJoin等を行う際にデータを維持する必要がある場合、FileSpillが走って性能がガタ落ちするケースもある。

  • Join時に片方が小さい場合はBroadcast Hashjoin方式を採用
  • 複数キー指定で複数回読み込む必要があるScanの中間データを共用などして最適化
  • データストア次第では統計情報や範囲絞り込みの機能も提供されるため、それに応じて動的Repartitionを行う機能も組み込まれている
  • Partial aggregationを行うことでデータ送信量を低減
  • LookupJoinの際はLookup先のデータソースからより大きな単位で読込を行った方が複数のキーに対してマッチする可能性も生じるため、効率が良くなる。このLookupRequestの並列度が向上し、DataSourceの限界並列数を超過するとDataSourceをTailする際の遅延もアクセスモデルで隠蔽され問題にならなくなる
  • DataSourceがSpanner等のようにRangeで区切られている場合はRangePartitinoningで対応することで、キー分散が限定でき、状況次第では他のデータソースからの読み込みPartition設定と同じにできるとJoin時に単一のFragmentにデータを送信すればいいため、効率が向上する。このRange情報を交換することで明示的なPartition指定も可能だが、データの偏りに弱いのは変わらない。
  • 上記のRangePartition時の偏りへの対処として、ローカルのデータで取得されたデータ統計を基に動的Repartitionを行うケースもある。(ある部分集合の傾向は全体の傾向と似るケースが多いため)
  • インタラクティブモードのため、極力チェックポイントは省いてメモリを活用し、レスポンスを早くすることに焦点を置いている。(信頼性が求められる場合はバッチ実行モードを使用)

実行例としては以下。

SELECT Clicks.Region, COUNT(*) ClickCount FROM Ads JOIN Clicks USING (AdId)
WHERE Ads.StartDate > ‘2018-05-14’ AND Clicks.OS = ‘Chrome OS
GROUP BY Clicks.Region
ORDER BY ClickCount DESC;

バッチ実行モード

MapReduce/FlumeJavaは幅広い範囲の処理を記述できるので効果的だが、PushDownやカラムの絞り込んでの読込など、SQLでは自動的に可能な最適化を手動で行う必要が出てくる。
加えて、開発時のメンテナンスコストも高いこともあり、SQLでETLを組めると有効であり、以下のような問題に対処の検討を行った。

  • インタラクティブモードでは常に全FragmentがActiveだったが、バッチ実行モードでは各Fragmentは非同期に実行され、実行時間が被らないケースもあるため、RPCでの同期は困難
  • 長時間の実行となる場合一部のコンポーネントで障害が発生することも多いため、中間状態を保存して復旧可能である必要がある
  • バッチ実行モードで実行中のクエリを管理して、最後まで実行されることを保証するためのService Frameworkが必要

MapReduceの中間状態はColossusに出力され、障害発生時に復旧する。シンプルに考えるとF1クエリがMapReduceに変換されるわけだが、F1クエリオプティマイザはFlumeJavaのMSCR Fusionと同様の最適化を適用。
これはつまり、MapReduceのTreeはDAG=グラフの部分集合であるため、より効率的な実行プランを組めるということ。
LeafNodeをMapOperationと抽象化し、内部NodeをReduceOperationと抽象化して認識する。
Map-Reduce-Reduceの場合、基本のMapReduceで実行すると、以下の流れとなる。

  • 前半のMap-Reduceを実施し、中間結果ファイルをファイルに出力する。
  • 中間結果ファイルを読み込み、結果を出力する。

ただし、Map-ReduceとMap(Identity)-Reduceという2ステージに分割することで、前半のReduce処理の結果をそのまま後段のMap(Identity)の入力とし、中間結果ファイルの出力をスキップ可能になる。
実際の最適化例は以下の通り。6StageのMapReduceを3Stageで実行できるようになっている

この最適化自体は素でDAGをサポートするCloud Dataflow等では不要なため、後継プロダクトに入れ込まれてはいない。

また、データのExchangeを削減するための手段として、Hash Joinを小規模データセットに対するLookupJoinに変更するという対応がある。
この際、小規模な方がBroadcast Hash joinを行うには大きすぎたり、偏りが大きい場合、バッチ実行モードにおいては小規模データセットをディスクベースのLookuupTable(Sorted STring Tables、SSTables)を生成する。
大規模データセット側をそこに対してLookupをかければ、大規模データセット側をHash Join用にRepartitionするコストを削減することが可能

バッチ実行サービスの設計ポイントは以下の通り。

Registory以外はステートレスで冗長化されているため復旧などが行いやすい。実行対象はデータの存在などでデータセンター単位で選ばれるが、データセンター内で実行し、データセンター内で完全に失敗したらまた最初から別データセンターでやり直す方式をとっている。

クエリオプティマイザ

クエリオプティマイザの開発は非常に複雑だが、以下の再利用によってそれを提言している。

  • 複数の実行モードで共通の実行計画を使用。

実行するフレームワークも違うものの、実行計画までは共通にできる。これによってオプティマイザを更新すると自動的に両実行モードに反映。

  • Spark Catalystチームと交流があったため、設計思想は一部Spark Catalyst Plannerと共通

最適化は以下の流れで実施

  • SQLをパースしてAbstract syntax tree(AST)に変換
  • ASTをRelational algebla planに変換
  • Relational algebla planに対して多数のルールが適用され、経験より決まった最適な形態まで変換を継続
  • 上記結果のRelational algebla planをデータソースへのアクセスパスや実行アルゴリズムも含めた物理実行プランに変換
  • ここまででクエリオプティマイザの処理は完了し、QueryCoordinatorに結果が渡って処理が実行

F1クエリオプティマイザは経験則から出しているルールを適用している個所が多い。
他に統計データも使用するが、データソースの多様性のため、典型的なF1クエリは、事前に収集可能なものに応じて特定のソースからの統計のみを使用している。

オプティマイザの構成

変換時はイミュータブルな構造を基に変換を行い、変換過程をもさかのぼれるようになっている。それによってサブツリーの再利用といったことも可能となる
オプティマイザのコードベースは3K程のPythonコードと5K程のJinja2テンプレートからなる。
上記を基に、数百もの変換ルールを構成するC++のコードが生成される。
このC++のコードがクエリ計画生成やFlagment毎のハッシュ算出、Treeの同一性判定などを行う。
テンプレートから生成されるC++のコードの方が人が実装するよりやっている処理内容などがわかりやすく書かれるため、改造も行いやすくなっている。

論理計画最適化

FilterPushDown、定数畳み込み、属性の選定等が行われる
DataSourceはProtobufを使っていることが多いため、Protobufのフィールド絞りこみもDataSourceによってはPushDownされることもある。
それによってディスクからの読込やネットワーク転送を低減可能。

物理計画最適化

論理Relational algeblaに対して複数の「Strategy」を適用して物理計画に落とし込む。
各Strategyは単一機能を持っており、例えばLookup Joinとインデックスなどの条件がそろった場合にSSTableを生成するよう計画したりする。
Strategyを適用時に各Nodeには各種プロパティが付与される。
付与されるものは分散状況、オーダー制御、ユニークキーの是非、推測カーディナリティなど
上記の付与されたプロパティを基に次Stage用にExchangeOperatorを挟むか否かや、クエリの実行モードがシングルノードモードか分散モードかの決定なども行われる。

物理実行Flagment生成

物理実行計画を基に、実際にそれを実行するFlagmentに変換する。並列度等、このフェーズで決定される値もある。
それらの値はこの時点で末端のScanからTopレベルまで反映される。

拡張性の担保

F1は以下のような機構をユーザが個別で利用可能とすることによって拡張性を担保している。

UDF

F1 ServerがUDF ServerにRPCでデータを送り、UDF ServerがUDFを適用して返すというモデル。
UDF Server自体は共通で管理されているものを使ってもいいし、自前で立ててもいい。ただし、自前でたてる場合はClient > Serverに送信時のRPCにアドレスを含める。
自前で管理することによって、JavaやC++のようなコンパイルが必要な言語でもUDFが書けるようになっている。
UDF Serverを切り離すことで、F1のコアとUDFの障害を分離することも可能になっている。
UDF server/F1サーバ間でテーブル丸ごと転送(Google network だと問題にならないらしい)

SQLやLua scripted functionによるUDFはクエリの定義の中に記述でき、UDF Serverを介さずに実行する。
ClientがUDFの本体も送信するため、それによってアドホックに入れ込んで実行できる
別ファイルに定義してある場合はそれの処理部分を抽出して送信
UDFをmoduleとしてまとめる機構も持っており、ビジネスロジックを自前で組んで再利用性を高めるために各チームで使用されている。moduleも同様にリクエストに乗って送信される。

Scalar Functions

基本はProjecton operatorの中で実行されるが、サイズ上限を超過した場合はUDF ServerにWorkerがデプロイして利用している。

Aggregate Functions

UDAを利用する際にはInitialize/Accumulate/Finalize部を開発し、Aggregate同士を集約できる場合はReaccumulate(Partial aggregation等に使用)も実装する。
基本はAggregation operatorの中で実行されるが、サイズ上限を超過した場合はUDF ServerにWorkerがデプロイして利用
ただし、UDF Server自体はステートレスのため、適切なUDFServerに分散するのはF1 Server側の役目。

Table-Valued Functions

クエリ実行時において、データの取得と予測を同時に実行するようなことが可能な機構をさす。
`SELECT * FROM EventsFromPastDays(3, TABLE Clicks);` のようにフルのTableを引数として渡し、処理を適用したテーブルを返す動作となる。
上記の例の場合、ClicksというテーブルにEventsFromPastDaysを3を指定して実行した結果のテーブルに対してSELECTを実行した結果が返る。
UDFやUDAと同じくSQLでTVFは定義可能だが、TVFはパラメータがテーブルとなる関係上、クエリ最適化の前に展開され、オプティマイザはフルに最適化が実行可能となっている。

TVFは以下のように定義可能

CREATE TABLE FUNCTION EventsFromPastDays(num_days INT64, events ANY TABLE) AS
SELECT * FROM events
WHERE date >= DATE_SUB(CURRENT_DATE(), INTERVAL num_days DAY);

上記のクエリは、出力スキーマは動的に決定されるものの、実行中は固定となる。
あらかじめ入力テーブルのスキーマがクエリ解析時にわかるように限定させることも可能。

より複雑なTVFはUDFServerを用いて定義することも可能
その中で、出力スキーマは入力テーブルからだけでなく、パラメータで決まるパターンもある。
その場合は、その段階でオプティマイザは固定値パラメータを解釈してPartitioning等の最適化を行う。
クエリオプティマイザはUDF Serverと通信してPartition方式を取得し、それを基に並列実行する場合は並列実行するなどの対応を行う。
リモートでTVFを評価する際のRPCプロトコルは以下の図のように双方向のストリーム接続を元にUDFにRowを送信して応答を並列して受け取る方式となる。

一定以上のパラメータの場合は全部ディスクに出力してShuffle、一定以下ならメモリ上という形で対応していると、データ量が増える際に以下グラフのように急に増える個所が発生し得る。
そのため、急な劣化を防ぐために、F1では1/0で急に遷移するのではなく、段階的な推移を行うようになっている。

例えば、以下のような対処をとっている。

  • ディスクに出力する際に、メモリをあふれる分だけ出力
  • lognのデータ数によって段階が遷移する際に、段階を軽減

マージソートの場合1つのデータが増えるとマージの回数が増えるというケースがあり得るが、これらの急な劣化を集約やソートから除去している。
SmoothScanやHash join dynamic destaging等を用いている。
ただし、動的最適化はあるデータが大量に実行失敗するなどがあると逆に悪化するケースもある。

上記の適用によって、以下のように性能の急激な劣化を防げるようになっている。

その他

Protobuf形式のRowデータ保持によってRow型データストアでも効率的にPushdownを行うなどの対応も行っている。

クエリエンジンの開発メンバは40名程らしい。

読んでみての感想

SQLで何でもやるのは厄介だとは感じていましたが、それを背後で何とかしてしまっているのは流石と思うのと同時に面白いとは感じました。
拡張のポイントもこのくらい用意されていれば大丈夫そうという一つの基準としても面白かったです。

Jinja2テンプレートを用いて生成する箇所や、自動生成することでコードが人が書くより読みやすいなどについても、実際にお決まりのパターンが複数別個出力する必要があるコードを開発するケースなどで適用できそうな考えではありました。

--

--

Kimura, Sotaro

千葉茨城(ちばらぎ)出身、東京在住のIT系雑用係。現在高度雑用係に昇格すべく邁進中です。家ではペンギンぬいぐるみと戯れてます。