ZIO with http4s and doobie

ZIO

"dev.zio" %% "zio" %  ZIOVersion

Http4s

"org.http4s" %% "http4s-blaze-server" % "0.21.1",
"org.http4s" %% "http4s-circe" % "0.21.1",
"org.http4s" %% "http4s-dsl" % "0.21.1"

Doobie

"org.tpolecat" %% "doobie-core" % "0.8.8",
"org.tpolecat" %% "doobie-h2" % "0.8.8"

How could you use ZIO with Http4s and Doobie?

"dev.zio" %% "zio-interop-cats" % "2.0.0.0-RC12"
case class Config(api: ApiConfig, dbConfig: DbConfig)
case class ApiConfig(endpoint: String, port: Int)
case class DbConfig(url: String, user: String, password: String)
api {
endpoint = "127.0.0.1"
port = 8083
}

db-config {
url = "jdbc:h2:~/test;DB_CLOSE_DELAY=-1"
user = ""
password = ""
}
object Configuration {
trait Service {
val load: Task[Config]
}
}
package configuration {
type Configuration = zio.Has[Configuration.Service]
...
}
val database: ZIO[Configuration, Throwable, DB] = ???
val database: ZIO[Has[Configuration.Service], Throwable, DB] = ???
package configuration {
val load:ZIO[Configuration, Throwable, AppConfig] =
ZIO.accessM(_.get.load)
...
}
"com.github.pureconfig" %% "pureconfig" % "0.11.0"
trait Live extends Configuration.Service {
val load: Task[Config] =
Task.effect(loadConfigOrThrow[Config])
}
object Configuration {
...
val
live: ZLayer[Any, Nothing, Configuration] =
ZLayer.succeed(new Live {} )
}
object Persistence {
trait Service[A] {
def get(id: Int): Task[A]
def create(user: User): Task[A]
def delete(id: Int): Task[Boolean]
}
}
import doobie.{ Query0, Transactor, Update0 }
import zio._
import doobie.implicits._
import zio.interop.catz._
final class UserPersistenceService(tnx: Transactor[Task]) extends Persistence.Service[User] {
import UserPersistenceService._

def get(id: Int): Task[User] =
SQL
.get(id)
.option
.transact(tnx)
.foldM(
err => Task.fail(err),
maybeUser => Task.require(UserNotFound(id))(Task.succeed(maybeUser))
)

def create(user: User): Task[User] =
SQL
.create(user)
.run
.transact(tnx)
.foldM(err => Task.fail(err), _ => Task.succeed(user))

def delete(id: Int): Task[Boolean] =
SQL
.delete(id)
.run
.transact(tnx)
.fold(_ => false, _ => true)
}
object UserPersistenceService {

object SQL {

def get(id: Int): Query0[User] =
sql"""SELECT * FROM USERS WHERE ID = $id """.query[User]

def create(user: User): Update0 =
sql"""INSERT INTO USERS (id, name) VALUES (${user.id}, ${user.name})""".update

def delete(id: Int): Update0 =
sql"""DELETE FROM USERS WHERE id = $id""".update
}
...
}
import doobie.h2.H2Transactor
import scala.concurrent.ExecutionContext
import zio.Task
import zio.interop.catz._
object UserPersistenceService {def mkTransactor(
conf: DbConfig,
connectEC: ExecutionContext,
transactEC: ExecutionContext
): Managed[Throwable, H2Transactor[Task]] = {
H2Transactor
.newH2Transactor[Task](conf.url,
conf.user,
conf.password,
connectEC,
Blocker.liftExecutionContext(transactEC)
)
.toManagedZIO
.map(new UserPersistenceService(_))
}
Managed(res).map(new UserPersistenceService(_))
object UserPersistenceService {
val live:
ZLayer[Configuration, Throwable, Has[UserPersistenceService]] =
ZLayer.fromManaged (
(for {
config <- configuration.loadConfig.toManaged_
connectEC <- ZIO.descriptor.map(_.executor.asEC).toManaged_
blockingEC <- blocking { ZIO.descriptor.map(_.executor.asEC)
}.toManaged_
managed <- mkTransactor(config.dbConfig, connectEC, blockingEC)
} yield managed).provideSomeLayer[Configuration](Blocking.live)
)
}
type UserPersistence = Has[UserPersistenceService]
val live: ZLayer[Configuration with Blocking, Throwable, UserPersistence] =
ZLayer.fromManaged (
for {
config <- configuration.loadConfig.toManaged_
connectEC <- ZIO.descriptor.map(_.executor.asEC).toManaged_
blockingEC <- blocking { ZIO.descriptor.map(_.executor.asEC)
}.toManaged_
managed <- mkTransactor(config.dbConfig, connectEC, blockingEC)
} yield managed
)
case class Test(users: Ref[Vector[User]]) extends Service {
def get(id: Int): Task[User] =
users.get.flatMap(users => Task.require(UserNotFound(id))(
Task.succeed(users.find(_.id == id))))
def create(user: User): Task[User] =
users.update(_ :+ user).map(_ => user)
def delete(id: Int): Task[Boolean] =
users.modify(users => true -> users.filterNot(_.id == id))
}
def test(users: Ref[Vector[User]]): Layer[Nothing, UserPersistence] =
ZLayer.fromEffect(Ref.make(Vector.empty[User]).map(Test(_)))
import io.circe.{Decoder, Encoder}
import org.http4s.{EntityDecoder, EntityEncoder, HttpRoutes}
import org.http4s.dsl.Http4sDsl
import zio._
import org.http4s.circe._
import zio.interop.catz._
import io.circe.generic.auto._
type UserTask[A] = RIO[R, A]def routes: HttpRoutes[UserTask] =
HttpRoutes.of[UserTask] {
case GET -> Root / IntVar(id) =>
getUser(id).foldM(_ => NotFound(), Ok(_))
case request @ POST -> Root =>
request.decode[User] { user =>
Created(createUser(user))
}
case DELETE -> Root / IntVar(id) =>
(get(id) *> deleteUser(id)).foldM(_ => NotFound(), Ok(_))
}
implicit def circeJsonDecoder[A](implicit decoder: Decoder[A]): 
EntityDecoder[UserTask, A] = jsonOf[UserTask, A]
implicit def circeJsonEncoder[A](implicit decoder: Encoder[A]):
EntityEncoder[UserTask, A] = jsonEncoderOf[UserTask, A]
object Main extends zio.App {
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = ???
}
type AppEnvironment = Configuration with Clock with UserPersistence
val program = for {
config <- configuration.load

httpApp = Router[AppTask](
"/users" -> Api(s"${conf.api.endpoint}/users").route
).orNotFound

server <- ZIO.runtime[AppEnvironment].flatMap { implicit rts =>
db.createTable *>
BlazeServerBuilder[AppTask] //requires a Clock environment
.bindHttp(conf.api.port, "0.0.0.0")
.withHttpApp(CORS(httpApp))
.serve
.compile[AppTask, AppTask, ExitCode]
.drain
} yield server
val userPersistence = (Configuration.live ++ Blocking.live) >>> UserPersistence.live
val io = program.provideSomeLayer[ZEnv](Configuration.live ++ userPersistence)
io.fold(_ => 1, _ => 0)
type Configuration = Has[ApiConfig] with Has[DbConfig]
val live: Layer[Throwable, Configuration] = ZLayer.fromEffectMany(
Task
.effect(loadConfigOrThrow[Config])
.map(config => Has(config.api) ++ Has(config.dbConfig)))
}
val apiConfig: ZIO[Has[ApiConfig], Nothing, ApiConfig] = 
ZIO.access(_.get)
val dbConfig: ZIO[Has[DbConfig], Nothing, DbConfig] =
ZIO.access(_.get)
val live: ZLayer[Has[DbConfig] with Blocking, Throwable, UserPersistence] =
ZLayer.fromManaged (
for {
config <- configuration.dbConfig.orDie.toManaged_
connectEC <- ZIO.descriptor.map(_.executor.asEC).toManaged_
blockingEC <- blocking.blocking {
ZIO.descriptor.map(_.executor.asEC) }.toManaged_
managed <- mkTransactor(config, connectEC, blockingEC)
} yield managed
)
val userPersistence = (Configuration.live ++ Blocking.live) >>> UserPersistenceService.live
val program = for {
api <- configuration.apiConfig
httpApp = Router[AppTask](
"/users" -> Api(s"${api.endpoint}/users").route).orNotFound
...
} ...
program.provideSomeLayer[ZEnv](Configuration.live ++ userPersistence)

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store