ScalarDB Analytics with Sparkをリリースしました

Akihiro Okuno
Scalar Engineering (JA)
18 min readJun 11, 2024

ScalarDB 3.12において、ScalarDBを用いて管理しているデータベース上でApache Sparkを用いた分析的クエリの実行を可能とするScalarDB Analytics with Sparkを新たにリリースしました。本記事ではScalarDB Analytics with Sparkの概要や利用方法などについて説明します。

ScalarDB Analytics with Sparkとは?

ScalarDBは汎用的なトランザクションマネージャであり、RDBMSやNoSQL等を含む様々なデータベースの上で、複数のデータベースをまたいだトランザクション処理を実行することを可能とします。ScalarDBは多数のシンプルなトランザクションを処理するようなワークロードを主な対象としており、SQLを用いた処理はサポートしているものの、可能な操作としてはキーに基づいたCRUDや一部のScanのみに限られています。

ScalarDB Analyticsは、ScalarDB管理下にあるデータベースに対して結合や集計を含む広範なクエリの実行をサポートし、アドホック分析などの高度な処理を行うことを可能にするものです。ScalarDB Analyticsシリーズとして、昨年ScalarDB Analytics with PostgreSQLをリリースしており、PostgreSQLにおいて外部データの読み込みを可能とする外部データラッパの機能を利用して、PostgreSQL上においてScalarDB管理下にあるデータベースに対してクエリを実行することができるようになりました。今回リリースしたScalarDB Analytics with Sparkは、新たなScalarDB Analyticsシリーズの一つであり、Apache SparkからScalarDB管理下のデータベースに対してクエリを実行することができます。Apache Sparkは分析的な処理を主な対象とする分散処理エンジンであり、様々な分析的クエリの実行をサポートしています。

ScalarDB Analytics with Sparkの概要

ScalarDB Analyticsシリーズの位置付けとしては、ScalarDB Analytics with PostgreSQLがOSSとして提供されており自由に利用できるのに対して、ScalarDB Analytics with Sparkはエンタープライズ版の製品であり、利用するためには有償ライセンスが必要となります。Sparkは複数のノードを用いた分散処理が可能であるため、PostgreSQLと比較して、よりスケーラブルに分析的クエリの実行が可能となることが期待されます。

ScalarDB Analytics with Sparkの仕組み

ScalarDB Analytics with SparkはApache Sparkのプラグインとして実装されています。ScalarDB Analytics with Sparkは、名前空間やテーブルなどのメタデータを扱うCatalogプラグインと、外部のデータソースからScalarDBライブラリを経由してデータを読み取るConnectorプラグインから、主に構成されています。

Catalogプラグインは、ScalarDBの設定ファイルに記載されているデータベースの接続情報などを用い、ScalarDB内にどのようなデータ(名前空間、テーブル、列)が含まれているかを取得して、Sparkのカタログ情報として提供します。Sparkはテーブル情報を管理するためのデフォルトのカタログ実装を内部に持っていますが、ユーザ側で別のカタログ実装を指定することが可能になっています。ScalarDB Analytics with Sparkが提供するカタログ実装を指定することにより、Spark上でScalarDBのカタログを用いることが可能になります。具体的な設定方法については、次の節で説明します。

Catalogプラグインには複数の実装方法がありますが、ScalarDB Analytics with Sparkのように外部に存在するカタログ情報を丸ごと利用するような用途には、CatalogPluginインターフェースを実装することによって一から独自のカタログを実装することが可能となっており、ScalarDB Analytics with Sparkではこの方法によって、ScalarDBのカタログ情報をSparkのカタログとして扱うことを可能にするCatalogプラグインを実装しています。

Catalogプラグインは、カタログ情報を提供するだけではなく、ユーザがクエリを指定した際に、適切なConnectorプラグインを起動して読み取り処理を開始することも役割の一つです。多くのCatalogプラグイン実装では、対象とする外部データソースが一種類であるため、常に単一のConnectorプラグインを用いるようになっていますが、上の図にあるようにScalarDBは多くの種類のデータベースをサポートしているため、それぞれのデータベースに対応したConnectorプラグインを用いる必要があります。ただし、全てのデータベースに対して個別のConnectorプラグインを用意することは手間がかかるため、いくつかの個別のConnectorプラグインに加えて、ScalarDB Analytics with Sparkで独自に提供しているScalarDB Connectorプラグインとを併用する形になっています。

ScalarDB Connectorプラグインは、内部でScalarDBライブラリを用いて外部のデータベースからデータを読み取る実装になっています。ScalarDBは分析的クエリに用いられる大量のデータのスキャンや複雑な絞り込みなどを得意としていないため、利用が可能な場合には個別のConnectorを用いた方が効率的な実行が可能になると考えられます。そのため、独自Connectorプラグインを利用可能なデータベースの場合にはそのConnectorプラグインを用い、それ以外の場合にはScalarDB Connectorプラグインを用いることによって、ScalarDBがサポートしている全てのデータベースからの読み取りを可能とする仕組みになっています。現在のバージョン3.12.0では、JDBCデータベースの場合にはSparkが同梱するJDBC Connectorを利用し、それ以外の場合にはScalarDB Connectorを利用するようになっています。

Connectorプラグインは歴史的な経緯から複数の実装方法があるようですが、ScalarDB ConnectorプラグインではTableインタフェースを実装する方法を用いています。

これまでに説明したCatalogプラグインとConnectorプラグインを用いることで、Spark上でScalarDB管理化のテーブルのデータを読み取ることが可能になりますが、このテーブルの生データにはScalarDBがトランザクション処理を行うためのメタデータが含まれています。また、これらのトランザクションメタデータを考慮せずにデータを読んでしまうことになるため、トランザクションによる一貫性の保証を得ることができません。例えば、別の実行中のトランザクションで書き込みをしようとしている未コミットのデータを読んでしまう可能性があります。そのため、ScalarDB Analytics with Sparkでは生データに含まれているトランザクションメタデータを解釈しつつ、ユーザのデータのみをテーブルのデータとして表示するための、WAL-Interpreted Viewを作成します。WAL-Interpreted Viewによって、ユーザはScalarDB上のテーブルと同等のテーブルがSpark上に存在するかのようにデータにアクセスすることができるようになります。また、WAL-Interpreted Viewを通したデータのアクセスは、トランザクションメタデータを解釈しているため、一定の一貫性(正確にはRead Committed)が保証されています。ユーザは、Catalogプラグインが提供する生のテーブルではなく、こちらのWAL-Interpreted Viewを通してデータにアクセスします。

ScalarDB Analytics with Sparkを用いたクエリの実行

ScalarDB Analytics with Sparkを用いるためには、まずCatalogプラグインを有効化する必要があります。対象とするSparkの設定(`spark.conf`)に以下の設定を追加します。

spark.sql.catalog.scalardb_catalog = com.scalar.db.analytics.spark.datasource.ScalarDbCatalog       (1)
spark.sql.catalog.scalardb_catalog.config = /<PATH_TO_YOUR_SCALARDB_PROPERTIES>/config.properties (2)
spark.sql.catalog.scalardb_catalog.namespaces = <YOUR_NAMESPACE_NAME_2>,<YOUR_NAMESPACE_NAME_2> (3)
spark.sql.catalog.scalardb_catalog.license.key = {"your":"license", "key":"in", "json":"format"} (4)
spark.sql.catalog.scalardb_catalog.license.cert_path = /<PATH_TO_YOUR_LICENSE>/cert.pem (5)

(1)では、Catalogプラグインの実装クラスを指定します。常にScalarDbCatalogのクラスを指定します。また、キーの最右端部のscalardb_catalogはSpark上でのカタログ名を指定する文字列であり、ユニークな値であれば任意の文字列を指定できます。この記事ではscalardb_catalogを用います。

(2)では、ScalarDB Analytics with Sparkでデータを読み込みたいScalarDBの設定ファイルのパスを指定します。

(3)では、ScalarDB Analytics with Sparkでデータを読み込む対象とするScalarDB内の名前空間を列挙します。複数の名前空間を指定する場合にはカンマで区切ります。

(4)(5)では、それぞれライセンスのキー(JSON)と証明書を指定します。ScalarDB Analytics with Sparkでは、クエリの実行時にこれらのライセンス情報の正当性をチェックしており、チェックに失敗した場合にはクエリの実行も失敗するようになっています。

以上が正しく設定された状態でSparkを起動すると、指定したカタログ名でScalarDBのテーブルを表示できるようになります。例として、ScalarDB Analytics with SparkのサンプルアプリケーションのScalarDBを用い、Spark Shellを実行すると、以下のようカタログ内の情報を表示することができます。

scala> sql("SHOW NAMESPACES in scalardb_catalog").show() // (1)
+-----------+
| namespace|
+-----------+
| dynamons|
| postgresns|
|cassandrans|
+-----------+


scala> sql("SHOW TABLES in scalardb_catalog.postgresns").show() // (2)
+----------+---------+-----------+
| namespace|tableName|isTemporary|
+----------+---------+-----------+
|postgresns| orders| false|
+----------+---------+-----------+


scala> sql("DESCRIBE scalardb_catalog.postgresns.orders").show() // (3)
+--------------------+---------+-------+
| col_name|data_type|comment|
+--------------------+---------+-------+
| o_orderkey| int| NULL|
| o_custkey| int| NULL|
| o_orderstatus| string| NULL|
| o_totalprice| double| NULL|
| o_orderdate| string| NULL|
| o_orderpriority| string| NULL|
| o_clerk| string| NULL|
| o_shippriority| int| NULL|
| o_comment| string| NULL|
| tx_id| string| NULL|
| tx_state| int| NULL|
| tx_version| int| NULL|
| tx_prepared_at| bigint| NULL|
| tx_committed_at| bigint| NULL|
| before_tx_id| string| NULL|
| before_tx_state| int| NULL|
| before_tx_version| int| NULL|
|before_tx_prepare...| bigint| NULL|
|before_tx_committ...| bigint| NULL|
|before_o_orderstatus| string| NULL|
+--------------------+---------+-------+
only showing top 20 rows

(1)ではScalarDB内の名前空間の一覧を、(2)ではpostgresnsという名前空間内のテーブルの一覧を、(3)ではpostgresns.ordersというテーブルの詳細情報を表示しています。

表示されたテーブル情報から分かるように、ユーザが定義した列に加えて、トランザクションメタデータが含まれています。このままでは使いづらいので、上述したようにこれら生のテーブルに対してWAL-Interpreted Viewを作成します。ScalarDB Analytics with SparkではWAL-Interpreted Viewを作成するためのヘルパークラスを提供しており、以下のコードを実行することによって、カタログ内に存在する全てのテーブルに対して一括してWAL-Interpreted Viewを作成することができます。

scala> import com.scalar.db.analytics.spark.view.SchemaImporter
scala> new SchemaImporter(spark, "scalardb_catalog").run()

new SchemaImporterの第一引数にはSparkSessionのインスタンスを、第二引数には設定で指定したカタログ名を渡します。このインスタンスのrun()を呼ぶことによって、WAL-Interpreted Viewを作成することができます。WAL-Interpreted ViewはSparkの現在のカタログ上に作成されます。通常はSparkのデフォルトカタログになります。

scala> sql("SHOW NAMESPACES").show() // (1)
+-----------+
| namespace|
+-----------+
|cassandrans|
| default|
| dynamons|
| postgresns|
+-----------+


scala> sql("SHOW TABLES in postgresns").show() // (2)
+----------+---------+-----------+
| namespace|tableName|isTemporary|
+----------+---------+-----------+
|postgresns| orders| false|
+----------+---------+-----------+


scala> sql("DESCRIBE postgresns.orders").show() // (3)
+---------------+---------+-------+
| col_name|data_type|comment|
+---------------+---------+-------+
| o_orderkey| int| NULL|
| o_custkey| int| NULL|
| o_orderstatus| string| NULL|
| o_totalprice| double| NULL|
| o_orderdate| string| NULL|
|o_orderpriority| string| NULL|
| o_clerk| string| NULL|
| o_shippriority| int| NULL|
| o_comment| string| NULL|
+---------------+---------+-------+

(1)、(2)では現在のカタログ上に、scalardb_catalogのものと同じ名前の名前空間とテーブル(正確にはビュー)が作成されていることが分かります。(3)で表示したビューの詳細情報を見ると、トランザクションメタデータは含まれておらず、ユーザが定義した列のみが含まれています。このビューを通してデータにアクセスすることによって、ScalarDBと同等の構造のテーブルに対して、Sparkがサポートしている任意のクエリを実行することが可能になります。例えば、以下のように複数のテーブルの結合や、複雑な絞り込み、集約が含まれているクエリの実行が可能です。

scala> sql("""
| SELECT
| l_orderkey,
| sum(l_extendedprice * (1 - l_discount)) AS revenue,
| o_orderdate,
| o_shippriority
| FROM
| dynamons.customer,
| postgresns.orders,
| cassandrans.lineitem
| WHERE
| c_mktsegment = 'AUTOMOBILE'
| AND c_custkey = o_custkey
| AND l_orderkey = o_orderkey
| AND o_orderdate < '1995-03-15'
| AND l_shipdate > '1995-03-15'
| GROUP BY
| l_orderkey,
| o_orderdate,
| o_shippriority
| ORDER BY
| revenue DESC,
| o_orderdate,
| l_orderkey
| LIMIT 10;
| """).show()
+----------+------------------+-----------+--------------+
|l_orderkey| revenue|o_orderdate|o_shippriority|
+----------+------------------+-----------+--------------+
| 1071617|128186.99915996166| 1995-03-10| 0|
| 1959075| 33104.51278645416| 1994-12-23| 0|
| 430243|19476.115819260962| 1994-12-24| 0|
+----------+------------------+-----------+--------------+

まとめ

本記事では、ScalarDB Analytics with Sparkの紹介とその簡単な仕組みを紹介しました。より詳細な情報はドキュメントをご参照下さい。Scalarでは、データの管理をより信頼できるというミッションのもと、データ管理に対するより広範なニーズに応えていくために、Scalar製品の可能性を拡げていきます。

--

--

Akihiro Okuno
Scalar Engineering (JA)

Software Engineer, Ph.D., specializing in database engineering.