Episode 6: The one with coordinator and workers.

Concurrency and Distributed Systems: A guide with Kubernetes

--

So far we have seen only one distributed system’s architecture and that is copying(replicating) a single code across different nodes. All the nodes are essentially running the same code and all the instances are more or less the same in terms of logic.
However, there is another important architecture which is used in many critical software projects(such as many database systems). That is coordinator(master)/workers architecture.

In this architecture, not all nodes are running the same code and logic. There is usually a single coordinator(or master) node and there are many worker nodes. The overall goal is to divide big requests, which would require lots of CPU or RAM on a single machine, into smaller tasks so that they can be done faster and more reliably in a distributed fashion.

Coordinator node is responsible for:

  • Handling client’s requests
  • Dividing up requests into smaller tasks(if necessary)
  • Sending small tasks to worker nodes
  • Keeping track of which tasks are done
  • Retrying failed tasks
  • Keeping track of current running worker nodes
  • Bundling up the final response from responses received from worker nodes
  • Communicating the response to the client

Worker nodes are responsible for:

  • Handling a defined task(e.g. running a query over S3 files, or computing sum of primes over a small range)
  • Sending the result back to the coordinator nodes
  • Updating their status(number of current tasks running, available CPU, RAM etc.) with the coordinator node

The main goal of this episode is to introduce this architecture and to demonstrate the implementation details of it using our Sum of Primes example in K8s.

Worker

We are going to start with the worker node’s logic in our Sum of Primes project. In this project, a worker node should:

  • Announces itself to the coordinator
  • Serves sum of prime task sent from the coordinator

Announcing oneself to the coordinator

On the coordinator’s side, we should have an API hosted on a port which accepts incoming requests for registering a worker. For now we assume that this API exists and is running on port 80 of the server. We have to supply the coordinator’s address to the workers upon their creation as a command line argument or as an environment variable.

The API for registering a worker looks like this:

PUT /worker/register/{workerId}

Usually this sort of internally faced API is hosted on a specific and non-public port on the server. In our case, the API for a client to send requests is hosted on port 80 and the API for workers to register themselves is also hosted on port 80. Interested readers can explore how they can create two API on two different ports depending on their language or framework they choose.

We create a class called ServerStub which is responsible for abstracting away the details of communication with the coordinator like so:

The worker should implement a logic for periodically registering itself with the coordinator. We don’t want to do that too often though. Every 1 second is a reasonable choice. Spring framework has a declarative way of running a periodic task without having to deal with threads.

The worker, as can be seen in the above code, registers its IP address with the coordinator. This way the coordinator can use the worker’s IP address to send tasks to it. In the above code we also have an if condition which prevents this process from registering if the serverStub is still not available or if the worker is shutting down.
Note: Before sending the IP address we are replacing dots with dashes. This is because the local DNS address of containers in K8s looks like the following:

Serving the sum of prime requests on the worker side

Previously in the simple replicated version of the sum of prime numbers we exposed an API to the clients which directly computed the sum of primes. In this new architecture, that very API will be implemented on the worker. On the coordinator node, however, we have to tweak the implementation to divide up the requests into smaller ones and send them to the workers.

Here we focus on the worker’s logic:

This is pretty much what we have on the worker side. For simplicity, the worker listens on port 80. In K8s world, you can pretty much listen on any port you want without any limitations.

Now, let’s move on to the Coordinator side.

Coordinator

On the coordinator side, we want to focus on two end-points:

  • Worker registration: internal facing for workers to use.
  • Sum of primes: external facing for clients to use.

Master or coordinator node needs to talk to workers via the exposed REST API. To do so, similar to worker side, we create a stub for worker API as follows:

Coordinator talks to each of the workers using their associated stub. This abstracts away the details of communication with workers.

As we explained before, coordinators should expose an endpoint for workers to be able to register themselves. This is simply a REST PUT endpoint which accepts the worker’s IP address as its parameter. Upon receiving a request on this endpoint, the coordinator will store the worker’s IP address and start sending requests to it. The endpoint implementation looks like this in Spring:

In the register implementation, we have two major data structures for keeping track of workers. One is the workerLastUpdate which is a concurrent map of worker ids to a timestamp(milliseconds since epoch). The timestamp represents the last time we received a ping from the worker. This is to ensure that if we didn’t receive a ping after a certain amount of time, we will remove the worker from the registered pool.

The other important data structure is a blocking queue of worker stubs. For each active worker we create a worker stub and put it in a blocking queue. When we receive a request we grab a worker from the queue and send a request to it. Once we receive a response from a worker we put the worker back on the free list. This will ensure that we don’t overwhelm workers with requests if they are busy serving one already. We are not limited to sending one task to one worker. We can have a counter and max the number of concurrent tasks for each worker to 2, 3 or any other choice. However, for simplicity we are going to limit it to 1 for now.

We also added an extra utility API endpoint to get the current list of active workers from the coordinator’s standpoint. This is useful for debugging purposes.

To refresh the list of workers and eliminate the non-active ones, we should run a housekeeping process every so often. This housekeeping process can be run as a scheduled job every second on the coordinator and fix up the two data structures we presented above.

In the above code, we go over the worker’s last update map and remove any worker that’s not sent an update for more than 5 seconds. Alongside removing the worker from the map, we also remove it from the free worker queue.
It is possible that we send a request to a worker and the worker dies shortly after that. Hence on the coordinator side we need proper timeout and retry mechanisms to handle failure cases properly. That will be the topic of another episode.

Sum of primes endpoint implementation

The implementation of this endpoint is different from the one we saw in episode 1. Here we should divide up the request into smaller tasks and divide it between workers.

Coordinator splits the input range into smaller ranges(with no overlaps). The split size is 2,500,000. For example if the input is a = 10 and b = 3,700,000, then we will have two tasks:

Task 1: a = 10, b = 2,500,010
Task 2: a = 2,500,011, b = 3,700,000

These two tasks are then sent to free workers to compute. Coordinator then waits for the responses to come back from the workers, sum them up and send the final result back to the client.

The following code shows the implementation of this endpoint with relevant comments inline:

This sums up the important details of this new computation model for the sum of primes service.

Now, let’s see how to configure the service in Kubernetes.

Kubernetes Configuration

First, we need two different docker files. We could also use one docker file, but I found it easier to have two different ones for coordinator and workers.

The coordinator docker file looks like this:

The worker’s docker file is as follows:

As you can see, the coordinator’s address is passed to the worker container via an environment variable which we will set in the K8s config file.

StatefulSets

Since coordinator has a state, it is appropriate to use the StatefulSets K8s application deployment. StatefulSets offers stable naming and ordering guarantees. Many databases usually have primary vs secondary instances. Secondary instances can become primary when the primary instance is down. In these scenarios StatefulSets is a very useful concept.
In our simple example service we do not have secondary instances. However, we can still benefit from properties of a StatefulSets deployment. For example, the deterministic naming convention of a StatefulSets will become helpful in our application.

We only have a single coordinator and hence we use a StatefulSets with a single instance. The K8s deployment file looks like this:

In the above config we have allocated 1 CPU and 1GB of RAM to the coordinator. Ideally, we would want to allocate more resources to coordinator node. But in our simple service running on my personal computer, this is enough.

Workers’ deployment follow a simple replicated deployment model which is stateless. Important thing to notice here is how we pass the coordinator’s address to the worker.

We are addressing the coordinator by its name. K8s’ DNS will take care of resolving this name to the right pod as long as both workers and coordinator are in the same K8s’ namespace. Namespaces are basically free in K8s and you can create as many as you like.

Also note that we have allocated 1 CPU to each worker and initially there are 10 worker replicas.

Finally we should create a K8s’ service endpoint on top of our coordinator so that we can connect to it. In order to easily connect to this service, we opt for a LoadBalancer service type. This also enables us to possibly add more coordinators in the future for high availability(A future episode).

Running the Application

To run this application follow the instructions in the README file. We ran a load test at 15 QPS with the above configurations and here are the results:

Running at 15 QPS was not throttled at all and the results are actually pretty impressive and even better than the replicated model we saw in episode 2.

But the true power of this architecture is in handling big/large requests. In the code for this episode you will find a new batch of queries, the medium ones. The medium batch is on average twice as big as the small ones. We run the medium workload at 5 QPS against the replicated architecture with no coordinator and the one with coordinator and here are the results:

The simply replicated architecture:

And the result for the coordinator/workers architecture is:

While both of these are serving similar number of requests the coordinator/worker architecture has a much better response time. We truly see the power of Divide and Conquer in action.

In future episodes, we will talk about some improvements to this architecture. In particular we will see how we can extend the design to a multi-coordinator one for higher availability.

Links

--

--