Pure and type-safe error handling in Akka Streams
Want to know how to deal with errors in Akka streams in a type-safe way, rather than using .recover
? You’ve come to the right place!
At Kaizo we process millions of ticket events per day. We subscribe to these events through the Zendesk API. Companies use Zendesk to track, prioritize and solve customer support interactions. When companies use the Kaizo app in Zendesk, they can evaluate and improve their team’s performance with unified and actionable real-time insights, QA, and gamification.
A ticket is a means through which end users communicate with customers' agents. And real-time processing of ticket related events is required to provide fair, engaging, and leading metrics. Rather than using traditional reporting where you’re always playing catch-up.
To that end, we have to be reactive. We want to provide our agents with useful insights, always. That means designing around failure and expecting any kind of error to happen while processing an event.
The problem
We use Akka streams because of its performance characteristics compared to other streaming libraries. In Akka Streams, you traditionally use .recover
to deal with failure:
Source(1 to 10)
.map(n =>
if (n == 3)
throw new RuntimeException(s"unexepted value: $n")
else
n.toString
)
.recover {
case e: RuntimeException => e.getMessage
}
As you can see, Akka expects you to deal with errors by throwing exceptions (and therefore use side-effects). Not only that, the compiler has almost no means of pointing out any bugs due to lack of type-safety. Imagine accidentally removing .recover
. Since the type of the resulting expression doesn’t change, bugs like this are impossible to catch by the compiler.
Our first step into improving this snippet is to purify it. Instead of modeling errors by throwing exceptions, we can use Either
which results in an expression of type Source[Either[Exception, Int], _]
.
Source(1 to 10)
.map(n =>
if (n == 3)
Left(new RuntimeException(s"unexepted value: $n"))
else
Right(n)
)
Due to the resulting type, the compiler now forces us to deal with any error at every processing stage. But this also creates another problem: at each moment of transforming the stream, we are now forced to introduce boilerplate which is a nested map: .map(_.map(n => ...))
. While other streaming libraries such as ZIO Streams allow you to conveniently carry around typed errors by virtue of their type definition (Stream[E, A]
), Akka streams is clearly not designed to do this. The solution we use at Kaizo to overcome this issue originates from Colin Breck’s talk at Scala Days 2018.
Solution: use divertTo and
collect
First of all, we create a Sink
specifically for dealing with errors. Secondly, we use divertTo
and collect
to divert any Left
value to it:
val errorSink: Sink[RuntimeException, NotUsed] =
Flow[RuntimeException]
.log("Error occurred")
.to(Sink.ignore)Source(1 to 10)
.map(n =>
if (n == 3)
Left(new RuntimeException(s"unexepted value: $n"))
else
Right(n)
)
.divertTo(
errorsSink.contramap(
_.left.getOrElse(sys.error("No left value"))
),
_.isLeft
)
.collect { case Right(a) => a }
Here, contramap
extracts the Left
out of the Either
before sending it to errorSink
. The type of the resulting expression now is Source[Int, _]
and we can use .map
as we are used to. Note that we only use sys.error
due to the Akka Streams API not having primitives to express what we are trying to achieve here. In reality, this error can never occur due to diversion happening if and only if _.isLeft
.
Another way to approach this is to have an errorSink
of slightly different definition:
val errorSink: Sink[Either[RuntimeException, _], NotUsed] =
Flow[Either[RuntimeException, _]]
.collect { case Left(exception) => exception }
.log("Error occurred")
.to(Sink.ignore)
Now we no longer need .contramap
(together with the user of .left.getOrElse
) before sending errors to the sink. But we are forced to use nested .map
to transform the error of the errorSink
. And we have a type definition for errorSink
that is slightly wider than needed: the errorSink
should only deal with errors, but that is not reflected in its type definition.
We can abstract over this pattern, by creating a divertLeftTo
function:
Source(1 to 10)
.map(n =>
if (n == 3)
Left(new RuntimeException(s"unexepted value: $n"))
else
Right(n.toString)
)
.via(divertLeftTo(errorSink))def divertLeftTo[E, A](
sink: Sink[E, NotUsed]
): Flow[Either[E, A], A, NotUsed] = {
val sinkEither: Sink[Either[E, A], NotUsed] = sink.contramap(
_.left.getOrElse(sys.error("No left value"))
)
def shouldSendToSink(message: Either[E, A]): Boolean =
message.isLeft
Flow[Either[E, A]]
.divertTo(sinkEither, shouldSendToSink)
.collect { case Right(a) => a }
}
Thanks to errorSink
we don't need to convolute error handling with the main logic. Any error gets diverted to this sink, and handled separately. That means we don't need to deal with errors immediately when we extract them from Either
. Essentially, this is a data-driven approach to error handling, reaping all the benefits of the compiler doing its type checking.
Different ways of handling errors
Let’s take a step back here, and imagine what kind of errorSink
we would want to have. By default (as we have implemented in the example above using Sink.ignore
), sending messages to errorSink
is effectively the same as skipping a message. But one can think of other types of sinks: those that forwards messages to a dead letter queue to be processed again later (when the error contains sufficient information to do that) or sinks which halt the stream completely and shut down the service.
Skipping a message could make sense for an error that is not (re-)processable at all (think of business constraints). For other types of errors, you’d definitely want to reprocess your messages. Think of an error that occurs during deserialization suggesting that the used data model has evolved, but this service has not yet been updated.
The following example showcases a Sink
that can skip or halt the stream completely:
val errorSink: Sink[MyError, NotUsed] =
Flow[RuntimeException]
.takeWhile { case (e, _) => shouldSkip(e) }
.log("skipping")
.to(Sink.ignore)def shouldSkip(e: MyError): Boolean =
e match {
case _: InvalidUserId => true
case e =>
log.error(s"terminating - $e")
false
}
Special case: committing on skipped messages using Kafka
We use Apache Kafka as our streaming platform, and use Alpakka Kafka to integrate with Scala. Imagine reading from a topic, and skipping certain events that we cannot process using only .collect
:
Consumer
.sourceWithOffsetContext[String, String](consumerSettings, topics)
.map(record => deserializeAs[MyEvent](record.value)
.collect { case Right(event) => event }
.toMat(Committer.sinkWithOffsetContext(committerSettings))(Keep.none)
.run()def deserializeAs[T](message: String): Either[DeserializationError, T]
Can you spot the bug here?
We are not committing the skipped messages! This means we would re-read this message if we’d happen to restart the service while skipping occurs. Luckily, committing skipped messages using divertLeftTo
is a breeze:
val errorSink: Sink[DeserializationError, NotUsed] =
Flow[DeserializationError]
.log("skipping")
.toMat(Committer.sinkWithOffsetContext(comitterSettings))(Keep.none)Consumer
.sourceWithOffsetContext[String, String](consumerSettings, topics)
.map(record => deserializeAs[MyEvent](record.value)
.via(divertLeftTo(errorSink))
.toMat(Committer.sinkWithOffsetContext(committerSettings))(Keep.none)
.run()def deserializeAs[T](message: String): Either[DeserializationError, T]
Thanks for reading! If you enjoyed this story, follow our Publication to stay tuned for more stories.
Interested in joining Kaizo? We are hiring (Scala) Software Engineers and Data engineers! Check the recruitment page for our open positions.