How To Implement Streaming Microservices Using ZIO and Kafka

Scalac
Scalac
Published in
8 min readFeb 9, 2021

The design and implementation of distributed and highly concurrent applications is something we do every day at Scalac. The adoption of distributed systems is a trend that is currently growing, and it’s not going to stop. Apart from Kubernetes, Apache Kafka is surely the main reason for this.

Here at Scalac, we use Apache Kafka as the main component for asynchronous communication between microservices. Ingestion of large-scale data, resilience to data loss and corruption, replication, and easily achievable parallelism via consumer groups are some of the main reasons why Kafka is one of the most important tools for building distributed systems.

Scala, ZIO and finally Apache Kafka

It is no secret that we love Scala, and we have fully embraced functional programming, so this article will assume some familiarity with functional effect systems like ZIO and just a basic understanding of Apache Kafka. Although this article is aimed at developers who are already familiar with ZIO and want to know how to integrate Kafka in ZIO-based projects. I will cover the basics of ZIO, ZIO Streams, and finally, the process of implementing streaming microservices using functional programming techniques.

Let’s get started.

ZIO Basics

ZIO is a zero-dependency Scala library for asynchronous and concurrent programming. Powered by non-blocking fibers that never waste or leak resources, ZIO lets you build scalable, resilient, and reactive applications that meet the needs of your business. To learn more about ZIO and its benefits check the official documentation or our latest blog post.

ZIO Streams

Functional streams, as an abstraction, are commonly used to elegantly solve problems in the area of concurrency and the processing of unbounded data, making resource safety and CPU utilization the priority.

ZIO Streams are pretty similar to the ZIO type we described in the previous chapter. It looks like this:

ZStream[-R, +E, +O]

ZStream is a description of a program that requires an environment of type R to be evaluated, and it can fail with an error of type E or can produce zero or more values of type O. You can also think of ZStream as a ZIO value, that can emit multiple values instead of a single one.

ZStream is a purely functional pull-based stream, which means that it offers inherent laziness and backpressure, relieving users of the need to manage or write any backpressure handling code manually.

It’s no surprise that ZIO Kafka is based on ZIO and ZIO Streams to provide an elegant interface for consuming, processing, and committing Kafka records with already existing operators from ZIO Streams. So after configuration of our input ZStream, there is no difference between whether elements come from Kafka, File System, or any other data source; the operators for element processing are completely the same. Of course, there are some specific operations when you process records from Kafka, such as manual offset committing.

ZIO Kafka

Applications that are based on event-driven architecture probably are using Apache Kafka as a data backbone, which is the unified point in the system where all of the events are stored. Since Kafka topics are meant to consistently increase over time with new events, applications that follow event-driven principles are constructed as infinite loops, which consume events and then process them in some way. This is whether it is the transformation of an event and again produces it to Kafka or writes it into some data store in order to build a materialized view or some kind of aggregate.

Using a plain Kafka client is a valid option of course, but in a lot of situations it can be really hard to implement some common streaming workflows such as buffering, aggregating batches of records up to a specified timeout, or control of emitted messages per time unit. These are not trivial tasks and will distract a developer, and delay the implementation of any business requirements. It is imperative that a code can support all of these afore-mentioned patterns, but this is hard to implement, hard to test, and hard to be extend. Now imagine that you would like to implement all of this, but in a completely asynchronous and non-blocking manner. It sounds extremely hard, because it is. Fortunately, all of these patterns are available in ZIO Streams, and that is probably the main reason why you should avoid using the plain Kafka client.

So to summarize, if you want to focus more on business logic and let the library manage all of the hard work for you, such as resource safety and a level of parallelism for your stream processing pipeline, you should definitely consider ZIO Streams. And if your system is built on top of Apache Kafka, then ZIO Kafka is surely a library you will enjoy using.

Microservices with ZIO and Kafka

Now, after we’ve explained the basics of ZIO, ZIO Streams, and ZIO Kafka, it is time to go through an implementation of a system that utilizes all those technologies.

Our system consists of 2 services. The first is a producer service, which is responsible for producing some events to Kafka, and the second one is a processor service, which is responsible for consuming the raw events produced by the producer service, their enrichment by contacting some external API and finally producing these enriched records to Kafka again. The full implementation is here, but we will explain it step by step in the next few parts.

The project is implemented as a multi-module sbt project. We have a kafka module, which is used just to bootstrap an embedded Kafka instance, for development purposes. The second module is the protocol module, where we have defined events in our system, which are modeled as case classes. We are going to use the JSON format for our Kafka messages, so in the protocol module, JSON codecs are also implemented.

A Protocol module contains a few case classes. The first is

TransactionRaw

final case class TransactionRaw(userId: Long, country: String, amount: BigDecimal)

object TransactionRaw {
implicit val codec: Codec[TransactionRaw] = deriveCodec[TransactionRaw]
}

TransactionRaw is used to model events that are going to be published by the producer service. The next event in our system is

TransactionEnriched

final case class TransactionEnriched(userId: Long, country: Country, amount: BigDecimal)

object TransactionEnriched {
implicit val codec: Codec[TransactionEnriched] = deriveCodec[TransactionEnriched]
}
final case class Country(
name: String,
capital: String,
region: String,
subregion: String,
population: Long
)

object Country {
implicit val codec: Codec[Country] = deriveCodec[Country]
}

TransactionEnriched contains the same fields as TransactionRaw, but is enriched with Country details as you can see. The processor service will be responsible for fetching these country details from the external API, and produce the enriched transactions to the new Kafka topic.

Let’s review the code of our producer application.

object ProducerApp extends App {
override def run(args: List[String]) =
program.provideSomeLayer[Any with Blocking](appLayer).exitCode

private lazy val program =
ZStream
.fromIterable(EventGenerator.transactions)
.map(toProducerRecord)
.mapM { producerRecord =>
log.info(s"Producing $producerRecord to Kafka...") *>
Producer.produce[Any, Long, String](producerRecord)
}
.runDrain

private lazy val appLayer = {
val producerSettings = ProducerSettings(List("localhost:9092"))
val producerLayer = Producer.make[Any, Long, String](
producerSettings, Serde.long, Serde.string
).toLayer

val loggingLayer = Slf4jLogger.make { (context, message) =>
val correlationId = LogAnnotation.CorrelationId.render(
context.get(LogAnnotation.CorrelationId))
"[correlation-id = %s] %s".format(correlationId, message)
}

loggingLayer ++ producerLayer
}

private def toProducerRecord(transaction: TransactionRaw): ProducerRecord[Long, String] =
new ProducerRecord("transactions.raw", transactionRaw.userId, transactionRaw.asJson.toString)
}

The core logic is represented with a value called a program, where we first create a ZStream from some hardcoded list of raw transactions, then we transform the elements of the stream to ProducerRecord in order to be able to send them with Kafka Producer. In order to use ZIO Kafka and add logging capabilities to our program, we need to construct a ZIO Layer that will enable us to use Kafka and logging features in our ZIO program. If you have ever been exposed to some kind of stream centric style of programming, this code should look familiar. Operators like mapM or runDrain may be confusing, so I will explain them right now. Unlike a standard map operator that transforms the elements of the stream using the supplied function, mapM will do the same, but you need to supply an effectful function.

For example, if the map requires the function: , mapM will require , where could be , , or some other effect type. Since our main method in the ZIO application must return a ZIO type, we need to transform ZStream into ZIO, and that is the purpose of the runDrain operator, which will run a stream only to execute specified effects. The emitted elements will be discarded. For more details check the source code.

We are approaching the final section of this article, where the main service — the processor, will be explained. Since the processor service is the largest in our system, we will not examine every piece of code, but I will try to go through as much as possible, focusing on the most important parts.

As we said, event-driven microservices are designed as infinite loops, which constantly poll messages from Kafka and react to them in some way. So the main component in our service is Pipeline, which is ZLayer, and which contains a run method that will be running indefinitely.

lazy val live: ZLayer[PipelineEnvironment, Nothing, Pipeline] =
ZLayer.fromFunction { env =>
new Service {
override def run(): IO[Throwable, Unit] =
(log.info("Starting processing pipeline") *>
Consumer
.subscribeAnd(Subscription.topics("transactions.raw"))
.plainStream(Serde.long, Serde.string)
.mapM { cr =>
val parsed = decode[TransactionRaw](cr.value)

parsed match {
case Right(transactionRaw) =>
Enrichment
.enrich(transactionRaw)
.map(toProducerRecord)
.flatMap(Producer.produce[Any, Long, String](_))
.as(cr)
case Left(error) =>
(log.info(s"Deserialization error $error")
*> ZIO.succeed(cr))
}
}
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapM(_.commit)
.runDrain)
.provide(env)
}
}

In this service, the whole logic is encapsulated. After Consumer is subscribed to the specified topic, it will return a ZStream based abstraction over Kafka Topic, where you can process the stream elements one by one. When an element is consumed, it will be parsed from a JSON string into a TransactionRaw case class, and after that, we will use the Enrichment service to communicate with the external API, in order to fetch the country details and make a TransactionEnriched event. If the message can’t be parsed from JSON, we will just log that as an error message, and pass that message in raw format downstream.

The next step is to review the Enrichment service:

lazy val live: ZLayer[
Logging with CountryCache with SttpClient, Nothing, Enrichment
] = ZLayer.fromFunction { env =>
new Service {
override def enrich(
transactionRaw: TransactionRaw
): IO[ProcessorError, TransactionEnriched] =
(for {
_ <- log.info("Getting country details from cache for")
country <- CountryCache.get(transactionRaw.country)
result <- country.fold(
fetchAndCacheCountryDetails(transactionRaw.country)
)(ZIO.succeed(_))
} yield toTransactionEnriched(
transactionRaw, result
)).provide(env)
}
}

And here are some helper methods for this service:

private def fetchAndCacheCountryDetails(countryName: String): ZIO[Logging with CountryCache with SttpClient, ProcessorError, Country]  =
for {
_ <- log.info(s"Cache miss. Fetching details from external API.")
country <- fetchCountryDetails(countryName)
_ <- CountryCache.put(country)
} yield country
private def fetchCountryDetails(
countryName: String
): ZIO[SttpClient, ProcessorError, Country] =
for {
req <- ZIO.succeed(
basicRequest.get(urlOf(countryName)).response(asJson[List[Country]])
)
res <- SttpClient.send(req).orElseFail(CountryApiUnreachable)
country <- res.body.fold(
_ => ZIO.fail(ResponseExtractionError),
res => ZIO.succeed(res.head)
)
} yield country

You may have noticed that the Enrichment service uses CountryCache, which is just a simple layer, that is responsible for caching HTTP responses from the external API.

For the full implementation, take a look at this repository.

Summary

Through this blog post, I’ve explained what ZIO is and how to use it in combination with ZIO Streams to write purely functional event-driven microservices. I hope that some major advantages of functional streaming libraries presented in this article are enough for you to be able to start introducing them into your projects.

Originally published at https://scalac.io on February 9, 2021.

--

--

Scalac
Scalac
Editor for

Scalac is a web & software development company with 122 people including Backend, Frontend, DevOps, Machine Learning, Data Engineers, QA’s and UX/UI designers