Announcing ScalarDB Analytics with Spark

Akihiro Okuno
Scalar Engineering
Published in
7 min readJun 11, 2024

In ScalarDB 3.12, we launched ScalarDB Analytics with Spark, a new feature enabling the execution of analytical queries by using Apache Spark on databases managed by ScalarDB. This article will provide an overview and usage guide for ScalarDB Analytics with Spark.

What is ScalarDB Analytics with Spark?

ScalarDB is a universal transaction manager that allows for cross-database transactions over various databases, including RDBMS and NoSQL. It primarily targets workloads involving numerous simple transactions, supporting operations limited to key-based CRUD and some scans.

ScalarDB Analytics enhances the query capabilities over databases managed by ScalarDB to include joins and aggregations, facilitating complex ad-hoc analyses. As part of the ScalarDB Analytics feature set, we released ScalarDB Analytics with PostgreSQL last year, which utilizes the foreign data wrapper feature to execute queries on ScalarDB-managed databases in PostgreSQL. The newly released ScalarDB Analytics with Spark enables querying ScalarDB-managed databases from Apache Spark, a distributed processing engine designed for analytical workloads.

Overview of ScalarDB Analytics with Spark

While ScalarDB Analytics with PostgreSQL is available as open-source software (OSS), ScalarDB Analytics with Spark is an enterprise version of the product, which requires a paid license. Given Spark’s ability to handle distributed processing across multiple nodes, it is expected to perform analytical queries more scalably compared to PostgreSQL.

How ScalarDB Analytics with Spark works

ScalarDB Analytics with Spark is implemented as a plugin for Apache Spark. ScalarDB Analytics with Spark consists mainly of the catalog plugin, which handles ScalarDB metadata, such as namespaces and tables, and the connector plugin, which reads data from external data sources via the ScalarDB library.

The catalog plugin uses database connection information in the ScalarDB configuration file to retrieve the data (namespaces, tables, and columns) contained in ScalarDB and provide this as a catalog in Spark. By specifying the catalog implementation provided by ScalarDB Analytics with Spark, it is possible to use ScalarDB’s catalog in Spark. The specific configuration is described in the next section.

Catalog implementation

There are several ways to implement catalogs. For applications such as ScalarDB Analytics with Spark, where the entire catalog information exists externally, it is possible to implement your own catalog by implementing the CatalogPlugin interface. ScalarDB Analytics with Spark uses this method to implement a catalog plugin that turns ScalarDB catalog information into a Spark catalog.

The catalog plugin is not only responsible for providing catalog information but also for invoking the appropriate connector plugin to initiate the read process when a user runs a query. Most catalog plugin implementations only use a single connector plugin for a single external data source. On the other hand, as shown in the figure above, ScalarDB supports many types of databases, so it is necessary to choose a connector plugin for each database. Since preparing separate connector plugins for every database supported by ScalarDB is time consuming, ScalarDB Analytics with Spark provides its own ScalarDB connector plugin in addition to several database-specific connector plugins.

The ScalarDB connector plugin is implemented internally by using the ScalarDB library to read data from external databases. Because ScalarDB is more suited for scanning smaller amounts of data or performing simple analytical queries, it is considered more efficient to use the separate connector plugin when possible. For this reason, the database-specific plugins are used when available, and the ScalarDB connector plugin is used in other cases to support all ScalarDB-supported databases for ScalarDB Analytics with Spark. As of ScalarDB 3.12, ScalarDB Analytics with Spark uses only the JDBC connector, included with Spark, as the database specific connector.

There are several ways to implement Connector plugin due to the historical reason. The ScalarDB Connector plugin uses the Table interface to implement.

The catalog and connector plugins described so far allow Spark to read data from ScalarDB-managed databases. However, the raw data from these databases contain transaction metadata columns that ScalarDB needs to process transactions. Also, since the data would be read without considering this transactional metadata, it is not possible to guarantee transactional consistency. For example, Spark could read uncommitted data that is about to be written by another running transaction. Therefore, based on the raw tables, ScalarDB Analytics with Spark creates WAL-Interpreted Views that interprets the transaction metadata contained in the raw data but displays only the user data as data in the table. The WAL-Interpreted Views allow the user to see the equivalent table in Spark as it exists in ScalarDB.

In addition, accessing data through WAL-Interpreted Views guarantees a certain degree of consistency (Read Committed) by interpreting transactional metadata. So, users generally access data through the WAL-Interpreted Views here rather than the raw tables provided by the catalog plugin.

Running a query by using ScalarDB Analytics with Spark

To use ScalarDB Analytics with Spark, you must first activate the catalog plugin. Add the following settings to your Spark configuration (spark.conf):

spark.sql.catalog.scalardb_catalog = com.scalar.db.analytics.spark.datasource.ScalarDbCatalog       (1)
spark.sql.catalog.scalardb_catalog.config = /<PATH_TO_YOUR_SCALARDB_PROPERTIES>/config.properties (2)
spark.sql.catalog.scalardb_catalog.namespaces = <YOUR_NAMESPACE_NAME_2>,<YOUR_NAMESPACE_NAME_2> (3)
spark.sql.catalog.scalardb_catalog.license.key = {"your":"license", "key":"in", "json":"format"} (4)
spark.sql.catalog.scalardb_catalog.license.cert_path = /<PATH_TO_YOUR_LICENSE>/cert.pem (5)

In (1), you specify the implementation class of the catalog plugin. You always need to specify the sample value, ScalarDbCatalog. The rightmost part of the key, scalardb_catalog, is an arbitrary string that specifies the name of the catalog on Spark. In this article, scalardb_catalog is used.

In (2), you specify the path to the ScalarDB configuration file from which you want to load data in ScalarDB Analytics with Spark.

In (3), you list the namespaces in ScalarDB into which you want to read data in ScalarDB Analytics with Spark. When specifying multiple namespaces, you can separate them with commas.

In (4) and (5), you specify the license key (JSON) and the certificate, respectively. ScalarDB Analytics with Spark checks the validity of this license information when executing a query, and if the check fails, the query execution will also fail.

When you launch Spark with the above correctly configured, you will be able to view ScalarDB tables with the specified catalog name. As an example, when using the ScalarDB Analytics with Spark sample application and executing the Spark Shell, the catalog information will be displayed as follows:

scala> sql("SHOW NAMESPACES in scalardb_catalog").show() // (1)
+-----------+
| namespace|
+-----------+
| dynamons|
| postgresns|
|cassandrans|
+-----------+


scala> sql("SHOW TABLES in scalardb_catalog.postgresns").show() // (2)
+----------+---------+-----------+
| namespace|tableName|isTemporary|
+----------+---------+-----------+
|postgresns| orders| false|
+----------+---------+-----------+


scala> sql("DESCRIBE scalardb_catalog.postgresns.orders").show() // (3)
+--------------------+---------+-------+
| col_name|data_type|comment|
+--------------------+---------+-------+
| o_orderkey| int| NULL|
| o_custkey| int| NULL|
| o_orderstatus| string| NULL|
| o_totalprice| double| NULL|
| o_orderdate| string| NULL|
| o_orderpriority| string| NULL|
| o_clerk| string| NULL|
| o_shippriority| int| NULL|
| o_comment| string| NULL|
| tx_id| string| NULL|
| tx_state| int| NULL|
| tx_version| int| NULL|
| tx_prepared_at| bigint| NULL|
| tx_committed_at| bigint| NULL|
| before_tx_id| string| NULL|
| before_tx_state| int| NULL|
| before_tx_version| int| NULL|
|before_tx_prepare...| bigint| NULL|
|before_tx_committ...| bigint| NULL|
|before_o_orderstatus| string| NULL|
+--------------------+---------+-------+
only showing top 20 rows

(1) shows the list of namespaces in ScalarDB, (2) shows the list of tables in the namespace named `postgresns`, and (3) shows detailed information about the table, `postgresns.orders`.

As you can see from the table information, it contains transaction metadata columns in addition to user-defined columns. Since it is difficult to use as it is, we will create WAL-Interpreted Views for these raw tables. ScalarDB Analytics with Spark provides a helper class to create WAL-Interpreted Views. By executing the following code, you can create WAL-Interpreted Views for all tables in the catalog at once:

scala> import com.scalar.db.analytics.spark.view.SchemaImporter
scala> new SchemaImporter(spark, "scalardb_catalog").run()

The first argument of new SchemaImporter is an instance of SparkSession and the second argument is the catalog name specified in the configuration. By calling run() on this instance, WAL-Interpreted Views will be created on Spark’s current catalog. Normally, the current catalog is Spark’s default catalog. You can see the details of the created WAL-Interpreted Views as follows:

scala> sql("SHOW NAMESPACES").show() // (1)
+-----------+
| namespace|
+-----------+
|cassandrans|
| default|
| dynamons|
| postgresns|
+-----------+


scala> sql("SHOW TABLES in postgresns").show() // (2)
+----------+---------+-----------+
| namespace|tableName|isTemporary|
+----------+---------+-----------+
|postgresns| orders| false|
+----------+---------+-----------+


scala> sql("DESCRIBE postgresns.orders").show() // (3)
+---------------+---------+-------+
| col_name|data_type|comment|
+---------------+---------+-------+
| o_orderkey| int| NULL|
| o_custkey| int| NULL|
| o_orderstatus| string| NULL|
| o_totalprice| double| NULL|
| o_orderdate| string| NULL|
|o_orderpriority| string| NULL|
| o_clerk| string| NULL|
| o_shippriority| int| NULL|
| o_comment| string| NULL|
+---------------+---------+-------+

In (1) and (2) we see that namespaces and tables (more precisely, views) with the same name as the those in scalardb_catalog have been created in the current catalog. The detailed information of the view displayed in (3) shows that it does not contain transactional metadata, only user-defined columns. By accessing the data through this view, it is possible to execute an arbitrary query supported by Spark on tables with the same schema as ScalarDB.

For example, it is possible to execute a query that contains multiple table joins, complex selections, and aggregations, as shown below:

scala> sql("""
| SELECT
| l_orderkey,
| sum(l_extendedprice * (1 - l_discount)) AS revenue,
| o_orderdate,
| o_shippriority
| FROM
| dynamons.customer,
| postgresns.orders,
| cassandrans.lineitem
| WHERE
| c_mktsegment = 'AUTOMOBILE'
| AND c_custkey = o_custkey
| AND l_orderkey = o_orderkey
| AND o_orderdate < '1995-03-15'
| AND l_shipdate > '1995-03-15'
| GROUP BY
| l_orderkey,
| o_orderdate,
| o_shippriority
| ORDER BY
| revenue DESC,
| o_orderdate,
| l_orderkey
| LIMIT 10;
| """).show()
+----------+------------------+-----------+--------------+
|l_orderkey| revenue|o_orderdate|o_shippriority|
+----------+------------------+-----------+--------------+
| 1071617|128186.99915996166| 1995-03-10| 0|
| 1959075| 33104.51278645416| 1994-12-23| 0|
| 430243|19476.115819260962| 1994-12-24| 0|
+----------+------------------+-----------+--------------

Wrap-up

This article introduces the newly released ScalarDB Analytics with Spark. For more details, please refer to the documentation. We’ll continue to expand the possibilities of Scalar products to meet a wider range of data management needs, in line with our mission, “Making Data Management More Reliable.”

--

--

Akihiro Okuno
Scalar Engineering

Software Engineer, Ph.D., specializing in database engineering.