Gossip Gloomers — A Detailed Walkthrough — Challenge #2
Unique ID Generation
In this challenge, you’ll need to implement a globally-unique ID generation system that runs against Maelstrom’s unique-ids
workload. Your service should be totally available, meaning that it can continue to operate even in the face of network partitions.
You can find the problem here
Understanding the problem
The problem is straightforward. We need to return a unique id (any type) for every RPC call (think of it as a GET request). The msg.Body[type]
is equal to generate
. We need to change msg.Body["type"]="generate_ok"
and msg.Body["id"]=<unique-id>
.
If this was a normal http server deployed on a single node deployment system. It would have been so easy. Simply maintain a counter value on the node. On each request, return the counter and increment it.
Let’s try that out:
cd gossip-gloomers
mkdir maelstrom-unique-ids && cd maelstrom-unique-ids
touch main.go
# setup your go module like the previous challenge
# cp /path/to/maelstrom/binary .
Add the simple boilerplate code to main.go
:
package main
import (
"encoding/json"
"os"
maelstrom "github.com/jepsen-io/maelstrom/demo/go"
log "github.com/sirupsen/logrus"
)
var COUNTER = 0
var logger *log.Logger
func initLogger() {
logger = log.New()
logFile, _ := os.OpenFile("./maelstrom.log", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
logger.SetOutput(logFile)
}
func getCounter() int {
return COUNTER
}
func incrementCounter() {
COUNTER++
}
func main() {
initLogger()
n := maelstrom.NewNode()
n.Handle("generate", func(msg maelstrom.Message) error {
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}
body["id"] = getCounter()
incrementCounter()
body["type"] = "generate_ok"
n.Reply(msg, body)
return nil
})
if err := n.Run(); err != nil {
panic(err)
}
}
I have added a log file for the logger because maelstrom uses os.STDOUT and os.STDERR for mimicking http requests. Hence, if you log something directly on these, the entire binary would crash. So I use a log file for the same, and would recommend you to do the same
The code above looks simple and correct, right? If I run it for a single node, would it cause any issues? Seems simple enough. Lets run the following command:
./maelstrom test -w unique-ids --bin ~/go/bin/maelstrom-unique-ids --time-limit 10 --rate 100 --node-count 1
And, we get
Everything looks good! ヽ(‘ー`)ノ
Now lets add a new flag to the command: --concurrency 2n
This tells the maelstrom binary, that hey, please test my system for 2 concurrent requests on a node. That means, even though you have a single server, it would be serving 2 different requests concurrently in separate goroutines.
./maelstrom test -w unique-ids --bin ~/go/bin/maelstrom-unique-ids --time-limit 10 --rate 100 --node-count 1 --concurrency 2n
Aaaahhhhhh!
Analysis invalid! (ノಥ益ಥ)ノ ┻━┻
So, whats the problem?
For every request the handler in n.Handle()
is called in a separate goroutine. You can think of goroutines as subprocesses running in parallel (even though, strictly they are not parallel. Concurrency vs Parallelism is a different topic in itself).
COUNTER
is a global variable. 2 goroutines might be calling the getCounter()
at the same time -> getting the same value. Then each goroutine increments the value (order doesn’t matter) -> so value is incremented by 2 -> then both goroutine return same value that they read.
So if we think of initial state of counter as 0.
- Goroutine 1 reads 0
- Goroutine 2 reads 0
- Both increment it by one (order doesn’t matter)
- The value of counter is now 2
- They both return 0 and 0.
This is a very common mistake that most developers do. When we code, we don’t think how our code would function if several goroutines are running. These types of problems are a little difficult to identify while coding and even harder to debug when you see unexpected errors.
To fix this, we would use mutex locks from the sync
library in go. We define a mutex lock variable:
// keep it global for now
var lock = sync.Mutex{}
Then we tweak our handler a bit:
lock.Lock()
body["id"] = getCounter()
incrementCounter()
lock.Unlock()
When lock.Lock()
is called by any goroutine, it ensures that when other goroutines call .Lock()
on the same variable (here, lock)
, they would wait until the first goroutine calls .Unlock()
Hence, now the first goroutine locks -> getCounter() -> incrementCounter() -> unlocks -> and then the second goroutine locks -> getCounter() and so on…
A simple insight, but makes you so much better than the rest of developers :)
Just a heads up, we have not even delved into Distributed Systems yet :)
Now coming to the actual problem (even bigger one) …
The first approach that often comes to mind is to create a global counter that is consistent across all the nodes. This is an extension to the solution we proposed in the single node system.
If we read the problem carefully it states 2 more requirements:
- System should have total availability i.e. the system should continue to work (serve requests) at all times.
- System might face network partitions, i.e. might face network failures. Communication between certain nodes can be halted or certain nodes might discontinue to work.
So, we want to create a system where:
- There is global counter that is consistent across all nodes.
- A totally available server.
- A server that might face network partitions.
If you try to think about a solution to this problem, you would realise that you can never achieve all the 3 things at the same time. We will realise this better when we go through all these challenges. This is exactly what the CAP Theorem says.
The CAP theorem says that a distributed system can deliver only two of three desired characteristics: consistency, availability and partition tolerance (the ‘C,’ ‘A’ and ‘P’ in CAP).
So, we will try to solve the problem for each combinations possible. We will go in the following order:
- Availability + Partition Tolerance (The actual problem)
- Availability + Consistency
- Consistency + Partition Tolerance
Availability + Partition Tolerance
Since there is no consistency possible, we can’t expect all nodes to maintain a consistent global counter. Let understand why this happens:
a. Partition Tolerance means that some nodes might not be able to communicate with each other at any given time, or there can be other network failures.
b. Total Availability gives the guarantee that the http request would be served by the distributed system at any given time (even during network failures in the system)
This means that each node should be capable of handling the request without communicating with the other nodes. This means that each node is guaranteed to generate a unique set of id which will never collide with any generated id of any other node.
There are several solutions to the problem:
Solution 1-A: One of the easiest one is to find the number of nodes in the cluster. You can do this using len(n.NodeIDS)
. Let’s say this is 5. So the node with nodeID=1 will always return values 0,5,10…, and with nodeId=2 will always return value 1,6,11 and so on. You can find the id of a node using n.ID()
. Personally, I don’t find this as very good solution even though it will pass the tests. It is not well suited to the situation when the cluster size is dynamic, although it can be handled. You can find the solution here.
Solution 1-B: Another simple solution would be to simply assign the id value as time.Now().String()
value as the id. This would work since goroutines don’t exactly run in parallel. This would pass the tests (mentioned in the challenge), but this starts failing if you increase the rate from 1000->1000000. This happens because there is a possibility of 2 commands being running at exactly the same time across 2 nodes (especially when request rate is high). This can be mitigated using the nodeID. We can simply append the nodeID to the timestamp string and then all the returns ids would be unique. You can find the solution here.
There can be more solutions to this problem, but the objective here is to give you an idea. Here, a single node can independently serve the request without collaborating with the other nodes.
Availability + Consistency
Here, we can assume the system to have no network failures. That means, all nodes can communicate to each other at all times. They should be able to maintain a consistent global counter.
Generally, to maintain a consistent state of a variable in a distributed network, we select a master node which is responsible to manage the value of the counter. It has 2 responsibilities:
- When the value of counter is to be increment, any node will send the increment request to master node. Then, master node accepts or rejects this request using a consensus protocol.
- It returns the current value of the counter.
- You might create an independent service like a distributed database or key value store which provides this functionality.
Hence, this shows that for consistency to be maintained, all services should be able to communicate to the master node and vice versa. Hence, a consistent system can be totally available only when no network partitions are present.
You can view a simple solution using maelstrom.LinkKV
(maelstrom’s inbuilt Key-Value store service) here:
We will also try implementing the master node algorithm (n0
is considered master in our examples). For simplicity, we won’t use any consensus algorithm. Rather, we will take a pre-conditition value. So rather than implementing the Read
and Write
functions for counter, we would implement the Read
and CAS
(Compare and swap function).
CAS functions are an important part of distributed systems. The idea is simple:
Write(key, newValue)
CAS(key, fromValue, newValue)
They accept another argument fromValue
, the key value is only updated if fromValue
provided matches the existing keyValue. This verifies that the user only updates the expected state. If the value of key is updated before by another process (node or goroutine), the write operation should not continue.
Let’s understand the problem with simple Write
operation in more detail:
func getCounter(n *maelstrom.Node) (int, error) {
lock.Lock()
defer lock.Unlock()
if n.ID() == "n0" {
return COUNTER, nil
}
resp, err := n.SyncRPC(context.TODO(), "n0", map[string]any{
"type": "get_counter",
})
if err != nil {
return 0, err
}
var body map[string]any
if err := json.Unmarshal(resp.Body, &body); err != nil {
return 0, err
}
return int(body["id"].(float64)), nil
}
func writeCounter(n *maelstrom.Node, newValue int) error {
lock.Lock()
defer lock.Unlock()
if n.ID() == "n0" {
COUNTER = newValue
return nil
}
if err := n.Send("n0", map[string]any{
"type": "write_counter",
"id": newValue,
}); err != nil {
return err
}
return nil
}
n.Handle("generate", func(msg maelstrom.Message) error {
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}
newID, err := getCounter(n)
if err != nil {
logger.Errorf("error getting counter: %v", err)
return err
}
if err := writeCounter(n, newID+1); err != nil {
logger.Errorf("error writing counter: %v", err)
return err
}
body["id"] = newID
body["type"] = "generate_ok"
n.Reply(msg, body)
return nil
})
n.Handle("get_counter", func(msg maelstrom.Message) error {
body := map[string]any{
"id": COUNTER,
}
n.Reply(msg, body)
return nil
})
n.Handle("write_counter", func(msg maelstrom.Message) error {
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}
COUNTER = int(body["id"].(float64))
return nil
})
The problem:
Let’s say 2 nodes — n1
and n2
simultaneously read the counter value as 0. Then, they both make a write request for 0+1 = 1
. Hence, the value of counter is now updated to 1, and then both the nodes return the id as 0.
This is a common problem faced in distributed systems, while using key/value stores. To update a value based on existing value:
- We read the value of
key
asexistingVal
- Calculate the value
newVal
based onexistingVal
(here adding 1) - Update the value of
key
tonewVal
However, another process (say x
) in the distributed system, might change the value of key
between (1) and (3), hence when (3) occurs, it invalidates the change caused by x
causing inconsistencies.
Changing to CAS:
func CASCounter(n *maelstrom.Node, existing, newValue int) error {
lock.Lock()
defer lock.Unlock()
if n.ID() == "n0" {
COUNTER = newValue
return nil
}
ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*1)
resp, err := n.SyncRPC(ctx, "n0", map[string]any{
"type": "cas_counter",
"id": newValue,
"existing": existing,
})
if err != nil {
return err
}
if resp.Type() == "cas_counter_failed" {
return maelstrom.NewRPCError(maelstrom.PreconditionFailed, "precondition failed")
}
return nil
}
n.Handle("generate", func(msg maelstrom.Message) error {
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}
counterValue, err := getCounter(n)
if err != nil {
logger.Errorf("error getting counter: %v", err)
return err
}
for {
if err := CASCounter(n, counterValue, counterValue+1); err != nil {
logger.Errorf("error cas counter: %v", err)
counterValue++
continue
} else {
break
}
}
body["id"] = counterValue + 1
body["type"] = "generate_ok"
n.Reply(msg, body)
return nil
})
n.Handle("cas_counter", func(msg maelstrom.Message) error {
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}
existingValue := int(body["existing"].(float64))
if COUNTER != existingValue {
body["type"] = "cas_counter_failed"
n.Reply(msg, body)
return maelstrom.NewRPCError(maelstrom.PreconditionFailed, "precondition failed")
}
COUNTER = int(body["id"].(float64))
return nil
})
CAS solves the above problem. Here, when the user tries to change the value of key
from existingVal
to newVal
, he specifies that first compare and then swap.
Looking at the previous process:
- We read the value of
key
asexistingVal
- Calculate the value
newVal
based onexistingVal
(here adding 1) - Update the value of
key
tonewVal
only if currentVal is equal toexistingVal
This makes sure that the variable state does not change between (1) and (3), and if it does, it invalidates the process.
So if n1
and n2
try to update the counter value from 0->1 concurrently, they would make the CAS request 0->1. Let’s say n2
updates the value of counter first (master nodes has locks to serve a request at a time). Then value of counter changes to 1
. Now n1
‘s CAS request is served after this, and it fails since the existing value is not 0 anymore. So, n1
retries with a new CAS request 1->2.
So all the nodes keep retrying the CAS requests with incrementing value until successful. This ensures that each node gets a unique id
You can find the solution here.
This is totally available system with some latencies (retries until the CAS request passes). You might see the maelstrom shows it not a totally available system due request timeout.
We can reduce the number of CAS failures by using sparse ids (using rand.Int()) instead of using incremental counter since it has more collision between nodes
Consistency + Partition Tolerance
When network partitions are introduced into the system, some nodes in the above system might not be able to communicate to n0
. Hence, they would not be able to serve their requests. Hence, the above system will become a non-available
system.
All the nodes that can communicate to n0
and n0
itself would continue to serve the requests. So the system is partition tolerant.
Also, the system’s consistency is maintained.
Now, can you gain more conviction on why the CAP Theorem stands?
If you read till this point, I hope you got a deep understanding of the challenges present in a Distributed System. The objective of this writeup wasn’t to just give you the solution to pass the tests, but to take you through multiple approaches and limitation of each of them.
We’ll discuss the Broadcast service in the next article. You can check it out here
You can find the final solutions here:
Totally Available with Partition Tolerance: Solution1-A | Solution1-B
Consistency with Availability without Partition Tolerance (additional solution): Solution2
Until Next Time
Burnerlee