Listening to Data Change Events in Apache Gossip

Mirage Abeysekara
Jul 10, 2017 · 3 min read

Apache Gossip now supports listing for per-node data and shared data change events. This is more useful when a user wishes to get notification of changes to the managed objects without polling. The following figure illustrates how a module in node 2 can receive notification whenever the value for key “a” gets updated.

The GossipManager has following methods to register and unregister the event listeners,

  • registerPerNodeDataSubscriber(UpdateNodeDataEventHandler handler)
  • registerSharedDataSubscriber(UpdateSharedDataEventHandler handler)
  • unregisterPerNodeDataSubscriber(UpdateNodeDataEventHandler handler)
  • unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler)

Listening to Per-Node Data Changes

To get notified when per-node data get changed, the event listener should implement the UpdateNodeDataEventHandler or can simply use Java 8 lambda as following,

gossipManager.registerPerNodeDataSubscriber(
(nodeId, key, oldValue, newValue) -> {
// do something
}
);
  • nodeId: String identifier of the node that owns the data object
  • key: Key of the object
  • oldValue: Value of the object before the update (This value is null when the data is newly added)
  • newValue: Value of the object after the update

Listening to Shared Data Changes

Shared data change event listener should implement the interface UpdateSharedDataEventHandler or use Java 8 lambda as following,

gossipManager.registerSharedDataSubscriber(
(key, oldValue, newValue) -> {
// do something
}
);
  • key: Key of the object
  • oldValue: Value of the object before update (This value is null when the data is newly added)
  • newValue: Value of the object after update

The following example demonstrates how to setup a gossip cluster and listen for shared data changes

Setup the gossip cluster with two nodes

GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List<GossipManager> nodes = new ArrayList<>();
final int clusterMembers = 2;
for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
.id(i + "").gossipMembers(startupMembers)
.gossipSettings(settings).build();
nodes.add(gossipService);
gossipService.init();
}

Add a shared data event listener in node 2 for the key named “myKey

nodes.get(1).registerSharedDataSubscriber(
(key, oldValue, newValue) -> {
if (key.equals("myKey")) {
System.out.println("myKey updated from " + oldValue +
" to " + newValue);
}
}
);

Create a shared data message with key “myKey” and add it to the node 1

SharedDataMessage message = new SharedDataMessage();
message.setExpireAt(Long.MAX_VALUE);
message.setKey("myKey");
message.setPayload("myValue");
message.setTimestamp(System.currentTimeMillis());

nodes.get(0).gossipSharedData(message);
Thread.sleep(2000); // give some time to gossip

After adding message to the node 1, node 2 shared data event listener get notified and prints the “myKey updated from null to myValue” to the standard output.

Let’s update the “myKey” value

SharedDataMessage newMessage = new SharedDataMessage();
newMessage.setExpireAt(Long.MAX_VALUE);
newMessage.setKey("myKey");
newMessage.setPayload("myNewValue");
newMessage.setTimestamp(System.currentTimeMillis());

nodes.get(0).gossipSharedData(newMessage);

Thread.sleep(2000); // give some time to gossip

Now the node 2 will print the “myKey updated from myValue to myNewValue” to the standard output.

Complete code

public static void main(String[] args) 
throws URISyntaxException, InterruptedException {
// setup the cluster
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List<GossipManager> nodes = new ArrayList<>();
final int clusterMembers = 2;
for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
GossipManager gossipService = GossipManagerBuilder.newBuilder()
.cluster(cluster).uri(uri)
.id(i + "").gossipMembers(startupMembers)
.gossipSettings(settings).build();
nodes.add(gossipService);
gossipService.init();
}

// Add shared data event listener
nodes.get(1).registerSharedDataSubscriber(
(key, oldValue, newValue) -> {
if (key.equals("myKey")) {
System.out.println("myKey updated from " + oldValue
+ " to " + newValue);
}
});

// Create shared data message for node 1
SharedDataMessage message = new SharedDataMessage();
message.setExpireAt(Long.MAX_VALUE);
message.setKey("myKey");
message.setPayload("myValue");
message.setTimestamp(System.currentTimeMillis());

nodes.get(0).gossipSharedData(message);
Thread.sleep(2000);

// Update the shared data message
SharedDataMessage newMessage = new SharedDataMessage();
newMessage.setExpireAt(Long.MAX_VALUE);
newMessage.setKey("myKey");
newMessage.setPayload("myNewValue");
newMessage.setTimestamp(System.currentTimeMillis());

nodes.get(0).gossipSharedData(newMessage);
Thread.sleep(2000);

// shutdown the nodes
for (GossipManager node : nodes) {
if (node != null) {
node.shutdown();
}
}
}

View Source at GitHub : https://github.com/apache/incubator-gossip

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade