Writing A Simple Distributed Pub/Sub Server With The Typelevel Toolkit and Http4s Ember Server
This tutorial is for those familiar with Scala looking for an introduction to the Typelevel stack.
This will not be a full tutorial on each library used here but more of a quick start guide for some libraries in the Typelevel stack such as http4s, fs2, cats effect, decline and munit-cats-effect. In this tutorial we’re going to create a small REST server that enables a client to subscribe and publish to a queue in a distributed manner. We’re then going to write a test to showcase the the functionality. Our server is going to have four endpoints:
- /ping — this endpoint is for testing whether the server is running.
- /createQueue/$queueName — this endpoint will create a
Queue
(https://typelevel.org/cats-effect/docs/std/queue) that will be the underlying mechanism for our application. This queue will be stored in a sharedRef
(https://typelevel.org/cats-effect/docs/std/ref) to be accessed by other endpoints. - /publish/$queueName — this endpoint will take a list of messages as a request and publish them to the specified queue
- /subscribe/$queueName — this endpoint will serve a stream that continuously pulls elements off of the specified queue.
First let’s pull in our dependencies. We’re going to use the Typelevel toolkit (https://typelevel.org/toolkit/) which contains most of the dependencies that we need for this as well as the http4s (https://http4s.org/v0.23/) ember server library for our REST server. Go ahead and add the following to your build.sbt settings:
libraryDependencies ++= Seq(
"org.typelevel" %% "toolkit" % "0.1.17",
"org.typelevel" %% "toolkit-test" % "0.1.17" % Test,
"org.http4s" %% "http4s-ember-server" % "0.23.23",
"org.http4s" %% "http4s-dsl" % "0.23.23",
)
Next, lets set up a skeleton for our HTTP routes — made easy with the http4s DSL:
import cats.effect.IO
import org.http4s.HttpRoutes
import org.http4s.dsl.io._
def routes = HttpRoutes.of[IO] {
case GET -> Root / "ping" => Ok("pong")
case POST -> Root / "createQueue" / queueName => ???
case request@POST -> Root / "publish" / queueName => ???
case GET -> Root / "subscribe" / queueName => ???
}
Now let’s fill in the blanks. We need to create a Queue
that will persist within our app to be accessed by various clients asynchronously. We’ll use a shared Ref
instance and, to make this a little more extensible, instead of a single queue we’ll store a Map
where the keys are the queue name and the values are the queue associated with that name.
Let’s create a function that takes a queue name and a Ref
. This function will create a Queue
and access the Map[String, Queue]
inside the Ref
. If the map contains a key that equals the queue name we’ll essentially do nothing. Otherwise we’ll update the map within the Ref
to include (queue name -> queue):
import cats.effect.std.Queue
import cats.effect.Ref
def createQueue(queueName: String, ref: Ref[IO, Map[String, Queue[IO, String]]]): IO[Unit] =
Queue.unbounded[IO, String].flatMap { defaultQueue =>
ref.update { queues =>
val queue = queues.getOrElse(queueName, defaultQueue)
queues + (queueName -> queue)
}
}
We have our Queue
and our Ref
to store it, now we need a way to push an element onto the queue. We’ll create a function that takes a list of elements, a queue name and a Ref
. This function will get the Queue
associated with the queue name from the Ref
and push each element in the list to the Queue
via Queue.offer
:
import cats.implicits._
def publish(elements: List[String], queueName: String, ref: Ref[IO, Map[String, Queue[IO, String]]]): IO[List[Unit]] =
ref.get.map(_(queueName)).flatMap { queue =>
elements.parTraverse(queue.offer)
}
The last thing we’ll need for our server is a way to subscribe to the queue forever until the client closes the stream. To this end we’ll create a function that takes a queue name and a Ref
. To handle streams we’re going to use fs2 (https://fs2.io). This function will get the Queue
associated with the queue name from the Ref
and we’ll use fs2’s Stream.fromQueueUnterminated
method to infinitely pull elements from the Queue
:
import fs2.Stream
def subscribe(queueName: String, ref: Ref[IO, Map[String, Queue[IO, String]]]) =
Stream.eval(ref.get.map(_(queueName)))
.flatMap(Stream.fromQueueUnterminated(_))
Let’s revisit our HTTP routes with our newly created functions. Http4s integrates very nicely with cats effect and fs2 so we don’t have to do much or any managing of our effects even when returning a Stream
. We’ll use the built in circeEntityDecoder
from http4s to decode our publish request into a list of strings:
import org.http4s.circe.CirceEntityCodec.circeEntityDecoder
def routes(ref: Ref[IO, Map[String, Queue[IO, String]]]) = HttpRoutes.of[IO] {
case GET -> Root / "ping" => Ok("pong")
case POST -> Root / "createQueue" / queueName =>
createQueue(queueName, ref) *> Ok("Queue created")
case request@POST -> Root / "publish" / queueName =>
request.as[List[String]].flatMap(publish(_, queueName, ref)) *> Ok("Published")
case GET -> Root / "subscribe" / queueName => Ok(subscribe(queueName, ref))
}
Finally let’s set up our full application to run our server. We need to configure our server with the host and port that we want it to run on. For easy command line configuration we’re going to extend the CommandIOApp
from decline (https://ben.kirw.in/decline/):
object QueueServer extends CommandIOApp("queue-server", "a distributed queue server"){
//server code
val flags = (Opts.option[String]("port", "port for the server to run on"),
Opts.option[String]("host", "host for the server to run on"))
override def main: Opts[IO[ExitCode]] = flags.mapN { (host, port) =>
Ref[IO].of(Map[String, Queue[IO, String]]()).flatMap { ref =>
val server = Router("/" -> routes(ref)).orNotFound
EmberServerBuilder.default[IO]
.withHost(Host.fromString(host).get)
.withPort(Port.fromString(port).get)
.withHttpApp(server).build.allocated
}.map(_ => ExitCode.Success)
}
}
And that’s it — using libraries in the Typelevel stack we’re able to write powerful abstractions with very little code! Now we’re going to look at a test where we’ll use harness the power of our remote queue to calculate the sum of many numbers. Because we’re working with types from cats effect we’ll extend munit-cats-effect (https://typelevel.org/munit-cats-effect/) to handle running our effects in tests, specifically IO
(https://typelevel.org/cats-effect/docs/2.x/datatypes/io).
Our test is going to use fs2 to coordinate running our server and a “client” stream concurrently. The client is going to do 5 main things:
- Check that the server is running using the
/ping
endpoint. - Create two queues using the
/createQueue
endpoint, a worker queue and a collector queue. - Publish a stream of numbers to the worker queue using the
/publish/$workerQueue
endpoint. - Create a concurrent cluster of worker streams that each do the following:
— Consume a stream of numbers from the response of the/subscribe/$workerQueue
endpoint.
— Gather the stream into chunks of numbers.
— Find the sum of all numbers in the chunk.
— Publish the sum to the collector queue using the/publish/$collectorQueue
endpoint. - Create a single collector stream that does the following:
—Consume a stream of numbers from the response of the/subscribe/$collectorQueue
endpoint.
— Gather the stream into chunks of two numbers — or if a certain amount of time has passed without another element coming into the queue then only one number
— Publish the sum of the two numbers back to the worker queue using the/publish/$workerQueue
endpoint.
— Drop all chunks that have size 2 from the stream
— Take one element from the stream and stop streaming. This element should be the sum — though since it’s timing based it may be flaky.
The test itself will calculate the sum of the stream of numbers locally and compare it to the result of the collector stream:
import cats.effect.IO
import munit.CatsEffectSuite
import org.http4s.{Method, Request, Uri}
import org.http4s.client.Client
import fs2.{Stream, text}
import org.http4s.circe.CirceEntityCodec.circeEntityEncoder
import org.http4s.ember.client.EmberClientBuilder
import scala.concurrent.duration.DurationLong
import scala.util.Random
import cats.implicits._
class QueueServerSuite extends CatsEffectSuite{
val host = "localhost"
val port = "8080"
def waitForServer(client: Client[IO]) = {
val uri = Uri.fromString(s"http://$host:$port/ping").right.get
val request = Request[IO](Method.GET, uri)
Stream.eval(client.expect[String](request)).attempt.repeat.dropWhile(_.isLeft).take(1)
}
def createQueue(client: Client[IO], queueName: String) = {
val uri = Uri.fromString(s"http://$host:$port/createQueue/$queueName").right.get
val request = Request[IO](Method.POST, uri)
client.expect[String](request)
}
def publish(client: Client[IO], queueName: String, list: List[Long]) = {
val uri = Uri.fromString(s"http://$host:$port/publish/$queueName").right.get
val request = Request[IO](Method.POST, uri).withEntity(list.map(_.toString))
client.expect[String](request)
}
def initialPublish(client: Client[IO], initialStream: Stream[IO, Long], queueName: String): Stream[IO, String] =
initialStream.groupWithin(100, 100.milliseconds)
.map(_.toList).covary[IO]
.mapAsyncUnordered(Int.MaxValue)(publish(client, queueName, _))
def workerStream(client: Client[IO], workerQueueName: String, collectorQueueName: String) = {
val subscribeRequest = Request[IO](Method.GET, Uri.fromString(s"http://$host:$port/subscribe/$workerQueueName").toOption.get)
client.stream(subscribeRequest)
.flatMap(_.body)
.through(text.utf8.decode)
.map(_.toLong)
.groupWithin(100, 500.milliseconds)
.map(_.toList.sum)
.mapAsyncUnordered(Int.MaxValue)(long => publish(client, collectorQueueName, List(long)))
}
def concurrentStreams[X](stream: Stream[IO, X], n: Int = 0): Stream[IO, X] =
Stream.range(0, n).covary[IO]
.mapAsyncUnordered(n)(_ => IO.pure(stream))
.parJoinUnbounded
def collectorStream(client: Client[IO], workerQueueName: String, collectorQueueName: String): Stream[IO, Long] = {
val subscribeRequest = Request[IO](Method.GET, Uri.fromString(s"http://$host:$port/subscribe/$collectorQueueName").right.get)
client.stream(subscribeRequest)
.flatMap(_.body)
.through(text.utf8.decode)
.map(_.toLong)
.groupWithin(2, 2.second)
.evalMap(chunks => publish(client, workerQueueName, List(chunks.toList.sum)) *> IO.pure(chunks))
.dropWhile(_.size > 1)
.take(1)
.map(_.head.get)
}
def clientStream(client: Client[IO], numbers: Stream[IO, Long], queueName: String, collectorQueueName: String): Stream[IO, Long] = {
val createQueues = Stream.eval(createQueue(client, queueName)) *> Stream.eval(createQueue(client, collectorQueueName))
val initialPublishStream = initialPublish(client, numbers, queueName)
val workerClusterStream = concurrentStreams(workerStream(client, queueName, collectorQueueName), 10)
val collector = collectorStream(client, queueName, collectorQueueName)
val streams = collector concurrently workerClusterStream concurrently initialPublishStream
waitForServer(client) *> createQueues *> streams
}
test("distributedServerTest"){
val queueName = Random.alphanumeric.take(10).mkString + "-queue"
val collectorQueueName = Random.alphanumeric.take(10).mkString + "-collectorQueue"
val numbers = Stream.range[IO, Long](0,10000)
val expectedSum = numbers.compile.toList.map(_.sum).unsafeRunSync()
val result = EmberClientBuilder.default[IO].build.use { client =>
val sumStream = clientStream(client, numbers, queueName, collectorQueueName) concurrently
Stream.eval(QueueServer.run(List("--port", port, "--host", host)))
sumStream.compile.toList.map(_.head)
}
assertIO(result, expectedSum)
}
}
That’s all folks. The full code can be found at https://github.com/SethLasky/queue-server. Feel free to leave a note in the comments if you’d like me to go over anything here in more detail.