Episode 8: The one with Asynchronous Distributed Systems(Part 2).

Concurrency and Distributed Systems: A guide with Kubernetes

--

In the previous part of this episode we looked at the basic concepts of asynchronous distributed systems. We saw the general architecture and towards the end we looked at the design of an asynchronous system for our Sum of Primes service. In this part, we will look at the detailed implementation of this new system. So, expect a lot of code! :)

The following figure shows how request handling is done in the server:

Sum of Primes Asynchronous System Architecture

Each server instance has an internal concurrent queue in which it stores the requests. As soon as the request is put into the queue, it is also put into the response store and a response is returned to the client immediately.

Response store, as mentioned in the previous part, can be a Redis store or MongoDB or a CouchDB. For no particular reason we chose (OK, I just wanted to learn) CouchDB as a response store. To abstract the choice, the server uses the following interface to talk to any response store:

Response Store Interface

And here is a CouchDB implementation of this interface using ektorp library:

CouchDB Implementation of the Response Store

The implementation class’s full name for the response store is passed to the server as an argument. For example:

This way, we are decoupling the response store implementation from the server’s code. Here is how the server is initialized and response store instance is created:

Server initialization

Server has a pool of threads running inside the executor service which grab work from the queue and compute the responses. They then update the response store with the right status and response when available. In our implementation, the queue will be hidden inside the executor service. As can be seen in the above code, we have instantiated a work stealing pool for the executor service.

API Implementation

Here is an example of the POST operation by which a client sends a request to the server.

Here is the implementation of the POST:

POST API Implementation

The GET API which is for polling responses are as follows:

The implementation of GET is fairly simple. It only needs to make a call to the response store:

GET API Implementation

Notice how we throw an exception in case the response is not found. This particular exception is handled by the Spring Framework and returned to the user as a NOT_FOUND response.

This concludes our super simple API. Next we will look at the K8s deployment.

Deployment in K8s

The components we need to deploy are:

  • Servers(Replicated, StatefulSet)
  • CouchDB(using Helm)

Server Setup

Since the servers carry some sort of state(the in-memory queue), we found it appropriate to use StatefulSet deployment. We can use a regular deployment, but with a StatefulSet deployment we could make one of the servers, for example the zeroth one, responsible for cleaning up the CouchDB. The cleanup involves removing all the responses which are past their TTL.

Dockerfile for server is fairly simple. You can see the repo for the code.

The K8s deployment for the server is as follows:

Important thing to notice is how we are passing the CouchDB related environment variables from the secret store to the container. Also we have a load balancer set up on top of the StatefulSet servers.

CouchDB Setup

To setup CouchDB inside K8s cluster do this:

CouchDB Setup Script

This will set up CouchDB servers with some dummy username and password which is good for the purpose of this guide. The username and password are stored in a K8s’ as a secret resource. That way, we can later reference those secrets from other K8s deployment YAML files and pass them to the server.

To deploy the servers, run this:

If everything goes well, we should have servers up and running at this point. Try to get the load balancer’s address:

Let’s grab the load balancer IP address and try to send a request to the server:

Given the _id, let’s get the status of the response:

LGTM! :)

Scale Testing

It would not be a good build if we don’t scale test it! Scale testing asynchronous systems is a bit more complicated and not as straightforward as synchronous ones. We cannot just measure how quickly one call to the POST method returns. We have to measure the latency of the whole flow, from POST to the final GET when we get the final result and the status is FINISHED. This flow is as follows:

As you can see in the above test flow, every request actually turns into at least two requests(most likely more than 3 in most cases). So, if we set the arrival rate to 15 requests per second, we are throwing almost 30 requests per second at the servers(We will in fact see this when we run artillery and see its report). This is an important consideration when we are comparing a synchronous system with its asynchronous counterpart. Keeping this consideration in mind, we will set the arrival rate to 10 for our test and given the hardware we have.

Another important point in the above test flow is how much we wait before poll. If we poll too often, servers will be overloaded with GET requests and their resources are consumed by responding to those as opposed to computing the results. On the other hand, if we poll not so often, that will hurt our latency and performance for each individual request. So, it is a fine line and we need to find it with experiment perhaps. In addition, it is essential to have a good throttling mechanism in place to guard against bad clients.

We are going to use some advanced features in artillery to do the scale testing. Read more about that here. Here is the artillery test config:

Artillery test configuration

Notice the flow in the Async sumPrime load test scenario. First it does a POST. It then captures the requestId from the response. Then it enters a loop where it sends GET requests to the server until it sees a FINISHED status. notFinished is a JS function which is defined as follows:

The JS function used in artillery test config.

To run the load test run the following command:

After load test is done, we see the following report:

Notice how the arrival rate says 30/sec, even though we asked for 10 requests per second. Overall, there is more network traffic when we are dealing with asynchronous systems. The performance numbers are not bad but they are not as good as the synchronous ones. Some of this is due to increased network traffic and queuing. As we mentioned before, an asynchronous system has its own advantage and disadvantages.

Summary

We went through implementation of an asynchronous distributed system for Sum of Primes service. We decided that we only need the response store from the three optional components of the distributed system.

We scale tested the system and realized how an asynchronous architecture would affect the performance.

We can further explore and expand this very system and use a queue service such as Kafka and test some faulty scenarios and observe the behavior of the asynchronous system. That will be a story for another episode. See you then.

--

--