ZIO with http4s and doobie

Wiem Zine
Wiem Zine
Jun 16 · 9 min read

If you’re using Scala and you’d like to implement reactive applications using Functional programming approach, there are many open source FP Scala libraries that enable you to write purely functional code.

But.. you might ask why would you use Functional programming Libraries?

Usually when you call a bench of statements, they will be executed right away which makes it harder to test and you couldn’t control any interaction that is executing and sometimes you need to use external libraries to test them and sometimes it wouldn’t be possible to test your program!. Every interaction with the outside of the context of your program or your function called “side effect”,

Functional programming approach enables you to control the effects!

How?

Functional programming is a declarative paradigm in which programs describe what you want to achieve and separates between the Data and its functionality. You can pass the Data through different functions in order to change its behaviors.

And functional programming deals with pure functions which are:

  • Easy to reason about.
  • Testable because for given inputs you get an output and if you pass the same inputs you will get the same result.
  • Free of side effects which means the ouput has to be computed from the inputs without interacting with the outside World!

Functional programmers deal with effectful programs by describing them in immutable data types, Any interaction (with an API/Database/Console…) is called effectful program which is described in immutable values that you can compose, test and pass them through different functions and you can compute effectful programs from another and return them which is impossible in imperative paradigm, you cannot do that with statements.

ZIO

ZIO is a zero dependency Scala library that helps you to describe effectful programs, and build asynchronous, concurrent applications and provides resource safety, ZIO enables us to write purely functional code.

We can describe different types of effects using ZIO:

  • UIO[A]: a description of an effectful program that computes a value of type A and never fail.
  • Task[A]: a description of an effectful program that might fail with a Throwable or produce a value of type A.
  • IO[E, A]: a description of an effectful program that might fail with any value of type E or produce a value of type A.
  • ZIO[R, E, A]: description of an effectful program that uses an external service/Context/Environment of type R which can contain modules for Production and for Test, and it might fail with an error of type E or produce a value of type A.
  • TaskR[R, A]: the same as ZIO but this program might fail with an error of type Throwable.

At the end of the day, after building your effectful programs you can interact with the real world using the runtime system built on ZIO: DefaultRuntime and you can call unsafeRun

In this blog, you will learn how to use ZIO to interact with an HTTP Api and a Database using http4s and Doobie.

Motivation:

Http4s

http4s is Type safe, functional, streaming HTTP for Scala. its servers and clients share an immutable model of requests and responses. Http4s deals with I/O using cats effect.

Doobie

doobie is a pure functional JDBC layer for Scala and Cats. It provides a functional way to construct programs that use JDBC.

How could you use ZIO with Http4s and Doobie?

ZIO has a separate module called: zio-interop-cats
which contains instances for the Cats Effect library, and allow you to use ZIO with any libraries that rely on Cats Effect like in our case Http4s and Doobie.


Sample example

Let’s implement a simple application that we can create/read/delete a user in Database via HTTP calls:

1. Configuration:

  • The API endpoint and port
  • Database configuration
case class Config(api: ApiConfig, dbConfig: DbConfig)
case class ApiConfig(endpoint: String, port: Int)
case class DbConfig(url: String, user: String, password: String)

Using ZIO environment we can describe how we will load our configuration,

trait Configuration extends Serializable {
val config: Configuration.Service[Any]
}
object Configuration {
trait Service[R] {
val load: TaskR[R, Config]
}
}

In production we can use pureconfig to load the configuration file that contains the configuration of the API and the Database.

"com.github.pureconfig" %% "pureconfig" % "0.11.0"

This is the Production Module for our Configuration:

trait Live extends Configuration {
val config: Service[Any] = new Service[Any] {
val load: Task[Config] = Task.effect(loadConfigOrThrow[Config])
}
}

you can also add a Configuration that you can use in your test

trait Test extends Configuration {
val config: Service[Any] = new Service[Any] {
val load: Task[Config] = Task.effectTotal(Config(???, ???))
}
}

2. Database:

Using ZIO environment we can describe the operations that we can use to persist the user data in our Database:

trait Persistence extends Serializable {
val userPersistence: Persistence.Service[Any]
}

object Persistence {
trait Service[R] {
val createTable: TaskR[R, Unit]
def get(id: Int): TaskR[R, User]
def create(user: User): TaskR[R, User]
def delete(id: Int): TaskR[R, Unit]
}
}

Now we can implement the methods in Persistence.Service for Production module using Doobie, but every query needs to be executed in a transactor that will wrap the connection pool provided by our database that we want to choose (in our example it would be h2) specifying a target effect type that will take the effectful computation, in our case we will use zio.Task then when the transaction will be performed by calling transact we wanna get a zio.Task but .. transact requires implicit ev: Bracket[M, Throwable] you can solve that by importing: zio.interop.catz.taskConcurrentInstances

trait Live extends Persistence {

protected def tnx: Transactor[Task]

val userPersistence: Service[Any] = new Service[Any] {

val createTable: Task[Unit] =
SQL.createTable.run.transact(tnx)
.foldM(err => Task.fail(err), _ => Task.succeed(()))

def get(id: Int): Task[User] =
SQL
.get(id)
.option
.transact(tnx)
.foldM(
Task.fail,
maybeUser => Task.require(UserNotFound(id))(
Task.succeed(maybeUser))
)

def create(user: User): Task[User] =
SQL
.create(user)
.run
.transact(tnx)
.foldM(err => Task.fail(err), _ => Task.succeed(user))

def delete(id: Int): Task[Unit] =
SQL
.delete(id)
.run
.transact(tnx)
.unit
.orDie
}

object SQL {

def createTable: Update0 = sql"""CREATE TABLE IF NOT EXISTS
Users (id int PRIMARY KEY, name varchar)""".update

def get(id: Int): Query0[User] =
sql"""SELECT * FROM USERS WHERE ID = $id """.query[User]

def create(user: User): Update0 =
sql"""INSERT INTO USERS (ID, NAME) VALUES (${user.id},
${user.name})""".update

def delete(id: Int): Update0 =
sql"""DELETE FROM USERS WHERE ID = $id""".update
}

}

How could we make a transaction?

We need to reserve the transaction and use it in the application and when the application is closed, the resource should be cleaned up! How could we do that ? we can define a zio.Managed that takes a type Reservation with acquire and release actions, in doobie you can make a transaction that returns cats.effect.Resource which is similar to zio.Managed

If you want to use H2Transactor here what you can do:

def mkTransactor(
conf: DbConfig,
connectEC: ExecutionContext,
transactEC: ExecutionContext
): Managed[Throwable, H2Transactor[Task]] = {
import scalaz.zio.interop.catz._

val xa = H2Transactor
.newH2Transactor[Task](
conf.url,
conf.user,
conf.password,
connectEC,
transactEC)

val res = xa.allocated.map {
case (transactor, cleanupM) =>
Reservation(ZIO.succeed(transactor), cleanupM.orDie)
}.uninterruptible

Managed(res)
}

Cool! You can use the resource later when you start the Application.

You can implement a Test Module for Persistence, and you can use zio.Ref that persists the data

case class Test(users: Ref[Vector[User]]) extends Persistence {
override val userPersistence: Service[Any] = new Service[Any] {
val createTable: Task[Unit] =
Ref.make(Vector.empty[User]).unit
def get(id: Int): Task[User] =
users.get.flatMap(users => Task.require(UserNotFound(id))(
Task.succeed(users.find(_.id == id))))
def create(user: User): Task[User] =
users.update(_ :+ user).map(_ => user)
def delete(id: Int): Task[Unit] =
users.update(users => users.filterNot(_.id == id)).unit
}
}

3. Http Api

Using Http4s, let’s implement our HttpRoutes with the following HTTP calls:

  • GET → get the user for a given id
  • POST → create a new user
  • DELETE → delete a user with a given id

HttpRoutes returns the result in our effectful program which uses Persistence service for the Database interaction and it might use more other services.

First of all let’s think about: how could we generalize Persistence Environment that could work with both Test or Live ?

We can create a helper that can access to the Environment of Persistence :

package object db extends Persistence.Service[Persistence] {
val createTable: TaskR[Persistence, Unit] =
ZIO.accessM(_.userPersistence.createTable)
def get(id: Int): TaskR[Persistence, User] =
ZIO.accessM(_.userPersistence.get(id))
def create(user: User): TaskR[Persistence, User] =
ZIO.accessM(_.userPersistence.create(user))
def delete(id: Int): TaskR[Persistence, Unit] =
ZIO.accessM(_.userPersistence.delete(id))
}

Now in our Api, we can use: db.get(???) , db.create(???) and db.delete(???)

Let’s describe the routes :

type UserTask[A] = TaskR[R, A]def routes: HttpRoutes[UserTask] =
HttpRoutes.of[UserTask] {
case GET -> Root / IntVar(id) =>
get(id).foldM(_ => NotFound(), Ok(_))
case request @ POST -> Root =>
request.decode[User] { user =>
Created(create(user))
}
case DELETE -> Root / IntVar(id) =>
(get(id) *> delete(id)).foldM(_ => NotFound(), Ok(_))
}

But.. in order to return the response that contains User wrapped in the UserTask we have to define an implicit: EntityEncoder[UserTask, A] .

And in order to be able to decode the body in POST Request, we need to define an implicit: EntityDecoder[UserTask, A]

We can use http4s-circe for that:

implicit def circeJsonDecoder[A](implicit decoder: Decoder[A]): 
EntityDecoder[UserTask, A] = jsonOf[UserTask, A]
implicit def circeJsonEncoder[A](implicit decoder: Encoder[A]):
EntityEncoder[UserTask, A] = jsonEncoderOf[UserTask, A]

Don’t forget to import zio.interop.catz.taskConcurrentInstances ! Because Http4s deals with I/O using cats effect and ZIO can interoperate with cats effects and make it zio.Task.

Cool!

Now in the Main we can interact with the real world providing the Live modules and also our application is easy to test using Test modules so we can check our zio.Ref instead of interacting with the Database.

4. Interaction with the real World!

Load the configuration from Configuration.Live :

configuration.load.provide(Configuration.Live)

In order to make the transactor we need 2 execution contexts, the first one is for the connection and the second one is for the transaction, we can use the execution context of the zio.DefaultRuntime and for the DB transactions we can use the execution context of: blocking.blockingExecutor then you can make the transactor:

for {

conf <- configuration.load.provide(Configuration.Live)
blockingEC <-
blocking.blockingExecutor.map(_.asEC).provide(Blocking.Live)

transactorR = Persistence.mkTransactor(
conf.dbConfig,
Platform.executor.asEC,
blockingEC
)
...

To open the Api connection we will use BlazeServerBuilder to serve the requests that requires cats.effect.Timer and cats.effect.ConcurrentEffect in order to convert them respectively to zio.Clock and zio.Task we can import: zio.interop.catz._ and we can define our types:

type AppEnvironment = Clock with Persistence

type AppTask[A] = TaskR[AppEnvironment, A]

Let’s create the User table and open the connection:

httpApp = Router[AppTask](
"/users" -> Api(s"${conf.api.endpoint}/users").route
).orNotFound

server = ZIO.runtime[AppEnvironment].flatMap { implicit rts =>
db.createTable *>
BlazeServerBuilder[AppTask] //requires a Clock environment
.bindHttp(conf.api.port, "0.0.0.0")
.withHttpApp(CORS(httpApp))
.serve
.compile[AppTask, AppTask, ExitCode]
.drain
}

And finally we need to provide the Persistence.Live using the managed resource of Transactor:

program <- transactorR.use { transactor =>
server.provideSome[Environment] { _ =>
new Clock.Live with Persistence.Live {
def tnx: doobie.Transactor[Task] = transactor
}
}
}

Cool! We did it!


Each step was a description of what we want to achieve, this code is purely functional because:

  • We created effects, composed them together and passed them through different functions.
  • We changed the behavior of our effectful programs by passing them around different functions.

If you’re interested in the code checkout this exercise and zio-todo-backend that was a good guidance to get started with Http4s and doobie using ZIO.

I hope you learned more about the power of ZIO and how it can interoperate with other libraries and especially how to use the environment feature! that gives you the capability to describe and test effects, when you look at your code you can see which external service that you used in your application and which services you need to provide at the end, this way makes you easily reason about the code.

Thanks to John De Goes for proofreading this post!