Gossip Gloomers — A Detailed Walkthrough — Challenge #3b and #3c

Aviral Jain
8 min readAug 3, 2024

--

In the last post of this article, we understood how to implement a single node broadcast system. You can checkout the article here.

Now let’s extend the solution to a multi-node broadcast system, shall we?

To better understand the article and setup, I would recommend you to skim through the solution to 3a, the link to that article is here.

Tldr; You can find the solutions here:
Solution #3b | Solution #3c

Understanding Topology

Let’s start with implementing the topology RPC. In a single node system, the topology didn’t matter. But here, it is a good idea to save the information about neighbouring nodes of any given node. Let us look at a sample topology.

A sample topology

The topology simply means — how the nodes in a distributed system are located. Even though in the challenges, any 2 nodes can communicate with each other (for example n1 can communicate with n5 directly), but n2, n7 and n6 are neighbouring nodes for n1, and its best if n1 communicates to these nodes only.

We can assume (for the scope of these challenges) that all nodes are connected as a single system and don’t form isolated groups like the one in example below.

A distributed system with 2 isolated networks of nodes

Implementing the Topology RPC

We will implement a neighbourStore for storing information about the neighbours of a node in an array. We create a new file neighbours.go :

package main

import "sync"

type nodeNeighbours struct {
neighbours map[string]bool
lock sync.Mutex
}

func (n *nodeNeighbours) addNeighbour(neighbour string) {
n.lock.Lock()
defer n.lock.Unlock()

n.neighbours[neighbour] = true
}

func (n *nodeNeighbours) getNeighbours() []string {
n.lock.Lock()
defer n.lock.Unlock()

var neighbours []string
for neighbour := range n.neighbours {
neighbours = append(neighbours, neighbour)
}

return neighbours
}

Note the use of mutex locks in the above implementation. These help us maintain concurrency on a single node.

Now implementing the topology handler:

The topology info is provided in the following format:

{
"topology": {
"n1": ["n2", "n3"],
"n2": ["n1"],
"n3": ["n1"]
}
}

At the start of the testing, the maelstrom system makes a topology RPC call to each node. Let’s say the call is made to n2 , then n2 finds its neighbours using topology["n2"] which is equal to ["n1"] , and saves this information with itself. Similarly, n1stores n2 and n3 , and n3 stores n1 .

The topology for the above system looks like: We will call this System A

Topology system for example provided above

The following code implements this logic:

n.Handle("topology", func(msg maelstrom.Message) error {
var body map[string]any

if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}

topologyInterface := body["topology"].(map[string]interface{})

topologyForNode := topologyInterface[n.ID()].([]interface{})

for _, neighbour := range topologyForNode {
neighbourStore.addNeighbour(neighbour.(string))
}

body = map[string]any{
"type": "topology_ok",
}
n.Reply(msg, body)

return nil
})

Extending the Broadcast RPC

Now, that each node knows about its neighbours, we can extend the broadcast rpc to multi-node system.

The logic remains similar: read the message from request -> save this message in store -> broadcast this message to other nodes (new step) -> return broadcast_ok response.

Since all the nodes form a single connected network, this strategy ensures that if a message arrives on any node, it is communicated to all the other nodes.

Let me write the code down:

 n.Handle("broadcast", func(msg maelstrom.Message) error {
var body map[string]any

if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}

message := body["message"].(float64)
logger.Infof("Received message %f", message)
store.addMessage(message)

// broadcasting the message to other nodes
for _, neighbour := range neighbourStore.getNeighbours() {
body["type"] = "broadcast"
body["message"] = message
n.Send(neighbour, body)
}
// broadcast ends

body["type"] = "broadcast_ok"
delete(body, "message")

n.Reply(msg, body)
return nil
})

Does this look good? Take a pause, and give a minute to find any issues with this implementation.

Think about how this would play out for SystemA.

Say n1 receives message = 1:

  • it saves the message with itself.
  • it broacasts the message to n2 and n3
  • n2 receives the message -> saves the message with itself -> broadcasts the message to its neighbour n1
  • n3 receives the message -> saves the message with itself -> broadcasts the message to its neighbour n1
  • and then n1 repeats from step1.

This would be a never ending cycle, and the reason is simple. If A is a neighbour of B, B is also a neighbour of A. So, if we send a message A->B, B should not broadcast the message back to A, but only to other neighbours.

Thus, we need to avoid broadcasting a message back to the sender i.e. skip sending the message to a neighbour, if the neighbour == sender.

We handle this case by tweaking the code a bit:

for _, neighbour := range neighbourStore.getNeighbours() {
if neighbour == msg.Src {
// skip sending back to the sender
continue
}

body["type"] = "broadcast"
body["message"] = message
n.Send(neighbour, body)
}

There might be another issue:

A 4 node cyclic cluster

Consider a message received on n1 in the above cluster. It broadcasts messages to n2 and n3 . Then n2 sends it to n4 , n3 sends to n4 .

  • Now n4 forwards the message received from n3 to n2
  • And n4 also forwards the message received from n2 to n3

Do you see the problem here? Then, n2 sends to n1 and n3 also sends to n1 and the cycle goes on. We are stuck in a cycle!

Therefore, it is not sufficient to restrict broadcasting the message to only the sender. When we broadcast a message, and it is received on a node, the node must check if it has already seen this message. If yes, this means the node already has this message stored and also had broadcasted the message to its neighbours when it saw it the first time. Hence, we can simply ignore the message in this case.

Hence, we need to add this logic to broadcast.go before the store.AddMessage() call:

alreadyExists := store.checkMessage(message)
if alreadyExists {
logger.Infof("Message %f already exists", message)
body["type"] = "broadcast_ok"
delete(body, "message")
n.Reply(msg, body)
return nil
}

The Read RPC

The read RPC is simple — return the values present in the store of a node. Since, we already ensured that all values are present in a node at any given time, we can simply read the values from the store and return them, like in the previous solution.

 n.Handle("read", func(msg maelstrom.Message) error {
messages := store.getMessages()

var body map[string]any
if err := json.Unmarshal(msg.Body, &body); err != nil {
return err
}

body["type"] = "read_ok"
body["messages"] = messages

n.Reply(msg, body)
return nil
})

You can find the entire code here

Compile the code and run the maelstrom tests, you should find the system to run successfully!

Handling Network Partitions (#3c)

Understand Network Partitions

Network partitions mean that some nodes in the system might not be able to communicate with each other during that time. So if you try to broadcast a message from, let’s say — n1 -> n2 , the request would fail, when there is a network partition. So, what do you do?
What would you do if you’re trying to communicate a message to your friend, but you can’t reach him? Well, you keep trying until you. (Unless you have an ego :) Fortunately, machine don’t have one :P)

That’s what we do, we change the n.Send()with a better function — sendMessageWithRetry()

for _, neighbour := range neighbourStore.getNeighbours() {
if neighbour == msg.Src {
// skip sending back to the sender
continue
}

body["type"] = "broadcast"
body["message"] = message

msg_id := body["msg_id"].(float64)
sendMessageWithRetry(n, neighbour, message, msg_id)
}

We define this function as:

func sendMessageWithRetry(n *maelstrom.Node, neighbour string, message, msg_id float64) {
body := map[string]any{
// assign a random message id
"msg_id": msg_id,
"type": "broadcast",
"message": message,
}

for {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second))
defer cancel()
if _, err := n.SyncRPC(ctx, neighbour, body); err != nil {
logger.Errorf("Failed to send message to %s: %s", neighbour, err)
continue
}
return
}

}

We keep sending the message to the neighbour, until we can send it successfully.

We add a context with a deadline to timeout the request, if it doesn’t complete within 1s. If we don’t provide a deadline, the request doesn’t timeout on its own, and hangs during network partition. After network partition is healed, the hanged request doesn’t continue here. So it’s better to timeout a request and try again. This timeout value would depend on the typical latency of your system.

And we should be good to go!

Or wait, should we? Again, can you identify an issue here?

Well, sendMessageWithRetry only returns when the message is sent successfully. That means that our for loop will remain stuck for each neighbour, until the message is communicated. This can result in the broadcast RPC call to timeout.

To elaborate, if n1 receives a broadcast request from a client c1 (the maelstrom workers), it would not return a broadcast_ok response back until it is successfully able to broadcast the message to n2 and n3 , and c1 will timeout the request marking it as failed.

Considering what we discussed about CAP theorem in the 2nd challenge’s solution, we want the system to be available (n1 should return success, even if the system is facing network partition). How can we do that?
Its simple, we call sendMessageWithRetry in a new goroutine. It ensures that the function is called, and also it doesn’t block the execution of the main RPC call.

for _, neighbour := range neighbourStore.getNeighbours() {
if neighbour == msg.Src {
// skip sending back to the sender
continue
}

body["type"] = "broadcast"
body["message"] = message

msg_id := body["msg_id"].(float64)
go sendMessageWithRetry(n, neighbour, message, msg_id)
}

And now finally, we are good to :) No more surprises.

You can find the entire source code here

Please note that this system, we created is Available, Partition Tolerant but not Consistent. The other nodes have stale information, unless the main node communicates the message after the network partition.

Things get more exciting from the next part of this challenge, we will try to make our system more efficient.

What do we exactly mean by efficiency? Give it a thought, and we will discuss this in the next article. (Will post the link here soon)

(Updated: Solution to #3d)

Until Next Time…

Burnerlee

--

--