Pure and type-safe error handling in Akka Streams

Arjun Dhawan
Kaizo Engineering
5 min readFeb 22, 2021

--

Want to know how to deal with errors in Akka streams in a type-safe way, rather than using .recover? You’ve come to the right place!

At Kaizo we process millions of ticket events per day. We subscribe to these events through the Zendesk API. Companies use Zendesk to track, prioritize and solve customer support interactions. When companies use the Kaizo app in Zendesk, they can evaluate and improve their team’s performance with unified and actionable real-time insights, QA, and gamification.

A ticket is a means through which end users communicate with customers' agents. And real-time processing of ticket related events is required to provide fair, engaging, and leading metrics. Rather than using traditional reporting where you’re always playing catch-up.

To that end, we have to be reactive. We want to provide our agents with useful insights, always. That means designing around failure and expecting any kind of error to happen while processing an event.

The problem

We use Akka streams because of its performance characteristics compared to other streaming libraries. In Akka Streams, you traditionally use .recover to deal with failure:

Source(1 to 10)
.map(n =>
if (n == 3)
throw new RuntimeException(s"unexepted value: $n")
else
n.toString
)
.recover {
case e: RuntimeException => e.getMessage
}

As you can see, Akka expects you to deal with errors by throwing exceptions (and therefore use side-effects). Not only that, the compiler has almost no means of pointing out any bugs due to lack of type-safety. Imagine accidentally removing .recover. Since the type of the resulting expression doesn’t change, bugs like this are impossible to catch by the compiler.

Our first step into improving this snippet is to purify it. Instead of modeling errors by throwing exceptions, we can use Either which results in an expression of type Source[Either[Exception, Int], _].

Source(1 to 10)
.map(n =>
if (n == 3)
Left(new RuntimeException(s"unexepted value: $n"))
else
Right(n)
)

Due to the resulting type, the compiler now forces us to deal with any error at every processing stage. But this also creates another problem: at each moment of transforming the stream, we are now forced to introduce boilerplate which is a nested map: .map(_.map(n => ...)) . While other streaming libraries such as ZIO Streams allow you to conveniently carry around typed errors by virtue of their type definition (Stream[E, A]), Akka streams is clearly not designed to do this. The solution we use at Kaizo to overcome this issue originates from Colin Breck’s talk at Scala Days 2018.

Solution: use divertTo and collect

First of all, we create a Sink specifically for dealing with errors. Secondly, we use divertTo and collect to divert any Left value to it:

val errorSink: Sink[RuntimeException, NotUsed] =
Flow[RuntimeException]
.log("Error occurred")
.to(Sink.ignore)
Source(1 to 10)
.map(n =>
if (n == 3)
Left(new RuntimeException(s"unexepted value: $n"))
else
Right(n)
)
.divertTo(
errorsSink.contramap(
_.left.getOrElse(sys.error("No left value"))
),
_.isLeft
)
.collect { case Right(a) => a }

Here, contramap extracts the Left out of the Either before sending it to errorSink. The type of the resulting expression now is Source[Int, _] and we can use .map as we are used to. Note that we only use sys.error due to the Akka Streams API not having primitives to express what we are trying to achieve here. In reality, this error can never occur due to diversion happening if and only if _.isLeft.

Another way to approach this is to have an errorSink of slightly different definition:

val errorSink: Sink[Either[RuntimeException, _], NotUsed] =
Flow[Either[RuntimeException, _]]
.collect { case Left(exception) => exception }
.log("Error occurred")
.to(Sink.ignore)

Now we no longer need .contramap (together with the user of .left.getOrElse ) before sending errors to the sink. But we are forced to use nested .map to transform the error of the errorSink. And we have a type definition for errorSink that is slightly wider than needed: the errorSink should only deal with errors, but that is not reflected in its type definition.

We can abstract over this pattern, by creating a divertLeftTo function:

Source(1 to 10)
.map(n =>
if (n == 3)
Left(new RuntimeException(s"unexepted value: $n"))
else
Right(n.toString)
)
.via(divertLeftTo(errorSink))
def divertLeftTo[E, A](
sink: Sink[E, NotUsed]
): Flow[Either[E, A], A, NotUsed] = {

val sinkEither: Sink[Either[E, A], NotUsed] = sink.contramap(
_.left.getOrElse(sys.error("No left value"))
)

def shouldSendToSink(message: Either[E, A]): Boolean =
message.isLeft

Flow[Either[E, A]]
.divertTo(sinkEither, shouldSendToSink)
.collect { case Right(a) => a }

}

Thanks to errorSink we don't need to convolute error handling with the main logic. Any error gets diverted to this sink, and handled separately. That means we don't need to deal with errors immediately when we extract them from Either. Essentially, this is a data-driven approach to error handling, reaping all the benefits of the compiler doing its type checking.

Different ways of handling errors

Let’s take a step back here, and imagine what kind of errorSink we would want to have. By default (as we have implemented in the example above using Sink.ignore), sending messages to errorSink is effectively the same as skipping a message. But one can think of other types of sinks: those that forwards messages to a dead letter queue to be processed again later (when the error contains sufficient information to do that) or sinks which halt the stream completely and shut down the service.

Skipping a message could make sense for an error that is not (re-)processable at all (think of business constraints). For other types of errors, you’d definitely want to reprocess your messages. Think of an error that occurs during deserialization suggesting that the used data model has evolved, but this service has not yet been updated.

The following example showcases a Sink that can skip or halt the stream completely:

val errorSink: Sink[MyError, NotUsed] =
Flow[RuntimeException]
.takeWhile { case (e, _) => shouldSkip(e) }
.log("skipping")
.to(Sink.ignore)
def shouldSkip(e: MyError): Boolean =
e match {
case _: InvalidUserId => true
case e =>
log.error(s"terminating - $e")
false
}

Special case: committing on skipped messages using Kafka

We use Apache Kafka as our streaming platform, and use Alpakka Kafka to integrate with Scala. Imagine reading from a topic, and skipping certain events that we cannot process using only .collect :

Consumer
.sourceWithOffsetContext[String, String](consumerSettings, topics)
.map(record => deserializeAs[MyEvent](record.value)
.collect { case Right(event) => event }
.toMat(Committer.sinkWithOffsetContext(committerSettings))(Keep.none)
.run()
def deserializeAs[T](message: String): Either[DeserializationError, T]

Can you spot the bug here?

We are not committing the skipped messages! This means we would re-read this message if we’d happen to restart the service while skipping occurs. Luckily, committing skipped messages using divertLeftTo is a breeze:

val errorSink: Sink[DeserializationError, NotUsed] =
Flow[DeserializationError]
.log("skipping")
.toMat(Committer.sinkWithOffsetContext(comitterSettings))(Keep.none)
Consumer
.sourceWithOffsetContext[String, String](consumerSettings, topics)
.map(record => deserializeAs[MyEvent](record.value)
.via(divertLeftTo(errorSink))
.toMat(Committer.sinkWithOffsetContext(committerSettings))(Keep.none)
.run()
def deserializeAs[T](message: String): Either[DeserializationError, T]

Thanks for reading! If you enjoyed this story, follow our Publication to stay tuned for more stories.

Interested in joining Kaizo? We are hiring (Scala) Software Engineers and Data engineers! Check the recruitment page for our open positions.

--

--