Orchestrating startup and shutdown in Scala
A set of abstractions to manage the lifecycle of any Scala process, from small console apps to large-scale distributed Akka cluster applications
--
Co-authored with Grégory Marti
Starting and stopping an application properly is an often underlooked challenge. Tackling domain complexity is the obvious priority, and we often postpone such mundane concerns for later. Yet, in today’s highly available distributed systems, robust and failsafe application initialization and termination are essential to maintain service quality and data integrity during updates.
Here are some aspects to getting a service off and on the ground:
- initialize and terminate internal components in a specific order
- flush buffers to persistent storage to avoid loss of data
- close database connections and more generally release resources to avoid timeouts and starvation
- stop acceptation of new requests while finalizing in-flight responses
- coordinate cluster membership and node hand-off
This article describes a set of simple abstractions that we use to start and shut down our services. This accelerates our development, ensures reliable continuous delivery operations, and makes our codebase more uniform.
We also describe how to take advantage of these abstractions with Akka to define application startup, while delegating shutdown to Akka’s built-in CoordinatedShutdown
mechanism.
Stopping
The main abstraction in our library is Stoppable[T]
. It represents some process that can be stopped or that can stop out of its own volition. You might be surprised that we start this article by describing stop 😉. As it turns out, stopping is indeed slightly more complicated to describe than starting, precisely because processes can stop on their own (while they typically do not start on their own).
Here’s how we define a Stoppable
process:
This trait bears a type parameter T
which represents the type of value returned upon stop process completion. The typical candidate for this parameter is Either[E, Unit]
where E
is some error type informing about a failure during graceful shutdown.
Reacting to stopped
Let’s first look at the stopped
method: this returns a Future[T]
, precisely representing an asynchronous completion value delivered once the shutdown is complete. Making this signal readily accessible with an idempotent method allows for subscribers to react to the process stop event.
Triggering a stop is done via a call to triggerStop()
. Here as well, this trigger is handled by subscribing to the future returned by stopTriggered
. The subscriber, typically the very same component extending Stoppable
, is expected to carry out the necessary tear-down actions and call notifyStopped
upon completion (which in turn signals anyone listening to stopped
, as described above). Note that we are using simple promises to implement these notification contracts, but other mechanisms such as Monix's Subject
would work just as well.
This simple set of definitions already allows the description of the asynchronous interactions of some process triggering a stop of another and termination observers. Compared to a simpler version where a single future is used to terminate a process, this reactive solution has the advantage that the concern of triggering the stop is decoupled from the one of keeping track of the shutdown. Also, the stop can be triggered from several points in the program flow without leading to competing shutdown sequence executions.
Starting
To have an application to stop, we first need to start it! In many scenarios, bringing the application up is a question of letting the dependency graph of instantiations resolve itself. But more often than not, certain components are not ready right after instantiation and require some initialization phase.
For such cases, we introduce an explicit asynchronous method, unsurprisingly named start(), which returns a Future[Either[E, Unit]]
. Unlike stop, start is a direct process: the caller gets a Future
on the result of the start operation. This simpler approach fits the common use case since as mentioned previously, processes do typically not start on their own but according to some controllable stimulus (e.g. running the application). This call is also usually located in a single place within the code (the object extending App
for instance).
Start and stop define a Service
We have made the start()
method part of a Service
trait, extending Stoppable:
This trait extends Stoppable
with the notion of asynchronous start. We could also have defined start()
in some separate Startable
trait, but the need hasn't arisen until now. It is indeed often the case that components featuring asynchronous initialization also require the corresponding asynchronous tear-down. Nonetheless, we still support an easy way to create start-only services via our builder, as explained below.
Service builder
Implementing a trait is suitable when we choose to describe a service with a class, but it’s often more convenient to use plain functions. For these cases, we use a small builder DSL which allows us to assemble services quickly. Here are some examples:
- a “startable”, for instance an in-memory cache which takes time to load:
- a service with error-free initialization and shut-down:
- a service requiring a specific kafka topic and for which initialization can fail:
- a service with only stop logic, described using the previously mentioned
Stoppable
abstraction:Service.withName("stop-only").withStoppable(theStoppable).build
Composite Service
We have found that the real value of this Service
concept is in the "gluing" of both start and stop aspects in a single abstraction: this grants the ability to scaffold ordered and layered startup and shutdown sequences using the composite pattern.
When composing services together, we define the composed start as the ordered sequence of initialization steps of each service. By extension, the composed termination is the sequence of corresponding stop operations, applied in the reverse order. With these definitions, the composition itself can be considered a service, with start and stop implementing sequencing for “child” services.
Although this generic composite behavior seems trivial at first sight, there are some interesting challenges:
- services can stop at any point in time, in which case we want to stop the whole composition in turn
- services can stop while the composition is still initializing, in which case we want to abort and stop only the services that were already started
- similarly, one of the services may fail to start, in which case we also need to abort and stop already started services
- triggering a stop of the composite while it is starting should abort the ongoing sequence and stop already started services
- we should have a way to define timeouts to make sure stop methods do not exceed some maximal acceptable duration, to avoid freezing processes
The required orchestration to satisfy these constraints involves time and events, so we will make use of functional reactive operators available in Monix to express it compactly. Note that only the internals of the composite service requires this machinery, the outer API of the composite is (by definition) the same as any other service and is thus expressed with Future
. Akka Streams would also be fine for this job, but Monix has an edge when it comes to testing, as we'll illustrate further below.
Composite start
The composite start flow is orchestrated using:
startTriggered: Task[Unit]
: start signal, represented with a MonixTask
(tasks are in essence lazyFuture
)stopTriggered: Task[Unit]
: stop signaltriggerStop: () => Unit
: trigger for stop signalservices: List[Service]
: list of servicesdef startService(service: Service): Task[Service]
: wraps a call to thestart
method of the service within aTask
, failing the task withServiceStartFailed
in case of errorcase class StartResults(startedServices: List[Service], failure: Option[ServiceStartError])
: data structure to capture the results of the composite start operationdef registerForServiceStopOrError(service: Service, triggerStop: () => Unit) = service.stopped.foreach(_=> triggerStop())
: callstriggerStop
mentioned above when a child service stops
Here’s the expression of the start sequence:
Let’s review each involved operator (for visual description, you can refer to the marble diagrams further below):
Observable.fromTask
: convert a task to anObservable
.Observable
is an abstraction for a back-pressured stream....ambWith(Observable.fromTask(stopTriggered)))
: amb picks the first stream which emits a value and "sticks" to it. We use this here to short-circuit the start if a stop is triggered before even startingObservable.fromIterable(services)
: iterate the collection of services into a stream, with built-in back-pressure, i.e. the next value is only delivered with the agreement of the downstream operator. Combined withmapEval
(see below), this implements a sequential service starttakeUntil(Observable.fromTask(stopTriggered))
: takeUntil stops the stream whenever a value is emitted on the other parameter stream. We use it here to implement early stopmapEval(startService)
: call thestartService
method described above.map(_.asRight).onErrorHandleWith { case failure: ServiceStartFailed => Observable(failure.error.asLeft)}
: capture success or failure into an either value.takeWhileInclusive(_.isRight)
: maintain the stream while initializations are successful - in case of failure, abort and include the failure as last valuescan
: for each value in the stream, emit a newStartResults
consolidated element. We also make a side-effecting call toregisterForServiceStopOrError
when the service starts successfully to enable observation of the child servicestopped
signal
Composite stop
For the shutdown, we will make use of the following elements:
stopTriggered: Task[Unit]
: stop signalstartResults: Task[StartResults]
: results of the start operation
Stopping is about triggering the stop of each started service in reverse order, sequentially.
Stopping a service is implemented with:
Task.deferFuture
turns the passed future into a lazy one, so that we trigger service stop only upon running the task.
We rely on the stopped
signal of the service to continue on to the next, with the addition of a timeout mechanism. This prevents one service from holding up the stop of others indefinitely and limits the scope of any possible resource leak.
This stopService
function is used in the following flow:
Let’s review it line-by-line:
Task.fromFuture(stopTriggered).flatMap(_ => startResults)
: the stop signal is chained with the asynchronous result of start. The shutdown is thus delayed until initialization is completed. This isn't an issue however since the start is short-circuited in case of an early stop. The idea is that even in case of abort, stopping only happens after the ongoing start step terminates - so that the system state is always fully defined.Observable.fromIterable(results.startedServices.reverse).mapEval(stopService)
: initiates the stream of sequential stop operations in reverse order for services that were successfully started (and only these)foldLeftL(...)
: results of stop operations are aggregated into a list so that we can have informative logging feedback about the shutdown
Testing
Monix, and more generally any functional reactive framework which features a scheduler abstraction, allows for “virtual time” testing — in other words, simulating instantaneously what happens over longer periods. The most obvious illustration of this ability is to exercise timeouts, like in the following test:
The scheduler abstraction allows taking full control of flow during testing, which can become invaluable with complex streams.
Managing a distributed Akka application lifecycle
In actor-based systems running with Akka, and when running a cluster application in particular, it is recommended to delegate the entire application shutdown to Akka. Akka’s CoordinatedShutdown
defines several logical stages to termination which are universal to every Akka application. User code can be hooked up to form part of the shutdown, while Akka itself registers its internal termination procedures. For instance, the HTTP server stops accepting incoming connections, the sharding coordinator initiates message buffering and hand-off, the cluster extension downs the node, etc.
Even with the help of CoordinatedShutdown
, bringing a distributed HTTP application down safely requires some careful orchestration, as not even one HTTP request is supposed to be lost when an instance goes down. To achieve this lofty goal, we’ll first need some integration with the container orchestrator.
Liveness and readiness probes
The container orchestrator needs to know when it should start or terminate an instance and more importantly for our purpose here when traffic can be directed to the node. In the case of Kubernetes, this is achieved via liveness and readiness probes:
- liveness: indicates whether the application should be left running. The orchestrator will restart the container if the probe fails, to avoid nodes hanging forever in a limbo state ;
- readiness: indicates that the application is ready to receive traffic.
When operating a cluster with Akka management extensions, we can take advantage of the built-in support for health checks. In particular, two endpoints are exposed out of the box by the Akka management server:
/alive
: meant to be used as a liveness probe (the default implementation simply returns 'OK') ;/ready
: meant to be used as a readiness probe. When using the Akka management cluster HTTP module, the result depends on cluster membership matching a configurable value (typically, that the node has joined the cluster).
Akka also lets us define custom health checks to be added to (or to replace) the built-in ones (when enabling more than one check, the probe only succeeds if all checks are green).
Not one HTTP request lost!
Shutting down safely is a little more involved. One fundamental property of a highly available service is that not even one HTTP request can be lost when an instance is shut down. When exercising continuous delivery, this happens quite often as rolling updates maintain the system up-to-date. Obviously, the capacity to maintain a seamless quality of service during updates is critical to allow for frequent releases. Here’s the downing tactic we follow to minimize the possibility for lost requests:
- We start by stopping message consumption from Kafka. The local consumer is generally part of a group ID, leaving it up to the other consumers in the group to continue handling incoming data
- The readiness probe is disabled so that Kube eventually stops directing us further traffic
- We allow ongoing requests to complete and give Kube time to find out that the probe is disabled. This phase supports a timeout as we can’t afford to wait indefinitely either
- After this request “draining” period, it is time to leave the cluster, which is all driven by
CoordinatedShutdown
. Any cluster singleton instance running on the node is respawned somewhere else. Incoming messages to actors residing in the downing node are buffered during hand-off so that they can be recovered in another node without losing a single message (this requires persistent actors obviously, the state itself isn't transferred) - After leaving the cluster, we can take care of closing any remaining resources before the application exits
Here’s a sequence diagram of this orchestration (the terminology is explained further down):
Akka’s Coordinated Shutdown
Studying CoordinatedShutdown
gives us terminology to designate the different phases involved in this downing flow. Rather than including the original documentation here, we show some extension methods that we have defined to make registration of our custom tasks easier (the scaladoc annotations are taken directly from Akka's documentation):
The methods above expose only the phases which are meant to accommodate user-defined tasks.
Custom readiness probe
As mentioned previously, we want our readiness indicator to indicate both that the cluster is up and that we are ready to absorb incoming requests. More specifically, some time must be allotted to drain the requests while we are still part of the cluster. The probe is therefore disabled as soon as the ServiceUnbound
phase begins. We had to define a custom health check for this, as Akka's default only reflects cluster membership. Definition of a health check is done with a function returning Future[Boolean]
(and referencing it in the configuration):
Along with this probe, we also register some custom tasks to CoordinatedShutdown
to orchestrate request drain. Parameters to this logic are:
binding
: the Akka-HTTP bindingpendingRequestsHardDeadline
: hard deadline after which opened connections are force-closedunbindingDelay
: duration before rejection of incoming connections (the time allowed for traffic redirection)
Here’s the code, in the form of an extension method:
Application Builder DSL
As has now become our weird habit, we have started our description with the shutdown. Initializing an Akka application also requires some care. Application startup can broadly be divided into the following four stages:
- Start services dependent on the actor system but which do not require cluster membership
- Bind the HTTP routes so that we are ready to receive traffic whenever the Akka Cluster readiness probe indicates we have joined the cluster
- Join the Akka Cluster
- Start services depending on cluster membership
We can implement each phase as a service and sequence it using CompositeService
. To capture these common patterns and make it easier to bootstrap new Akka applications, we have created a small higher-level DSL which takes care of scaffolding the application, while also hooking up the termination of its components to coordinated shutdown phases.
Here’s an example composition of such an application:
SomeEtcdService
: this imaginary service doesn't require cluster membership to start pulling data from ETCD using Akka Streams, so we initialize it first. Its stop operation is hooked up withbeforeActorSystemTerminate
.HttpServiceDefinition
: this type captures relevant information to expose HTTP endpoints:
SomeEntityService
: a service making use of sharded persistent actors. This is typically best initialized when cluster membership has been confirmedSomeKafkaConsumerService
: once the system is ready to send commands to persistent actors, it's the time to start consuming from Kafka sources and committing offsets
If any of these services fail during startup, CoordinatedShutdown
is triggered. Since our services have hooked their stops into its various phases upon startup, the shutdown is entirely driven by Akka.
The composite service returned from the builder can even be composed further, which typically allows for describing an initial migration phase before the creation of the actor system. This can be beneficial in a cluster configuration since Akka already takes steps towards joining the cluster upon instantiation. Note however that by default CoordinatedShutdown
will shut down the JVM, so care must be taken not to rely on outer stoppers (or to override this setting).
Summary
In this article, we have given an overview of how we orchestrate application startup and shutdown with the help of core concepts Stoppable
(relying on a synchronous stop
trigger combined with asynchronous stopTriggered
and stopped
futures), Service
(exposing an asynchronous start
method with explicit possible failure) and CompositeService
(allowing to compose services and stoppables easily). We have also illustrated how we integrate these mechanisms with Akka's own CoordinatedShutdown
to achieve high availability.
How do you deal with those challenges in your applications? We are curious to find out, please let us know!