Node discovery and health checking in a Peer to peer network using zeromq

prashun javeri
8 min readFeb 1, 2019

--

Sensor networks have been the domain of many research topics in the areas of distributed networks, unreliable communications handling, and autonomy. For sensor networks, data can be processed in two ways, either collect all of the reports into one server and apply algorithmic processing, or share that processing across the network distribution. Originally, the former was considered most feasible, but it has been shown the latter is indeed promising. We address the problem using context-driven strategy to minimize the information across the networks and network-aware delivery mechanism. So, only the needed information is delivered with pre-processing locally in a sensor node when possible. Beyond discovery and integration, another important feature for field-deployed sensors is to leverage each other to achieve some level of information sharing. For example, one or multiple sensor nodes might be destroyed and the rest of nodes within the same group might designate other suitable sensors autonomously to resume the coverage or sensor service whenever possible.

To support the adaptive behavior among the sensor nodes, peer-to-peer dynamic discovery capability is important . This allows the rest of the sensors in the network to take proper recovery action including moving to new locations when one or multiple sensor nodes within the sensor group loses communication contact or is destroyed.

In order to build the network we divide the network nodes into two types the routing nodes and sensor nodes

  • Routing nodes : act as node registry they help in discovery of new sensor nodes connected to the network and check them for liveness and help in data recovery when a sensor node is inactive ideally the network should have one sensor node connected to at least 2 routing nodes ( proxies )
  • Sensor nodes : these nodes send registration and ping messages to the routing nodes and help in managing the liveness of the nodes and data recovery

These are connected in a pub-sub with proxy configuration to provide a synchronicity .Publish–subscribe is a messaging pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers. Instead, published messages are characterized into classes, without knowledge of what, if any, subscribers there may be. Similarly, subscribers express interest in one or more classes, and only receive messages that are of interest, without knowledge of what, if any, publishers there are.

because of the tightly coupled nature of the pub-sub such that publisher and subscribers are loosely coupled the

  • Publishers can't tell when subscribers are successfully connected, both on initial connections, and on reconnections after network failures.
  • Subscribers can't tell publishers anything that would allow publishers to control the rate of messages they send. Publishers only have one setting, which is full-speed, and subscribers must either keep up or lose messages.
  • Publishers can't tell when subscribers have disappeared due to processes crashing, networks breaking, and so on.

we overcome the tight coupling by introducing a proxy between the publisher and the subscriber . publishers publish message onto the proxy which then forwards these to the subscribers this loosely coupled pattern allows for multiple publishers and subscribers to communicate asynchronously while allowing for the ability to track connected publishers and subscribers thus enabling node discovery and liveness tracking

In the wireless sensor network the router nodes act as the proxies while the sensor nodes act as both publisher and subscribers in order to see how node discovery works at the network level please see Node Discovery

Node Discovery

the network uses a gossip protocol for dissemination of information .The gossip-based data dissemination protocol performs three primary functions on the network:

  1. Manages peer discovery and channel membership, by continually identifying available member peers, and eventually detecting peers that have gone offline.
  2. Disseminates data across all peers on a channel. Any peer with data that is out of sync with the rest of the channel identifies the missing blocks and syncs itself by copying the correct data.
  3. Bring newly connected peers up to speed by allowing peer-to-peer state transfer update of data.

for the gossip protocol to be such that messaging is reliable we need a service registry so that nodes can be tracked .A service registry is a phone book for your Nodes. Each node registers itself with the node registry and tells the registry where it lives (host, port, node name) and perhaps other service-specific metadata — things that other services can use to make informed decisions about it. Clients can ask questions about the service topology (“are there any ‘fulfillment-services’ available, and if so, where?”) and service capabilities

There are several popular options for service registries. Netflix built and then open-sourced their own service registry, Eureka. Another new, but increasingly popular option is Consul.while these are great for datacenter to datacenter communication they are not built for a peer to peer system, there is therefore a need to build a peer to peer service discovery tool to be used with the wireless sensor network

Each sensor node can be either a publisher and /or a subscriber when acting as a publisher

  • The sensor node sends a discovery message containing a discovery message and its ip address to the proxy
  • Which then registers the node by saving its metadata in a global key value store
  1. code for the publisher for node discovery on sensor node
import time
import zmq
import socket
import json

def main():
"""main method"""

# Prepare our context and publisher
context = zmq.Context()

publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5563")

# wait for connects
time.sleep(1)

if True:

# import host info
[host_name, host_ip] = get_host_name_and_ip()

# Write two messages, each with an envelope and content
# sends
# 1. heartbeat interval
# 2. max wait time for response from proxy
# 3. publisher name
# 4. publisher ip
# 5. publisher port
# 6. publisher group
# 7. publisher status
# 8. channel name

discovery_message = {
"discovery" : {
"publisher": {
"header": {
"node_id":"213sdfsfs",
"node_name": host_name,
"node_ip":host_ip,
"node_port": 5563,
"node_group": "/device",
"channel_name": "premium"
},
"payload": {
"message": "publisher_discovery_message"
}
}
}
}

json_string = json.dumps(discovery_message)
print( json_string )
publisher.send_multipart([b"channel-one", bytes(host_name, 'utf-8'),bytes(host_ip, 'utf-8'),bytes( json_string, 'utf-8')])


# We never get here but clean up anyhow
publisher.close()
context.term()

def get_host_name_and_ip():
try:
host_name = socket.gethostname()
host_ip = socket.gethostbyname(host_name)
return [host_name, host_ip]
except:
print("Unable to get Hostname and IP")


if __name__ == "__main__":
main()

2. code for the proxy on router node

import zmq
from org.monkeybrain.routernode import global_state
import json

context = zmq.Context()


# Socket facing producers
frontend = context.socket(zmq.XPUB)
frontend.bind("tcp://*:8100")


# Socket facing consumers
backend = context.socket(zmq.XSUB)
backend.connect("tcp://localhost:5563")

socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")

ctx = zmq.Context()

redis = global_state.instance()

# Initialize poll set
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)

# Switch messages between sockets
while True:
socks = dict(poller.poll())


if socks.get(frontend) == zmq.POLLIN:
message = frontend.recv_multipart()
print("subsriber:"+ str(message))
backend.send_multipart(message)


if socks.get(backend) == zmq.POLLIN:
message = backend.recv_multipart()
print("publisher: " + str(message))
meta = json.loads(str(message[3],"utf-8"))
redis.set("pnode_"+ meta["discovery"]["publisher"]["header"]["node_id"], str(meta))
frontend.send_multipart(message)

# Wait for next request from client
[channel_name,host_name, host_ip, contents] = socket.recv_multipart()
print("subscriber: [ %s %s %s %s ]" % (channel_name, host_name, host_ip, contents))
# Send reply back to client
meta = json.loads(str(contents, "utf-8"))
redis.set("snode_"+ meta["discovery"]["subscriber"]["header"]["node_id"], str(meta))
socket.send_string("ACK")

zmq.proxy(frontend, backend)


# We never get here…
frontend.close()
backend.close()
context.term()

3. code for the subscriber for node discovery on sensor node

"""

Pubsub envelope subscriber

Author: Guillaume Aubert (gaubert) <guillaume(dot)aubert(at)gmail(dot)com>

"""
import zmq
import socket
import json


def main():
""" main method """

# Prepare our context and publisher
context = zmq.Context()
subscriber = context.socket(zmq.SUB)

req = context.socket(zmq.REQ)
req.connect("tcp://localhost:5555")

subscriber.connect("tcp://localhost:8100")
subscriber.setsockopt(zmq.SUBSCRIBE, b"channel-one")

[host_name, host_ip] = get_host_name_and_ip()

discovery_message = {
"discovery": {
"subscriber": {
"header": {
"node_id": "213sdfsfs",
"node_name": host_name,
"node_ip": host_ip,
"node_port": 8100,
"node_group": "/device",
"channel_name": "premium"
},
"payload": {
"message": "subscriber_discovery_message"
}
}
}
}

json_string = json.dumps(discovery_message)

while True:
req.send_multipart([b"channel-one", bytes(host_name, 'utf-8'),bytes(host_ip, 'utf-8'), bytes(str(json_string), 'utf-8')])
message = req.recv()
[channel, publisher_name , publisher_ip, contents ] = subscriber.recv_multipart()
print("publisher : [%s %s %s %s]" % (channel, publisher_name , publisher_ip, contents))

# We never get here but clean up anyhow
subscriber.close()
context.term()

def get_host_name_and_ip():
try:
host_name = socket.gethostname()
host_ip = socket.gethostbyname(host_name)
return [host_name, host_ip]
except:
print("Unable to get Hostname and IP")

if __name__ == "__main__":
main()

Health checks

Health checks are performed in order to identify and evict down/non-reachable nodes The network location of a node instance is registered with the service registry when it starts up. It is removed from the service registry when the instance terminates. The service instance’s registration is typically refreshed periodically using a heartbeat mechanism.The node registry is a key part of service discovery. It is a database containing the status of node instances. A service registry needs to be highly available and up to date.to allow for messaging reliability

Heartbeating solves the problem of knowing whether a peer is alive or dead. This is not an issue specific to ZeroMQ. TCP has a long timeout (30 minutes or so), that means that it can be impossible to know whether a peer has died, been disconnected. One peer sends a ping command to the other, which replies with a pong command. Neither command has any payload.below is the code for the ping — pong heartbeat used to check liveness of the nodes it uses the request reply pattern of zeromq

  1. Pinger from the router nodes
from org.monkeybrain.routernode import global_state
import time
import zmq
import ast
import numpy

redis = global_state.instance()
ctx = zmq.Context()

req = ctx.socket(zmq.REQ)
uri = 'tcp://127.0.0.1:10111'


def ping(node_type, node_id ):
msg = 'ping ' + node_type
tic = time.time()
req.send_string(msg)
resp = req.recv_string()
return_time = 1000 * (time.time() - tic)
redis.delete("status_" + node_id)
if return_time < 0.5:
redis.set(node_type + "_status_" + node_id ,"ACTIVE")
else:
redis.set(node_type + "_status_" + node_id, "INACTIVE")

while True:
# code goes here
time.sleep(10)
for key in redis.keys(pattern='*node_*'):
value = redis.get(key)
meta = ast.literal_eval(value)

if meta["discovery"].get('publisher'):
node_id = meta["discovery"].get('publisher')["header"]["node_id"]
req.connect(uri)
ping('publisher', node_id )
else:
node_id = meta["discovery"].get('subscriber')["header"]["node_id"]
req.connect(uri)
ping('subscriber', node_id)

2. Ponger from the sensor nodes

#!/usr/bin/env python

from __future__ import print_function

import time
import numpy
import zmq
from zmq import devices

ctx = zmq.Context()

dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1)
dev.bind_in('tcp://127.0.0.1:10111')
dev.setsockopt_in(zmq.IDENTITY, b"whoda")
dev.start()

#wait for connections
time.sleep(1)

A = numpy.random.random((2**11,2**12))
print("starting blocking loop")
while True:
tic = time.time()
time.sleep(1)
print("blocked for %.3f s"%(time.time()-tic))

the end result of this exercise is a map of active status of each node which can then be used to provide resilient messaging in the network .together these two pieces help in node discovery and monitoring .

--

--