Chaos Experiment: Split Braining Akka Clusters

As part of a series of blog posts on Chaos engineering, this blog post introduces the open source library docker-compose-toolkit. Here, we use this library to define a Chaos experiment (using extensible effects) that illustrates the impact that auto-downing can have when an Akka cluster is subjected to network partitioning. The previous post Can Real World Distributed Systems be Proven Correct? motivates and explains the need for performing fault injection on distributed applications. In future posts, we will consider more realistic use cases than considered here — so please stay tuned!

Introduction to docker-compose-testkit

docker-compose-testkit is an open source library (implemented in Scala) for defining and automatically running Chaos experiments against Dockerised distributed applications.

The library provides the ability to deploy, orchestrate and instrument distributed Dockerised applications to multiple target environments (e.g. local development, AWS, Azure, GCE, etc.). Deployed applications are treated as a black box and may be monitored (via defined sensor probes) or interacted with (via defined data injection points).

As we shall see, Chaos experiments will be defined by composing runtime monitors using extensible effects. This will enable the reuse of Chaos experiments and runtime monitors across projects.

Worked Example

We shall illustrate how docker-compose-testkit might be used by presenting a simple use case scenario. In our example scenario, we will first launch an Akka cluster. Once we are sure the Akka cluster is stable, we shall partition the network that the cluster uses to communicate. Using auto-downing, we will then wait until the Akka cluster split brains. Moreover, we want to do all of this automatically and in realtime!

For an extra bit of spice, we will also risk a split brain by simulating a JVM garbage collection pause. As these are stop the world events, any JVM garbage collection pause that is too long may also cause the cluster to split brain. We will define a runtime monitor that waits until the cluster node engaging in the JVM garabage collection pause is disassociated from the Akka cluster. At this point in time, we will end the JVM garbage collection pause and allow the cluster to heal itself.

Using docker-compose-testkit consists in following at most 4 steps. We define and explain each of these steps in the following subsections.

Optional Step 1: Service and Network Description

In order to be able to observe and interact with a deployed system, we need to describe the services it has available, how these services are configured and the networks these services should connect to. Docker Compose provides a convenient medium for expressing these requirements.

For our working example, we will need to describe 4 Akka cluster nodes (we will show how we do this shortly), and 3 networks as follows:

val yaml = DockerComposeString(

s"""version: '2'

|

|services:

| ${clusterNode("left-A", "left", "middle")}

| ${clusterNode("left-B", "left", "middle")}

| ${clusterNode("right-A", "right", "middle")}

| ${clusterNode("right-B", "right", "middle")}

|

|networks:

| left:

| middle:

| right:

""".stripMargin
)

Each Akka node will be configured using environment variables (e.g. to specify actor system ports, hostnames and seeding nodes) and will be connected to 2 networks. All our Akka nodes will be built using a single generic Akka image (see Node.scala) as follows:

def clusterNode(name: String, network1: String, network2: String): String =

s"""$name:

| template:

| resources:

| ...
| image: docker-compose-testkit-tests:$version

| environment:

| AKKA_HOST: $name

| AKKA_PORT: 2552

| CLUSTER_SEED_NODE: "akka.tcp://TestCluster@left-A:2552"

| expose:

| - 2552

| networks:

| - $network1

| - $network2

""".stripMargin

Notice that we have introduced the non-standard Docker Compose YAML template section. As we shall shortly see, this will be used to define any instrumentation layers that we wish to add to deployed services. Deploying and orchestrating our fleet of Dockerised services can then be achieved using:

implicit val compose: DockerCompose = up("chaos-experiment", yaml)

Step 2: System Instrumentation

Sensor InstrumentationData Injection Instrumentation

Ultimately, all interaction between our Chaos experiment and a deployed application will utilise side effecting code. In future posts, we will want to ensure that Chaos experiments can be code generated. This implies that Chaos experiments will need to be first class data citizens. Hence we will functionally model these side effects using extensible effects. In particular, we will utilise the Eff monad library to achieve our goals here. Broadly, our aim here is to keep all side effecting code at the external code boundaries to our otherwise functionally defined Chaos experiments.

Using extensible effects will boil down to defining a lightweight DSL describing how we observe a sensor or interact with a data injection point. We will then provide an interpreter for that DSL that may utilise side effecting code (e.g. to repeatibly perform a docker exec).

For our working example, we wish to define a Chaos experiment that is able to simulate a JVM GC pause and to perform a network partition. JVM GC pauses will here be simulated by pausing the Docker container and network partitions will be implemented by using the tc command to modify the network attached NICs to drop packets 100% of the time.

Injecting JVM GC Events

For the moment, let us assume that monitors can be implemented using nested partial functions (we will see more of this latter). Then a simple DSL for JVM GC events consists of just starting and stopping the JVM GC pause as follows:

sealed trait JvmGCEvent
case object JvmGCStart extends JvmGCEvent
case object JvmGCEnd extends JvmGCEvent
sealed trait JvmGCAction[Result]
final case class JvmGC(name: String, data: JvmGCEvent) extends JvmGCAction[Unit]

Notice how we encode the return type from a JvmGC event in the JvmGCAction trait.

In order to be able to provide a compositional DSL syntax, we need to provide a convenience constructor for injecting JVM GC actions into the Eff monad as follows:

type _jvm[Model] = JvmGCAction |= Model
def jvmGC[Model: _jvm](data: JvmGCEvent)(name: String): Eff[Model, Unit] =
Eff.send[JvmGCAction, Model, Unit](JvmGC(name, data))

Finally, we can provide an interpreter implementation for our DSL as follows:

implicit class JvmRun[R, A](effects: Eff[R, A]) {
def runJvm[U](
db: Map[String, DockerImage]
)(implicit member: Member.Aux[JvmGCAction, R, U],
error: _errorOrOk[U],
system: ActorSystem
): Eff[U, A] = {
translate(effects)(new Translate[JvmGCAction, U] {
def apply[X](jvm: JvmGCAction[X]): Eff[U, X] = {
jvm match {
case JvmGC(name, JvmGCStart) =>
ErrorEffect.eval(Now {
db(name).pause().asInstanceOf[X]
})
          case JvmGC(name, JvmGCEnd) =>
ErrorEffect.eval(Now {
db(name).unpause().asInstanceOf[X]
})
}
}
})
}
}

Notice how we pass a context (relating DockerImage's to string based identifiers) to the runJvm interpreter method. Also notice how this code wraps up our side effecting Docker interactions using an error effect.

Injecting Network Partitions

Again, as for JVM GC events, we define a simple DSL and convenience constructors as follows:

sealed trait NetworkAction[Result]
final case class PartitionNetwork(name: String) extends NetworkAction[Unit]
type _network[Model] = NetworkAction |= Model
def partition[Model: _network](name: String): Eff[Model, Unit] =
Eff.send[NetworkAction, Model, Unit](PartitionNetwork(name))

The network partition DSL being interpreted as follows:

implicit class NetworkingRun[R, A](effects: Eff[R, A]) {
def runNetwork[U](
implicit member: Member.Aux[NetworkAction, R, U],
error: _errorOrOk[U],
compose: DockerCompose
): Eff[U, A] = {
translate(effects)(new Translate[NetworkAction, U] {
def apply[X](net: NetworkAction[X]): Eff[U, X] = {
net match {
case PartitionNetwork(name) =>
ErrorEffect.eval(Now {
compose.network(name).impair(Loss("100%")).asInstanceOf[X]
})
}
}
})
}
}

Observing Akka Cluster State

Here we will utilise the Akka JMX console tools to observe the current state of the cluster. It is worth noting that these tools are deprecated and are intended to be replaced by the REST based Cluster Management interface.

As we shall shortly see, we will define runtime monitors using state machines as follows:

sealed trait JmxAction[Result]
final case class Jmx[State](name: String, monitor: Monitor[State, AkkaClusterState]) extends JmxAction[Observable[Notify]]
type _jmx[Model] = JmxAction |= Model
def jmx[State, Model: _jmx](monitor: Monitor[State, AkkaClusterState])(name: String): Eff[Model, Observable[Notify]] =
Eff.send[JmxAction, Model, Observable[Notify]](Jmx(name, monitor))

Here, these state machines are driven by polling a named Docker container and running the Akka JMX command line tool to observe the nodes current cluster state (these observed events are produced into a Monix Observable). On each transition of the state machine we may optionally emit notification events. Whenever the state machine terminates, then we will always generate a notification event. These notification events are collected into Monix Observables — and so, runtime monitors may be thought of as transforming Observable data types. This leads us to interpret our JMX based cluster sensor DSL as follows:

implicit class JmxRun[R, A](effects: Eff[R, A]) {
// Currently, it is essential that our first argument to runJmx is not a vararg!
def runJmx[U](
db: Map[String, DockerImage]
)(implicit member: Member.Aux[JmxAction, R, U],
error: _errorOrOk[U],
system: ActorSystem,
scheduler: Scheduler,
log: Logger
): Eff[U, A] = {
translate(effects)(new Translate[JmxAction, U] {
def apply[X](jmx: JmxAction[X]): Eff[U, X] = jmx match {
case Jmx(name, monitor: Monitor[_, AkkaClusterState]) =>
ErrorEffect.eval(Now {
monitor.run(db(name).members()).asInstanceOf[X]
})
}
})
}
}

Docker Container Instrumentation (optional)

Having defined extensible effects to implement our sensors and injection points, we still need to ensure that our deployed Docker containers are correctly built with additional features such as JMX command line scripts and libraries. We achieve this here by extending the Docker Compose YAML file with some syntactic sugar that allows us to express what additional code resources should be layered on top of our immutable base Docker container:

def clusterNode(name: String, network1: String, network2: String): String =

s"""$name:

| template:

| resources:

| - cakesolutions.docker.jmx.akka

| - cakesolutions.docker.network.default.linux

| image: docker-compose-testkit-tests:$version

| ...
""".stripMargin

Step 3: Runtime Monitors

Runtime monitors are implemented as state machines which are driven using events observed from our deployed application. On each transition of the state machine we may optionally emit notification events. This leads us to define the following:

sealed trait ObservedEvent[+Event]
case object StateTimeout extends Throwable with ObservedEvent[Nothing]
final case class Observe[Event](event: Event) extends ObservedEvent[Event]
type Behaviour[State, Event] = PartialFunction[State, PartialFunction[ObservedEvent[Event], Action[State]]]

It is worth noting that whenever the state machine terminates, then we will always generate a notification event. These notification events are collected into Monix Observables. Additionally, at each monitor state, we may optionally specify a duration for being in that state.

With our working example, we need to be able to determine if a given node has become unreachable (that way we enable recovery from a potential split brain during a long JVM GC pause). We achieve this using the following one state (of type Unit) state machine:

def unreachable(name: String): Monitor[Unit, AkkaClusterState] = {
val node = Address(akkaProtocol, akkaSystem, name, akkaPort)
  Monitor(()) {
case _ => {
case Observe(AkkaClusterState(_, _, unreachable)) if unreachable.exists(_.node == node) =>
Stop(Accept())
}
}
}

We also need to be able to detect that a given set of nodes has successully joined the Akka cluster and are associated with the cluster in an Up state. As we would like this to be the case for some period of time (that way we might then say that the cluster is stable), we use the slightly more complex runtime monitor:

def inCluster(names: String*): Monitor[Boolean, AkkaClusterState] = {
val nodes = names.map { nm =>
Address(akkaProtocol, akkaSystem, nm, akkaPort)
}
  Monitor(false) {
case false => {
case Observe(AkkaClusterState(_, members, unreachable)) if unreachable.isEmpty && nodes.forall(node => members.filter(_.status == MemberStatus.Up).exists(_.address == node)) =>
Goto(true, 3.seconds)
}
case true => {
case Observe(AkkaClusterState(_, _, unreachable)) if unreachable.nonEmpty =>
Stop(Fail())
case Observe(AkkaClusterState(_, members, _)) if nodes.exists(node => members.filter(_.status == MemberStatus.Up).forall(_.address != node)) =>
Stop(Fail())
case StateTimeout =>
Stop(Accept())
}
}
}

Step 4: Chaos Experiment

So far we have defined how to instrument and deploy our Dockerised application. We have also described how we can monitor the deployed sensors and interact with data injection points. All we need to do now is to show how we can define experiments that introduce Chaos into the system by injecting faulting behaviour!

Broadly speaking, we can imagine a Chaos experiment as being a state machine with runtime monitors as states. As runtime monitors succeed, then the Chaos experiment transitions into a new monitoring state emiting a data injection event. As sensors and data injection points are implemented as extensible effects in the Eff monad, we can simply combine these effects and so define Chaos experiments as an effect via for-comprehensions.

So, for our working example, we need to monitor:

  • that the initial cluster has correctly formed
  • that the right-A container becomes unreachable (after a JVM GC pause)
  • that the cluster correctly reforms (after the JVM GC pause ends)
  • that the cluster split brains (after a network partition).

This leads us to define the following Chaos experiment template:

def experiment[Model: _jvm: _jmx: _network: _validate: _errorOrOk](implicit compose: DockerCompose, system: ActorSystem, scheduler: Scheduler, log: Logger): Eff[Model, Notify] =
for {
obs1 <- jmx(inCluster(leftA, leftB, rightA, rightB))(leftA)
_ <- check(isAccepting(obs1))
_ <- note("cluster stable with members: left-A & left-B & right-A & right-B")
_ <- ???
_ <- note("right-A GC pause starts")
obs2 <- jmx(unreachable(rightA))(leftA)
_ <- check(isAccepting(obs2))
_ <- note("left-A sees right-A as unreachable")
_ <- ???
_ <- note("right-A GC pause ends")
obs3 <- jmx(inCluster(leftA, leftB, rightA, rightB))(leftA)
_ <- check(isAccepting(obs3))
_ <- note("cluster stable and right-A is allowed to rejoin")
_ <- ???
_ <- note("partition into left and right networks")
obs4 <- jmx(inCluster(leftA, leftB))(leftA) && jmx(inCluster(leftA, leftB))(leftB) && jmx(inCluster(rightA))(rightA) && jmx(inCluster(rightB))(rightB)
_ <- check(isAccepting(obs4))
_ <- note("cluster split brains into 3 clusters: left-A & left-B; right-A; right-B")
} yield Accept()

In order to complete the definition of our Chaos experiment, we just need to define the experiment’s data injection (i.e. JVM GC and network partitioning events) as follows:

def experiment[Model: _jvm: _jmx: _network: _validate: _errorOrOk](implicit compose: DockerCompose, system: ActorSystem, scheduler: Scheduler, log: Logger): Eff[Model, Notify] =
for {
obs1 <- ???
_ <- note("cluster stable with members: left-A & left-B & right-A & right-B")
_ <- jvmGC(JvmGCStart)(rightA)
_ <- note("right-A GC pause starts")
obs2 <- ???
_ <- note("left-A sees right.A as unreachable")
_ <- jvmGC(JvmGCEnd)(rightA)
_ <- note("right-A GC pause ends")
obs3 <- ???
_ <- note("cluster stable and right-A is allowed to rejoin")
_ <- partition("middle")
_ <- note("partition into left and right networks")
obs4 <- ???
_ <- note("cluster split brains into 3 clusters: left-A & left-B; right-A; right-B")
} yield Accept()

Finally, in order to run these experiments, we need to specify the collection of monadic effects that we intend to use. The type constraints in the experiment method dictates which effects must be present in the monadic effect stack (i.e. the type Model). Having done that, we can then run each effect to (hopefully!) generate a successful outcome for the Chaos experiment:

type ExperimentalModel = Fx.fx5[JvmGCAction, JmxAction, NetworkAction, Validate[String, ?], ErrorOrOk]
inside(experiment[ExperimentalModel].runJvm(cluster).runJmx(cluster).runNetwork.runError.runNel) {
case Pure(Right(Right(Accept())), _) =>
assert(true)
}

Conclusion

In this post, we have demonstrated the dangers of auto-downing during network partitioning (see screencast). This has been achieved using docker-compose-testkit to specify, control and monitor fault injection testing scenarios (c.f. a Chaos experiment). Using extensible effects we have shown how we may contain side effecting system interactions within a functional environment. In future posts, we will build upon this work so that we may search for Chaos experiments using formal models of our distributed application.

One limitation with the example that we have presented here is that it makes various closed-world assumptions regarding its knowledge of the system. However, as we outlined in the post Can Real-world Distributed Systems be Proven Correct?, real world examples typically operate in highly dynamic and changing environments, and so such assumptions can not be generally made.

In our next post, we will examine using docker-compose-testkit to investigate and resolve more realistic distributed programming issues. In the meantime, if you would like to learn more, why not attend my talk at the Krakow Scala user group’s ScalaCamp #10?

— Carl Pulley