最新のApache Spark v2.4にふれてみよう: 概要と新機能の紹介

Takeshi Yamamuro
nttlabs
Published in
10 min readNov 26, 2018

NTT研究所の山室です,Sparkのコミッタを勤めています.

本記事ではSparkの概要と,昨日リリースしたv2.4の新機能の中から(全ては無理なので)一部を簡単に紹介します.全ての新機能を俯瞰したい方はリリースノートを参照してください.

Spark Release 2.4.0

SparkはUC Berkeleyでビックデータ分析に関する研究を行っていたAMPLabの成果を2012年にOSSとして公開したもので,データの前処理から機械学習やグラフ処理などデータ分析に必要な一連の処理を効率的に実現できます.よく挙げられるSparkの特徴は以下3つです.

  • SQL/DataFrame/DatasetなどのAPIを用いた容易なデータ操作
  • 外部プロダクト(下図のPostgreSQLやOracleなどのRDBMS,Hadoop HDFSやAmazon S3などのデータストア,PandasやTensorFlowなどの分析ツール,…)との連携
  • 内部での高度な最適化と効率的な分散・並列実行
Apache Spark 2015 Year In Review, https://databricks.com/blog/2016/01/05/apache-spark-2015-year-in-review.html

Sparkには機械学習やグラフ処理などのデータ分析用のライブラリがデフォルトで組み込まれていますので,RDBMSやデータストア上のデータを読み込み,Spark DataFrame上で欠損値の補完や結合・集約などの前処理を適用し,組み込みのライブラリを用いて容易にデータ分析を行うことが可能です.またSpark DataFrameのAPI(toPandas)を用いることでPandas DataFrameに変換し,NumPyやscikit-learnなどのPython toolchainを用いた分析に橋渡しすることも可能です.

Sparkは上で述べたようにUC Berkeley AMPLabの成果物であることから,AMPLabや関連する研究員が今も開発に参加しています.そのため今までも有用な研究成果をコミュニティで積極的に議論して取り入れてきましたし,これからもそういった傾向が続くと考えています.

それではSpark v2.4の紹介に移っていきたいと思います.Spark v2.4を簡単かつ手早く使う方法の1つはPythonのパッケージ管理ツールpip経由でインストールする方法です.

無事インストールが終わればpysparkコマンドを使いSparkを起動して,DataFrameを用いて任意の処理を行うことできます.

また上記の例と同様の処理をSQLを用いて行うことも可能です.

Sparkは複数マシンを束ねたクラスタ環境上での分散処理だけではなく,CPUのコア数が多くメモリサイズの大きい単体マシン環境でも入力した処理を自動で並列化してくれるため非常に便利です.単体マシン環境上でのSparkの利用や性能に関しては以下の記事が参考になります.

Benchmarking Apache Spark on a Single Node Machine

例えば以下に示すようにローカル環境上にあるファイルサイズの大きいJSONファイルをSparkを用いて並列に読み込み・サンプリングを行った後に,PandasのDataFrameに変換することも簡単です.

それでは次にv2.4の新機能(一部の機能はv2.3から使用可能)をいくつか紹介していきます.

  • 1. PySpark UDF improvement
  • 2. Eager evaluation
  • 3. New built-in functions for complex data types
  • 4. New data source support: AVRO
  • 5. New data source support: Image
  • 6. Optimizer improvement

1. PySpark UDF improvement

PySparkにおいてUDFを実行した際の性能が大幅に改善されました.UDFの処理内容よっては100倍以上の高速化が期待できます(下図).

Introducing Pandas UDF for PySpark, https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

PySparkにおけるUDFの実行ではPythonで記述されたUDFをJVM(Sparkの実行環境)上で実行できないため,Python Workerにデータを渡して実行し,結果を書き戻す必要があります.この一連のUDFの実行において,以前のPySparkではデータの(De-)Serializationとrow-by-rowでの実行がボトルネックであると指摘されていました.この課題を解決するためにApache Arrowを用いた効率的な(De-)Serializationと実行のバッチ化による改善が行われました.以下のようにUDFの定義時にpandas_udfデコレータを明示的に指定する使用する必要があります.

2. Eager evaluation

PySpark/SparkRの対話シェルにおいて遅延評価を行わずに定義時にすぐに結果を表示する動作モードが追加されました.Sparkは当初から一部の操作以外は遅延評価を行うことで最適化の余地を大きくしていました.しかしPandasのように,step-by-stepに結果を確認しながら処理を記述したいユーザにとって不便であるため,この動作モードが追加されました.この機能を使用するためには以下のように明示的に有効化する必要があります.

3. New built-in functions for complex data types

共通的に使用されるArrayやMapなどのデータ型に対する操作が標準の組み込み関数としてSparkに実装されました.この追加されたもの中には以下のような高階関数も含まれています.

これらの追加された新しい関数に関しては以下の記事が詳しいので興味がある方は参照してください.

Introducing New Built-in and Higher-Order Functions for Complex Data Types in Apache Spark 2.4

4. New built-in data source support: AVRO

Sparkが読み書きできるフォーマットにAVROが追加されました.Sparkは既にCSV,JSON,ParquetORCなどの広く使われているフォーマットを利用可能で,これと同様にAVROも使用可能になりました.しかし今回のリリースにはAVROのパッケージが含まれていないため,AVROの読み書きを行うためにはPySparkを起動する際に追加のオプションが必要です.

5. New built-in data source support: Image

主に機械学習向けの用途として画像データ群をDataFrameとして簡単に読み込むことが可能になりました.例えば以下のように手書き文字の画像データMNIST(JPEG )を一括で読み込むことができます.

6. Optimizer improvement

参照しない不要なBucketの読み込みの除去や,サブクエリ内の不要なソート処理の除去などを適用するようにOptimizerが改善されました.Sparkでは遅延評価を活用することで,ユーザが入力した処理をそのまま実行するのではなく,不要な処理の削除,同等でかつより効率的な処理への置換,処理の並び替えなど様々な最適化が適用された後に実行されます.他に比べると地味な改善のようにも感じますが,処理によってはより新しいSparkを使うだけで数倍速くなるケースがあるため重要です.

サブクエリ内の不要なソート処理として以下の具体例を考えます.

上記の例ではsdf1のサブクエリ内のソート処理は不要なのですが,v2.3までは以下の赤線で示すように実際にソート処理を行っていました.

ソート処理は比較的にコストが高いため,このソート処理を除去(以下がv2.4の例)できるだけで大幅に処理時間を短縮可能です.

この例は単純な処理であるため不要な処理に気づきやすいですが,処理が複雑になるほど気づき難くなり,このような遅延評価による最適化が非常に重要になります.

今後のリリースに向けて

Sparkのコミュニティは次期v3.0のリリースに向けて鋭意開発を進行中です.Project Hydrogenと呼ばれる活動下で進行中のMPIベースのジョブ(e.g., 深層学習)との連携機能や,後方互換性維持のために修正できなかった多くの問題解決が実現される予定です.また少し高度な話題ですが,実行時コード生成のための中間表現の導入に関する議論が進行中です.現在のSparkは実行効率化のために,2011年にデータベース研究分野で提案されたProduce/Consumeモデルに従ってユーザが入力したSQL/DataFrame/Datasetの操作を内部で対応するソースコードに変換・コンパイルしてから実行しています.これは優れたモデルですが,後の研究でメンテナンスの困難さや最適化の課題が指摘されおり,現在も研究分野で様々な改善案が提案されています.実際にSparkの開発においても同様の課題に直面しており,このような議論に至った経緯があります.この取り組みがv3.0に含まれるかは現時点では不明ですが,長期的にはこのような研究成果が反映されたより良い実行時のコード生成モデルが実現されると考えています.

終わりに

私たちNTTは,オープンソースコミュニティで共に活動する仲間を募集しています.ぜひ弊社 ソフトウェアイノベーションセンタ紹介ページ及び,採用情報ページをご覧ください.

--

--

Takeshi Yamamuro
nttlabs
Writer for

R&D engineer, Ph.D. in CS (Database Systems) — Apache Spark committer, Apache Hivemall PPMC, PostgreSQL enthusiast — LLVM/C/C++03/Java/Scala/Python