Communicating with kafka using akka actors

Avi Levi
Mar 8, 2017 · 6 min read

Let’s write simple example of communicating with Apache Kafka using Akka actors. Our application will demonstrate two actors that are playing Ping Pong with each other. If you are familiar with the startup activator template - minimal-akka-scala-seed, than yes , it is the same app with a little twist — the actors will NOT send messages to each other but they will communicate by placing and reading messages from kafak’s topics.

In order to make our life easier rather than using the official Apache Kafka Java Driver directly, I will use Cakesolutions scala-kafka-client library which gives nice api and control over kafka client. I will not get into much details but you can read all about it with a quick overview in their blog — getting started with scala-kafka-client

Full running example of this code can be found here or use this activator template.

Very short and unsatisfied intro to kafka

we will focus on the latter two.

Game Rules

After 3 messages the PingActor will terminate the game by sending himself PoisonPill and submitting “GameOver” message to PongActor.topic

Setup

libraryDependencies ++= Seq(
 “com.typesafe.akka” %% “akka-actor” % “2.3.11”
 ,”com.typesafe.akka” %% “akka-testkit” % “2.3.11” % “test”
 ,”org.scalatest” %% “scalatest” % “2.2.4” % “test”
 ,”net.cakesolutions” %% “scala-kafka-client” % “0.10.1.2”
 ,”net.cakesolutions” %% “scala-kafka-client-akka” % “0.10.1.2”
 ,”net.cakesolutions” %% “scala-kafka-client-testkit” % “0.10.1.2” % “test”
 ,”org.slf4j” % “log4j-over-slf4j” % “1.7.21” % “test”
, “com.typesafe.play” % “play-json_2.11” % “2.5.12”
)

Configuration:

val config: Config = ConfigFactory.parseString(
  s"""
     | bootstrap.servers = "localhost:9092",
     | group.id = "$randomString"
     | enable.auto.commit = false
     | auto.offset.reset = "earliest"
     | schedule.interval = 1 second
     | unconfirmed.timeout = 3 seconds
     | max.redeliveries = 3
      """.stripMargin
)

Testing

”net.cakesolutions” %% “scala-kafka-client-testkit” % “0.10.1.2” % “test”

scala-kafka-client provides nice test kit that includes embedded kafka-server , and more goodies .

val kafkaServer = new KafkaServer()

we need a producer to place messages on our queue. we will use KafkaProducer which is a thin, lightweight wrapper for the Kafka java driver .

val producer =
  KafkaProducer(KafkaProducer.Conf(new StringSerializer(), new JsonSerializer[PingPongMessage], bootstrapServers = s"localhost:${kafkaServer.kafkaPort}"))

(we will discuss some of this configuration later on)

and a little helper method to place messages on Kafka:

def submitMsg(times: Int, topic: String, msg: PingPongMessage) = {
  for(i <- 1 to times) {
    producer.send(KafkaProducerRecord(topic, randomString, msg))
    producer.flush()
  }
}

Requirement — Our PingActor should terminate after he gets 3 messages . Let’s write our first test :

"A Ping actor" must {
  "terminate after 3 messages" in {
    val pingActor = system.actorOf(PingActor.props(config), "PingTest")
    val tester = TestProbe()
    tester.watch(pingActor)
    submitMsg(3, PingActor.topics.head, PingPongMessage("PING"))
    tester.expectTerminated(pingActor, 10 seconds)
  }
}

Requirement — PingActor should place “PONG” message when he get a “PING” message and a “GameOver” message before it terminates. KafkaServer provides a consume method with built in assertion that we can use to verify that the correct number and type of messages are placed in queue :

"should place 2 Pong messages and GameOver on Pong topic message after 3 Ping messages" in {
  val pingActor = system.actorOf(PingActor.props(config), "PingTest")
  submitMsg(3, PingActor.topics.head, PingPongMessage("PING"))
  val tester = TestProbe()
  tester.watch(pingActor)
  tester.expectTerminated(pingActor, 10 seconds)
  val results: Seq[PingPongMessage] = kafkaServer.consume(PongActor.topics.head, 3, 2000, keyDeserializer, valueDeserializer).map(_._2)
  results.take(2) should contain theSameElementsAs Seq.fill(2)(PingPongMessage("PONG"))
  results.last shouldEqual PingPongMessage("GameOver")
}

we should also write tests for the PongActor but I don’t to make this post too long, so I’ll leave it for you to practice. Now that our tests are set we can move on for …

Coding:

case class PingPongMessage(text: String)

The PingPong message will be sent to and from Kafka and needs to be serialize and deserialize :

object PingPongProtocol{
 case class PingPongMessage(text: String)
 implicit val PingPongMsgFrmt: Format[PingPongMessage] = Json.format[PingPongMessage]
}class JsonDeserializer[A: Reads] extends Deserializer[A] { private val stringDeserializer = new StringDeserializer override def configure(configs: util.Map[String, _], isKey: Boolean) =
 stringDeserializer.configure(configs, isKey) override def deserialize(topic: String, data: Array[Byte]) =
 Json.parse(stringDeserializer.deserialize(topic, data)).as[A] override def close() = stringDeserializer.close()
}class JsonSerializer[A: Writes] extends Serializer[A] { private val stringSerializer = new StringSerializer
 
override def configure(configs: util.Map[String, _], isKey: Boolean) = stringSerializer.configure(configs, isKey)
 
override def serialize(topic: String, data: A) =
 stringSerializer.serialize(topic, Json.stringify(Json.toJson(data))) override def close() = stringSerializer.close()}

Kafka producer /consumer actor.

val kafkaConsumerActor = context.actorOf(
  KafkaConsumerActor.props(config,new StringDeserializer(), new JsonDeserializer[PingPongMessage], self),
  "ConsumerActor"))val kafkaProducerConf = KafkaProducer.Conf(
  bootstrapServers = config.getString("bootstrap.servers"),
  keySerializer = new StringSerializer(),
  valueSerializer = new JsonSerializer[PingPongMessage])

val kafkaProducerActor = context.actorOf(KafkaProducerActor.props( kafkaProducerConf))

The Consumer actor accepts as a parameter an ActorRef as the downstream actor which means it will pipe all messages from Kafka to that actor. In this case, I am using the parent actor itself hence I deliver self as the downstream ActorRef we will use those underlying actors for consuming and submitting messages to kafka.

For pattern matching convenience let’s provide an extractor that will make pattern matching in our receive method easier. We will use ConsumerRecods class which represents a batch of key-value records consumed from Kafka (scala-kafka-client docs) with a nice extractor method for pattetrn matching

val msgExtractor = ConsumerRecords.extractor[java.lang.String, PingPongMessage]receive: Receive = {
  case msgExtractor(consumerRecords) =>
    consumerRecords.pairs.foreach {
      case (_, pongMessage) =>

now that we have our Consumer and Producer actors set we can send them subscribe and produce(submit) messages.

kafkaConsumerActor ! Subscribe.AutoPartition(List("ping"))kafkaProducerActor ! ProducerRecords(List(KafkaProducerRecord("pong", randomString(3), msg)

and that’s it. now we can play PingPong

trait PingPongConsumer extends KafkaConfig{
  this: Actor =>

  //for pattern matching in our receive method
  val msgExtractor = ConsumerRecords.extractor[java.lang.String, PingPongMessage]

  val kafkaConsumerActor = context.actorOf(
    KafkaConsumerActor.props(config,new StringDeserializer(), new JsonDeserializer[PingPongMessage], self),
    "PingKafkaConsumerActor")

  def subscribe(topics: List[String]) =
     kafkaConsumerActor ! Subscribe.AutoPartition(topics)
}

trait PingPongProducer  extends KafkaConfig{
  this: Actor =>

  val kafkaProducerConf = KafkaProducer.Conf(
    bootstrapServers = config.getString("bootstrap.servers"),
    keySerializer = new StringSerializer(),
    valueSerializer = new JsonSerializer[PingPongMessage])

  val kafkaProducerActor = context.actorOf(KafkaProducerActor.props( kafkaProducerConf))

  def submitMsg(topics: List[String], msg: PingPongMessage) = {
    topics.foreach(topic => kafkaProducerActor ! ProducerRecords(List(KafkaProducerRecord(topic, randomString(3), msg))))
  }
}class PongActor(val config: Config) extends Actor
 with ActorLogging with PingPongConsumer with PingPongProducer{
 import PingPongProtocol._
 import PongActor._ override def postStop() = {
 kafkaConsumerActor ! Unsubscribe
 super.postStop()
 } override def preStart(): Unit = {
 super.preStart()
 subscribe(topics)
 } def receive = {
 case Start =>
 log.info(“In PongActor — received start message — let the games begin”)
 submitMsg(PingActor.topics,PingPongMessage(“ping”))
 context.become(playingPingPong)
 } def playingPingPong: Receive ={
 case msgExtractor(consumerRecords) =>
 consumerRecords.pairs.foreach {
 case (_, PingPongMessage(“GameOver”)) =>
 kafkaConsumerActor ! Confirm(consumerRecords.offsets)
 log.debug(s”Bye Bye ${self.path.name})
 self ! PoisonPill case (None, msg) =>
 log.error(s”Received unkeyed submit sample command: $msg)
 
 case (Some(id), pongMessage) =>
 submitMsg(PingActor.topics, PingPongMessage(“ping”))
 kafkaConsumerActor ! Confirm(consumerRecords.offsets)
 log.info(s”In PongActor — id:$id, msg: $pongMessage, offsets ${consumerRecords.offsets})
 } case unknown =>
 log.error(s”PongActor got Unknown message: $unknown)
 }
}object PongActor {
 def props(config: Config) = Props(new PongActor(config))
 val topics = List(“pong”)
 case object Start
}

As I wrote -full running example can we found here or use this activator template

Hope you enjoyed

Your inputs are always welcome

Cheers

Avi

Useful links :

Source code: https://github.com/123avi/kafka-akka-example

Getting started with scala-kafka-client by cakesolutions

Quick start to kafka

Kafka monitoring — nice article by DataDogs with nice into to kafka

You are always welcome to contact me directly via Linkedin or email me

dive-in-scala

My journey to scala , akka , functional programming world and some other goodies

Avi Levi

Written by

Avi Levi

Tech Lead Architect, Coder

dive-in-scala

My journey to scala , akka , functional programming world and some other goodies