Spark+AI Summit 2019参加レポート at San Francisco — Spark3.0/Koalas/MLflow/Delta Lake

Takeshi Yamamuro
nttlabs
Published in
23 min readMay 28, 2019

こんにちは,NTTの山室です.

今回の記事は4/23–25にサンフランシスコで開催されたSpark+AI Summitの参加レポートになります.興味のある情報への良い足がかりになることを目的に,個人的にチェックした内容を浅く広めに取り上げます.

以下の公式サイトに大半の発表資料と動画が公開されていますので,興味がある方はそちらも併せて参照してください.

Spark+AI Summit 2019 Agenda

Summitの翌日に訪問した会場近くのDatabricks社

Spark+AI Summitは現在年に2回アメリカ西海岸とヨーロッパで開催されているDatabricks(Sparkの作者が在籍する企業)主催のイベントです.特に毎年サンフランシスコで開催されるSummitは規模が大きく,今回世界中から5,000人を超える参加者が集まったそうです.

この記事では個人的に興味を持った以下の内容に関して簡単に紹介します.また最後に6/12に予定しているSpark+AI Summitの参加報告会に触れてから終わりたいと思います.

  • Spark v3.0に向けて
  • Koalas: Spark用のpandas DataFrame APIs
  • Delta Lake: データレイクを実現するためのストレージレイヤ
  • MLflow: データ分析のライフサイクル管理ツール
  • 高度化するSparkの最適化・実行処理系
  • イベント告知: 6/12 Spark+AI Summit参加報告会

Spark v3.0に向けて

v2.4系が昨年の11月にリリースされてそろそろ半年になりますが,現段階ではv3.0に向けた具体的なロードマップはアナウンスされていません.夏頃にv3.0用のブランチが作成されると聞いたので,おそらく秋口のリリースになると予想しています.Keynote(上写真)で触れられていたように,v3.0でよく話題になるのは以下のような新機能です.

昨年のSpark+AI Summitで発表されたProject Hydrogenの取り組みの一部です.昨年の発表ではMapReduceの様なMPP(Massively Parallel Processing)とは異なる機械学習の分散処理に適した実行方式を実現するBarrier Executionと,Sparkと他の処理系間のデータ交換を効率化するためのOptimized Data Exchangeに関するものでしたが,今回のKeynoteで取り上げられていた話題はスケジューラに関する取り組みになります.SparkがサポートしているYARNやMesosでは既にGPUなどのアクセラレータを意識したリソース管理が可能です(YARN-6223MESOS-4424).しかしリソースの割り当てができたとしても,Sparkがアクセラレータを意識したタスクのスケジューリングを行わないと効率的な実行を実現できません.現在の実装では,CPUのコア数を前提としたリソース要求を行いますが,この処理においてリソースの種類を考慮できるようになりました.

Project Hydrogenの概要と,去年の発表からの今日までの取り組みに関しては提案者の発表を参照してください.

Spark Graphは宣言的な問い合わせ言語(Cypher)を実装したグラフ処理ライブラリです.Spark SQLにおけるSQLも関係データを効率的に扱うための宣言的な問い合わせ言語の代表例です.Cypherは元々Neo4j向けに開発された問い合わせ言語ですが,その後に仕様が標準化されています.Sparkのグラフ処理ライブラリは歴史的にRDD上に実装されたGraphXや,DataFrame上に実装されたGraphFrameなどがありますが,どちらも宣言的ではなく手続き的な操作を行うこと前提としたライブラリでした.そのためSpark Graphはユーザビリティに重点を置いたライブラリになりそうです.ほとんど実装しか情報がない状況でしたが,今回のSummitの発表でNeo4jの提案者からの発表がありましたので,詳細を知りたい方はそちらを参照してください.

Data SourceとはSparkにおける入出力の対象で,Sparkが標準で読み書きできるData SourceはCSV/JSON/Parquet/ORC/JDBCです.下記の様にformatで指定したData Sourceの実装が内部で呼び出され,Sparkの内部表現と各Data Sourceの外部表現との相互変換を行います.

>>> df = spark.read.format('datasource name').load('path')

この読み書きの操作を抽象化しているものがData Source APIsで,以前までのものをV1,いま議論されているものをV2と呼んでいます.Sparkの成長とともに,Data Sourceが持つべき機能にも様々なニーズへの対応が求められています.具体的なニーズとしては,任意の論理プランのプッシュダウンやトランザクショナルな読み書きなどがありますが,V1で実現することが難しいという結論になっていました.そこで抽象化の見直しを行い,V2として再設計されました.例えばSPARK-23521では論理プランの構造が標準化され,論理プランの組み立てはSparkで行い ,実行はData Source側で行うことも可能になります.

Netflixが開発しているIcebergは,Data Source V2を用いて実装されており,今後も様々なData Sourceの実装が登場することが期待されています.Data Source APIsに関する歴史的な経緯や,V2に関する詳細な仕様に関しては昨年のSpark+AI Summitでの発表がより詳しいので,興味がある方は併せて参照してください.

Intelの開発者が主体となって取り組んでいる実行時の最適化に関する改良です.もともとSparkには実行時にパーティション数を自動調整する機能spark.sql.adaptive.enabled)がありますが,この取り組みは既存の最適化を高度化したもので,実行時にデータの統計情報を確認してJoinの実行方法を変更するなどの物理プランの切り替えも対象としています.具体的な内容に関してはIntelの方々が書いている記事(Spark SQL Adaptive Execution at 100 TB)を参照してください.

Scalaはそろそろ2.13がリリースしそうですが,Sparkがv2.xで主に使用しているのは2.11です.このことがScalaコミュニティで話題になったことを背景にSparkコミュニティでも議論され,v3.0ではデフォルトで2.12が使用されて2.11のサポートは打ち切られます.既にv2.4のリリースでは2.12でコンパイルしたSparkのバイナリが配布されています.

Spark SQLは歴史的な経緯で,ANSI SQL Standardsで規定されている挙動と異なる点(予約語や算術オーバーフロー扱いなど)が多くあります.そのためSQLパーサのエラーが難読化しているなどの問題があります.そこでANSIが規定しているパーサの挙動に準拠するために新たなオプション(spark.sql.parser.ansi.enabled)が追加されました.このオプションを有効にするとSQL:2011に準拠したキーワードが予約されるようになり,予約されたキーワードを含むSQLのエラー文が読みやすくなります.

現在Spark SQL/Catalystに与えることができるヒント情報は,Joinの実行方法に対するヒントであるbroadcast(Broadcast Hash Joins)のみです.しかし,Joinの実行方法は性能に大きな影響を与えるためSparkが実装している他のJoinの実行方法(Shuffle Hash Joins,Sort Merge Joins,Cartesian Product Joins,Broadcast Nested Loop Joins)に対するヒント情報もv3.0のリリースに向けて追加されました.

またメジャーバージョンが上がるので,現時点でも多くの互換性を壊すAPIsやオプション名の変更がされています.これらの致命的な変更に関しては基本的に全てMigration Guidesに記載されていますので,興味がある方はそちらを参照してください.

Koalas: Spark用のpandas DataFrame APIs

pandasは小〜中規模のデータ処理において非常に優れたツールです.しかし,pandasの作者がApache Arrow and the “10 Things I Hate About pandas”で述べているように,データ量の5〜10倍程度のメモリを消費することから大規模データ処理には適していません.そのためAmazon S3やHadoop HDFSなどにあるデータをSparkのDataFrameとして処理を行い,ある程度データ量を減らした後に,pandasのDataFrameへ変換して分析を行うことが現在のベストプラクティスになっています.しかしデータ量に応じてAPIsを使い分けることはユーザからすれば非常に面倒ですし,Sparkのメーリングリストでも3月に似たような議論があったことから今回のプロジェクトの発足に繋がったようです.

Python 3.5+の環境を既に使用しているなら,Koalas(Spark用のpandas DataFrame APIs)を試すことは非常に簡単です.下記のKoalas DataFrameはpandas DataFrame APIsを実現するための,Spark DataFrameのラッパー的な位置づけになります.

// NOTE: Python 3.5+ required
maropu$python -V
Python 3.7.0
// Install Koalas and PySpark
maropu$pip install koalas
maropu$pip install 'pyspark>=2.4'
// Test data
maropu$cat test.csv
c0, c1, c2
1, 'a', 3.8
2, 'b', 0.5
3, 'c', 1.1
maropu$python
>>> import databricks.koalas as ks
# 'kdf' means a DataFrame for Koalas and 'pdf' for pandas
>>> kdf = ks.read_csv('test.csv')
>>> kdf
c0 c1 c2
0 1 'a' 3.8
1 2 'b' 0.5
2 3 'c' 1.1
# Rename the columns
>>> kdf.columns = ['x', 'y', 'z']
>>> kdf
x y z
0 1 'a' 3.8
1 2 'b' 0.5
2 3 'c' 1.1
# Do some operations in place
>>> kdf['z'] = kdf.z*kdf.z
>>> kdf
x y z
0 1 'a' 14.44
1 2 'b' 0.25
2 3 'c' 1.21
# Converts to a pandas DataFrame
>>> pdf = kdf.to_pandas()
>>> pdf
x y z
0 1 'a' 14.44
1 2 'b' 0.25
2 3 'c' 1.21

現在の最新リリースはv0.5.0で絶賛開発中です.GitHubのIssue Trackerを使って開発をしているようなので,試してみて気づいたバグや改善点がある方は奮って報告をお願いします.

関連する発表:

Delta Lake: データレイクを実現するためのストレージレイヤ

Delta Lakeは今回のKeynoteで新しく発表されたSparkの入出力の対象となるストレージレイヤのOSSです.Data Source APIsを用いて実装されているため,以下のようにformatに指定するだけで使用できます.

>>> df = spark.read.format('delta').load('path')

下の画像で列挙されているものが主な特徴ですが,特にトランザクショナルな読み書きと,変化に対して柔軟でスケーラブルなメタデータ管理あたりが重要そうです.

A Deep Dive into Query Execution Engine of Spark SQL, https://bit.ly/2HMVWzb

トランザクショナルな読み書きに関しては,Oracle DatabaseやPostgreSQLのような高いスループット性能を目的としたものではなく,複数ストリームからの粗粒度の書き込みとバッチ的な読み込みが同時に発生しても不整合を起こさずデータの完全性を保証することが目的です.そのため排他制御は楽観的に行われ,書き込みの衝突は低いことを想定しています.またトランザクションログの出力先として,現在3種類の実装(ローカルのファイルシステム,HDFS,Azure)が用意されているようです.

スキーマが変化しながら日々増大するデータを正しく処理をするためには効率的なメタデータ管理が重要で,NetflixのIcebergでも同様の課題を扱っています.つまりDelta Lakeでは,スキーマが変化したデータを読み込み時に正しくマージする機能をもち,データが増大した場合にでもメタデータへの読み書きがボトルネックにならないように設計されています.

関連する発表:

MLflow: データ分析のライフサイクル管理ツール

チームで機械学習を用いたデータ分析を行う場合,他の人が作成した学習モデルを再利用・再現することは非常にコストがかかります.そこでデータ分析のライフサイクルを体系化し,この問題を解決するためにFacebookはFBLearner,UberはMichelangelo,GoogleはTFXをそれぞれ独自に開発しています.この流れを受けてOSSとして開発されたものがMLflowになります.昨年行われたSpark+AI Summitで発表され,今年のKeynoteで5月中にv1.0がリリースされる予定であることが発表されました.

公式からのアナウンス

MLflowは特定のツールに依存せず,学習に用いた条件(データセット,特徴量,ハイパーパラメータなど)と作成した学習モデルのロギング,学習環境の再現(依存関係の解決),また作成した学習モデルをRESTサーバやAmazon SageMakerなどにデプロイなどを行うことを支援します.そのためMLflow自体に前処理や学習を行う機能があるわけではなく,分析によく用いられる既存のツール(pandas,scikit-learn,XGBoost/LightGBM,PyTorch,Spark,TensorFlowなど)を補助する位置づけになります.

MLflowを用いる簡単なサンプルコードを書きました.このデモに含まれる学習用のコードは,Sparkを用いたデータの読み込み,pandas DataFrameへの変換,XGBoostを用いた学習を行います.MLflowはこの学習用コードを実行する前に,設定ファイルに記録されているコードが要求する環境の再現を行った後に,学習用コードの実行を行います.またコード実行中に記録されたログを集約して,実行後にWeb UI(下図)上から確認することが可能です.

$ pip install mlflow==0.9.1
$ git clone https://github.com/maropu/mlflow-blog-sample-201905.git
$ cd mlflow-blog-sample-201905
$ mlflow run . -P max_depth=4,8
$ mlflow ui
... [27805] [INFO] Listening at: http://127.0.0.1:5000 (27805)
MLflow Web UI (http://127.0.0.1:5000)

サンプルコードの詳細に関してはここでは省きますので,興味のある方はREADME.mdを参照するか,下記の6/12の参加報告会のイベントでMLflowに関して話す予定ですのでそちらの参加をお待ちしています.

関連する発表:

高度化するSparkの最適化・実行処理系

RDD上にSpark SQL/DataFrameが実装されてから数年経ち,近年のSpark+AI Summitでは,このレイヤ上に実装されたライブラリやユースケースのほうに注目が当たることが多くなってきました.しかし大半のライブラリやユーザが利用する機能であるため,継続的な効率化・最適化のための改良や,それらの動向を理解することは非常に重要です.

Appleの開発者からの発表では,ネストされたデータ読み込みに関する最適化の取り組みが紹介されていました.AppleではSiriのログ分析のための90%以上のジョブがSparkのものらしく,数百ペタバイトのデータに対して処理を行う必要があるため読み込みの最適が非常に重要なようです.具体的な取り組みとしては,処理中に参照されない不要なネストされたデータを読み込まないように最適化(Nested Column Pruning)するものです.一見すると簡単なように思えますが,遅延評価で最適化を行うSparkにおいては様々な処理の組み合わせを考慮しなければいけないため,処理に不要なデータが読み込まれてしまうケースは依然として多々あります.

例えば,以下のクエリではv3.0向けに追加された最適化オプション(spark.sql.optimizer.nestedSchemaPruning.enabled)のON/OFFで読み込むデータ量が変わります.

// Generates and writes test data
>>> exprs = ['id c0', 'struct(id a, id b) c1']
>>> spark.range(10).selectExpr(exprs).write.parquet('/tmp/t')
// Reads the test data
>>> df = spark.read.load('/tmp/t')
>>> df.printSchema()
root
|-- c0: long (nullable = true)
|-- c1: struct (nullable = true)
| |-- a: long (nullable = true)
| |-- b: long (nullable = true)
// This is the v2.4 behaviour
>>> spark.sql("SET spark.sql.optimizer.nestedSchemaPruning.enabled=false")
>>> df.repartition(1).select('c1.a').explain(True)
== Physical Plan ==
*(2) Project [c1#11.a AS a#93L]
+- Exchange RoundRobinPartitioning(1)
+- *(1) FileScan parquet [c1#11] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/t], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:struct<a:bigint,b:bigint>>
// This is the v3.0 behaviour
>>> spark.sql("SET spark.sql.optimizer.nestedSchemaPruning.enabled=true")
>>> df.repartition(1).select('c1.a').explain(True)
== Physical Plan ==
Exchange RoundRobinPartitioning(1)
+- *(1) Project [c1#11.a AS a#99L]
+- *(1) FileScan parquet [c1#11] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/t], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:struct<a:bigint>>

これは実装された最適化の一例で限定的なものですが,ネストされたデータを多く扱う場合には試して見る価値があるかもしれません.

Databricksの開発者からは,Spark SQL/DataFrameの内部詳細に関する発表クエリプラン/UIsの読み方に関する発表がありました.前者の発表は物理プラン・コード生成に始まり,メモリ管理,ベクトル化された読み込み,UDFの実行などに関して現状の最新情報を扱った発表であったため,現状の主要箇所を確認したい人には非常に良い資料だと感じました.また後者の発表で扱っているクエリプラン/UIsは内部動作と密接な関わりがあることから,普段開発に関わっている私でも読むことが非常に難しいことが現状です.その中でも頻繁に話題に上がるHiveテーブルの読み書きやキャストに関する話題などを取り上げ,それら確認の仕方に関して紹介しています.DBMSライクな処理系においては今も昔もクエリプランを理解することが,性能理解への第一歩です.

IBMの開発者からはSession Extension APIsに関する紹介がありました.このAPIsを利用することで外部からパーサ,アナライザ,オプティマイザなどに新規の変換ルールを追加することが可能です.以前までもオプティマイザ以降の処理に新規のルールを追加する方法がありましたが,このAPIsはそれをより一般化したものです.大半のユーザが使うことのない機能かとは思われますが,特定ドメインに特化した最適化を組み込むことが可能なので,適したユースケースがあれば効果は非常に高いと思います.

関連する発表:

イベント告知: Spark+AI Summit参加報告会

来月の6/12にNTT SICの田町オフィス(Social Coding Studio)でSpark+AI Summitの参加報告会を行います.Databricks様のご厚意でイベント後に飲み物とお菓子も提供されます.上で紹介したKoalasの主要開発者である上新さんも参加・発表されるので,Koalasの最新の情報,またその他のSpark界隈の情報収集をしたい方は奮って参加をよろしくおねがいします.

Spark Meetup Tokyo #1 (Spark+AI Summit 2019)

おわりに

私たちNTTはオープンソースコミュニティで共に活動する仲間を募集しています.ぜひ弊社 ソフトウェアイノベーションセンタ紹介ページや,採用情報ページをご覧ください.

--

--

Takeshi Yamamuro
nttlabs
Writer for

R&D engineer, Ph.D. in CS (Database Systems) — Apache Spark committer, Apache Hivemall PPMC, PostgreSQL enthusiast — LLVM/C/C++03/Java/Scala/Python