Hunting the fleeting flow …

How to create real-time metrics monitoring system?

Kian Jalali
Sanjagh
15 min readMay 12, 2020

--

This is a story about an experience on integrating (Akka-Stream, Alpakka-Kafka, Kafka, Aplakka-Slick, TimescaleDB, Grafana) — Part 2/2

Mongolian Eagle Hunters — by Vitya_maly

In the previous part, we answered an important question that was “Why we choose these tools & Why we choose to stream?”For reading about the previous part, just see “Fledge over the flow …”.

And now we try to answer “How to create a real-time metrics monitoring system? by integrating Akka-Stream, Alpakka-Kafka, Apache-Kafka, Alpakka-Slick, TimescaleDB, Grafana …”

Many metrics are published by each service every day at sanjagh.pro that we need to analyze & measure them. In some cases, we are forced to have real-time graphs because some of these metrics are vital to be analyzed and on the other hand, we have data which it’s enough for us to just store them for future queries. But the main point is that we need a mechanism to store these metrics and as I mentioned in the previous section we chose TimescaleDB (Because our metrics are based on time and all of them have a “time” field that is typed with Unix-Epoch milliseconds).

//In Scala we calculate Epoch-Milliseconds as below:
val millis: Long = System.currentTimeMillis()

Note that: here we had a challenge that we’ll mention in another part! (the challenge of choosing Epoch time)

For example, we have this data model as a metric that represents one simple message in our chat application:

sealed trait Event { def eventTime: Long }case class NewMessageEvent(eventTime: Long, messageType: MessageType) extends Event//...MessageType...object MessageType extends Enumeration {
type MessageType = Value
val TEXT = Value("Text")
val LOCATION = Value("Location")
val IMAGE = Value("Image")
val AUDIO = Value("Audio")
val VIDEO = Value("Video")
}

So from now on for each sent message in our chat service, we should publish one of these objects. but where? but how?

Note that:

metrics should contain just measurable attributes (or attributes that may affect our filters and groupBy) which are useful to be measured, not everything. For example here in “NewMessageEvent” we do not need message content but in some cases maybe message length is useful.

Note that:

It is not our message aggregator service job to store message content because we could not measure anything from that.

So in this step of answering our questions (Where/How), we need the simplest architect and a todo list to design our pipeline. Here you are:

And now we need a TODO list for moving step by step:

1- How to design an event publisher client?

  • Which Kafka client should we use?
  • Which Serialization pattern should we use?
  • How many publishers (producers) do we need?
  • How to design our publisher clients?

2- How to design our Kafka cluster?

  • How to design our Kafka topics?
  • How to manage our topic partitions?
  • How many brokers do we need?
  • Which retention policies are for your case?
  • Which compaction policies are good for your case?

3- How to design the event aggregator service?

  • Which toolkit should we choose to connect Kafka?
  • How to design Kafka subscribers?
  • Which toolkit should we choose to connect Slick?
  • What process do we apply to our events?

4- How to design our timescale database?

  • How to design our Hypertables?
  • How much time interval should be configured for each table to create its partitions?
  • Which retention policy strategy is good for your case?

These are the most important questions which should be answered at first. but we don’t answer all of these questions, because some of them have too many answers for being mentioned here…

Let us speak about part 1 and 3 today and leave other parts to you because there are enough posts and articles about those two. Design event-publisher-client:

Step-1: How to design event-publisher:

So as you know from the previous part of this post, we try to follow the reactive manifesto and we prefer to use reactive toolkits such as Akka-Stream. As our Kafka client, we decided to use Alpakka-Kafka.

Many ways you can implement how to produce your events into Apache Kafka. As the simplest way you can add Alpakka library dependency to each project that you want to produce events by them; this may take some effort from you. Imagine that you have 20 services that each must produce their events independently, so if you want to use pure Alpakka maybe you should write many boilerplate codes or copy/paste some codes over your services and imagine that you have some common events you must publish from all of your services, so if you want to modify some event model that is common in some projects you should change many different services all time.

To fix this problem we start to create a library (‘serviceName’-publisher) for each service and one for all common events. you can do this in scala as SBT-Multi-Projects feature and add publisher clients as subprojects into a service you want and after that, you can publish your library as a private artifact in a private repository like maven, artifactory, GitLab, …

Let see an example of how to do this:

Project structure

lazy val commonPublisher = project
.settings(commonSettings: _*)
.settings(
name := "common-event-publisher",
version := "1.0.1",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaLibVersion
"com.typesafe.akka" %% "akka-stream" % akkaLibVersion
"com.typesafe.akka" %% "akka-http" % akkaHttpLibVersion
"com.typesafe.akka" %% "akka-http-spray-json" % "*.*.*"
)
)
//**********************************************************
lazy val eventAggregator = project
.settings(commonSettings: _*)
.settings(
version := "1.0.1",
name := "event-aggregator",
libraryDependencies ++= Seq(
"ch.qos.logback" % "logback-classic" % "1.2.3",
"org.scala-lang.modules" %% "scala-parser-combinators" % "**",
"com.typesafe.akka" %% "akka-actor" % akkaLibVersion,
"com.typesafe.akka" %% "akka-stream" % akkaLibVersion,
"com.typesafe.akka" %% "akka-http" % akkaHttpLibVersion,
"com.typesafe.akka" %% "akka-http-spray-json" % "x.x.x",
"com.typesafe.akka" %% "akka-slf4j" % akkaLibVersion,
"net.codingwell" %% "scala-guice" % "4.2.5",
"com.typesafe.akka" %% "akka-testkit" % "*.*.*" % "test",
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"com.typesafe.slick" %% "slick" % "3.3.1",
"org.slf4j" % "slf4j-nop" % "1.7.26",
"com.typesafe.slick" %% "slick-hikaricp" % "3.3.1",
"org.postgresql" % "postgresql" % "42.2.6",
),
)
.dependsOn(commonPublisher)
//**********************************************************
lazy val root = (project in file("."))
.aggregate(eventAggregator, commonPublisher)
.settings(
)

As you see we have created “common-event-publisher” as a module inside of our event-aggregator service but we can use it all around of our cluster as a library dependency. like this:

libraryDependency += domain.org %% "common-event-publisher" % "1.0.1"

Step 2: How to Implement event-publisher-client:

First, we had to determine our Events so we create a model. Ex:

Event

sealed trait Event { def eventTime: Long }case class NewMessageEvent(eventTime: Long, messageType: MessageType) extends Event//...MessageType...object MessageType extends Enumeration {
type MessageType = Value
val TEXT = Value("Text")
val CARD = Value("Card")
val LOCATION = Value("Location")
val IMAGE = Value("Image")
val AUDIO = Value("Audio")
val VIDEO = Value("Video")
}

This model should be accessible to all modules that include our “common-event-publisher” dependency. Then, we need simple function calls to serialize and to deserialize our models. we choose Spray-JSON, (it’s good to take a look at Avro and Protocol-Buffer, in your case choose more fittable object serialization combinator) so at the second step we write simple functions for our Events:

Serialization

import spray.json.DefaultJsonProtocol._
import spray.json._
object ImplicitJsonParsersCommonEventPublisher {private
implicit val formatter: RootJsonFormat[NewMessageEvent] =
jsonFormat2(NewMessageEvent.apply)
//for enums you should go diffrent way. it`s not our case...def parse2Event[T](msg: String)(
implicit jsonReader: JsonReader[T]
): T = {
msg.parseJson.convertTo[T]
}
def pars2Json[T](obj: T)
(implicit writer: JsonWriter[T]): JsValue = {
obj.toJson
}
}

Note that: here we had a challenge that we’ll mention in the other part! (the challenge of parsing hierarchical Events)

*Note: It’s very important to “name” your class and objects perfectly because very soon you had many different kinds of publisher and parser and etc. that make you confused. Sometimes duplicate names may occur some compile errors. so pay attention to “names”! (for example CommonEventPublisher has its own json parser and ChatEventPublihser has its own json parser — they are in different places but maybe both of them include in a project at same time!)

Third: we should write our general event producer class:

Producer

import event.publisher.ImplicitJsonParsersCommonEventPublisher._class CommonEventPublisher()(implicit val system: ActorSystem) {
.
.
.
private val ref: ActorRef = {
Source
.actorRef(1000, OverflowStrategy.dropNew)
.to(Producer.plainSink(producerSettings))
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.run
}

private def produce[T <: Event](topic: String, event: T)(
implicit writer: JsonWriter[T]
): Unit = {
val record: ProducerRecord[Nothing, String] =
new ProducerRecord(topic, pars2Json(event).toString)
private def produce[T <: Event](topic: String, event: T)(
implicit writer: JsonWriter[T]
): Unit = {
val record: ProducerRecord[Nothing, String] =
new ProducerRecord(topic, pars2Json(event).toString)
ref ! rec
}
def newMessage(
eventTime: Long,
messageType: Int
): Unit = {
produce[NewMessageEvent](
"message-topic",
NewMessageEvent(
eventTime,
messageType
)
)
}
.
.
.
}

Here we are our client is ready. To use it we just need it… need to make an instance from CommonEventPublisher class (It’s better to take a singleton object and inject to the target class that wants to publish events) like this:

val commonEventPublisher = new CommonEventPublisher()commonEventPublisher.newMessage(123L, 1)
commonEventPublisher.newMessage(123L, 2)
...

This simple sync function call sends new “message-events” to the “message-topic” topic. but I had to tell you about some issues in our CommonEventPublisher class…

1- Some types of producers you can find here. but why we choose this way?(Source.actorRef): 1- if you had a huge bulk of facts that should be produced for Kafka? Kafka, it’s good to use a buffer [ but be careful about your buffer size. Because if the buffer size is too big the more data inconsistency may affect you or memory overflow may happen for you]

2- In our case, consistency does not matter that much. fire & forget pattern is much faster than other kinds of producers that return acknowledge to us for each produce.

point: if consistency and also returning a type instead of “Unit” matter to you, choose another producer which is suggested in the Alpakka document.

2- It’s necessary to handle errors while you’re producing stream floats. and I suggest Deciders. take a look at this Ex:

private val decider: Supervision.Decider = {
case e: spray.json.JsonParser.ParsingException =>
println(e.getStackTrace.toString)
Supervision.Resume

case e: spray.json.SerializationException =>
println(e.getStackTrace.toString)
Supervision.Resume
}
//and use decider in your producer
//.withAttributes(ActorAttributes.supervisionStrategy(decider))

3- produce method get an event (event should be a subtype of Event treat), produce and the ref should be private because the purpose of our class is to publish events into Kafka by just a simple function call. (note that if your produce message return something you calling async methods so if it’s possible to use Future or other IO/Monads)

In the end, you can write another publisher for other services. don’t forget that if your publisher does not expose to other services your views/models should be exposed to others because we will use them on the consumer side which all gather in our metric aggregator service.

Design event-aggregator-service:

When we talk about the Streaming programming paradigm, we are talking about networks of “black box” processes, which exchange data across predefined connections by message passing.

So it is hard to explain layers of our application step by step like previous parts. we should explain different black box scope of our codes that will make sense in the future(when we compose them together to build our stream). This is more difficult because we try to write general methods that use Type-Parameters and some functional style. but I try to explain them as easy I can:

1- consume from Kafka

2- process facts(records)

3- insert into the timescale.

and all these parts should execute in a stream…

Let me first design our repository layer so I could introduce it as an isolated part from other parts of our stream. (using Alpakka-Slick)

Entity

trait EntityModel {  def eventTime: Long  }

case class
NewMessageEntity(
eventTime: Long,
messageType: String
) extends EntityModel
/*Note that NewMessageEvent is our view and this(NewMessageEntity) is our Entity*/

So here you are the first step is to design our data access layer. As you see for the first step we design our Entity layer and here is an example that is clear enough to explain.

Schema

@Singleton
class NewMessageSchema @Inject()(val configRepo: ConfigRepo)
extends BaseTable[NewMessageEntity] {

import profile.api._

override def tableQuery: TableQuery[TableDef] = TableQuery[TableDef]
override val schemaName: String = "NewMessage"

class TableDef(tag: Tag) extends Table[NewMessageEntity](tag, schemaName) with TableModel {

override def eventTime: Rep[Long] =
column[Long]("eventTime")
def messageType: Rep[String] = column[String]("messageType")

def * : ProvenShape[NewMessageEntity] =
(
eventTime,
messageType
) <> (NewMessageEntity.tupled, NewMessageEntity.unapply)
}
}
//...TableModel...
import slick.lifted.Rep
trait TableModel { def eventTime: Rep[Long] }
//...BaseTable...trait BaseTable[A <: EntityModel] {
val profile: JdbcProfile = PostgresProfile
val configRepo: ConfigRepo
val
schemaName: String
import profile.api._
type TableDef <: Table[A] with TableModel
def
tableQuery: TableQuery[TableDef]
}

The second step is to design the schema layer. Here we need Slick knowledge. The important point of this example for others is our table schema explained in the “TableDef” class that is a simple table and the SQL types are :(“eventTime”: bigint, “messageType”: varchar).

Also, I should say each class extends “BaseTable[A]” (A is a type parameter that should replace with Event and Entity or any class extends EntityModel) and force that class to implement 1- configRepo (it’s our config file repository) 2- TableDef that must implement as a class 3- schemaName that is a String type value we use it many times in the future. (is equal to the table name) 4- tableQuery — and gave the target class some abilities such as PostgresProfile.api…

About tableQuery:

/** Represents a database table. Profiles add extension methods to TableQuery
* for operations that can be performed on tables but not on arbitrary
* queries, e.g. getting the table DDL. */

Repo

@Singleton
class NewMessageRepo @Inject()(
val baseTable: NewMessageSchema,
val dbDefProvider: DatabaseDefinitionProvider
)(
implicit val ec: ExecutionContext
) extends BaseRepo[NewMessageEvent, NewMessageEntity] {

override def toEntity(e: NewMessageEvent): Future[NewMessageEntity] = {
Future.successful(
NewMessageEntity(
e.eventTime,
e.messageType.toString
)
)
}

}
//...‌Repo...
trait Repo {
def createSchemaIfNotExists(): Future[Unit]
def schemaName: String
}
//...‌BaseRepo...
trait BaseRepo[E, A <: EntityModel] extends Repo {
val baseTable: BaseTable[A]
import baseTable.profile.api._

protected val tableQuery
: baseTable.profile.api.TableQuery[baseTable.TableDef] =
baseTable.tableQuery

protected val dbDefProvider: DatabaseDefinitionProvider
protected val db: PostgresProfile.backend.Database = dbDefProvider.dbDef

override def
schemaName: String = baseTable.schemaName

def dBIOInsert(entity: A): DBIO[Int] = {
tableQuery += entity
}

def dBIOInsert(event: E): Future[DBIO[Int]] = {
toEntity(event).map(e => dBIOInsert(e))
}

protected def toEntity(e: E): Future[A]
override def createSchemaIfNotExists(): Future[Unit] = {
db.run(tableQuery.schema.createIfNotExists)
}
}

The third step is to design our Repo. As we’ve spoken before NewMessageRepo extends BaseRepo.. but another point is NewMessageRepo override toEntity method that returns Future[NewMessageEntity] and it seems that is not necessary to return Future. and doesn’t need to return any Future but in our suggested design, the only way to calculate some of our entity attributes is to make some change in this toEntity method.

for example, in calculating message receive time (if we need it someday) we should ask from external sources which may force us to return Future or other IO/Monads. Another important method in this NewMessageRepo that is implemented implicitly is dBIOInsert methods& these methods are concrete methods implement in the BaseRepo trait.

Each class extends BaseRepo must implement 2 abstract variables (baseTable/dbDefProvider) and implicitly own dBIOInsert() /createSchemaIfNotExists methods()/db() variables. from now each class that takes an instance from NewMessageRepo could insert NewMessageEvent to NewMessageTable with a simple function call…

Until here everything we do is a normal application development process and from here we want to dive into the streaming world…

Consumers

@Singleton
class ConsumerProvider @Inject()(
configRepo: ConfigRepo //configs instance
)(
implicit val system: ActorSystem,
implicit val ec: ExecutionContext
){
.
.
.
private implicit val session: SlickSession =
SlickSession.forConfig(configRepo.dbProfile)
//***Slick Session***private val consumerSettings: ConsumerSettings[String, String] =
ConsumerSettings(
system.settings.config.getConfig("akka.kafka.consumer"),
new StringDeserializer,
new StringDeserializer
).withGroupId("common-group-id")
//***Consumer Settings***def sourceCreator(
topic: String,
consumerSetting: Option[ConsumerSettings[String, String]] = None
)
:Source[ConsumerMessage.CommittableMessage[String, String], Control] =
Consumer
.committableSource(
consumerSetting.getOrElse(consumerSettings()),
Subscriptions.topics(topic)
)
//***Source Creator***def control(
source: Source[
ConsumerMessage.CommittableMessage[String, String],
Control
],
f:ConsumerMessage.CommittableMessage[String, String] => Future[DBIO[Int]]
): DrainingControl[immutable.Seq[Int]] = {
source
.via(
Flow[ConsumerMessage.CommittableMessage[String, String]]
.mapAsync(3) {
message =>
f(message) flatMap { dBIO =>
session.db.run(dBIO)
}
}
)
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.toMat(Sink.seq)(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()
}
//***Control***
}
  • The session has db.run() method. it’s for Alpakka-Slick.
  • The consumerSettings() get Kafka consumer properties and these properties must be written in config like files and the pattern to choose Deserialization pattern which would be String/Array of bytes/etc for Deserialize Key/Value and with groupId that is important for consuming independent with different offset. if you don’t know about these properties read about them here.
  • The sourceCreator(): our source write as a black box method because of the control() method. control should be capable of using other sources and each source should be capable of following each topic that they want, including each setting they need. so in the call() method, we give the source as a function parameter. so we need to create a source creator method that is able to create a basic source that we want. this method gives settings and topic-name and returns a source — this method writes based on committableSource this is one of many types of sources you can create for consuming from Kafka. *(here source creator has an optional setting parameter that means that if we don’t pass any parameter to it, it has a default value (consumerSettings()))
  • The control(): control get a source of committable messages and a function that gets a message and return Future[DBIO[Int]] this function is an insert() function that gets a String message and process it and insert it to Timescale.

In Scala when we have “f: A => B” pattern in any function parameter its means when you call ”f” you should pass an A-type value to it and it will return B-type value to you. more about call by name parameter in Scala.

So in the control body, we get the flow of committable messages and run an async map with parallelism degree 3 on them and pass the emitted message to our f method and execute our DBIO query with db.run(). the control function has a lot of points and issues that take us a lot of time to describe them. it’s better to read Akka stream documents to understand better what we did in this method.

See more about inserting with Alpakka-Slick here.

Note that: here we had a challenge that we mention it at the next part ASP! (the challenge of using db.stream() function insted of run())

Aggregators

trait EventAggregator {
def insertEvent(
msg: ConsumerMessage.CommittableMessage[String, String]
): Future[DBIO[Int]]

val topicName: String
}
.
.
.
@Singleton
class NewMessageEventAggregator @Inject()(
newMessageRepo: NewMessageRepo,
kafkaTopicConfig: KafkaTopicConfig
) extends EventAggregator {
override val topicName: String = kafkaTopicConfig.messageTopic

override def insertEvent(
msg: ConsumerMessage.CommittableMessage[String, String]
): Future[DBIO[Int]] = {
newMessageRepo.dBIOInsert(
parse2Event[NewMessageEvent](msg.record.value)
)
}
}

Aggregators are responsible for override an insert function for each repo class that emitted the dBIOInsert() and toEntity() function and each Entity has pars2Event() method the insert method should take a committable message and parse it to an event and pass the event to repo dBIOInsert().

So each aggregator class extends EventAggregator trait must implement the insertEvent() method and a topic-name that we talk about it, in the next part

Subscribers Supervisor

Let’s solve the puzzle by composing each black box piece we talked about :

Here we go. We know that each event aggregator has an insert method and a topic name and we now that consumer provider has a control method that executes the stream by getting topic-name and an insert function and now we have a list of event aggregators that each of them need to run control for itself and we act like this:

class SubscribersSupervisorActor(
kafkaTopicConfig: KafkaTopicConfig,
eventAggregators: List[EventAggregator],
consumerProvider: ConsumerProvider
)(implicit val materializer: ActorMaterializer)
extends Actor
with ActorLogging
{

override def receive: Receive = {
case SubscribersSupervisorActor.Start =>
eventAggregators foreach { agg =>
consumerProvider.control(
consumerProvider.sourceCreator(agg.topicName),
agg.insertEvent
)
}
}
}

object SubscribersSupervisorActor {
sealed trait SubscribersSupervisorMessages

case object
Start extends SubscribersSupervisorMessages
}

The important thing is that I choose to execute all of my consumers on an actor because in the future I could easily handle the failures in an actor with actor supervisor strategies. good to delegate some jobs to another thread because of the system resilience principles we read in the reactive manifesto.

Run

val newMessageEventAggregator = new NewMessageEventAggregator(...)val eventAggregators = List(
xEventEventAggregator,
yEventEventAggregator,
.
.
newMessageEventAggregator
)
val ref = Props(
new SubscribersSupervisorActor(
kafkaTopicConfig,
eventAggregators,
consumerProvider
)
ref ! SubscribersSupervisorActor.Start

In the end, we are able to run the whole stream as easy as sending a Start object to our SubscribersSupervisorActor and our stream executes well.

From now all of your data will persist into the timescale and for monitoring them you just need to set up visualization tools like Grafana and run a query on a timescale and see how Grafana monitors your real-time time-series data.

for example, we would run a query like this:

SELECT
time_bucket('1d',to_timestamp("eventTime" / 1000 )) AS "time",
"messageType" AS metric,
count(1) AS "value"
FROM "NewMessage"
WHERE
to_timestamp("eventTime" / 1000 )
BETWEEN $__timeFrom() and $__timeTo()
GROUP BY 1,2
ORDER BY 1,2

and Grafana shows us something like this:

produced by Grafana with mocked data

and each time you refresh your query you could see the number of texts and … text and images are produced in a day (because in the query we gave time_bucket ‘1d’ . it means that chunk our data to days and if you need you could see more here, somethings like ‘1h’).

Conclusion

This blog post was a little demonstration of a way to create a real-time monitoring system using Akka-Stream, Alpakka-Kafka, Kafka, Aplakka-Slick, TimescaleDB, Grafana which are powerful tools for streaming cases . to how to do this job and do it well in your case you should read the specific documents of each of these tools. The goal of sharing this post is to share an experience.

In the next parts of this blog, we talk about some exciting challenges we had:

1- The challenge of choosing Epoch time

2- The challenge of parsing hierarchical Events

3- The challenge of using SlickSession.db.stream()

4- The challenge of ordering events in Kafka

I hope you enjoy this post. if you enjoy it please share the post and if you had a question or found an issue please give feedback to me by leaving comments.

Tnx!

--

--

Kian Jalali
Sanjagh
Writer for

In type we trust! — A Scala fan & Data Engineer at @Smartech_ir