Decentralized App Development

Peer-to-Peer MongoDB: Part 4

Relaying updates efficiently

Jeremy Orme
Coinmonks
Published in
11 min readApr 28, 2024

--

Photo by Rubaitul Azad on Unsplash

Last time, we made our p2p network sparsely connected so that it could scale up to many nodes without the number of messages rising exponentially. However, that introduced a new problem — updates are only sent to direct connections. Today we’ll fix this so updates are propagated to all nodes in the network.

One hop

Suppose we post an update to the peer with port 5001. That peer will apply the update to its copy of the database and call sendToPeers to send a message to its connected peers asking them to do the same:

In the existing implementation, when peers 5004, 5006 and 5007 receive that message, they each apply the change to their database but do not send any further messages to their connected peers. Thus, peers 5002, 5003 and 5005 in this example never receive the update.

Multi-hop

Whether peers should be notified is controlled by the notifyPeers boolean parameter passed to insertOne, which is set to true only for the first hop. Therefore, the first step to allow multi-hop is to remove this boolean so that we always send updates, not just on the first hop.

With that restriction lifted, let’s follow how an update propagates through the network. Now, peers 5004, 5006 and 5007 will send the update to their connected peers:

The first thing to notice is that the update has been sent back to peer 5001 by all three peers 5004, 5006 and 5007. This redundant message is easy to prevent as the address we received from is known so can be excluded from the send:

We see some redundant messages being sent here — specifically, peer 5003 receives the same update from three peers (5004, 5006 and 5007). Peer 5003 can easily check that the update was already applied so after the first message is processed, the others can simply be ignored.

At this point, every node has already received the update so any messages sent from this point are redundant. Let’s continue and look at the next hop:

These messages are all redundant, in the sense that they are transmitting an update that the target peer has already processed.

For sake of argument, we assume that peer 5003 receives the message from 5004 before the ones from 5006 and 5007 and in response propagates the update to 5002, 5005, 5006 and 5007. At the same time, peers 5002 and 5005 propagate the update to 5003.

These are the last messages to be sent. All of these messages are received by peers that have already applied the update and are therefore ignored and do not propagate further.

Reducing redundancy

Can we do anything about these redundant messages? If instead of excluding just the immediate sender, we could exclude all known recipients seen along the path by which the update was received then we could cut down the number of messages significantly.

Let’s start from the beginning again but pass on an exclusion list with each update (only last digit of port is shown in exclusion lists for brevity):

The original update was applied (or sent) to 5001 and then it was sent on to 5004, 5006 and 5007 so peer 5001 sends those four addresses in its exclusion list. Let’s look at the next hop:

In this hop, peer 5004 sends to 5002, 5003 and 5005 (but not 5001, which was in the exclusion list). In doing so it adds those addresses to its exclusion list (5001, 5004, 5006, 5007, 5002, 5003, 5005).

Similarly, peers 5006 and 5007 send to 5003 (but not 5001). They add 5003 to the end of the exclusion list (5001, 5004, 5006, 5007, 5003).

Peers 5002 and 5005 receive the update from 5004 and do not propagate further because they have no connections that aren’t in the exclusion list (in this case the exclusion list contains every node in the network).

For peer 5003 there are two possibilities:

  • It receives the update from 5004 first — it has no connections not in the exclusion list so propagation stops.
  • It receives the update from either 5006 or 5007 first —in this case it propagates the update to 5002 and 5005 because they are not in the exclusion list received from 5006 or 5007.

In the best case for this particular network, we have two redundant messages (the duplicate messages sent to 5003) and in the worst case we have four redundant messages (adding in the messages from 5003 to 5002 and 5005). This compares to six redundant messages where we only exclude the immediate sender.

Larger networks

How does this approach scale up to larger networks? With even 20 peers it becomes difficult to keep track of the state of each peer. Fortunately though, we can fairly easily construct a program to simulate an update propagating through a network of any size and gather some stats.

The following Typescript program creates state for a specified number of peers where each peer establishes a minimum number of connections and an update is propagated from a given starting peer.

A simplified graph representation is output after each hop, showing the connections and highlighting the nodes that have completed both receiving and propagating the update:

// Holds current state of a peer
class Peer {
connections: Set<number> = new Set();
pathPeers: Set<number> = new Set();
complete: boolean = false;
}

// Parameters
const MAX_PEERS: number = 100;
const MIN_CXNS: number = 2;
const START_PEER: number = 0;
const HOP_LIMIT: number = 100;

// Create the peers
const peers: Peer[] = Array.from(Array(MAX_PEERS), _ => new Peer());

// Connect the nodes
peers.forEach((peer, peerIdx) => {
while (peer.connections.size < MIN_CXNS) {
const remotePeerIdx = Math.floor(Math.random() * (MAX_PEERS - Number.EPSILON));
peer.connections.add(remotePeerIdx);
peers[remotePeerIdx].connections.add(peerIdx);
}
});

// Update starting peer
peers[START_PEER].pathPeers.add(START_PEER);

// Keep a track of the number of messages sent
let msgCount = 0;

// Simulate until all peers have completed or hop limit reached
let hop = 0;
for (; hop < HOP_LIMIT; ++hop) {
// Keep a copy of the peer info from the last hop
const lastPeers = peers.map(peer => {
const newPeer = new Peer();
newPeer.complete = peer.complete;
peer.connections.forEach(c => newPeer.connections.add(c));
peer.pathPeers.forEach(p => newPeer.pathPeers.add(p));
return newPeer;
});

lastPeers.forEach((lastPeer, peerIdx) => {
// If peer complete or nothing to propagate then skip
if (lastPeer.complete || lastPeer.pathPeers.size == 0)
return;

// Determine the peers in the path after this hop
const pathPeers = new Set(lastPeer.pathPeers);
lastPeer.connections.forEach(remotePeerIdx => pathPeers.add(remotePeerIdx));

lastPeer.connections.forEach(remotePeerIdx => {
// If peer already notified along path then skip
if (lastPeer.pathPeers.has(remotePeerIdx))
return;

++msgCount;

// If remote peer received the update, it will ignore it
if (peers[remotePeerIdx].pathPeers.size > 0)
return;

// Update remote peer
peers[remotePeerIdx].pathPeers = pathPeers;
});

peers[peerIdx].complete = true;
});

// Generate graph
let g = `digraph hop_${hop + 1}{\n`;
g += ' graph [overlap=false]\n';
g += ' edge [dir=none]\n';
g += ' node [style=filled]\n';
peers.forEach((p, pi) => {
const fontcolor = p.complete ? 'white' : 'black';
const fillcolor = p.complete ? 'black' : 'white';
g += ` ${pi} [fontcolor=${fontcolor} fillcolor=${fillcolor}]\n`;
});
peers.forEach((p, pi) => {
p.connections.forEach(rpi => {
if (rpi < pi)
return;
g += ` ${pi} -> ${rpi}\n`;
});
});
g += '}';
console.log(g);

// If all peers are complete then stop
if (peers.every(peer => peer.complete))
break;
}

// Report if we blew the hop limit
if (hop == HOP_LIMIT) {
console.log(`Hop limit (${HOP_LIMIT}) reached before completion.`);
}
else {
console.log(`Completed after ${hop + 1} hops.`);
}

// Report total number of messages sent to propagate the update
console.log(`Message count: ${msgCount}.`);

View and run this code at typescriptlang.org.

100 nodes, 2 connections per node

First, let’s run with 100 nodes and a minimum two connections per node. We see that the simulation completes after 12 hops and 162 messages sent.

The following graphs were rendered by pasting the dot output from the simulator into Edotor using the neato engine:

Hop 1

Hop 2

Hop 3

Hop 4

Hop 5

Hop 6

Hop 7

Hop 8

Hop 9

Hop 10

Hop 11

Hop 12

100 nodes, 3 connections per node

If we increase the minimum connections per node, we see the update propagates faster (but less efficiently). In the run I did, completion occurred after 6 hops and 284 messages sent (Note: there is some variability in the stats between runs due to the random connections).

Hop 1

Hop 2

Hop 3

Hop 4

Hop 5

Hop 6

Much larger networks

Now we have a simulator, we can turn up the number of nodes and evaluate much larger networks.

Unfortunately, as the number of nodes rises, visualization becomes less useful and ultimately fails as the graph layout engine is no longer able to cope with the number of nodes and connections.

Not all is lost though… we can still run the simulation and examine the statistics for the number of hops to completion and the total number of messages sent. Doing this for a range of network sizes and connection densities yields the following tables:

From this we can see the algorithm scales quite well:

  • The number of hops scales in proportion to log(number of peers)
  • As the number of peers rises, the number of messages per peer tends toward the minimum number of connections per peer

Implementation

Implementing this algorithm requires only a few changes…

Firstly, we need to modify our insertOne method (in src/db-manager.ts) to take the received excluded addresses. We default this to empty so that it can be omitted when first initiating an insert:

    async insertOne(name: string, publicKeyOwner: string | null, entry: OptionalId<Document>, excludeAddresses: string[] = []) {

To generate the exclude list that we’ll to send to our peers, we simply merge in the addresses of all our peers. In the case where excludeAddresses is empty, we also need to add in our own address to prevent transmissions back to us:

const excludeAddressesNext = Array.from(new Set(excludeAddresses.length > 0 ?
[...excludeAddresses, ...this._pubSub.cxnAddresses()] :
[this._pubSub.selfAddress(), ...this._pubSub.cxnAddresses()]));

Next, we’ll define a message format to send the insertOne parameters:

enum DbMessageType {
InsertOne
}

interface DbMessage {
action: DbMessageType;
excludeAddresses: string[];
}

interface DbMessageInsertOne extends DbMessage {
action: DbMessageType.InsertOne,
name: string;
publicKeyOwner: string | null;
entry: any;
}

Now we can populate a DbMessageInsertOne and send it to peers following a successful insert:

            const insertMsg: DbMessageInsertOne = {
action: DbMessageType.InsertOne,
excludeAddresses: excludeAddressesNext,
name,
publicKeyOwner,
entry: originalEntry
}

this._pubSub.sendToPeers(insertMsg, excludeAddresses);

Note: we pass the received excluded addresses to PubSub.sendToPeers but send the next exclude addresses in the message. That’s because the excluded addresses in the message specify the addresses that the receiver should exclude, not the sender.

In the insert message we set the entry field to originalEntry. That is because insertOne mutates the entry, adding an _id field and if we were to send that entry, the signature validation would fail at the receiving peer. Therefore, we take a copy of entry prior to calling Collection.insertOne:

           const originalEntry = { ...entry };

In the DbManager._onReceive method, we need to handle the new DbMessageInsertOne message type to get the parameters from the message and call DbManager.insertOne:

private _onReceive(obj: any) {
const dbMsg = obj as DbMessage;
switch (dbMsg.action) {
case DbMessageType.InsertOne:
{
const insertMsg = dbMsg as DbMessageInsertOne;
this.insertOne(insertMsg.name, insertMsg.publicKeyOwner, insertMsg.entry, insertMsg.excludeAddresses);
}
break;
}
}

The final thing we need to do in DbManager.insertOne is to use our own id field instead of _id — we’ll call it _entryId. That’s because we want a string field to contain the "publicKey/entryKey" id that is the same for a given entry across the database instances of all the peers. The _id field is constrained to be an ObjectId so can’t be used for this. We need to ensure the collection is uniquely indexed by _entryId to prevent duplicate entries:

const address = publicKeyOwner ? `${name}/${publicKeyOwner}` : name;
var col: Collection<Document>;
try {
col = this._db.collection(address);
console.log(`Successfully opened collection '${col.collectionName}'`);
col.createIndex({ _entryId: 1 }, { unique: true, background: false });
}
catch {
console.log(`Failed to create index for collection ${address}`);
return null;
}

Note that we turn off background indexing. This is crucial because the index is created when the collection is first opened and it’s possible to almost immediately receive a duplicate update from another peer. If the index is created in the background, there’s a window where the unique index is not enforced, a duplicate can be inserted and then the index will fail to build.

We’re nearly done… we just need to make a couple of changes to the PubSub class In src/pub-sub.ts. First, we need to change PubSub.sendToPeers to take that extra excludeAddresses parameter:

    sendToPeers(obj: any, excludeAddresses: string[] = []) {
const peerDataMsg: PeerDataMessage = { type: PeerMessageType.PeerData, data: obj };
const json = JSON.stringify(peerDataMsg);
const buffer = new TextEncoder().encode(json);
const exclAddrSet = new Set(excludeAddresses);
for (const [address, socket] of this._addressToSocket) {
if (!exclAddrSet.has(address)) {
console.log(`Sending to peer: ${address}`)
socket.write(buffer);
}
else {
console.log(`Not sending to excluded peer: ${address}`)
}
}
}

We just test each address against the exclude list and skip those that are marked as excluded. We also add some logging here as it’s interesting to see when a send was skipped by exclusion.

Then we need to add methods to get the addresses of the connected peers and our self address:

    selfAddress(): string {
return `${this._serverAddress}:${this._serverPort}`;
}

cxnAddresses(): IterableIterator<string> {
return this._addressToSocket.keys();
}

And that’s it! I tested the implementation by running seven peers and sending in updates to various peers. They relay the updates between themselves and all database replicas are correctly updated. The number of messages sent matches the expectation of our simulation.

The source up to this point is available on github: https://github.com/jeremyorme/bonono-server/tree/release/part-4

Next time

We’ll finally get on to the problem of a peer that joins the party late so has an incomplete replica.

--

--

Jeremy Orme
Coinmonks

Software engineer. Experimenting with database decentralization