Fault Tolerance in Asynchronous, Choreographed, Distributed System
It’s a mouthful, I know!
I am building an asynchronous distributed system that serves as a backend for a mobile app. The system does a lot of processing and communicates with couple of external components. From the start, I decided to design the system as a choreography of micro-services. But other than this basic constraint, I decided on iterative approach to the design. Starting with something simple and evolving it over time…
In this article, I will walk you through couple of challenges I had designing the system and how I choose to solve it. To me, it was interesting to see how the system started coming together and how patterns started to emerge. Because I designed the system from scratch adding layers of complexity over time, I feel I now have better grasp of some of the concepts and patterns in distributed computing and I would like to share that experience with you. This is a first article that focuses on system resiliency but I plan on writing several more, covering various aspects of the whole system.
Before we get started, let me briefly cover the difference between choreography and orchestration:
Choreography vs Orchestration
Service orchestration represents a single centralized executable business process (the orchestrator) that coordinates the interaction among different services. The orchestrator is responsible for invoking and combining the services.
Service choreography is a global description of the participating services, which is defined by exchange of messages, rules of interaction and agreements between two or more endpoints. Choreography employs a decentralized approach for service composition.
The problem arose when I decided to switch out an external API to my own as a cost saving measure and to better cater to my use case. Creating the new API was a challenge of it’s own (and well worth a separate article — if you ever wonder when you are going to use all those fancy algorithms they taught you in your Computer Science class, you will want to read this!) but in this article I want to focus on a different, but nevertheless equally interesting problem.
This new API is a self contained system (set of two micro-services) that provides an interface for the main distributed pipeline to query new processing and receive results. Processing itself is a very resource intensive task — single request requires several GBs of RAM and hefty amount of CPU time.
In the first “MVP” version of the system I did not care much about the resource consumption which quickly led to the service crashing when several requests were being processed at the same time.
Most obvious measure — that I chose not to implement at this time — is to scale out. You know: stick a load balancer in front, and scale (automatically or manually) the number of machines that run the service. My service is stateless, so there would be no need to implement session affinity.
But I chose not to implement (auto-)scaling primarily to save cost. This subsystem is expensive as it is with a single node.
There were additional constrains on what the system needed to support so it ended up requiring a custom Docker image running on Ubuntu distro — and Infrastructure-as-a-Service is never a cheap option. At this phase in the project, we don’t need to scale.
First measure I took to remedy the problem was to introduce and internal request queue and throttling mechanism. It uses a notion of “processing tokens” — essentially a number that represents number of requests that can process in parallel on the system and this number is configurable. If I decide to scale up my machine, I can grant more tokens, if I scale down, I can decrease the number of tokens.
Ok, so now that we introduced the internal throttling that self-regulates the system and prevents it from running out of memory we have introduced, or rather accentuated, another problem. Our processing pipeline is making synchronous requests to this API and waiting for a response. With the queue now in picture, request might queue up until processed. Now, it becomes pretty typical for a request to be processed past the original request timeout period. Making it seemingly fail to the calling system. And even though we provide successful response eventually, the calling system already moved on handling the response as failed (most likely retrying, further increasing load on the API).
The logical next step was to switch to asynchronous invocation of this API.
Asynchronicity is often advocated as the best default in distributed systems as it provides de-coupling, because any message can be sent independently of the availability of the receiver. The message will get delivered as soon as the service provider becomes available.
In my case, I don’t have to worry about time limit for the overall transaction. Of course, I still care deeply about the transaction being processed as soon as possible, but there is no inherent problem with delay caused by a timeout and retry of an operation within the system.
To implement asynchronous processing, I introduced a set of parameters that allow the calling system to provide a messaging queue URI (with short-lived access token for increased security) — queue, and set of arbitrary parameters that the API will just repeat back to the queue along with the result of it’s operation when it’s ready — queueMetadata.
This way, we have created an async invocation that will resume processing in the main pipeline when result becomes available by sending the result to a queue instead of responding to synchronous request. More importantly, we avoided creating tight coupling between the components. API has no knowledge of the required message structure or the calling system. The API “knows” how to put message in a queue but what the queue is and what other data need to be put there along with the result of it’s own operation are completely unknown to the API.
Good solution, let’s call it a day you might think. But you would be wrong. We still haven’t fully solved the problem. Because now, we don’t have a good way to handle failures in this asynchronous transaction.
In my naive approach, I return an error property to the queue. The calling system knows to check for errors. But only thing that the system can do on error is fail. There is no mechanism to retry an operation. Even though we are dealing with errors that are often transient in nature (internal socket timeouts, connection resets, …). Overall, there were about two dozen different error cases in the API, some of them transient, others not.
To recap, here’s the situation:
- We have a micro-service A invoking the API
- API returns result to a queue
- Micro-service B picks up the result for further processing
- Micro-service B sees an error in the message but cannot do anything but fail, it does not have any knowledge of micro-service A, or the API in between
I considered couple of solutions:
- Introduce tight coupling between B and A and B could invoke A again to retry the operation
- Similarly, we can introduce coupling between B and the API. B can retry calling the API but it would have to know what are the parameters to invoke the API
Side note: In both cases, we need idempotency to make the operation repeatable. That is something built into most of my micro-services that make up the processing pipeline. However, there are few checkpoints where I either assume or explicitly check a state of the overall transaction (file exists, DB has been updated, …). Because of this both these approaches would be quite hard to implement without making expensive changes to the system.
Since we do not want to introduce any coupling between services and/or the APIs, I decided to go with a different technique and introduce a retryToken. With retryToken, a service can retry the operation without any need to “understand” the payload required to invoke the API or what the called API even is.
As a prerequisite, I first modified the API to support receiving the request payload either in a HTTP request body or in query string. Receiving JSON data in body was how the system operated until then, and to support the query string method I added new parameter called “payload” that contains base64 encoded JSON object identical to what would be in the body of the message.
Now that I had that, I modified API to include “retryToken” in it’s erroneous response. RetryToken is a full URI of the API endpoint with payload query parameter containing the original payload used to invoke the API as well as one additional parameter “retryCount”. retryCount is the current retry number. Internally, API has configurable parameter to set how many retries are supported. After the limit is reached, API will no longer return retryToken in it’s response, preventing the external system from retrying the request.
With retryToken in place, the flow is following:
- Micro-service A invokes API
- API returns result to a queue
- Micro-service B picks up the result for further processing
- Micro-service B checks for errors in the message
- B validates retryToken and if valid, issues HTTP request to retryToken value, effectively repeating step 1
Micro-service B now needs to understand how make API calls. Arguably, that is a slight violation of single responsibility principle. But other than ability to issue HTTP requests it has no coupling to the system A or to the API.
But security is another important aspect of such design. Without any security in place, message queue that feeds the system B could be compromised and the attacker could provide arbitrary retryTokens to DoS a 3rd party system, etc.
Let’s provide two new query parameters — validUntil and sig (stands for “signature”).
First, we need to establish trust between the two services. The way I am doing it here is with a shared secret- a key that stored securely in Key Vault and accessible by all participating services. When the API creates it’s reply token on error, it set a validUntil date (next two minutes) and it signs the the token using the key.
When service B receives new message in the queue, it first checks if there is an error. If there is, it will check if there is a retryToken. Token may not be there if the error is not perceived to be transient by the source of if the internal retry count was exceeded and we should stop trying to process the request. In my case I stop processing and put the message in “quarantine” queue to be investigated by human operator (this principle is often called a Circuit breaker).
If, however, there is a retryToken, micro-service B uses the same signing algorithm and a shared key to create a signature that we can compare to the signature provided in the token. If signatures match, we know that the token hasn’t been tampered with (validUntil, payload content, …) and that the request comes from a service that we trust.
At this point, we can comfortably issue a HTTP request with the retryToken to see if we can continue processing the transaction.
With all this in place, I finally have a resilient distributed system. It can recover from transient errors and fail fast if errors are permanent. It is asynchronous, so it can deal with long running processing operations. And last, but not least it is loosely coupled. And I can replace individual components without affecting other parts of the system.