Listening to Data Change Events in Apache Gossip
Apache Gossip now supports listing for per-node data and shared data change events. This is more useful when a user wishes to get notification of changes to the managed objects without polling. The following figure illustrates how a module in node 2 can receive notification whenever the value for key “a” gets updated.

The GossipManager has following methods to register and unregister the event listeners,
registerPerNodeDataSubscriber(UpdateNodeDataEventHandler handler)registerSharedDataSubscriber(UpdateSharedDataEventHandler handler)unregisterPerNodeDataSubscriber(UpdateNodeDataEventHandler handler)unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler)
Listening to Per-Node Data Changes
To get notified when per-node data get changed, the event listener should implement the UpdateNodeDataEventHandler or can simply use Java 8 lambda as following,
gossipManager.registerPerNodeDataSubscriber(
(nodeId, key, oldValue, newValue) -> {
// do something
}
);- nodeId: String identifier of the node that owns the data object
- key: Key of the object
- oldValue: Value of the object before the update (This value is null when the data is newly added)
- newValue: Value of the object after the update
Listening to Shared Data Changes
Shared data change event listener should implement the interface UpdateSharedDataEventHandler or use Java 8 lambda as following,
gossipManager.registerSharedDataSubscriber(
(key, oldValue, newValue) -> {
// do something
}
);- key: Key of the object
- oldValue: Value of the object before update (This value is null when the data is newly added)
- newValue: Value of the object after update
The following example demonstrates how to setup a gossip cluster and listen for shared data changes
Setup the gossip cluster with two nodes
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List<GossipManager> nodes = new ArrayList<>();
final int clusterMembers = 2;
for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
.id(i + "").gossipMembers(startupMembers)
.gossipSettings(settings).build();
nodes.add(gossipService);
gossipService.init();
}Add a shared data event listener in node 2 for the key named “myKey”
nodes.get(1).registerSharedDataSubscriber(
(key, oldValue, newValue) -> {
if (key.equals("myKey")) {
System.out.println("myKey updated from " + oldValue +
" to " + newValue);
}
}
);Create a shared data message with key “myKey” and add it to the node 1
SharedDataMessage message = new SharedDataMessage();
message.setExpireAt(Long.MAX_VALUE);
message.setKey("myKey");
message.setPayload("myValue");
message.setTimestamp(System.currentTimeMillis());
nodes.get(0).gossipSharedData(message);Thread.sleep(2000); // give some time to gossip
After adding message to the node 1, node 2 shared data event listener get notified and prints the “myKey updated from null to myValue” to the standard output.
Let’s update the “myKey” value
SharedDataMessage newMessage = new SharedDataMessage();
newMessage.setExpireAt(Long.MAX_VALUE);
newMessage.setKey("myKey");
newMessage.setPayload("myNewValue");
newMessage.setTimestamp(System.currentTimeMillis());
nodes.get(0).gossipSharedData(newMessage);
Thread.sleep(2000); // give some time to gossipNow the node 2 will print the “myKey updated from myValue to myNewValue” to the standard output.
Complete code
public static void main(String[] args)
throws URISyntaxException, InterruptedException {
// setup the cluster
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List<GossipManager> nodes = new ArrayList<>();
final int clusterMembers = 2;
for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
GossipManager gossipService = GossipManagerBuilder.newBuilder()
.cluster(cluster).uri(uri)
.id(i + "").gossipMembers(startupMembers)
.gossipSettings(settings).build();
nodes.add(gossipService);
gossipService.init();
}
// Add shared data event listener
nodes.get(1).registerSharedDataSubscriber(
(key, oldValue, newValue) -> {
if (key.equals("myKey")) {
System.out.println("myKey updated from " + oldValue
+ " to " + newValue);
}
});
// Create shared data message for node 1
SharedDataMessage message = new SharedDataMessage();
message.setExpireAt(Long.MAX_VALUE);
message.setKey("myKey");
message.setPayload("myValue");
message.setTimestamp(System.currentTimeMillis());
nodes.get(0).gossipSharedData(message);
Thread.sleep(2000);
// Update the shared data message
SharedDataMessage newMessage = new SharedDataMessage();
newMessage.setExpireAt(Long.MAX_VALUE);
newMessage.setKey("myKey");
newMessage.setPayload("myNewValue");
newMessage.setTimestamp(System.currentTimeMillis());
nodes.get(0).gossipSharedData(newMessage);
Thread.sleep(2000);
// shutdown the nodes
for (GossipManager node : nodes) {
if (node != null) {
node.shutdown();
}
}
}View Source at GitHub : https://github.com/apache/incubator-gossip
