Data Replication Control in Apache Gossip

Mirage Abeysekara
Jul 10, 2017 · 3 min read

Data replication control enables user to constrain the data sharing between nodes. For example a user may wish to replicate data (per-node data or shared data) only within the data center. Apache Gossip has built-in data replication control for the following use cases,

  • Data should replicate to all the nodes (This is the default behavior if an object does not specify any replication control mechanism)
  • Data should not replicate to any node
  • Data should replicate only to a given list of nodes
  • Data should not replicate to a given list of nodes
  • Data should only replicate within the data center

A replication controller can be added for shared or per-node data message using message.setReplicable(Replicable<T> replicable) where T is the message type (SharedDataMessage or PerNodeDataMessage)

Black Listing a Node using Replication Control

The following example demonstrates how to create a shared data message which does not replicate on a given node.

Create a gossip cluster with 3 nodes

GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List<GossipManager> nodes = new ArrayList<>();
final int clusterMembers = 3;
for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
.id(i + "").gossipMembers(startupMembers)
.gossipSettings(settings).build();
nodes.add(gossipService);
gossipService.init();
}

Make a black list and add node 2

List<LocalMember> blackList = new ArrayList<>();
blackList.add(nodes.get(1).getMyself());

Create a shared data message with black list replication control and add it to the node 1

SharedDataMessage message = new SharedDataMessage();
message.setExpireAt(Long.MAX_VALUE);
message.setKey("myKey");
message.setPayload("I am only visible in Node 1 and 3");
message.setTimestamp(System.currentTimeMillis());
// set the black list replicable
message.setReplicable(new BlackListReplicable<>(blackList));

nodes.get(0).gossipSharedData(message);
Thread.sleep(2000); // give some time to gossip

Find the message with “myKey” in all nodes and print the value to the standard output (Find will return null if there are no matching data for the given key)

for (GossipManager node : nodes) {
SharedDataMessage replicatedMessage = node
.findSharedGossipData("myKey");
if (replicatedMessage == null) {
System.out.println("Node " + (node.getMyself().getId())
+ ", no value for the myKey");
} else {
System.out.println("Node " + (node.getMyself().getId())
+ ", value for the myKey = " + replicatedMessage.getPayload());
}
}

Complete Code

public static void main(String[] args)
throws URISyntaxException, InterruptedException {
// setup cluster
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List<GossipManager> nodes = new ArrayList<>();
final int clusterMembers = 3;
for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
GossipManager gossipService = GossipManagerBuilder.newBuilder()
.cluster(cluster).uri(uri)
.id(i + "").gossipMembers(startupMembers)
.gossipSettings(settings).build();
nodes.add(gossipService);
gossipService.init();
}

// create a black list which include node 2
List<LocalMember> blackList = new ArrayList<>();
blackList.add(nodes.get(1).getMyself());

// Create shared data message for node 1
SharedDataMessage message = new SharedDataMessage();
message.setExpireAt(Long.MAX_VALUE);
message.setKey("myKey");
message.setPayload("I am only visible in Node 1 and 3");
message.setTimestamp(System.currentTimeMillis());
// set the black list replicable
message.setReplicable(new BlackListReplicable<>(blackList));

nodes.get(0).gossipSharedData(message);
Thread.sleep(2000);

for (GossipManager node : nodes) {
SharedDataMessage replicatedMessage = node
.findSharedGossipData("myKey");
if (replicatedMessage == null) {
System.out.println("Node " + (node.getMyself().getId())
+ ", no value for the myKey");
} else {
System.out.println("Node " + (node.getMyself().getId())
+ ", value for the myKey = "
+ replicatedMessage.getPayload());
}
}

// shutdown the nodes
for (GossipManager node : nodes) {
if (node != null) {
node.shutdown();
}
}
}

The above program prints the following output to the standard output

Node 1, value for the myKey = I am only visible in Node 1 and 3
Node 2, no value for the myKey
Node 3, value for the myKey = I am only visible in Node 1 and 3

Writing a Custom Replication Controller

All the replication controllers are implemented using Replicable interface which has shouldReplicate(LocalMember me, LocalMember destination, T message) method to implement.

  • me: node in which the message is to be transmitted from
  • destination: node which receive the message
  • message: message object it-self

Return value of this method determines whether the message should replicate from me to destination

Example:

public class MyReplicable<T extends Base> implements Replicable<T> {

@Override
public boolean shouldReplicate(LocalMember me,
LocalMember destination, T message) {
boolean result;
// do the replication logic
return result;
}
}

View Source at GitHub : https://github.com/apache/incubator-gossip

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade