Making your Akka Streams robust

Steven Vroonland
Codestar blog
Published in
9 min readAug 17, 2018

Typical stream processing applications need to be running all the time. We want our streams to keep on running and be resilient to unexpected errors.

I have been using Akka Streams for stream processing in several projects. It’s a very powerful streaming library that offers you many ways to configure your streams. My experience over the past year has been that it takes several iterations to get the stream error handling right and ensure the stream keeps on running.

In this article I want to present some examples and guidelines for making your streams more robust that have worked for me. There are several ways to add error handling to your stream and you may already be using some of the techniques presented below. I want to show how they complement each other and fit together.

We will look at:

  1. How to be informed of stream completion and failures
  2. Restarting the stream or parts of it upon failure
  3. Resuming the stream on failures
  4. How to shutdown your stream gracefully

A basic stream

Let’s look at a basic stream. We’ll start with a basic stream and then gradually make it more robust.

val source = Source(1 to 1000)
val sink = Sink.foreach(println(_))
val stream = source
.mapAsync(1)(i => callSomeExternalService(i))
.to(sink)
stream.run() // We're done! All good, right?

What happens when we run()our stream? The stream is created in the background by creating one or more actors on the ActorSystem. Those actors will then run the stream, at which point we have no control over this process anymore.

Hopefully all goes well, but callSomeExternalService may fail, causing our stream to fail.

Clearly we need some more control over this background process.

Monitoring stream completion

The first step to gaining control over our streams is to add some logging when our stream terminates.

The result of calling run() on our stream graph is called the materialized value of the stream. This value provides us some grasp over the 'background process' that runs our stream after we have created it.

To log stream completion, we would like our stream to materialize to a Future[Done] that completes together with the stream (Future[Any] is fine as well). We can then log completion and failure like this:

val stream: RunnableGraph[Future[Done]] = ... 
val streamComplete: Future[Done] = stream.run()
streamComplete.onComplete {
case Success(_) => println("The stream has completed successfully")
case Failure(e) => println(s"The stream has completed with an error: ${e}")
}

If an exception occurs in any stage, it will result in streamComplete to be a failed Future.

Most of the built-in Sinks like Sink.ignore and Sink.foreach materialize to Future[Done] and are therefore suited for monitoring completion. If your Sink does not materialize to Future[Done] but something else, you can still monitor stream completion, at least up to the point where you connect to your Sink. Do this by using watchTermination, which materializes to a Future[Done] that completes when the stream stage terminates.

For example:

val source: Source[Int, NotUsed] = Source(1 to 10) 
val sink: Sink[Int, NotUsed] = ???
val streamComplete: Future[Done] =
source
.watchTermination()(Keep.right) // Keep Future[Done]
.toMat(sink)(Keep.left) // Keep Future[Done]
.run

Note on propagating materialized values

Sources, Sinks and Flows may all produce materialized values. Besides wiring the stream itself, it is good to keep in mind that we also have to ensure all of these materialized values are propagated ‘downstream’ correctly as they are in important for control over the stream once it has started running.

When writing your stream, you might naturally use the .to(sink) or .runWith(sink) methods to wire the source to a sink. However, these operators will only preserve only one of the materialized values. And confusingly enough, to keeps the Source's materialized value (left), while runWith will keep the Sink's value (right)!

So, for example, if you connect a Source[T, Future[Done]] with a Sink[T, Future[Done]] like source.to(sink), the materialized value will be a Future[Done], but it may not be the one you were interested in.

I therefore recommend to be careful with operators that combine materialized values themselves, like Source.to, Source.via and Source.runWith. Instead, use the variants that require you to combine the materialized values explicitly, like toMat and viaMat followed by run(). toMat takes a combine function that combines the materialized values. Akka Streams offers the convenience functions Keep.left, Keep.right and Keep.both for these.

Restarting the stream on failure

Now we are logging when our stream terminates on failure, but we want it to keep running. We should always expect our streams to fail at some point and take action to restart them.

There are several ways to restart your stream, we can restart it in its entirety or restart parts of it.

Restarting the entire stream on failure can be done as follows.

val stream: RunnableGraph[Future[Done]] = source.to(sink)def runAndRestartStream(): Future[Done] = {
stream.run()
.recoverWith {
case NonFatal(e) => println(s"Restarting the stream because of ${e}")
runAndRestartStream()
}
}
val streamComplete: Future[Done] = runAndRestartStream()

We can then add logging as shown earlier.

There are two things to be said about this approach:

  1. It can be expensive to reconstruct the entire stream. This may involve setting up connections to external services, database connections, etc.
  2. There is no strategy to ensure that we don’t overload external systems by continually trying an failing operation.

For these purposes, Akka provides ways to restart only your Sources, Flows and Sinks together with a backoff strategy.

Restarting sources

Akka provides a RestartSource that can restart a source on failure. Backoff settings can be configured so that restarting does not happen 100 times per second but with an exponential backoff time.

By wrapping our Source with RestartSource.onFailuresWithBackoff we can restart that part of the stream on failure.

This does not have to be the actual ‘initial source’ of our stream, all parts of the stream up to the Sink are of type Source[T, ..] and can be restarted as a whole with RestartSource.

Here is an example that will restart the source 5 times, each time taking a longer pause than the previous.

val source: Source[Int, NotUsed] =
Source(scala.collection.immutable.Seq(1, 2, 3, 4, 5))
.map(i => if (i == 3) {
throw new IllegalArgumentException("I don't like the number 3")
} else {
i
})
val sink: Sink[Int, Future[Done]] = Sink.foreach(println(_))val stream: RunnableGraph[(NotUsed, Future[Done])] =
RestartSource.onFailuresWithBackoff(
minBackoff = 100.millis,
maxBackoff = 1.second,
randomFactor = 0.2,
maxRestarts = 5
)(() => source)
.toMat(sink)(Keep.both)
val (_, streamComplete) = stream.run()streamComplete.onComplete {
case Success(_) => println("The stream has completed successfully")
case Failure(e) => println(s"The stream has completed with an error: ${e}")
}

The output will be 5 times the numbers 1 and 2, followed by the IllegalArgumentException, and finally:

The stream has completed with an error: java.lang.IllegalArgumentException: I don't like the number 3

Restarting and the materialized value

Restarting a source has consequences for the materialized value of the stream. Restarting means that the source is recreated and so is its materialized value.

Because of that, RestartSource produces Source[T, NotUsed] and not Source[T, Mat] with the Mat of the source: it cannot deliver the materialized values of the sources that are recreated in the future.

If necessary, we can tack watchTermination onto the RestartSource, so we have access to a Future[Done] materialized value again which we can observe for completion.

Restarting flows and sinks

RestartFlow allows us to restart a flow when exceptions occur in that part of the stream. As with RestartSource, it materializes to NotUsed. If you want to keep the materialized value of the Source, use the viaMat(flow)(combine) method with Keep.left as the combine parameter.

As an example use case, the Reactive Kafka library provides producing messages to Kafka as a Flow. Creating this Flow is expensive, but since the connection to Kafka may fail, wrapping this in a RestartFlow will make it more robust to failure.

There is also RestartSink which provides restarting behavior for sinks.

I suggest to look at the excellent Akka Streams documentation on this subject for more information.

Resuming the stream on failure

In many stream processing applications, restarting the stream may not have the desired effect of continuing the stream processing. Let’s say you are processing elements from some durable queue and processing of one element fails. If you restart the stream, the next element that will be processed will be the same element that caused the stream to fail earlier. This happens for example when we’re consuming from a Kafka topic and committing offsets only after processing.

In such a case we want our application to skip processing that element and resume the stream with the next element.

Akka Streams make use of a supervision strategy attribute on each stream stage to decide what to do when exceptions occur. By default the stage is stopped and the stream completes with an exception. We can alter this behavior to:

  • drop the current stream element and resume processing the next
  • restart the stream stage and resume processing the next element

Resuming is what we want in most of the cases. Restarting can be useful if we want to reset some accumulated state, when we’re using a scan or statefulMapConcat operator for example.

If we’re dealing with some external system, it’s better to make use of the exponential backoff provided by RestartSource/RestartFlow/RestartSink then the (immediate) restart supervision strategy.

You can configure the supervision strategy on the Actor Materializer or in the stream itself using withAttributes. Put withAttributes to the end of the stream as much as possible so that the supervision strategy applies to all stream stages before it.

The example below configures resuming instead of restarting like the earlier example:

val source: Source[Int, NotUsed] =
Source(scala.collection.immutable.Seq(1, 2, 3, 4, 5))
.map(i => if (i == 3) {
throw new IllegalArgumentException("I don't like the number 3")
} else {
i
})
val sink: Sink[Int, Future[Done]] = Sink.foreach(println(_))val stream: RunnableGraph[(NotUsed, Future[Done])] =
source
.withAttributes(ActorAttributes.withSupervisionStrategy {
case NonFatal(e) =>
println(s"Resuming after non fatal exception ${e}")
Resume
})
.toMat(sink)(Keep.right) // Keep the Sink's Future[Done]
val streamComplete = stream.run()

Shutting down a stream correctly

Finally, let’s talk a little bit about how to shut down a stream gracefully. Now that we have nice monitoring of our stream failures, the last thing we want is all kinds of error messages being logged unnecessarily during system shutdown or restart.

To properly shut down our stream, we basically need to do two things:

  1. Trigger the Source to complete
  2. Await completion of the stream

Complete the source

When the source completes, a ‘finished’ message will propagate down the stream (internally in Akka Streams). Depending on the source, we have some control over its completion. Sources that connect to external services often materialize to some sort of Cancellable or other means of control.

If we don’t have this control, we can insert a KillSwitch into our stream. When calling shutdown on a KillSwitch, it will propagate a cancellation message upstream, thereby completing the source. It will also send a completion message downstream.

In the example below we have a source that produces elements for about 10 seconds. We want to terminate it gracefully after 1 second.

val source: Source[Int, NotUsed] = Source(1 to 100).throttle(1, 100.milliseconds)val sink: Sink[Int, Future[Done]] = Sink.foreach(println(_))val (killSwitch: UniqueKillSwitch, streamComplete: Future[Done]) = source
.viaMat(KillSwitches.single)(Keep.right) // Keep the KillSwitch
.toMat(sink)(Keep.both) // Keep the Sink's completion Future[Done]
.run
streamComplete.onComplete {
case Success(_) => println("The stream has completed successfully")
case Failure(e) => println(s"The stream has completed with an error: ${e}")
}
Thread.sleep(1000)killSwitch.shutdown()

This will output elements 1 to 10 followed by The stream has completed succesfully.

Await stream completion

Triggering the source to complete is not enough. Processing each source element may take some time and we should wait for the last source element to be fully completed. ‘Draining’ the stream, if you will.

To make sure that the last source element is fully processed by our stream, we need to await stream completion. We can await the Future[Done] of our Sink for this purpose.

When integrating our stream with the Play framework, we can make use of the ApplicationLifecycle's stop hook. This looks as follows:

val lifecycle: ApplicationLifeCycle = ... 
lifecycle.addStopHook { () =>
killSwitch.shutdown()
streamComplete
}

Play will then wait for our stream to gracefully shutdown.

Summary

I have shown you some techniques to make sure your streams will keep running and how they fit together. Your stream processing logic may never be completely bug free, exceptions will occur sometimes, but these techniques will help us to be prepared.

There is much more to writing good stream processing systems, stuff like dealing with backpressure and optimizing throughput. But at least I hope the techniques I wrote about will help you to make your streams more robust to failures.

In summary:

  • Make your streams materialize to a Future[Done] for completion monitoring
  • Restart your stream or parts of it with exponential backoff using RestartSource, RestartFlow and RestartSink
  • Add resume logic using a supervision strategy to skip over faulty elements
  • Properly drain your streams on application shutdown

--

--