Etcd watch API integration with scala and cats-effect

Happy devOps

λ.eranga
λ.eranga
Jan 16 · 3 min read

Background

etcd-v3 provides streaming API which know as watch API to listen to the changes happening to etcd keys. etcd publish changing events(key events) into grpc stream. The clients who subscribed to theses streams continuously receives the updates of the keys.

In my previous post I have discussed about building scala etcd client with using etcd4s library and cats-effects IO monad stack. In this post I’m gonna discuss about interacting with etcd watch API with scala and cast-effects IO monad. To interact with watch API have used etcd-java library which developed by IBM. All the source codes which related to this post available in gitlab. Please clone the repo and continue the post.

Run etcd

First I need to run etcd key value pair storage. I have run etcd with docker. Following is way to run etcd.

Sbt dependency

I have used IntelliJ Idea as my IDE to work with Scala applications. I need to create sbt project and add the build.sbt dependency file with etcd-java, cats-effects and other dependencies. Following is the build.sbt dependency file.

Load config

I have used cats-effects IO monads to build this application. To load etcd the configurations with IO monads I have used circe-config library. Following is the way to parse application.conf configurations into cats-effects IO.

Etcd repository

I have used etcd to store microservice health statuses. etcd keys which corresponds to services starting with services/ prefix and follow services/<service-name> format. Assume I need to create a service named octopus with health status running. When creating this service, it will add services/octopus: running key/value pair in etcd storage.

I have created EtcdRepo to interact with etcd storage by using etcd-java library. EtcdRepo contains init, get, getRange, put and delete functions. init function add service informations which defined in services.conf into etcd. get returns a key from etcd. getRange returns all keys with given prefix in etcd. put add key in etcd. delete delete key from etcd.

EtcdRepoImpl is the cats-effects based implementation of EtcdRepo. All the functions in EtcdRepoImpl wraps with IO.fromFuture to handle the Future based API calls.

Etcd watcher

I have created EtcdWatcher and EtcdWatcherImpl to interact with etcd watch API. The function init(services: List[String]) register all keys in etcd on watch API. The updates which happens to etcd keys will triggered in StreamObserver.

Execute watcher

Finally I have run the functions on EtcdStore and EtcdWatcher in Main class. The function on EtcdStore and EtcdWatcher wraps with IO monad. To unwrap and run the side-effecting code in the IO monad I have invoked unsafeRunSync().

Test watcher

All the changes happening to etcd keys will be triggered in EtcdWatcher. To test this function I have changed the etcd keys by using etcdctls command line tool. Read more about etcdctl from my previous post. Once key is changed that value will be trigged on StreamObserver.

Reference

Rahasak

Have less, be more

λ.eranga

Written by

λ.eranga

Scala, Golang with Vim and Hockey: What else does a man need to be happy :)

Rahasak

Rahasak

Have less, be more

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade