Spring Data JDBC for ScalarDB

Mitsunori Komatsu
Scalar Engineering (JA)
27 min readJul 18, 2023

Spring Data JDBC for ScalarDBはScalarDB 3.8で最初にリリースされ、ScalarDB 3.9で幾つかの改善と新機能がリリースされました。本記事では、この機能が開発された背景や利用方法等について紹介します。

Spring Data JDBC for ScalarDBが生まれた背景

ScalarDBは、複数の様々なデータベースやマイクロサービスをまたいだ分散ACIDトランザクションを実現するための強力なツールです。しかし、直接ScalarDBを用いる場合、トランザクション用のAPI (例:rollback()commit()) を「いつ・どのように」使うかを考慮しながら、多くのコードを書かなければいけないという問題がありました。この問題を解決すべく、より少ないコード量でより安全なScalarDBのアプリケーション開発を可能にするためSpring Data JDBC for ScalarDBの開発を始めました。

我々は「ScalarDBの大部分のユーザーはJavaや他のJVM言語を使って開発している」と想定しています。Spring Frameworkは、Java系言語による開発において最も人気のあるアプリケーション開発基盤の一つです。Spring Framework内で永続化層として利用できるSpring Data JDBC for ScalarDBを開発し、ScalarDBユーザーに使ってもらうことで、ScalarDBを用いたアプリケーション開発をより簡単に安全にできると考えています。

Springアプリケーション開発における永続化のインターフェースとしてはJDBC、Spring JDBC、Spring Data JDBC、Spring Data JPAといった選択肢があります。学習曲線や開発の効率化について考慮した上で、我々はSpring Data JDBCを選択しました。より強力で洗練されたSpring Data JPAも検討しましたが、機能の豊富さからくる複雑性や急勾配な学習曲線を懸念しました。

https://hackmd.io/@ddubson/rkn-sR4wU

全体の構成

ScalarDBにおいては、データベースに直接アクセスするScalarDB Core (CRUD API) に加え、SQLを用いて開発者がScalarDB管理下のデータベースを操作できるScalarDB SQLも開発・提供されています。Spring Data JDBCもまたSQLを通してデータベースを操作する機能をもっており、SQLを経由することで、互いの連携がしやすそうです。しかし、Spring Data JDBCがデータベースに期待している文法や挙動と、実際にScalarDB SQLがどのように動作するか、の間には違いがありました。そのため、Spring Data JDBC for ScalarDB開発の主な目標の一つはこの相違を埋め、Spring Data JDBCとScalarDB SQLの機能を連携させることでした。

この連携により、開発者はSpring Data JDBC APIを呼び出し、ScalarDB層を通してRDBMSやNoSQLデータベースを操作できるようになります。

Spring Data JDBC for ScalarDBの使い方

Spring Data JDBC for ScalarDBの使用方法は、基本的にSpring Data JDBCと同様の用法・文法となります。従って、Spring Data JDBCと同様の設定、使用方法でSpring Data JDBC for ScalarDBを扱うことができます。

単一データベースの操作

SQLを用いてScalarDBとその下層のデータベースに接続するには、まず resources/application.properties 内でJDBC接続URLを設定します。

spring.datasource.driver-class-name=com.scalar.db.sql.jdbc.SqlJdbcDriver
spring.datasource.url=jdbc:scalardb:\
?scalar.db.sql.connection_mode=direct\
&scalar.db.contact_points=jdbc:mysql://localhost:3306/my_app_ns\
&scalar.db.username=root\
&scalar.db.password=mysql\
&scalar.db.storage=jdbc\
&scalar.db.consensus_commit.isolation_level=SERIALIZABLE

続いてJavaのソースコード内で、Spring Data JDBC for ScalarDBを有効にするため@EnableScalarDbRepositories アノテーションを設定します。これはSpring Data JDBCにおける @EnableJdbcRepositories と同等のアノテーションです。

@SpringBootApplication
@EnableScalarDbRepositories
public class MyApplication {
@Autowired private AccountRepository accountRepository;

...
}

次は、エンティティのモデルやレポジトリークラスです。Spring Data JDBCの場合はレポジトリークラスの継承元として CrudRepository (またはPagingAndSortingRepository) を用いますが、Spring Data JDBC for ScalarDBでは代わりに ScalarDbRepository を継承します。

@Table
public class Account {
@Id public int id;
public int balance;

public Account(int id, int balance) {
this.id = id;
this.balance = balance;
}
}
public interface AccountRepository
extends ScalarDbRepository<Account, Integer> {

default void transfer(int srcAccountId, int dstAccountId, int amount) {
Account src = findById(srcAccountId).get();
Account dst = findById(dstAccountId).get();
update(new Account(src.id, src.balance - amount));
update(new Account(dst.id, dst.balance + amount));
}
}

これらの設定や実装により、ACIDトランザクション内で整合性を保ちながら、指定した金額を2つの口座間で移動することができます。

// Transfer 5000 from account:42's balance to account:24's balance
accountRepository.transfer(42, 24, 5000);

口座間の転送処理トランザクションは、処理の成功時に自動的にcommitされ、失敗時にはrollbackされます。

もし、上記と同様の処理をScalarDB APIを用いて書く場合、以下のように、やや冗長な実装になりそうです。

DistributedTransaction tx = transactionManager.start();

try {
Account src = tx.get(Get.newBuilder()
.namespace(NAMESPACE)
.table(ACCOUNT_TABLE)
.partitionKey(Key.ofInt(KEY_ID, 42)).build())
.map(result ->
new Account(
result.getInt(KEY_ID),
result.getInt(KEY_BALANCE))).get();

Account dst = tx.get(Get.newBuilder()
.namespace(NAMESPACE)
.table(ACCOUNT_TABLE)
.partitionKey(Key.ofInt(KEY_ID, 24)).build())
.map(result ->
new Account(
result.getInt(KEY_ID),
result.getInt(KEY_BALANCE))).get();

tx.put(Put.newBuilder()
.namespace(NAMESPACE)
.table(ACCOUNT_TABLE)
.partitionKey(Key.ofInt(KEY_ID, src.id))
.intValue(KEY_BALANCE, src.balance - 5000).build());

tx.put(Put.newBuilder()
.namespace(NAMESPACE)
.table(ACCOUNT_TABLE)
.partitionKey(Key.ofInt(KEY_ID, dst.id))
.intValue(KEY_BALANCE, dst.balance + 5000).build());

tx.commit();
} catch (Throwable e) {
tx.abort();
throw e;
}

以上の例により、Spring Data JDBC for ScalarDBがどのようにSpringアプリケーションの開発を容易にするかの一部をお伝えできたかと思います。

Multi-storage Transactionsを用いた複数データベースの操作

複数データベースの操作をACIDトランザクション内で行いたい場合、resources/application.properties を適切に設定することにより、透過的にScalarDBのMulti-storage Transaction feature in ScalarDBを利用することができます。

spring.datasource.driver-class-name=com.scalar.db.sql.jdbc.SqlJdbcDriver
spring.datasource.url=jdbc:scalardb:\
?scalar.db.sql.connection_mode=direct\
&scalar.db.storage=multi-storage\
&scalar.db.multi_storage.storages=north,south\
&scalar.db.multi_storage.namespace_mapping=north:north,south:south&scalar.db.multi_storage.default_storage=south\
&scalar.db.multi_storage.storages.north.storage=jdbc\
&scalar.db.multi_storage.storages.north.contact_points=jdbc:mysql://localhost:3306/\
&scalar.db.multi_storage.storages.north.username=${north_username}\
&scalar.db.multi_storage.storages.north.password=${north_password}\
&scalar.db.multi_storage.storages.south.storage=jdbc\
&scalar.db.multi_storage.storages.south.contact_points=jdbc:postgresql://localhost:5432/\
&scalar.db.multi_storage.storages.south.username=${south_username}\
&scalar.db.multi_storage.storages.south.password=${south_password}\
&scalar.db.consensus_commit.isolation_level=SERIALIZABLE\
&scalar.db.sql.default_namespace_name=my_app

この設定例では、2つのデータベース(MySQLとPostgreSQL)を扱うようになっています。これらのデータベースはそれぞれ論理的なnamespace northsouth に対応しており、Spring Data JDBC for ScalarDBから両方のnamespace内のテーブルに1トランザクションで操作することができます。

上記設定後は、データベースのテーブルに対応するモデルやレポジトリークラスを実装することにより複数データベース内のテーブルにアクセスすることができます。

// `schema` field is used to specify the underlying database
@Table(schema = "north", value = "account")
public class NorthAccount {
@Id public final int id;
public final Integer balance;
public NorthAccount(int id, Integer balance) {
this.id = id;
this.balance = balance;
}
}

@Table(schema = "south", value = "account")
public class SouthAccount {
@Id public final int id;
public final Integer balance;
public SouthAccount(int id, Integer balance) {
this.id = id;
this.balance = balance;
}
}
@Transactional
@Repository
public interface NorthAccountRepository
extends ScalarDbRepository<NorthAccount, Integer> {

default void transferToSouthAccount(
SouthAccountRepository southAccountRepository,
int fromId, int toId, int value) {
NorthAccount fromEntity =
findById(fromId).orElseThrow(
() -> new AssertionError("Not found: " + fromId));
SouthAccount toEntity =
southAccountRepository
.findById(toId)
.orElseThrow(() -> new AssertionError("Not found: " + toId));
update(new NorthAccount(fromEntity.id, fromEntity.balance - value));
southAccountRepository.update(
new SouthAccount(toEntity.id, toEntity.balance + value));
}
}

@Transactional
@Repository
public interface SouthAccountRepository
extends ScalarDbRepository<SouthAccount, Integer> {}
  @Autowired private NorthAccountRepository northAccountRepository;
@Autowired private SouthAccountRepository southAccountRepository;

...

northAccountRepository.transferToSouthAccount(
southAccountRepository, 42, 24, 5000);

この他にも、​​動作可能なサンプルアプリケーションがありますので、興味のある方はこちらもご覧ください。

各機能の詳細についてはGuide of Spring Data JDBC for ScalarDBをご覧ください。

Spring Data JDBCからの拡張機能

Spring Data JDBC for ScalarDBは、Spring Data JDBCから引き継いでいる機能に加え、新たにいくつかの機能を追加しています。

CrudRepository#save(T t) 利用の抑止、および ScalarDbRepository への insert(T t)・update(T t) の追加

Spring Data JDBCでは、レコードの追加と更新はともに CrudRepository#save(T t) によって処理されていました。このAPIは、引数で渡された当該モデルのインスタンスの @Id 変数がnullかどうかでレコード追加・更新のいずれを行うかを判断していますが、これは利用しているデータベースのautoincrement column機能に依存することになります (ID Generation)。具体的には引数で渡されたインスタンスの @Id 変数がnullの場合レコードの追加を、null以外の場合は当該レコードの更新を行います。しかし、ScalarDBではこのautoincrement column機能をサポートしておらず、CrudRepository#save(T t) が期待通りに動作しません。Spring Data JDBCでは幾つか回避策を用意しているものの、これらの回避策は直感的ではなく開発者に追加実装を強いるものになっています。

そこで我々はSpring Data JDBCの CrudRepository を継承し以下の拡張を取り入れた ScalarDbRepository を追加しました

  • Spring Data JDBC for ScalarDBでは正常に機能しない ScalarDbRepository#save(T t) の呼び出しを、明示的に例外を発生させることにより抑止
  • 新規API insert(T t)update(T t) を追加。両APIとも、nonnullな @Id を持つモデルのインスタンスを引数として受け取る

ScalarDbRepository の利用により、Spring Data JDBC for ScalarDBでも直感的にScalarDBを操作することが可能になります。

Two-phase Commit Transactions向けAPIの追加

ScalarDBはmicroservice向けに、複数アプリケーションを跨いでトランザクションを実行するTwo-phase Commit (2PC) Transactions機能をサポートしています。Spring Data JDBC for ScalarDBからもその機能が使えるよう、以下の機能を追加しています。

Primitive 2PC transaction API

  • ScalarDBの2PC用APIが呼び出せるよう対応した原始的なAPI
  • 2PCトランザクション処理を細かく柔軟に制御できる一方、ユーザーはどのAPIをいつ呼び出すべきか考慮する必要がある

High-level 2PC transaction API

  • 一般的なユースケースを対象にした、使い勝手の良い抽象度の高いAPI。開発者はいつcommitやrollbackを実施すれば良いのか、意識する必要が無い
  • coordinator service開発用API
    - executeTwoPcTransaction
  • participant service開発用API
    - joinTransactionOnParticipant
    - resumeTransactionOnParticipant
    - prepareTransactionOnParticipant
    - validateTransactionOnParticipant
    - commitTransactionOnParticipant
    - rollbackTransactionOnParticipant

Spring Data JDBCに組み込まれている標準transaction manager(JdbcTransactionManager)は、CrudRepository APIの呼び出し完了時に必ずトランザクションをcommitします。しかし、この挙動は、commitタイミングを細かく制御する必要があるScalarDBの2PCトランザクションとは上手く動作しません。そのため、我々は JdbcTransactionManager を拡張した独自transaction managerを用意し、トランザクションのcommitタイミングを内部的に細かく制御できるようにしました。

仮に、ScalarDBユーザーが以下のようなトランザクション処理を、commit・rollbackの漏れなく実装したい場合、

Spring Data JDBC for ScalarDBのHigh-level 2PC APIを以下のように用いることで、開発者は複雑なトランザクション制御を意識せずにビジネスロジックの実装に集中できます。

Account Service (coordinator)

@Transactional(transactionManager = "scalarDbSuspendableTransactionManager")
@Repository
public interface AccountRepository
extends ScalarDbTwoPcRepository<Account, Integer> {
// findBydId(), insert(), update(), executeTwoPcTransaction()
// and some more APIs are prepared automatically.
}
@Autowired private Account accountRepository;

...

accountRepository.executeTwoPcTransaction(
// Business logic (CRUD operations) for local and
// remote participants in execution phase.
txId -> {
// [local] Read the account's balance
Optional<Account> stored = accountRepository.findById(account.id);
if (!stored.isPresent()) {
// Cancel the transaction if the account doesn't exist.
// No need to retry.
throw new ScalarDbNonTransientException(
"The local state doesn't meet the condition.
Aborting this transaction");
}
// [remote] Start a transaction with the transaction ID,
// read the item information and decrement the count
Optional<Integer> price = stockService.purchaseItem(
txId, account.id, itemName);
if (price.isPresent()) {
int currentBalance = stored.get().balance - price.get();
if (currentBalance < 0) {
// Cancel the transaction
// if the global state doesn't meet the condition.
// No need to retry.
throw new ScalarDbNonTransientException(
"The state of local and remote participants doesn't meet
the condition. Aborting this transaction");
}
// [local] Decrease the account's balance for the item
accountRepository.update(new Account2Pc(account.id, currentBalance));
return currentBalance;
}
// Cancel the transaction
// if the global state doesn't meet the condition.
// No need to retry.
throw new ScalarDbTransientException(
"The remote state doesn't meet the condition.
Aborting this transaction");
},

// Remote operations for Prepare/Validate/Commit/Rollback.
// These transactional operations will be automatically executed.
Arrays.asList(
RemotePrepareCommitPhaseOperations.createSerializable(
stockService::callPrepareTransaction,
stockService::callValidateTransaction,
stockService::callCommitTransaction,
stockService::callRollbackTransaction)
);

Stock Service (participant)

@Transactional(transactionManager = "scalarDbSuspendableTransactionManager")
@Repository
public interface StockRepository
extends ScalarDbTwoPcRepository<Stock, String> {
// joinTransactionOnParticipant(), commitTransactionOnParticipant()
// and some more APIs are prepared automatically.
}
@RestController
public class StockController {
@Autowired private StockRepository stockRepository;

@PostMapping("/purchaseItem")
public Optional<Integer> purchaseItem(
@RequestParam("transactionId") String transactionId,
@RequestParam("accountId") int accountId,
@RequestParam("itemName") String itemName) {
// Join the global transaction and execute the CRUD operations in it
return stockRepository.joinTransactionOnParticipant(txId, () -> {
Optional<Item> item = stockRepository.findById(itemName);

...

return Optional.of(item.price);
});
}

@PostMapping("/prepareTransaction")
public void prepareTransaction(
@RequestParam("transactionId") String transactionId) {
return stockRepository.prepareTransactionOnParticipant(txId);
}

@PostMapping("/validateTransaction")
public void validateTransaction(
@RequestParam("transactionId") String transactionId) {
return stockRepository.validateTransactionOnParticipant(txId);
}

@PostMapping("/commitTransaction")
public void commitTransaction(
@RequestParam("transactionId") String transactionId) {
return stockRepository.commitTransactionOnParticipant(txId);
}

@PostMapping("/rollbackTransaction")
public void rollbackTransaction(
@RequestParam("transactionId") String transactionId) {
return stockRepository.rollbackTransactionOnParticipant(txId);
}
}

まとめ

本記事では、Spring Data JDBC for ScalarDBを紹介しました。我々はこの機能によりScalarDBを用いたSpringアプリケーションの開発が簡単に安全になると信じています。現時点で、Spring Data JDBC本体およびScalarDBとの連携由来のいくつかの制限事項がありますが、将来的にはそれらの制限事項を解決し、ScalarDBユーザーがより簡単にアプリケーションを開発できるようにしていきたいです。

--

--