あなたも出来るakka-clusterとPersistentActorで実現するEvent Sourcing+CQRS

はじめに

開発室のはちわれです。ナイルに転職して約3ヶ月、Scalaの経験も転職してから本格的にコードを書く様になったのでまだ3ヶ月です。普段はScala、Akkaを使って開発を行っています。今回は、Scala試用期間中の私でも出来たakka-clusterとakka-persistentを使った「Event Sourcing+CQRS」を実現する方法について書かせて頂きたいと思います。

今回やること

最初に書かせて頂きましたが、今回はakka-clusterとakka-persistentを組み合わせて
「Event Sourcing+CQRS」を実現する一例を紹介させて頂きます。
本題に移る前に、Event SourcingとCQRSについて軽く触れさせて頂きます。

Event Sourcing

ステート(状態)ではなく、イベントを中心に考えられたアークテクチャ
ステートではなく、全てのイベントを保存、再生することによってステートを表します。

CQRS(コマンドクエリ責任分離)

Command Query Responsibility Segregationの略です。
簡単に説明するとCommand(更新)とQuery(参照)を明確に分離しようという考え方です。
弊社でも取り入れているDDD(ドメイン駆動設計)と一緒に語られることが多いです。
一緒に語られるのが多いのはQuery側はドメインの影響を受けることが少なく、逆にCommand側はドメインの影響を受けることが多い為ドメインの分離により、CommandとQueryの分離が自然に行えることが多い為かと思います。
どの程度CommandとQueryの分離を行うかは、人によって認識や意見が別れることもあるかと思います。今回の記事を執筆する上で調査した限りでも、アプリケーションのレイヤーでドメイン層からQueryを出すべきという方もいればドメイン層の中で分離を行うという意見もあり、この辺は携わっているシステムのドメインによる所かと思います。

この実装によって得られるもの

今回の実装によって、Event sourcingとCQRSを実現することができます。
また、それと伴にakka-clusterとakka-persistentを組み合わせることにより、以下の効果も同時に得ることができます。

  • 耐障害性(Resilient)

ここで言う耐障害性とは、障害が起きないという意味での耐障害性ではありません。
障害が発生した際の高い回復性を指し、リアクティブ宣言で言われるResilientにあたります。この高い回復性は以下のメカニズムによって実現されています。

  • akka-clusterのfailure detector
  • akka-persistentのリカバリー機能
  • ドメインの分離による高い独立性と保守性
    CQRSの概念に基づいてCommandとQueryの分離(ドメインの分離)をすることにより、それぞれを独立したコンポーネントして扱うことができます。
    これにより、CommandとQueryが疎結合となりどちらかに修正を行った際に、もう片方にも修正を行わなければならないという事態を避けることができ
    修正の影響を気にすることなく、作業を行うことができます。
    ドメインの変更があった場合も、Command側だけに影響する内容であればCommand側だけを修正すれば良く工数の短縮と高い保守性を得ることができます。

実装例

それでは、少し前置きが長くなってしまいましたが、これからコードを交えながら
今回の実装例について説明をさせていただきます。まず構成のイメージは下記の図をご覧ください。

構成イメージ

implements_image

簡単に今回の構成について説明します。
クラスターのノードはCommad(更新)を行うノードとQuery(参照)を行うノードの2種類があります。Commandノードは更新のイベント、Queryノードは参照のイベントだけをそれぞれClusterClientから受け取ります。Commandノードはクラスターの中でシングルトンのノードして存在し、このノードが持つデータをマスターのデータとします。Commandノードにて行われたイベントをQueryノードにPublishして、Query側はそのイベントを実行することによりCommandとQueryのノードの状態が同期されます。

※先に断っておきますと、コードの中のsleepやprintln、logなどはデモの為のコードとなります。見やすくする為に、コードしては必要ない空行なども入れていますので予めご了承ください

環境情報

今回の実装例のコードは以下のバージョンを対象としています。
設定は後述しているbuild.sbtを参照して下さい。

  • Scala:2.11.8
  • akka-persistence:2.4.8
  • akka-cluster:2.4.8
  • akka-cluster-tools:2.4.8
  • akka-actor:2.4.8

Write側ノードの実装

import akka.actor.{ActorLogging, Stash}
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import akka.persistence._
import ClusterPackage.Protocol._
import scala.collection.mutable
class WriteNodeActor extends PersistentActor
with ActorLogging
with Stash {
override def persistenceId = "cluster_persistent"
var state = mutable.Map.empty[String, String]
val mediator = DistributedPubSub(context.system).mediator
val writer = new EventImplements()
//PersistentActorのレシーブ
override val receiveCommand: Receive = {
case Write(key, value) => {
persist(Write(key, value)) { event =>
log.info("Write node update")
writer.update(key, value, state)
publishToRead(Write(key, value))
}
}
case SaveSnapshotEvent() => {
log.info("Request save snapshot.")
context.become(receiveCommandSaving)
}
case other => {
println(other.getClass)
log.info("Receive other event : " + other.toString())
}
}
//リカバリー時のレシーブ、ノードのリカバリー時はここに呼ばれる
override val receiveRecover: Receive = {
case SnapshotOffer(_, snapshot: String) => {
log.info("Read node recovery start")
}
case event: UpdateEvent =>
self ! event
case RecoveryCompleted =>
log.info("Read node recovery completed")
}
/**
* Write側で行ったイベントをRead側にPublish
*/
def publishToRead(event: UpdateEvent) = {
Thread.sleep(5000)
log.info("Publish event:" + event.toString)
        //"read"という名称のSubscribeにイベントをPublish
mediator ! Publish("read", event)
}
def receiveCommandSaving: Receive = {
case SaveSnapshotEvent() => {
log.warning("Discard event SaveSnapshot, cause previous saving is still running.")
}
case SaveSnapshotSuccess(metadata) =>
log.info("Snapshot saved : " + metadata)
// journal削除 最新のjournalは残す
deleteMessages(metadata.sequenceNr - 1)
unstashAll() // stashしていたイベントを解放。
context.unbecome() // 処理の実行をreceiveCommandに戻す。
case SaveSnapshotFailure(metadata, cause) => {
log.error("Failed to save snapshot(%s) : %s".format(metadata, cause))
unstashAll()
context.unbecome()
}
// Stash any other event
case other => {
log.warning("Stash the event while saving snapshot: " + other)
stash() // イベントをstash。
}
}
}
更新処理の実装
import scala.collection._
class EventImplements {
def update(key: String, value: String, state: mutable.Map[String, String]) = Option {
state += (key -> value)
println(state.toString())
}
}

Write側の実装について説明します。
使っている主な機能としては、以前こちらのブログでも紹介したPersistentActorとakka-cluster-toolsのDistributedPubSub、DistributedPubSubMediatorです。
receiveCommandにて更新イベントであるWriteをレシーブし、persistでイベントをjournalに永続化しています。
更新処理を行った後に、publishToReadでQuery側にイベントを送っています。
Query側へのイベントのpublishには、akka-cluster-toolsのDistributedPubSub、DistributedPubSubMediator(以下、mediator)を使っています。mediatorはクラスターのノード間でメッセージをやり取りする仕組みです。
PubSubという名称からも分かる通りPublish/Subscribeメッセージングモデルであり、Publisher側がメッセージを送りたいSubscribeを指定してメッセージを送信することになります。使い方自体は難しくなくmediatorを生成し、mediatorに対して対象となるSubscribeの名称とイベントを持ったPublishを送ることによってSubscribe側にイベントが送られます。今回の例では"read"という名称でSubscribeしているノードに対してメッセージを送信しています。
実際の更新処理は、WriteNodeActorからは切り離して別のクラスで実装しています。
こうした理由はCommandとQueryはあくまでクラスターのノードとして扱い、実際の更新のドメインとは疎結合とすることによって互いの修正が影響しあわない様にするためです。
スナップショットの保存と保存時のイベントのレシーブについて
スナップショットの保存は、SaveSnapshotEventをレシーブした後の一連処理で行われます。ポイントはcontext.becomeとunbecome、stashとunstashAllです。以下に一連の流れを示します。
スナップショット保存の流れ
  • SaveSnapshotEventをレシーブ。
  • context.becomeで処理の実行をreceiveCommandSavingに移譲。
  • これによりスナップショットの保存が終わるまでのレシーブはreceiveCommandSavingが行う。
  • case other -> 更新のイベントが送られてきた場合、送られてきたイベントをstash。
  • case SaveSnapshotEvent -> 既にスナップショットが保存中である事をログ出力。
  • case SaveSnapshotSuccess (スナップショット保存成功)
  • 新たにスナップショットが保存されたので、それ以前のjournalはリカバリーの際に不要の為削除。
  • unstashAll()でstashしていたイベントを実行。
  • context.unbecome()で処理の実行をreceiveComanndに戻す。
  • case SaveSnapshotFailure(スナップショット保存失敗)
  • エラー情報をログに出力。
  • unstashAll()でstashしていたイベントを実行。
  • context.unbecome()で処理の実行をreceiveComanndに戻す。
cotext.becomeで処理の実行を別の処理に移すことが出来ます。上記の実装例ではスナップショット保存中のレシーブをreceiveCommandSavingに移譲しています。元の処理に戻したい時はcontext.unbecomeによって戻すことが出来ます。
stashはGitのstashと同じで、送られてきたイベントを一時退避させておくことが出来ます。
unstashAllを実行することにより、stashしていたイベントが順次実行されていきます。
また、下記のコードでjournalの削除を行っていますが、sequenceNrの値を−1しているのは最新のjournalを残す為に行っています。最新のjournalを残さなければならない理由は2つあります。
  1. journalの採番をそのまま連番で続ける為。
  2. 起動、リカバリー時にjournalを読み込める様にする為。
1つ目の理由について説明します。journalは保存される際に自動的にシーケンシャルナンバーが採番されますが、ここでjournalを全て消してしまうと次に保存されるjournalのナンバーが再び1から始まってしまいます。それを避けるため最新のjournalを残しておいて、その次の番号が振られる様にしています。新しくスナップショット保存しているんだから、また1から採番するので良いのではないか?と思われるかもしれませんが、そうしてしまうと起動、リカバリー時に問題があります。PersistentActorは起動、リカバリー時にスナップショットから状態を復元した後に、journalを読み込んでリプレイすることによって最新の状態まで自身の状態を戻しますが、この際に使用したスナップショット以降のシーケンシャルナンバーを指定してjournal読み込みます。もしこの時にナンバーがまた1から振られているとPersistentActorはスナップショットの次のナンバーを指定して読み込みに行きますが、journalは1から番号が採番されている為、指定された番号と合わず読み込むことが出来ないため最新の状態に復元が出来なくなってしまいます。これが最新のjournalを残す2つ目の理由です。
journalを削除する処理
deleteMessages(metadata.sequenceNr - 1)
リカバリーなどのPersistentActorの機能については以前のブログでも紹介させて頂いておりますので、今回の記事では割愛させて頂いています。PersistentActorの動作自体の詳細を知りたいという方は、PersistentActorの良いところと使い方をご覧ください。

Read側ノードの実装

import akka.actor.{Actor, ActorLogging}
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Subscribe
import akka.persistence._
import ClusterPackage.Protocol._
import scala.collection.mutable
class ReadNodeActor extends PersistentActor with ActorLogging {

override def persistenceId = "cluster_persistent"
var state = mutable.Map.empty[String, String]
//ステートの更新処理を実装
val writer = new EventImplements()
//mediatorのsubscribeに設定
val mediator = DistributedPubSub(context.system).mediator
mediator ! Subscribe("read", self)
//PersistentActorのレシーブ
override val receiveCommand: Receive = {
case Get(key) => {
log.info("Get read node data: " + state.getOrElse(k, "Not found"))
}
case Write(key, value) => {
log.info("Subscribe event read node update")
writer.update(k, v, state)
}
case other => log.info("Receive other event : " + other.toString())
}
//リカバリー時のレシーブ、ノードのリカバリー時はここに呼ばれる
override val receiveRecover: Receive = {
case SnapshotOffer(_, snapshot: String) => {
log.info("Read node recovery start")
}
case event: UpdateEvent => self ! event
case RecoveryCompleted =>
log.info("Read node recovery completed")
}
}
Read側のノードでは情報の参照と、Write側から送られてくるイベントをレシーブする処理を実装します。上記のQuery側のノードでは、状態を更新する処理は持っていません。
状態の更新の処理は前述した通りEventImplementsに実装してあります。
これにより、更新処理の詳細とQuery側の処理を切り離しています。
更新側と同じ様に、mediatorを生成していますが更新側と違うのはSubscribe側は自分自身をmediatorのSubscribeとして登録する必要がある点です。下記のコードで自分自身をreadという名前のTopicsでSubscribeとして登録しています。
これでCommand側のノードから"read"という指定でPublishされるイベントをレシーブすることが出来る様になります。
mediator ! Subscribe("read", self)
参照のイベントであるGetをレシーブした際は、Getイベントのkeyを使って自身のステートであるMapからkeyのvalueを取得しています。Writeイベントのレシーブがありますが、これはWrite側からPublishされてくるWriteイベントをレシーブして自身のステートを更新する為です。今回の実装例では更新のイベントを送ってくるのは、あくまでWrite側のノードだけでありClient側から更新のイベントが送られてくることはありません。
実装例では、Command側とQuery側は同じデータソース、今回の場合はmutable.Mapでステートを保持していますが、もっと突き詰めたCQRSのアーキテクチャの例では、Command側とQuery側のデータソースを別々の物にすることが推奨されていたりしますので、Query側はjournalなどからイベントを読み込んで、リプレイすることによりステートを更新する様にした方が良いかもしれません。
akka-persistentのPersistentQueryを使えば、その様な実装も可能かと思います。今回PersistentQueryを使わず、PubSubMediatorを使う実装を選択したのは、その方が処理のステップが少なく済むからです。
PersistentQueryとPubSubMediatorを使った場合のステップを比較すると、以下の様に違いが出ます。
  • PersistentQueryの場合
  • Command側が更新のイベントがあったことを伝えるメッセージを送信。
  • Query側がメッセージを受け取る。
  • Query側がjournalからイベントを取得。
  • 取得したイベントをリプレイ。
  • PubSubMediatorの場合
  • Commad側が自身で行われたイベント自体を送信。
  • Query側がイベントを受け取る。
  • 取得したイベントをリプレイ。
上記の様に、PubSubMediatorの方が処理のステップが少なくても済む為今回の例では、PubSubMediatorを採用しました。
また上記はあくまで私の理解なので、もしかしたらPersistentQueryでも、もっと処理のステップを少なく済ませる方法があるのかもしれません。
ClusterPackage
object ClusterPackage {
trait Protocol {
trait Event
trait ReadEvent extends Event
trait UpdateEvent extends Event
case class Get(key: String) extends ReadEvent
case class Write(key: String, value: String) extends UpdateEvent
case class SaveSnapshotEvent()
}
case object Protocol extends Protocol
}
上記のClusterPackageはakka-persistent、akka-clusterとは関係ありません。
今回の実装例で使っているWriteやGetなどのイベントをcase classとしてまとめて定義しておき、WriteNodeActor、ReadNodeActorの両方で使える様にしてあるだけです。
良ければ、こういうやり方もあるとの参考にして頂ければと思います。
ClusterClientReceptionistのListenerを実装
import akka.actor.{Actor, ActorRef}
import akka.cluster.client.{ClusterClients, ClusterClientUp, ClusterClientUnreachable, SubscribeClusterClients}
class ClusterServiceListener(targetReceptionist: ActorRef) extends Actor {
override def preStart(): Unit = targetReceptionist ! SubscribeClusterClients
override def receive: Receive = {
def receiveWithClusterClients(clusterClients: Set[ActorRef]): Receive = {
case ClusterClients(cs) =>
context.become(receiveWithClusterClients(cs))
case ClusterClientUp(c) =>
context.become(receiveWithClusterClients(clusterClients + c))
case ClusterClientUnreachable(c) =>
context.become(receiveWithClusterClients(clusterClients - c))
}
receiveWithClusterClients(Set.empty)
}
}

後々クラスターの起動の所で説明しますが、クラスターのノードへメッセージを送る際に今回の例ではClusterClientを使っています。ClusterClientからメッセージを受け取る為にはClusterClientReceptionistにクラスターのノードとなるActorを登録するのと、ClusterClientReceptionistのリスナーとなるActorも必要となり上記はそのActorの実装になります。
ClusterClientの実装
import akka.actor.{ActorPath, ActorSystem}
import akka.cluster.client.{ClusterClient, ClusterClientSettings}
import com.typesafe.config.ConfigFactory
import collection.JavaConversions._
import ClusterPackage.Protocol._
class Client {
val clientConf = ConfigFactory.load("client.conf")
/**
      * Clusterのノードにメッセージ送信
      */
def sendEvent(event: Event): Unit = {
Thread.sleep(5000)
val clientSystem = ActorSystem("ClientSystem", clientConf)
val contactSettings = clientConf.getStringList("client.initial-contacts").map(path => ActorPath.fromString(path)).toSet
val c = clientSystem.actorOf(ClusterClient.props(ClusterClientSettings(clientSystem).withInitialContacts(contactSettings)), "client")
c ! ClusterClient.Send(getTargetPath(event), event, localAffinity = true)
}
/**
* イベントからメッセージの送信先のActorのパスを取得
*/
def getTargetPath(event: Event): String = event match {
case e: Write => clientConf.getString("client.target-path-update")
case e: Get => clientConf.getString("client.target-path-read")
}
}
ClusterClientを生成してメッセージを送っています。sendEventに渡されたeventの型からメッセージの送り先のノードを切り替えています。Writeの場合はCommad側ノード、Getの場合はQuery側ノードにメッセージが送られる様になっています。
今回の例や公式ドキュメントを見ると、上記の様に 「ClusterClientのActor ! ClusterClient.Send(...」となっていますがクラスターからの結果を受け取りたい場合は、「!」でメッセージを送ると結果が取得できないので注意して下さい。結果を受け取りたい場合は、「ClusterClient.Send(...」とすれば良いです。(※もちろんノード側に実装もちゃんと結果を返す様になっている必要があります、)
設定ファイル
build.sbt
name := "BlogTest"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-persistence" % "2.4.8",
"com.typesafe.akka" %% "akka-cluster" % "2.4.8",
"com.typesafe.akka" %% "akka-cluster-tools" % "2.4.8",
"com.typesafe.akka" %% "akka-actor" % "2.4.8",
)
application.conf
akka {
loglevel = "INFO"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
}
}
cluster {
auto-down-unreachable-after = 10s
seed-nodes = [
"akka.tcp://HogeCluster@127.0.0.1:2551"
"akka.tcp://HogeCluster@127.0.0.1:2552"
]
}
}
akka.persistence.journal.plugin="akka.persistence.journal.inmem"
akka.persistence.snapshot-store.plugin="akka.persistence.snapshot-store.local"
extensions = ["akka.cluster.client.ClusterClientReceptionist"]
client.conf
akka {
loglevel = "INFO"
actor {
provider = akka.cluster.ClusterActorRefProvider
}
remote {
log-remote-lifecycle-events = on
enabled-transports = ["akka.remote.netty.tcp"]
# Default tcp
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
}
client {
initial-contacts = [
"akka.tcp://HogeCluster@127.0.0.1:2551/system/receptionist",
"akka.tcp://HogeCluster@127.0.0.1:2552/system/receptionist"
]
target-path-read   = "/user/read"
target-path-update = "/user/masterProxy"
timeout = 10s
}
クラスターの起動とデモ
起動
import akka.actor.{ActorSystem, PoisonPill, Props}
import akka.cluster.client.ClusterClientReceptionist
import akka.cluster.singleton.{ClusterSingletonManager, ClusterSingletonManagerSettings, ClusterSingletonProxy, ClusterSingletonProxySettings}
import com.typesafe.config.{Config, ConfigFactory}
class ClusterBoot {
     /**
* Query側のノードの起動
      */
def startReadNode(port: Int, conf: Config) = {
val readConf = ConfigFactory.parseString("""akka.cluster.roles=["read"]""")
.withFallback(ConfigFactory.parseString(s"""akka.remote.netty.tcp.port=${port}"""))
.withFallback(conf)
// Query側のActorSystem開始。
val actorSystem = ActorSystem("HogeCluster", readConf)
// Query側のノードとなるActorを生成。
val readNode = actorSystem.actorOf(Props(classOf[ReadNodeActor]), "read")
// 生成したActorをClusterClientReceptionistに登録。
ClusterClientReceptionist(actorSystem).registerService(readNode)
// Query側のノードを子ActorとしてListenerのActorを生成。
actorSystem.actorOf(Props(classOf[ClusterServiceListener], readNode), "readListener")
}
/**
* Command側のノードの起動
      */
def startUpdateNode(port: Int, conf: Config) = {
// ClusterActorSystem configuration with backend role
val updateConf = ConfigFactory.parseString("""akka.cluster.roles=["update"]""")
.withFallback(ConfigFactory.parseString(s"""akka.remote.netty.tcp.port=${port}"""))
.withFallback(conf)
// Command側のActorSystem起動。
val actorSystem = ActorSystem("HogeCluster", updateConf)
// Command側はシングルトンのノードしてActorを生成。
actorSystem.actorOf(
ClusterSingletonManager.props(
singletonProps = Props(classOf[WriteNodeActor]),
terminationMessage = PoisonPill,
settings = ClusterSingletonManagerSettings(actorSystem).withRole("update")
)
, "master")
// 上記で生成したシングルトンのノードのProxyとなるActorを生成。
val masterProxy = actorSystem.actorOf(ClusterSingletonProxy.props(
singletonManagerPath = "/user/master",
settings = ClusterSingletonProxySettings(actorSystem).withRole("update"))
, "masterProxy")
// 生成したProxyのActorをClusterClientReceptionnistに登録。
ClusterClientReceptionist(actorSystem).registerService(masterProxy)
// ProxyのActorを子ActorとしてListenerのActorを生成。
actorSystem.actorOf(Props(classOf[ClusterServiceListener], masterProxy), "updateListener")
}
}

実行
import com.typesafe.config.ConfigFactory
import ClusterPackage.Protocol._
object ClusterMain {
def main(args: Array[String]): Unit = {
val conf = ConfigFactory.load()
val boot = new ClusterBoot()
List(2552, 2553).map(port => boot.startReadNode(port, conf))
boot.startUpdateNode(2551, conf)
val client = new Client
client.sendEvent(Write("hoge", "fuga"))
Thread.sleep(5000)
client.sendEvent(Get("hoge"))
}
}
それでは、クラスターの起動について説明します。
まずはQuery側のノードの起動から説明していきます。startReadNodeという処理の中で起動を行っています。
行っているのは以下の3つです。
  • QueryのノードとなるActorの生成。
  • 生成したActorをClusterClientReceptionistに登録。
  • QueryノードのActorを子ActorとしてClusterClientReceptionistのリスナーのActorを生成。
Query側のノードなるActorを生成する所までは普通にActorを生成すれば良いですが、ClusterClientからのメッセージを受け取る為には生成したActorをClusterClientReceptionistに登録する必要があります。ClusterClientReceptionistのregisterServiceに生成したActorを渡すことで登録されます。更にClusterClientReceptionistのリスナーが必要となるので、生成したQueryノードのActorを子ActorとしてリスナーのActorを生成しています。
次にCommand側のノードについて説明します。startUpdateNodeという処理の中で起動を行っています。行っているのは以下の4つです。
  • Commadのノードをシングルトンのノードして生成。
  • シングルトンのノードのProxyとなるActorを生成。
  • ProxyのActorをClusterClientReceptionistに登録。
  • ProxyのActorを子ActorとしてClusterClientReceptionistのリスナーのActorを生成。
Actorsystemを起動する所まではQuery側と同じですが、今回Command側のノードはシングルトンのノードなる様に生成します。こうした理由はCommandが持つ状態をmasterの状態として一意の存在にしておきたかったからです。
シングルトンのノードの生成には、ClusterSingletonManager.propsを使います。propsの各引数に値を渡して返されるPropsからActorを生成します。シングルトンのノードはこれで生成出来ましたが、このノードには直接メッセージを送る事は出来ません。ProxyとなるActorを生成し、そのActorを介してメッセージを送る形になります。Proxyとして生成したActorはクラスター上のどこかに在る、シングルトンノードの位置情報を持っておりシングルトンノードへのアクセスを行ってくれます。
ProxyとなるActorは上記のコードの通りClusterSingletonProxy.props関数の引数にシングルトンノードのActorのパス、ロールを設定して返されるPropsからActorを生成します。あと行っているのはQuery側と同じ様にClusterClientReceptionistへの登録と、ClusterClientReceptionistのリスナーの生成です。Query側と違うのはClusterClientReceptionistに登録するのと、リスナーの子ActorとなるActorはProxyのActorとなる点です。
mainメソッドの中で、上記の起動処理2つを呼び出してクラスターの起動を行っています。
Query側のノードは今回の例では2つノードを用意したかったので、それぞれportを2552、2553に指定して起動しています。ノードをもっと増やしたい場合は、ここで指定するポートを増やせばその数の分起動できます。ポートはノード毎に一意でなければならず、同じポートを指定することは出来ません。
Command側はシングルトンのノードなのでportをひとつ指定して起動しています。
そしてClusterClientのsendEventでメッセージ(イベント)を送っています。まずCommandノードに更新のイベントを送ります。Commandノードは受け取ったイベントから自身の状態を更新し、その後Query側にイベントをPublishします。
Query側の2つのノードは、それぞれイベントを受け取ると自分たちの状態を更新します。
コードの中で行っているsleepは起動したActorがクラスターに入ってアクセス可能な状態になるまでタイムラグがあり、その間にメッセージを送ってもアクセスできずdead_letterとなる為、アクセス出来る状態となるまでに待つ為にsleepさせています。通常クラスターを起動する時には、この様なsleepは不要です。
それでは実際に動かした結果を見てみたいと思います。
※クラスターの起動のログは長いので必要な箇所を抜粋して載せてあります。
[akka.cluster.Cluster(akka://HogeCluster)] Cluster Node [akka.tcp://HogeCluster@127.0.0.1:2551] - Node [akka.tcp://HogeCluster@127.0.0.1:2551] is JOINING, roles [update]
[akka.cluster.Cluster(akka://HogeCluster)] Cluster Node [akka.tcp://HogeCluster@127.0.0.1:2551] - Leader is moving node [akka.tcp://HogeCluster@127.0.0.1:2551] to [Up]
[akka.tcp://HogeCluster@127.0.0.1:2551/user/master] Singleton manager starting singleton actor [akka://HogeCluster/user/master/singleton]
[akka.tcp://HogeCluster@127.0.0.1:2551/user/master] ClusterSingletonManager state change [Start -> Oldest]
[akka.tcp://HogeCluster@127.0.0.1:2551/user/masterProxy] Singleton identified at [akka://HogeCluster/user/master/singleton]
     ・
     ・
     ・
[akka.tcp://HogeCluster@127.0.0.1:2551/user/master/singleton] Write node update //Commandノードがイベントをレシーブ。
Map(hoge -> fuga) // Commadの状態が更新される。
     ・
     ・
[akka.cluster.Cluster(akka://HogeCluster)] Cluster Node [akka.tcp://HogeCluster@127.0.0.1:2551] - Node [akka.tcp://HogeCluster@127.0.0.1:2553] is JOINING, roles [read]
[akka.cluster.Cluster(akka://HogeCluster)] Cluster Node [akka.tcp://HogeCluster@127.0.0.1:2551] - Node [akka.tcp://HogeCluster@127.0.0.1:2552] is JOINING, roles [read]
[akka.cluster.Cluster(akka://HogeCluster)] Cluster Node [akka.tcp://HogeCluster@127.0.0.1:2552] - Welcome from [akka.tcp://HogeCluster@127.0.0.1:2551]
[akka.cluster.Cluster(akka://HogeCluster)] Cluster Node [akka.tcp://HogeCluster@127.0.0.1:2553] - Welcome from [akka.tcp://HogeCluster@127.0.0.1:2551]
[akka.cluster.Cluster(akka://HogeCluster)] Cluster Node [akka.tcp://HogeCluster@127.0.0.1:2551] - Leader is moving node [akka.tcp://HogeCluster@127.0.0.1:2552] to [Up]
[akka.cluster.Cluster(akka://HogeCluster)] Cluster Node [akka.tcp://HogeCluster@127.0.0.1:2551] - Leader is moving node [akka.tcp://HogeCluster@127.0.0.1:2553] to [Up]
[akka.tcp://HogeCluster@127.0.0.1:2551/user/master/singleton] Publish event:Write(hoge,fuga) // Query側にイベントをPublish
[akka.tcp://HogeCluster@127.0.0.1:2553/user/read] Subscribe event read node update   // PublishされたイベントをSubscribe
[akka.tcp://HogeCluster@127.0.0.1:2552/user/read] Subscribe event read node update // PublishされたイベントをSubscribe
Map(hoge -> fuga) //Query側の状態が更新される。
Map(hoge -> fuga) //Query側の状態が更新される。
     ・
     ・
     ・
[ClientSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClientSystem@127.0.0.1:51162/user/client] Connected to [akka.tcp://HogeCluster@127.0.0.1:2551/system/receptionist]
[HogeCluster-akka.actor.default-dispatcher-3] [akka.tcp://HogeCluster@127.0.0.1:2553/user/read] Get read node data: fuga //Query側から取得
JOININGやUpなど出ていますが、これはクラスターのメンバーシップ(ノード)のライフサイクルを示しています。詳細はAkkaの公式ドキュメントをご覧頂ければと思いますが、簡単なライフサイクルを示すと以下の様になります。
ライフサイクルの流れ
  • JOINING -> クラスターに入ろうとしている
  • Up -> クラスターに入ってアクセス可能になった状態
  • Leaving -> クラスターから正常ぬに抜けようとしている状態
  • Down −> ノードがおち落ちてしまった状態
  • removed ->クラスターから削除された状態
これらの各ノードの状態の遷移や管理はクラスターのLeaderという機能が行っています。上のログでも「Leader is moving node」という様にLeaderがノードを遷移させているのが分かると思います。
実行結果として以下の流れになりイベントが伝播され、ノード間で状態が同期されているのが分かると思います。
  • Commandのノードが起動してクラスタに入り、Upの状態となる。
  • CommandにClusterClientからイベントを送る。
  • Commandがイベントをレシーブ。
  • Commandが自身の状態を更新。
  • Query側にイベントPublish
  • Query側がイベントをSubscribe
  • Query側の2つのノードの状態が更新される
  • Query側に状態を取得するイベントを送る。
  • 更新された状態が取得される。

まとめ

以上で、akka-clusterとPersistentActorの組み合わせによってEvent SourcingとCQRSを実現する実装例の紹介とさせて頂きます。今回紹介したのは、あくまで一例でありakka-clusterやakka-persistentの他の機能を使ったりノードの起動する数や、どの様なノードとして起動するかで柔軟に構成を変更してシステムを構築する事が出来ます。最後に今回の実装例の要点をまとめます。
  • akka-clusterとakka-persistentを組み合わせることによってEvent SourcingとCQRSを実現するシステムを構築する事が出来る。
  • akka-persistentのリカバリー機能等によって耐障害性の高いシステムを構築できる。
  • クラスターのノードの実装や実際の更新処理を明確に分離することによって、ドメインの分離とCQRSを実現でき保守性も高くなる。
  • クラスターのノードを起動の仕方によって様々な構成のクラスター構成を実現することが出来る。