Service Discovery With CRDTs

Service Discovery is a mechanism to resolve the address of a service we want to communicate to. In last few years, with the rise of container technology, service discovery mechanisms have become key architectural component of the production infrastructure. While building infrastructure services for Thirty Meter Telescope(www.tmt.org), we had a need for similar service discovery mechanism. One of the unique requirements we had was to register and resolve Actor references. We evaluated standard discovery solutions like Zookeeper, Etcd and Consul. Because we were already using Akka, we could use Akka’s clustering platform and Conflict Free Replicated Datasets (CRDTs) to implement service discovery mechanism. It turned out to be generic enough to be used across set of HTTP/TCP services and Actors. This post compares available service discovery frameworks and compares CRDT based service discovery mechanism with those.

Service discovery is a simple concept. In most simple terms, we need to get connection parameters like IP address and port of the service we need to connect to. If these connection parameters of a service are well known (statically), we can just use configuration files and there will be no need for Service Discovery. In most production environments, and particularly with containers this is not the case. Services can start with an IP address and port that is known only after deployment (dynamically). So to communicate with the service the client always needs to resolve the connection parameters at runtime using just the name of a service.

For any service discovery mechanism following key aspects are important.

  1. Fault Tolerance: A service registry itself should be fault tolerant. This typically means registry is distributed.
  2. Service Health Check: We need a mechanism to know if a service instance is down and remove the record for that service instance from the registry.
  3. Watches: Clients using service registry should be notified whenever a new service instance is registered or removed from the registry.

Based on the tool/framework selected, there are two options for registering a service instance. There are pros and cons of each of this approach, discussed here.

  1. Self-registration: a service instance registers itself when it starts.
  2. Third party registration: there is a separate agent which registers/unregisters a given service.

DNS

DNS is the well known mechanism to resolve IP address and PORT for a service name. DNS SRV records are registered with the DNS server and can be resolved using standard DNS resolution mechanism, without needing any special client. There are two issues with the DNS approach though.

  1. Updating records into DNS server when service instances come up is hard.

Spotify has some good details of problems/solutions they used with their DNS infrastructure.(https://labs.spotify.com/2017/03/31/spotifys-lovehate-relationship-with-dns/

2. DNS heavily relies on caching. So it is almost impossible for clients to know if a service instance has gone down and a new one has come up with different IP and port.

mDNS

mDNS is a zero configuration alternative which is popular for small intranets and home networks. If each service has a built in mDNS responder, there is no need to have a separate registry and service discovery. We did have a prototype with mDNS working for actor references and other services. But there were a lot of infrastructure issues.

The major problem with mDNS is that it depends on IP multicast and because of that on the network infrastructure (routers and switches etc..) to support IP multicast. This can be a blocker if you are on cloud environments. E.g. AWS blocks all IP multicast traffic, making it impossible to have mDNS working on AWS. mDNS also does not solve DNS caching issue.

So DNS is a good fault tolerant registry, but fails to integrate cleanly with service health checks and lacks any notification mechanism. Default caching for DNS services becomes a big issue.

Service Discovery In The Microservices World

In last few years, three major products have emerged as service registry/discovery frameworks of choice, Zookeeper, consul and etcd. The core concept in all the three is same. They have a set of nodes, typically 3 or 5, providing a linearizable key-value store. There are a set of libraries provided to allow service registration and discovery from this key-value store. All these frameworks also provide feature called ‘watches’ which notifies clients of any changes for a particular key. This feature is typically used along with service health check mechanisms to notify clients when new service instances are registered or instances fail.

Zookeeper

Zookeeper is a centralized service used to maintain configuration or naming information. It is a linearizable storage, typically run as a cluster of 3 to 5 nodes. Running on multiple nodes provides fault tolerance.

Zookeeper has a concept of ephemeral nodes which can be used to implement service registry and basic health checking. A new ephemeral node is added every time a service instance comes up. If a service instance goes down, the ephemeral node is automatically removed by Zookeeper.

Apache Curator is a library which provides service discovery mechanism on top of zookeeper. Using curator a service needs to self-register at start up. Curator adds an ephemeral node for the service instance. If service goes down, the ephemeral node for that service instance is automatically removed by zookeeper. If a client for the service has set up a watch for this node, the client is notified of the removal

The Zookeeper cluster needs to have well known IP addresses and ports, for clients to communicate with the cluster. We will see that for some other options, this requirement is minimal.

Etcd

Etcd is a linearizable key-value store based on raft consensus protocol. It is used as service registry, but we need to build a agent to register and unregister services. Each key in etcd can be assigned a TTL (time to live) after which the key expires. It’s the duty of the service itself (if it’s doing self registration) or the agent registering the service to update the TTL. If the service instance goes down, the key corresponding to the service instance will expire after the TTL that was set.

SkyDNS is a tool which provides DNS interface over etcd. With SkyDNS running over etcd, service clients do not need any special library to resolve services.

Etcd like zookeeper gives the basic facilities needed to build a service discovery solution. But you need to have libraries built on top of it for clients to utilize these features.

Etcd3 exposes a grpc api to interact with etcd, which is optimized. The benchmark is documented here https://coreos.com/etcd/docs/latest/benchmarks/etcd-3-watch-memory-benchmark.html#overall-memory-usage

Etcd cluster configuration itself is static, if clients know all the IP addresses and port of all the etcd nodes. There is a support for cluster discovery, which is nothing but a public etcd service running on discovery.etcd.io. This can be used for setting up new cluster. More options for dynamic discovery of etcd cluster members is described here. https://coreos.com/etcd/docs/latest/op-guide/clustering.html

Consul

Consul is a full fledged service discovery solution. At the core it has a fault tolerant linearizable key value store based on Raft consensus protocol (https://raft.github.io/). Consul also provides agents which coordinate with the services to register them with the core. Consul agents also support configurable health check for services, allowing consul to know when a particular service instance is down. Consul agents are run on each node running the service instances and automatically pick up IP address of the node to be registered against the service instance.

Both etcd and consul work very well with containerized services. Docker engine emits events for when the docker containers are started or stopped. A tool like Registrator can then be used to automatically register or remove service instances from etcd or consul.

Consul and the use of gossip protocols

Consul’s architecture is interesting. It has two types of components, servers and agents. The service catalog is stored only on servers. Both agents and servers form a cluster. For knowing which agents and servers are part of the cluster and knowing about their health, Consul uses Serf gossip protocol. Gossip protocol is a very common way of maintaining group membership list and failure detector for nodes participating in clusters. Each node joins the cluster through one or more introducer nodes. So introducer always has a list of all the nodes in the cluster. Each node (Including introducer itself) periodically sends a multicast message to a randomly selected set of nodes about the list of nodes it knows. Eventually each node in the cluster knows about each other. This might seem surprising, but the information propagated by gossip protocol reaches all the nodes in the cluster, even clusters having hundreds of machines, in less than a few seconds.

The time and bandwidth consumption for gossip protocol algorithm, Serf, used by Consul are shown on the following page. You can vary the gossip interval, network packet loss numbers etc.. to see the impact on the cluster.

https://www.serf.io/docs/internals/simulator.html

Frameworks like Akka and databases like Riak, Cassandra use gossip protocols to maintain cluster membership list and also to build failure detector to know member health.

Gossip protocol is standard way of knowing about all the nodes in the cluster and their health. But there is a limitation that it has. It always maintains information at a node level. The information maintained and disseminated by the gossip protocol across the nodes can not be controlled by users.

One of the primary advantage Consul provides on top of Serf is a ‘Service’ abstraction and ability to have service discovery and health check at Service level, as opposed to Node level.

Consul uses a central service catalog which is kept on servers, providing consensus with Raft and agent level membership of Consul cluster is maintained by Serf gossip protocol.

Serf does allow having external event handler scripts to be configured for handling events like joining members or leaving members, which can be used for something similar to service discovery. (For more details have a look at serf demo project at https://github.com/hashicorp/serf/tree/master/demo/web-load-balancer)

Service Discovery For Actor References

All these tools work well if the service registered are standard HTTP services. They do work for other type of services as well, but in those cases, some code/tools need to written for registration/health check of those services.

One of the key requirements we had was to support registering and locating, Akka actors. So we needed a facility to register actor references and a mechanism to health-check and remove stale actor references.

We also needed to have notifications sent to client components if a specific dependent service/component is registered or crashed. Zookeeper, etcd, Consul allows us to have programmatic watches. They are implemented by long polling , which requires a connection to be always open with the server. This also requires us to write some, not so trivial code, handling connection losses etc.

Consul’s usage of Gossip as we discussed in previous section was interesting. If we can provide a service abstraction and ability to propagate custom data structures across the cluster of machines with Gossip protocol, We can provide a service discovery mechanism which meets all the requirements of service discovery, Minimal Configuration, Health Checks, Watches, Fault Tolerance.

Akka Distributed Data library, which works on top of Akka cluster gossip protocol does exactly this. High level structure of custom data structures with gossip protocol will look like following. The data structure which is used as registry, needs to have ability to merge data received as part of gossip requests.

Considering above requirements, it was better suited for us to have a solution well integrated with Akka. Distributed Data extension with Akka-Cluster, natively supports Conflict Free Replicated Datasets or CRDTs for short. CRDTs make an excellent choice for maintaining service registry. Its distributed by nature allowing fault tolerance. It provides events for every addition and removal of entries allowing us to have

The Wikipedia definition of CRDT is ‘conflict-free replicated data type (CRDT) is a data structure which can be replicated across multiple computers in a network, where the replicas can be updated independently and concurrently without coordination between the replicas, and where it is always mathematically possible to resolve inconsistencies which might result‘

The key aspect of CRDTs is that they are replicated across multiple computers. They can be updated on different nodes without coordinating with other nodes. The platform implementing CRDTs propagates all the changes done on each node to all the other nodes in the cluster.

CRDTs make a good choice as a data structure maintaining service registry for following reasons.

  1. Service registration data can be merged without conflict, because each service instance registered will be unique.
  2. Services can be registered or unregistered on different physical nodes, but the information is propagated to all the nodes.
  3. Clients can get notifications for addition or removal of elements CRDTs. This is very well suited for implementing ‘Watches’.

Service Discovery And Linearizability

One of the key difference between the other service discovery solutions discussed above and CRDTs is that all those solutions are built around a linearizable key value store. Both etcd and consul are built with Raft consensus algorithm to provide linearizability.

CRDTs are, by design, eventually consistent. In Akka, they use gossip protocol to transmit information.

For service discovery solution, linearizability is not a must to have requirement. Eventual consistency system is good enough as long as data converges quickly enough. In practice, gossip protocols are proven to converge data very quickly even across a large cluster.

All the above solutions we discussed have following structure.

  1. A fault tolerant, distributed key value store
  2. A client API to register/unregister service instances
  3. A client API to resolve service locations.
  4. An API to get notifications for key/value changes

Service registry with CRDTs is equivalent and supports all of the above features.

Akka distributed data supports data types which are ‘convergent’ and it supports various various data types out of the box. For service registry, because each service is going to be registered only once, we chose LWWRegister to hold the service instance details.

Load balancing kind of requirements, will need support for more than one instance to be registered under the same service name. Our current implementations does not support this. But future versions might change the data structure to ORSet to support this requirement. LWWRegister holds connection information about a single service.

There is also a need to have a list of all the services registered. We need to have another CRDT to hold this information. We have a LWWMap to keep a list of all the services registered. Every time a service is registered or removed, the LWWMap is updated as well.

The service registry API looks like following. As of now it supports registering ActorRefs and http, Tcp connection information.

trait LocationService {
def register(registration: Registration): Future[RegistrationResult]
def unregister(connection: Connection): Future[Done]
def unregisterAll(): Future[Done]
def find(connection: Connection): Future[Option[Location]]
def resolve(connection: Connection, within: FiniteDuration): Future[Option[Location]]
def list: Future[List[Location]]
def list(componentType: ComponentType): Future[List[Location]]
def list(hostname: String): Future[List[Location]]
def list(connectionType: ConnectionType): Future[List[Location]]
def track(connection: Connection): Source[TrackingEvent, KillSwitch]
def subscribe(connection: Connection, callback: TrackingEvent ⇒ Unit): KillSwitch
def shutdown(): Future[Done]
}

Akka actors running on JVM or HTTP services, can use the client library to register themselves with the service registry. The information is propagated to all the other JVMs with the Akka distributed data mechanism.

For third party service registration, we have a JVM based agent, which can be run along with the third party service.This is very similar to Consul agent. The agent becomes part of the Akka cluster and propagates the service information to other nodes in cluster with the standard Akka CRDT mechanism.

With CRDT based solution, we got following benefits.

  • No other installation if already using Akka. It works “in process”
  • Out of the box efficient serialization/deserialization for ActorRefs
  • If the architecture heavily relies on Akka, most ‘services’ are ActorRefs
  • Easy to implement death-watch for terminated ActorRefs
  • Akka actors can watch for events when an ActorRef dies
  • Fault tolerance through replicated copy of registered connection on each node
  • Fast lookups
  • Clients can get notifications for addition or removal of elements
  • Tracking of location changes is push based (unlike long-polling)

The framework is open sourced and is available at https://github.com/tmtsoftware/csw-prod/tree/master/csw-location

We recently presented on the same topic at reactive summit. The slides are here https://www.slideshare.net/mushtaq.a/service-discovery-using-crdt-81099158