Data Replication Control in Apache Gossip
Data replication control enables user to constrain the data sharing between nodes. For example a user may wish to replicate data (per-node data or shared data) only within the data center. Apache Gossip has built-in data replication control for the following use cases,
- Data should replicate to all the nodes (This is the default behavior if an object does not specify any replication control mechanism)
- Data should not replicate to any node
- Data should replicate only to a given list of nodes
- Data should not replicate to a given list of nodes
- Data should only replicate within the data center
A replication controller can be added for shared or per-node data message using message.setReplicable(Replicable<T> replicable) where T is the message type (SharedDataMessage or PerNodeDataMessage)
Black Listing a Node using Replication Control
The following example demonstrates how to create a shared data message which does not replicate on a given node.
Create a gossip cluster with 3 nodes
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List<GossipManager> nodes = new ArrayList<>();
final int clusterMembers = 3;
for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
.id(i + "").gossipMembers(startupMembers)
.gossipSettings(settings).build();
nodes.add(gossipService);
gossipService.init();
}Make a black list and add node 2
List<LocalMember> blackList = new ArrayList<>();
blackList.add(nodes.get(1).getMyself());Create a shared data message with black list replication control and add it to the node 1
SharedDataMessage message = new SharedDataMessage();
message.setExpireAt(Long.MAX_VALUE);
message.setKey("myKey");
message.setPayload("I am only visible in Node 1 and 3");
message.setTimestamp(System.currentTimeMillis());
// set the black list replicable
message.setReplicable(new BlackListReplicable<>(blackList));
nodes.get(0).gossipSharedData(message);
Thread.sleep(2000); // give some time to gossipFind the message with “myKey” in all nodes and print the value to the standard output (Find will return null if there are no matching data for the given key)
for (GossipManager node : nodes) {
SharedDataMessage replicatedMessage = node
.findSharedGossipData("myKey");
if (replicatedMessage == null) {
System.out.println("Node " + (node.getMyself().getId())
+ ", no value for the myKey");
} else {
System.out.println("Node " + (node.getMyself().getId())
+ ", value for the myKey = " + replicatedMessage.getPayload());
}
}Complete Code
public static void main(String[] args)
throws URISyntaxException, InterruptedException {
// setup cluster
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List<GossipManager> nodes = new ArrayList<>();
final int clusterMembers = 3;
for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
GossipManager gossipService = GossipManagerBuilder.newBuilder()
.cluster(cluster).uri(uri)
.id(i + "").gossipMembers(startupMembers)
.gossipSettings(settings).build();
nodes.add(gossipService);
gossipService.init();
}
// create a black list which include node 2
List<LocalMember> blackList = new ArrayList<>();
blackList.add(nodes.get(1).getMyself());
// Create shared data message for node 1
SharedDataMessage message = new SharedDataMessage();
message.setExpireAt(Long.MAX_VALUE);
message.setKey("myKey");
message.setPayload("I am only visible in Node 1 and 3");
message.setTimestamp(System.currentTimeMillis());
// set the black list replicable
message.setReplicable(new BlackListReplicable<>(blackList));
nodes.get(0).gossipSharedData(message);
Thread.sleep(2000);
for (GossipManager node : nodes) {
SharedDataMessage replicatedMessage = node
.findSharedGossipData("myKey");
if (replicatedMessage == null) {
System.out.println("Node " + (node.getMyself().getId())
+ ", no value for the myKey");
} else {
System.out.println("Node " + (node.getMyself().getId())
+ ", value for the myKey = "
+ replicatedMessage.getPayload());
}
}
// shutdown the nodes
for (GossipManager node : nodes) {
if (node != null) {
node.shutdown();
}
}
}The above program prints the following output to the standard output
Node 1, value for the myKey = I am only visible in Node 1 and 3
Node 2, no value for the myKey
Node 3, value for the myKey = I am only visible in Node 1 and 3Writing a Custom Replication Controller
All the replication controllers are implemented using Replicable interface which has shouldReplicate(LocalMember me, LocalMember destination, T message) method to implement.
- me: node in which the message is to be transmitted from
- destination: node which receive the message
- message: message object it-self
Return value of this method determines whether the message should replicate from me to destination
Example:
public class MyReplicable<T extends Base> implements Replicable<T> {
@Override
public boolean shouldReplicate(LocalMember me,
LocalMember destination, T message) {
boolean result;
// do the replication logic
return result;
}
}View Source at GitHub : https://github.com/apache/incubator-gossip
