typebus: Anatomy of a typebus Service (code in scala+akka)

Corey Auger
7 min readJan 13, 2019

PART 2

In the last post we gave an introduction to typebus and the types of problems that it addresses. I went through an example design process and how it maps directly to the tyepbus service generator.

review: typebus

typebus is my mildly opinionated toolkit for building reactive microservices, abstracting away a publish/subscribe bus implementation. It provides a consistent way to create and advertise service definitions. These services in turn own their API and provide guarantees about how their API may evolve. If we think of the real world as a distributed system, then typebus is a set of stories or beliefs that each person holds so that we can highly organize our society (eg: the story of money). typebus aims to provide solutions to:

  • Faster to develop: providing templates that map to Domain Driven Design (DDD)
  • Better isolation and API ownership
  • Simpler gateway service design and public API implementations
  • Guarantees around schema evolution
  • Flexible deployment and scale options
  • Open and composable communication channel.
  • Tooling for support, visualization and debugging services

Anatomy of a typebus Service

Command and Events Layer (Data Transfer Objects or DTO )

typebus created an example Data Transfer Object file. The example contains some data for a fake Book Store. This is located in the data package object. It is this file where we would define all of the Command and Event types that we want to make part of our public API. We should be able to take the work that we did from the above Domain Driven Design and transfer those into case classes. Let’s take a closer look at the placeholder code that typebus generated for us.

Taking a closer look we can see a simple Command object for an example Book Store.

final case class GetLibrary(filter: String) extends BaseType

This is a simple Query request that would come from someone trying to use our service. This DTO therefore is a query command that we want to expose through our API. Notice that below we have a section that declares implicit serializers:

object Implicits extends AvroByteStreams{ …

Note that by default typebus gives you avro byte serialization out of the box. However it is simple to provide other forms of serializaton. To override serialization you would simply need to define implementations for ByteStreamReader[T] and ByteStreamWriter[T]. For more information on this consult the typebus documentation.

Inside the implicit object we see one of the more important workhorse function in typebus.

Typebus.declareType[..]

Here is where we declare our intention to add this type to our service definition and expose it as API. typebus does a number of things through this interface, including:

  • Add the type and schema to a service definition
  • Expose this type to code gen for services that want to use the API
  • Guarantee that this type will abide by a schema evolution policy that will NOT break the API contract.
  • This is done by way of a macro that will throw a compiler error if you break your schema.

Schema Evolution

When you expose types you’re telling the world how to use the interface through our API. If we then go and change this types in a undefined manor, we break all the services that use us. This is a major source of woe in the distributed system world. typebus forces you to be aware that you are breaking your API by not compiling.

How can you evolve you schema without breaking your API contract? Simple. You can only add new fields that declare a default value and you can only remove fields that contained a default value. All other changes on a type are invalid.

So what do you do if you need to change your API beyond these restrictions. Again this is simple. You create a new type.

Let’s take a look at this in action. Starting with our toy example I first compile it using sbt.

Next let’s say we want to make changes to our Author type:

final case class Author(name: String, twitter: Option[String] = None) extends BaseType

For example we will just try to add a phoneNumber field to it. So it now looks like this:

final case class Author(name: String, twitter: Option[String] = None, phoneNumber: String) extends BaseType

Compiling the program again we get the following.

loan-service/src/main/scala/com/tally/loanservice/data/data.scala:24:49:

[error] Schema evolution Failed !!

[error] You have added a new field to a typebus type that does not have a default value.

[error] Type failed:

[error] PropScope(/io.surfkit.loanservice.data.Library.books/scala.collection.Seq/io.surfkit.loanservice.data.Book.author/io.surfkit.loanservice.data.Author.phoneNumber:java.lang.String,false)

The compiler error indicates that we broke our schema and that if we had production services that relied on this one, we just broke them as well. So one way to fix this is to make it have a default value. Let’s first say that phoneNumber: String = “”.

Hooray, things compile again. But maybe we could have done better about being explicit that there might not be a phone number in our DTO. How about this?

phoneNumber: Option[String] = None.

Again this compiles and we are looking good.

Ok what if we wanted to remove name: String

loan-service/src/main/scala/com/tally/loanservice/data/data.scala:24:49:

[error] Schema evolution Failed !!

[error] You have removed a field from your schema that did not contain a default value

[error] Type failed:

[error] PropScope(/io.surfkit.loanservice.data.Library.books/scala.collection.Seq/io.surfkit.loanservice.data.Book.author/io.surfkit.loanservice.data.Author.name:java.lang.String,false)

Nope this again breaks the schema. So let’s put that back in.. but what if we remove the twitter handle that had a default: twitter: Option[String] = None.

Yep back into a program that compiles.

So what about before your project is in production and you are still hacking to figure out your DTO schema? How do you get around this compile time checking? The answer is that typebus is storing the schema in a binary file inside your resources/typebus/ folder. If you delete the file that corresponds to your type then you will generate a fresh evolution file. At this point you could say: what is stopping someone from just deleting the schema and having it compile before pushing it in to production and blowing things up. The answer is nothing. However this is VERY explicitly now, compared to the countless distracted ways it could have been done in the past.

Lastly what if we really need to have authors with addresses now and these addresses must be there because we are using some geolocation feature in another service. Again if you have this service already running in production (and even if you THINK no other service is still using the current Author type). It’s is still time to declare a NEW type. Think of it this way, if you missed the mark that badly on your Domain Driven Design, perhaps it is even time for an entirely new service?

The Service Layer

This file is the entry point for your service. Here is where you expose your service definition to the world. Service definitions are simple functions that define a way of handling a DTO and produce a Future[DTO]. Let’s look at the toy example that the template provides.

The first thing you see that happens in your code is that you are starting your UserLoansActor sharded actor. This will register with the shard coordinator the type of actor as well as how to route messages to these actors. You can then route messages to a shard using the userLoanActorRegion. An example will be provided in the coming section.

Next you’ll see a declaration to import some implicits

import io.surfkit.loanservice.data.Implicits._

This imports serialization for you DTO objects that we covered in the first section “Command and Event layer”.

Next you can get into the real work of the service. You’ll see an example function called:

def getLibrary(getLibrary: GetLibrary, meta: EventMeta): Future[Library]

Also at the bottom we can see a corresponding:

registerStream(getLibrary _)

These functions make up your service definitions (Your API that you expose). Each type (GetLibrary in the above) is a type that gets pushed onto your database. This in turn registers consumer to a kafka stream with the topic name being the fully qualified namespace of your DTO type io.surfkit.loanservice.data.GetLibrary. Additionally you will register a producer for the type you create after doing some work Future[Library]. This in turn will produce a record to a kafka topic corresponding to the fully qualified name io.surfkit.loanservice.data.Library.

Defining your service is as simple as coming up with new DTO objects and registering the functions that transform those types (Commands/Queries to Events).

The other argument to your service function is meta: EventMeta. This contains routing information and metadata on the type that is moving through the bus. Often this header is useful for performing more advanced routing patterns.

Note that you can write and register your service method with or without the meta argument signature. That is the following are both legal service functions you can register:

def getLibrary(getLibrary: GetLibrary, meta: EventMeta): Future[Library]

def getLibrary(getLibrary: GetLibrary): Future[Library]

Sharded Actor Layer

As a quick review our Sharded Actor was identified by our Aggregate Root during the design phase. The typebus template wired in the necessary boilerplate and gave us a UserLoansActor sharded actor.

This actor is setup to perform the write side of CQRS. I will dive deeper into CQRS in another blog post. For this post I will simply provide an example of how one might route messages to and from this sharded actor.

In keeping with our Loans example let’s add a command that adds some interest to the interest ledger. This command will be structured around some user identifier and then some ledger identifier. Here is an example command:

This would produce an Event now that looks like the following

Finally we can declare the type and avro byte serialization. The data file then would look like this:

Inside our service definition now we can add a method that looks like this:

The only thing left to do is to route this to our sharded actor. This actor would generally update some event store and return the InterstAddedToUserLoanLedger event. Here is how this might look.

The logic to actually record and adjust ledgers would be contained inside the UserLoansActor. We will address ways of dealing with state and event stores in a follow up post to this one. It is important to point out that we have gained strong consistency as well as the ability to scale out this service and have UserLoansActor be distributed over our cluster nodes.

What’s Next?

In the next post we will explore how to compose our services and get them talking to one another. This will also demonstrate how to expose gateway services and public APIs. The demonstration will include both REST and a more modern websocket approach.

PART 3: Service Composition and Gateway Services

References

Lightbend Reactive Architecture: https://www.lightbend.com/learn/lightbend-reactive-architecture

--

--