Gossip Gloomers — A Detailed Walkthrough — Challenge #2

Aviral Jain
11 min readJul 19, 2024

--

Unique ID Generation

In this challenge, you’ll need to implement a globally-unique ID generation system that runs against Maelstrom’s unique-ids workload. Your service should be totally available, meaning that it can continue to operate even in the face of network partitions.

You can find the problem here

The problem specification

Understanding the problem

The problem is straightforward. We need to return a unique id (any type) for every RPC call (think of it as a GET request). The msg.Body[type] is equal to generate . We need to change msg.Body["type"]="generate_ok" and msg.Body["id"]=<unique-id> .

If this was a normal http server deployed on a single node deployment system. It would have been so easy. Simply maintain a counter value on the node. On each request, return the counter and increment it.

Let’s try that out:

cd gossip-gloomers
mkdir maelstrom-unique-ids && cd maelstrom-unique-ids
touch main.go
# setup your go module like the previous challenge
# cp /path/to/maelstrom/binary .

Add the simple boilerplate code to main.go :

package main

import (
"encoding/json"
"os"

maelstrom "github.com/jepsen-io/maelstrom/demo/go"
log "github.com/sirupsen/logrus"
)

var COUNTER = 0
var logger *log.Logger

func initLogger() {
logger = log.New()
logFile, _ := os.OpenFile("./maelstrom.log", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
logger.SetOutput(logFile)
}

func getCounter() int {
return COUNTER
}

func incrementCounter() {
COUNTER++
}

func main() {
initLogger()

n := maelstrom.NewNode()

n.Handle("generate", func(msg maelstrom.Message) error {
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}

body["id"] = getCounter()
incrementCounter()
body["type"] = "generate_ok"
n.Reply(msg, body)
return nil
})

if err := n.Run(); err != nil {
panic(err)
}
}

I have added a log file for the logger because maelstrom uses os.STDOUT and os.STDERR for mimicking http requests. Hence, if you log something directly on these, the entire binary would crash. So I use a log file for the same, and would recommend you to do the same

The code above looks simple and correct, right? If I run it for a single node, would it cause any issues? Seems simple enough. Lets run the following command:

./maelstrom test -w unique-ids --bin ~/go/bin/maelstrom-unique-ids --time-limit 10 --rate 100 --node-count 1

And, we get

Everything looks good! ヽ(‘ー`)ノ

Now lets add a new flag to the command: --concurrency 2n

This tells the maelstrom binary, that hey, please test my system for 2 concurrent requests on a node. That means, even though you have a single server, it would be serving 2 different requests concurrently in separate goroutines.

./maelstrom test -w unique-ids --bin ~/go/bin/maelstrom-unique-ids --time-limit 10 --rate 100 --node-count 1 --concurrency 2n

Aaaahhhhhh!

Analysis invalid! (ノಥ益ಥ)ノ ┻━┻

So, whats the problem?

For every request the handler in n.Handle() is called in a separate goroutine. You can think of goroutines as subprocesses running in parallel (even though, strictly they are not parallel. Concurrency vs Parallelism is a different topic in itself).

COUNTER is a global variable. 2 goroutines might be calling the getCounter() at the same time -> getting the same value. Then each goroutine increments the value (order doesn’t matter) -> so value is incremented by 2 -> then both goroutine return same value that they read.

So if we think of initial state of counter as 0.

  1. Goroutine 1 reads 0
  2. Goroutine 2 reads 0
  3. Both increment it by one (order doesn’t matter)
  4. The value of counter is now 2
  5. They both return 0 and 0.

This is a very common mistake that most developers do. When we code, we don’t think how our code would function if several goroutines are running. These types of problems are a little difficult to identify while coding and even harder to debug when you see unexpected errors.

To fix this, we would use mutex locks from the sync library in go. We define a mutex lock variable:

// keep it global for now
var lock = sync.Mutex{}

Then we tweak our handler a bit:

lock.Lock()
body["id"] = getCounter()
incrementCounter()
lock.Unlock()

When lock.Lock() is called by any goroutine, it ensures that when other goroutines call .Lock() on the same variable (here, lock) , they would wait until the first goroutine calls .Unlock()

Hence, now the first goroutine locks -> getCounter() -> incrementCounter() -> unlocks -> and then the second goroutine locks -> getCounter() and so on…

A simple insight, but makes you so much better than the rest of developers :)

Just a heads up, we have not even delved into Distributed Systems yet :)

Now coming to the actual problem (even bigger one) …

The first approach that often comes to mind is to create a global counter that is consistent across all the nodes. This is an extension to the solution we proposed in the single node system.

If we read the problem carefully it states 2 more requirements:

  1. System should have total availability i.e. the system should continue to work (serve requests) at all times.
  2. System might face network partitions, i.e. might face network failures. Communication between certain nodes can be halted or certain nodes might discontinue to work.

So, we want to create a system where:

  1. There is global counter that is consistent across all nodes.
  2. A totally available server.
  3. A server that might face network partitions.

If you try to think about a solution to this problem, you would realise that you can never achieve all the 3 things at the same time. We will realise this better when we go through all these challenges. This is exactly what the CAP Theorem says.

The CAP theorem says that a distributed system can deliver only two of three desired characteristics: consistency, availability and partition tolerance (the ‘C,’ ‘A’ and ‘P’ in CAP).

So, we will try to solve the problem for each combinations possible. We will go in the following order:

  1. Availability + Partition Tolerance (The actual problem)
  2. Availability + Consistency
  3. Consistency + Partition Tolerance

Availability + Partition Tolerance

Since there is no consistency possible, we can’t expect all nodes to maintain a consistent global counter. Let understand why this happens:

a. Partition Tolerance means that some nodes might not be able to communicate with each other at any given time, or there can be other network failures.

b. Total Availability gives the guarantee that the http request would be served by the distributed system at any given time (even during network failures in the system)

This means that each node should be capable of handling the request without communicating with the other nodes. This means that each node is guaranteed to generate a unique set of id which will never collide with any generated id of any other node.

There are several solutions to the problem:

Solution 1-A: One of the easiest one is to find the number of nodes in the cluster. You can do this using len(n.NodeIDS) . Let’s say this is 5. So the node with nodeID=1 will always return values 0,5,10…, and with nodeId=2 will always return value 1,6,11 and so on. You can find the id of a node using n.ID(). Personally, I don’t find this as very good solution even though it will pass the tests. It is not well suited to the situation when the cluster size is dynamic, although it can be handled. You can find the solution here.

Solution 1-B: Another simple solution would be to simply assign the id value as time.Now().String() value as the id. This would work since goroutines don’t exactly run in parallel. This would pass the tests (mentioned in the challenge), but this starts failing if you increase the rate from 1000->1000000. This happens because there is a possibility of 2 commands being running at exactly the same time across 2 nodes (especially when request rate is high). This can be mitigated using the nodeID. We can simply append the nodeID to the timestamp string and then all the returns ids would be unique. You can find the solution here.

There can be more solutions to this problem, but the objective here is to give you an idea. Here, a single node can independently serve the request without collaborating with the other nodes.

Availability + Consistency

Here, we can assume the system to have no network failures. That means, all nodes can communicate to each other at all times. They should be able to maintain a consistent global counter.

Generally, to maintain a consistent state of a variable in a distributed network, we select a master node which is responsible to manage the value of the counter. It has 2 responsibilities:

  1. When the value of counter is to be increment, any node will send the increment request to master node. Then, master node accepts or rejects this request using a consensus protocol.
  2. It returns the current value of the counter.
  3. You might create an independent service like a distributed database or key value store which provides this functionality.

Hence, this shows that for consistency to be maintained, all services should be able to communicate to the master node and vice versa. Hence, a consistent system can be totally available only when no network partitions are present.

You can view a simple solution using maelstrom.LinkKV (maelstrom’s inbuilt Key-Value store service) here:

We will also try implementing the master node algorithm (n0 is considered master in our examples). For simplicity, we won’t use any consensus algorithm. Rather, we will take a pre-conditition value. So rather than implementing the Read and Write functions for counter, we would implement the Read and CAS (Compare and swap function).

CAS functions are an important part of distributed systems. The idea is simple:

Write(key, newValue)

CAS(key, fromValue, newValue)

They accept another argument fromValue , the key value is only updated if fromValue provided matches the existing keyValue. This verifies that the user only updates the expected state. If the value of key is updated before by another process (node or goroutine), the write operation should not continue.

Let’s understand the problem with simple Write operation in more detail:

func getCounter(n *maelstrom.Node) (int, error) {
lock.Lock()
defer lock.Unlock()
if n.ID() == "n0" {
return COUNTER, nil
}

resp, err := n.SyncRPC(context.TODO(), "n0", map[string]any{
"type": "get_counter",
})
if err != nil {
return 0, err
}

var body map[string]any
if err := json.Unmarshal(resp.Body, &body); err != nil {
return 0, err
}
return int(body["id"].(float64)), nil
}

func writeCounter(n *maelstrom.Node, newValue int) error {
lock.Lock()
defer lock.Unlock()
if n.ID() == "n0" {
COUNTER = newValue
return nil
}

if err := n.Send("n0", map[string]any{
"type": "write_counter",
"id": newValue,
}); err != nil {
return err
}
return nil
}
n.Handle("generate", func(msg maelstrom.Message) error {
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}

newID, err := getCounter(n)
if err != nil {
logger.Errorf("error getting counter: %v", err)
return err
}
if err := writeCounter(n, newID+1); err != nil {
logger.Errorf("error writing counter: %v", err)
return err
}
body["id"] = newID
body["type"] = "generate_ok"
n.Reply(msg, body)
return nil
})

n.Handle("get_counter", func(msg maelstrom.Message) error {
body := map[string]any{
"id": COUNTER,
}
n.Reply(msg, body)
return nil
})

n.Handle("write_counter", func(msg maelstrom.Message) error {
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}

COUNTER = int(body["id"].(float64))
return nil
})

The problem:

Let’s say 2 nodes — n1 and n2 simultaneously read the counter value as 0. Then, they both make a write request for 0+1 = 1 . Hence, the value of counter is now updated to 1, and then both the nodes return the id as 0.

This is a common problem faced in distributed systems, while using key/value stores. To update a value based on existing value:

  1. We read the value of key as existingVal
  2. Calculate the value newVal based on existingVal (here adding 1)
  3. Update the value of key to newVal

However, another process (say x) in the distributed system, might change the value of key between (1) and (3), hence when (3) occurs, it invalidates the change caused by x causing inconsistencies.

Changing to CAS:

func CASCounter(n *maelstrom.Node, existing, newValue int) error {
lock.Lock()
defer lock.Unlock()
if n.ID() == "n0" {
COUNTER = newValue
return nil
}

ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*1)
resp, err := n.SyncRPC(ctx, "n0", map[string]any{
"type": "cas_counter",
"id": newValue,
"existing": existing,
})
if err != nil {
return err
}
if resp.Type() == "cas_counter_failed" {
return maelstrom.NewRPCError(maelstrom.PreconditionFailed, "precondition failed")
}
return nil
}
n.Handle("generate", func(msg maelstrom.Message) error {
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}

counterValue, err := getCounter(n)
if err != nil {
logger.Errorf("error getting counter: %v", err)
return err
}
for {
if err := CASCounter(n, counterValue, counterValue+1); err != nil {
logger.Errorf("error cas counter: %v", err)
counterValue++
continue
} else {
break
}
}
body["id"] = counterValue + 1
body["type"] = "generate_ok"
n.Reply(msg, body)
return nil
})

n.Handle("cas_counter", func(msg maelstrom.Message) error {
var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}

existingValue := int(body["existing"].(float64))
if COUNTER != existingValue {
body["type"] = "cas_counter_failed"
n.Reply(msg, body)
return maelstrom.NewRPCError(maelstrom.PreconditionFailed, "precondition failed")
}
COUNTER = int(body["id"].(float64))
return nil
})

CAS solves the above problem. Here, when the user tries to change the value of key from existingVal to newVal , he specifies that first compare and then swap.

Looking at the previous process:

  1. We read the value of key as existingVal
  2. Calculate the value newVal based on existingVal (here adding 1)
  3. Update the value of key to newVal only if currentVal is equal to existingVal

This makes sure that the variable state does not change between (1) and (3), and if it does, it invalidates the process.

So if n1 and n2 try to update the counter value from 0->1 concurrently, they would make the CAS request 0->1. Let’s say n2 updates the value of counter first (master nodes has locks to serve a request at a time). Then value of counter changes to 1 . Now n1 ‘s CAS request is served after this, and it fails since the existing value is not 0 anymore. So, n1 retries with a new CAS request 1->2.

So all the nodes keep retrying the CAS requests with incrementing value until successful. This ensures that each node gets a unique id

You can find the solution here.

This is totally available system with some latencies (retries until the CAS request passes). You might see the maelstrom shows it not a totally available system due request timeout.

We can reduce the number of CAS failures by using sparse ids (using rand.Int()) instead of using incremental counter since it has more collision between nodes

Consistency + Partition Tolerance

When network partitions are introduced into the system, some nodes in the above system might not be able to communicate to n0. Hence, they would not be able to serve their requests. Hence, the above system will become a non-available system.

All the nodes that can communicate to n0 and n0 itself would continue to serve the requests. So the system is partition tolerant.

Also, the system’s consistency is maintained.

Now, can you gain more conviction on why the CAP Theorem stands?

If you read till this point, I hope you got a deep understanding of the challenges present in a Distributed System. The objective of this writeup wasn’t to just give you the solution to pass the tests, but to take you through multiple approaches and limitation of each of them.

We’ll discuss the Broadcast service in the next article. You can check it out here

You can find the final solutions here:

Totally Available with Partition Tolerance: Solution1-A | Solution1-B

Consistency with Availability without Partition Tolerance (additional solution): Solution2

Until Next Time

Burnerlee

--

--