Orchestrating startup and shutdown in Scala

A set of abstractions to manage the lifecycle of any Scala process, from small console apps to large-scale distributed Akka cluster applications

Jonas Chapuis
Bestmile
13 min readMay 10, 2020

--

Co-authored with Grégory Marti

Starting and stopping an application properly is an often underlooked challenge. Tackling domain complexity is the obvious priority, and we often postpone such mundane concerns for later. Yet, in today’s highly available distributed systems, robust and failsafe application initialization and termination are essential to maintain service quality and data integrity during updates.

Here are some aspects to getting a service off and on the ground:

  • initialize and terminate internal components in a specific order
  • flush buffers to persistent storage to avoid loss of data
  • close database connections and more generally release resources to avoid timeouts and starvation
  • stop acceptation of new requests while finalizing in-flight responses
  • coordinate cluster membership and node hand-off

This article describes a set of simple abstractions that we use to start and shut down our services. This accelerates our development, ensures reliable continuous delivery operations, and makes our codebase more uniform.

We also describe how to take advantage of these abstractions with Akka to define application startup, while delegating shutdown to Akka’s built-in CoordinatedShutdown mechanism.

Stopping

The main abstraction in our library is Stoppable[T]. It represents some process that can be stopped or that can stop out of its own volition. You might be surprised that we start this article by describing stop 😉. As it turns out, stopping is indeed slightly more complicated to describe than starting, precisely because processes can stop on their own (while they typically do not start on their own).

Here’s how we define a Stoppable process:

This trait bears a type parameter T which represents the type of value returned upon stop process completion. The typical candidate for this parameter is Either[E, Unit] where E is some error type informing about a failure during graceful shutdown.

Reacting to stopped

Let’s first look at the stopped method: this returns a Future[T], precisely representing an asynchronous completion value delivered once the shutdown is complete. Making this signal readily accessible with an idempotent method allows for subscribers to react to the process stop event.

Triggering a stop is done via a call to triggerStop(). Here as well, this trigger is handled by subscribing to the future returned by stopTriggered. The subscriber, typically the very same component extending Stoppable, is expected to carry out the necessary tear-down actions and call notifyStopped upon completion (which in turn signals anyone listening to stopped, as described above). Note that we are using simple promises to implement these notification contracts, but other mechanisms such as Monix's Subject would work just as well.

Example of stoppable flow (dashed arrows represent Future callbacks).

This simple set of definitions already allows the description of the asynchronous interactions of some process triggering a stop of another and termination observers. Compared to a simpler version where a single future is used to terminate a process, this reactive solution has the advantage that the concern of triggering the stop is decoupled from the one of keeping track of the shutdown. Also, the stop can be triggered from several points in the program flow without leading to competing shutdown sequence executions.

Starting

To have an application to stop, we first need to start it! In many scenarios, bringing the application up is a question of letting the dependency graph of instantiations resolve itself. But more often than not, certain components are not ready right after instantiation and require some initialization phase.

For such cases, we introduce an explicit asynchronous method, unsurprisingly named start(), which returns a Future[Either[E, Unit]]. Unlike stop, start is a direct process: the caller gets a Future on the result of the start operation. This simpler approach fits the common use case since as mentioned previously, processes do typically not start on their own but according to some controllable stimulus (e.g. running the application). This call is also usually located in a single place within the code (the object extending App for instance).

Start and stop define a Service

We have made the start() method part of a Service trait, extending Stoppable:

This trait extends Stoppable with the notion of asynchronous start. We could also have defined start() in some separate Startable trait, but the need hasn't arisen until now. It is indeed often the case that components featuring asynchronous initialization also require the corresponding asynchronous tear-down. Nonetheless, we still support an easy way to create start-only services via our builder, as explained below.

Service builder

Implementing a trait is suitable when we choose to describe a service with a class, but it’s often more convenient to use plain functions. For these cases, we use a small builder DSL which allows us to assemble services quickly. Here are some examples:

  • a “startable”, for instance an in-memory cache which takes time to load:
  • a service with error-free initialization and shut-down:
  • a service requiring a specific kafka topic and for which initialization can fail:
  • a service with only stop logic, described using the previously mentioned Stoppable abstraction: Service.withName("stop-only").withStoppable(theStoppable).build

Composite Service

We have found that the real value of this Service concept is in the "gluing" of both start and stop aspects in a single abstraction: this grants the ability to scaffold ordered and layered startup and shutdown sequences using the composite pattern.

When composing services together, we define the composed start as the ordered sequence of initialization steps of each service. By extension, the composed termination is the sequence of corresponding stop operations, applied in the reverse order. With these definitions, the composition itself can be considered a service, with start and stop implementing sequencing for “child” services.

Although this generic composite behavior seems trivial at first sight, there are some interesting challenges:

  • services can stop at any point in time, in which case we want to stop the whole composition in turn
  • services can stop while the composition is still initializing, in which case we want to abort and stop only the services that were already started
  • similarly, one of the services may fail to start, in which case we also need to abort and stop already started services
  • triggering a stop of the composite while it is starting should abort the ongoing sequence and stop already started services
  • we should have a way to define timeouts to make sure stop methods do not exceed some maximal acceptable duration, to avoid freezing processes

The required orchestration to satisfy these constraints involves time and events, so we will make use of functional reactive operators available in Monix to express it compactly. Note that only the internals of the composite service requires this machinery, the outer API of the composite is (by definition) the same as any other service and is thus expressed with Future. Akka Streams would also be fine for this job, but Monix has an edge when it comes to testing, as we'll illustrate further below.

Composite start

The composite start flow is orchestrated using:

  • startTriggered: Task[Unit]: start signal, represented with a Monix Task (tasks are in essence lazy Future)
  • stopTriggered: Task[Unit]: stop signal
  • triggerStop: () => Unit: trigger for stop signal
  • services: List[Service]: list of services
  • def startService(service: Service): Task[Service]: wraps a call to the start method of the service within a Task, failing the task with ServiceStartFailed in case of error
  • case class StartResults(startedServices: List[Service], failure: Option[ServiceStartError]): data structure to capture the results of the composite start operation
  • def registerForServiceStopOrError(service: Service, triggerStop: () => Unit) = service.stopped.foreach(_=> triggerStop()): calls triggerStop mentioned above when a child service stops

Here’s the expression of the start sequence:

Let’s review each involved operator (for visual description, you can refer to the marble diagrams further below):

  • Observable.fromTask: convert a task to an Observable. Observable is an abstraction for a back-pressured stream
  • ....ambWith(Observable.fromTask(stopTriggered))): amb picks the first stream which emits a value and "sticks" to it. We use this here to short-circuit the start if a stop is triggered before even starting
  • Observable.fromIterable(services): iterate the collection of services into a stream, with built-in back-pressure, i.e. the next value is only delivered with the agreement of the downstream operator. Combined with mapEval (see below), this implements a sequential service start
  • takeUntil(Observable.fromTask(stopTriggered)): takeUntil stops the stream whenever a value is emitted on the other parameter stream. We use it here to implement early stop
  • mapEval(startService): call the startService method described above
  • .map(_.asRight).onErrorHandleWith { case failure: ServiceStartFailed => Observable(failure.error.asLeft)}: capture success or failure into an either value
  • .takeWhileInclusive(_.isRight): maintain the stream while initializations are successful - in case of failure, abort and include the failure as last value
  • scan: for each value in the stream, emit a new StartResults consolidated element. We also make a side-effecting call to registerForServiceStopOrError when the service starts successfully to enable observation of the child service stopped signal
Composite start marbles for various scenarios.

Composite stop

For the shutdown, we will make use of the following elements:

  • stopTriggered: Task[Unit]: stop signal
  • startResults: Task[StartResults]: results of the start operation

Stopping is about triggering the stop of each started service in reverse order, sequentially.

Stopping a service is implemented with:

Task.deferFuture turns the passed future into a lazy one, so that we trigger service stop only upon running the task.

We rely on the stopped signal of the service to continue on to the next, with the addition of a timeout mechanism. This prevents one service from holding up the stop of others indefinitely and limits the scope of any possible resource leak.

This stopService function is used in the following flow:

Let’s review it line-by-line:

  • Task.fromFuture(stopTriggered).flatMap(_ => startResults): the stop signal is chained with the asynchronous result of start. The shutdown is thus delayed until initialization is completed. This isn't an issue however since the start is short-circuited in case of an early stop. The idea is that even in case of abort, stopping only happens after the ongoing start step terminates - so that the system state is always fully defined.
  • Observable.fromIterable(results.startedServices.reverse).mapEval(stopService): initiates the stream of sequential stop operations in reverse order for services that were successfully started (and only these)
  • foldLeftL(...): results of stop operations are aggregated into a list so that we can have informative logging feedback about the shutdown
Marble diagram for the successful stop scenario.

Testing

Monix, and more generally any functional reactive framework which features a scheduler abstraction, allows for “virtual time” testing — in other words, simulating instantaneously what happens over longer periods. The most obvious illustration of this ability is to exercise timeouts, like in the following test:

The scheduler abstraction allows taking full control of flow during testing, which can become invaluable with complex streams.

Managing a distributed Akka application lifecycle

In actor-based systems running with Akka, and when running a cluster application in particular, it is recommended to delegate the entire application shutdown to Akka. Akka’s CoordinatedShutdown defines several logical stages to termination which are universal to every Akka application. User code can be hooked up to form part of the shutdown, while Akka itself registers its internal termination procedures. For instance, the HTTP server stops accepting incoming connections, the sharding coordinator initiates message buffering and hand-off, the cluster extension downs the node, etc.

Even with the help of CoordinatedShutdown, bringing a distributed HTTP application down safely requires some careful orchestration, as not even one HTTP request is supposed to be lost when an instance goes down. To achieve this lofty goal, we’ll first need some integration with the container orchestrator.

Liveness and readiness probes

The container orchestrator needs to know when it should start or terminate an instance and more importantly for our purpose here when traffic can be directed to the node. In the case of Kubernetes, this is achieved via liveness and readiness probes:

  • liveness: indicates whether the application should be left running. The orchestrator will restart the container if the probe fails, to avoid nodes hanging forever in a limbo state ;
  • readiness: indicates that the application is ready to receive traffic.

When operating a cluster with Akka management extensions, we can take advantage of the built-in support for health checks. In particular, two endpoints are exposed out of the box by the Akka management server:

  • /alive: meant to be used as a liveness probe (the default implementation simply returns 'OK') ;
  • /ready: meant to be used as a readiness probe. When using the Akka management cluster HTTP module, the result depends on cluster membership matching a configurable value (typically, that the node has joined the cluster).

Akka also lets us define custom health checks to be added to (or to replace) the built-in ones (when enabling more than one check, the probe only succeeds if all checks are green).

Not one HTTP request lost!

Shutting down safely is a little more involved. One fundamental property of a highly available service is that not even one HTTP request can be lost when an instance is shut down. When exercising continuous delivery, this happens quite often as rolling updates maintain the system up-to-date. Obviously, the capacity to maintain a seamless quality of service during updates is critical to allow for frequent releases. Here’s the downing tactic we follow to minimize the possibility for lost requests:

  1. We start by stopping message consumption from Kafka. The local consumer is generally part of a group ID, leaving it up to the other consumers in the group to continue handling incoming data
  2. The readiness probe is disabled so that Kube eventually stops directing us further traffic
  3. We allow ongoing requests to complete and give Kube time to find out that the probe is disabled. This phase supports a timeout as we can’t afford to wait indefinitely either
  4. After this request “draining” period, it is time to leave the cluster, which is all driven by CoordinatedShutdown. Any cluster singleton instance running on the node is respawned somewhere else. Incoming messages to actors residing in the downing node are buffered during hand-off so that they can be recovered in another node without losing a single message (this requires persistent actors obviously, the state itself isn't transferred)
  5. After leaving the cluster, we can take care of closing any remaining resources before the application exits

Here’s a sequence diagram of this orchestration (the terminology is explained further down):

Sequence diagram for Akka Cluster HTTP application shutdown

Akka’s Coordinated Shutdown

Studying CoordinatedShutdown gives us terminology to designate the different phases involved in this downing flow. Rather than including the original documentation here, we show some extension methods that we have defined to make registration of our custom tasks easier (the scaladoc annotations are taken directly from Akka's documentation):

The methods above expose only the phases which are meant to accommodate user-defined tasks.

Custom readiness probe

As mentioned previously, we want our readiness indicator to indicate both that the cluster is up and that we are ready to absorb incoming requests. More specifically, some time must be allotted to drain the requests while we are still part of the cluster. The probe is therefore disabled as soon as the ServiceUnbound phase begins. We had to define a custom health check for this, as Akka's default only reflects cluster membership. Definition of a health check is done with a function returning Future[Boolean] (and referencing it in the configuration):

Along with this probe, we also register some custom tasks to CoordinatedShutdown to orchestrate request drain. Parameters to this logic are:

  • binding: the Akka-HTTP binding
  • pendingRequestsHardDeadline: hard deadline after which opened connections are force-closed
  • unbindingDelay: duration before rejection of incoming connections (the time allowed for traffic redirection)

Here’s the code, in the form of an extension method:

Application Builder DSL

As has now become our weird habit, we have started our description with the shutdown. Initializing an Akka application also requires some care. Application startup can broadly be divided into the following four stages:

  1. Start services dependent on the actor system but which do not require cluster membership
  2. Bind the HTTP routes so that we are ready to receive traffic whenever the Akka Cluster readiness probe indicates we have joined the cluster
  3. Join the Akka Cluster
  4. Start services depending on cluster membership

We can implement each phase as a service and sequence it using CompositeService. To capture these common patterns and make it easier to bootstrap new Akka applications, we have created a small higher-level DSL which takes care of scaffolding the application, while also hooking up the termination of its components to coordinated shutdown phases.

Here’s an example composition of such an application:

  • SomeEtcdService: this imaginary service doesn't require cluster membership to start pulling data from ETCD using Akka Streams, so we initialize it first. Its stop operation is hooked up with beforeActorSystemTerminate.
  • HttpServiceDefinition: this type captures relevant information to expose HTTP endpoints:
  • SomeEntityService: a service making use of sharded persistent actors. This is typically best initialized when cluster membership has been confirmed
  • SomeKafkaConsumerService: once the system is ready to send commands to persistent actors, it's the time to start consuming from Kafka sources and committing offsets

If any of these services fail during startup, CoordinatedShutdown is triggered. Since our services have hooked their stops into its various phases upon startup, the shutdown is entirely driven by Akka.

The composite service returned from the builder can even be composed further, which typically allows for describing an initial migration phase before the creation of the actor system. This can be beneficial in a cluster configuration since Akka already takes steps towards joining the cluster upon instantiation. Note however that by default CoordinatedShutdown will shut down the JVM, so care must be taken not to rely on outer stoppers (or to override this setting).

Summary

In this article, we have given an overview of how we orchestrate application startup and shutdown with the help of core concepts Stoppable (relying on a synchronous stop trigger combined with asynchronous stopTriggered and stopped futures), Service (exposing an asynchronous start method with explicit possible failure) and CompositeService (allowing to compose services and stoppables easily). We have also illustrated how we integrate these mechanisms with Akka's own CoordinatedShutdown to achieve high availability.

How do you deal with those challenges in your applications? We are curious to find out, please let us know!

--

--