Network Pipelines of the Node

Generally with all Client/Server applications we want to bind it to a port, so it can listen to messages and perform an action. In our system we have a Node which behaves as a Client and Server whereas the Wallet will only behave as a client. This article will explain how the networking pipelines of the MasterGenesis Node have been created.

Firstly we needed to bind to a UDP port to listen to messages, in our case the port will be 9999. Below are our ChannelOptions:

//Udp Server
bootstrap = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_REUSEADDR, true)

We use SO_REUSEADDR so we can have multiple threads listening to the same port. Multi-threading sockets decreases the chances of a UDP packets being dropped due to a thread being busy. On Unix machines we can further load balance our sockets by using the channel option SO_REUSEPORT.

//Udp Server
for (int i = 0; i < THREAD_AMOUNT; i++) { channels.add(bootstrap.bind(MasterGenesisConfiguration.NIC_IP, MasterGenesisConfiguration.SERVER_UDP_PORT) .syncUninterruptibly());
}

The above code shows the multiple channels created to listen to the same port. The THREAD_AMOUNT value above is currently a hard-coded value. In the future we will be using calculations based on the hardware and performance of the hosting device. Some of these channels will be dedicated to sending the synchronisation message for the ledger.

Our messages will be called Packets. As we will have multiple messages, the messages will contain a byte header called an ‘Id’ which will define what the message is. For example message with Id 1 will be ‘send transaction’ and contain the payload of transnational information. The replies to this message will be the negative Id, for example if a Node retrieved a message with Id 1, the Node will reply with a Id of -1 to the Client. This mapping of Packets to Id’s will be defined by our PacketWrapper class.

So far we have made multiple threads listen to a port for messages. Currently nothing will happen. So this is the part where we make the pipelines and handlers to actually do something with our messages. Our Packet class is fairly simple, a Packet as below shown has a getId() method which picks the ID from the packet wrapper, a reply method and a execute method. The execute method is the logic executed when the message is received by the node. If the message is multi-threaded it will go into a multi-threaded pool when executed. All messages will be processed on different threads to the listener threads. This is because if the listener threads are blocked it increases the chance of a Node dropping a Packet.

/*
* @author Sanjay Patel
*/
package org.mastergenesis.server.packet;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import org.mastergenesis.client.UdpClient;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
/**
* The Class Packet.
*
* @param <T>
* the generic type
*/
public abstract class Packet<T> {
/** The id. */
private byte id;
/** The wrapper. */
private PacketWrapper wrapper;
/**
* Instantiates a new packet.
*
* @param wrapper
* the wrapper
*/
public Packet(PacketWrapper wrapper) {
this.wrapper = wrapper;
}
/**
* Gets the id.
*
* @return the id
*/
public byte getId() {
int count = 0;
for (Packet p : wrapper.PACKETS) {
if (p != null && p == this) {
return (byte) count;
}
count++;
}
return -1;
}
/**
* Reply.
*
* @param buffer
* the buffer
* @param ctx
* the ctx
* @param sender
* the sender
*/
public void reply(ByteBuf buffer, ChannelHandlerContext ctx, InetSocketAddress sender) {
ByteBuf replyBuffer = Unpooled.buffer(1 + (buffer == null ? 0 : buffer.array().length));
replyBuffer.writeByte(this.getId() * -1);
if (buffer != null)
replyBuffer.writeBytes(buffer.array());
DatagramPacket packet = new DatagramPacket(replyBuffer, sender);
if (isSync()) {
wrapper.getSyncChannel().channel().writeAndFlush(packet);
} else {
ctx.writeAndFlush(packet);
}
}
/**
* Execute packet.
*
* @param ctx
* the ctx
* @param buf
* the buf
* @param sender
* the sender
*/
public abstract void executePacket(ChannelHandlerContext ctx, T buf, InetSocketAddress sender);
/**
* Gets the packet type.
*
* @return the packet type
*/
public PacketType getPacketType() {
return PacketType.SINGLE_THREADED;
}
/**
* The Enum PacketType.
*/
public enum PacketType {
/** The multi threaded pool. */
MULTI_THREADED,
/** The single threaded pool. */
SINGLE_THREADED
}
}

Now below is our generic packet wrapper which can be extended for both Client and Server use:

/*
* @author Sanjay Patel
*/
package org.mastergenesis.server.packet;
import java.net.InetSocketAddress;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import org.apache.log4j.Logger;
import org.mastergenesis.chain.storage.cache.CacheFile;
import org.mastergenesis.server.packet.Packet.PacketType;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
/**
* The Class PacketWrapper.
*
* @param <T>
* the generic type
*/
public abstract class PacketWrapper<T extends ByteBuf> {
/** The packet mapping. */
public final Packet<T>[] PACKETS = new Packet[Byte.MAX_VALUE];
/** The Constant logger. */
private static final Logger logger = Logger.getLogger(PacketWrapper.class);
/** The multi thread pool. */
private static ExecutorService multiThread;
/** The single thread pool. */
private static ExecutorService singleThread = Executors.newSingleThreadExecutor();
/**
* Instantiates a new packet wrapper.
*/
public PacketWrapper() {
initPackets();
for (Packet p : PACKETS) {
if (p == null) {
continue;
}
if (p.getPacketType() == PacketType.MULTI_THREADED) {
multiThread = Executors.newCachedThreadPool();
break;
}
}
}
/**
* Inits the packets.
*/
public abstract void initPackets();
/**
* Gets the packet.
*
* @param id
* the id
* @return the packet
*/
public Packet<T> get(byte id) {
if (id <= 0) {
return null;
}
return PACKETS[id];
}
/**
* Handle packet.
*
* @param id
* the id
* @param ctx
* the ctx
* @param buf
* the buf
* @param sender
* the sender
*/
public void handlePacket(byte id, ChannelHandlerContext ctx, T buf, InetSocketAddress sender) {
try {
Packet p = get(id);
if (p == null) {
return;
}
if (p.getPacketType() == PacketType.MULTI_THREADED) {
multiThread.execute(() -> {
execute(p, ctx, buf, sender);
buf.release();
});
} else {
singleThread.execute(() -> {
execute(p, ctx, buf, sender);
buf.release();
});
}
} catch (Exception e) {
e.printStackTrace();
logger.error("Exception: ", e);
}
}
/**
* Executes the packet
*
* @param p
* the p
* @param ctx
* the ctx
* @param buf
* the buf
* @param sender
* the sender
*/
private void execute(Packet p, ChannelHandlerContext ctx, T buf, InetSocketAddress sender) {
try {
p.executePacket(ctx, buf, sender);
} catch (Exception e) {
e.printStackTrace();
logger.error("Exception: ", e);
}
}
}

Now for our server to accept messages we need to make a Server Packet Wrapper which will extend the above packet wrapper. For this example we will make a packet wrapper with just one message. Message 1 which is a GetBalance request from another client.

/*
* @author Sanjay Patel
*/
package org.mastergenesis.server;
import org.mastergenesis.server.packet.impl.GetBalancePacket;
import io.netty.buffer.ByteBuf;
import io.netty.channel.socket.DatagramPacket;
// TODO: Auto-generated Javadoc
/**
* The Class UdpServerPacketWrapper.
*/
public class UdpServerPacketWrapper extends PacketWrapper<ByteBuf> {
/**
* Instantiates a new udp server packet wrapper.
*/
UdpServerPacketWrapper() {
super();
// TODO Auto-generated constructor stub
}
/* (non-Javadoc)
* @see org.mastergenesis.server.packet.PacketWrapper#initPackets()
*/
@Override
public void initPackets() {
this.PACKETS[1] = new GetBalancePacket(this);
}

static UdpServerPacketWrapper instance;
public static PacketWrapper<ByteBuf> getInstance() {
if(instance == null) {
instance = new UdpServerPacketWrapper();
}
return instance;
}
}

We need to connect the UdpServerPacketWrapper to our bootstrap object. To do this we need to make use of a class provided by Netty, the SimpleChannelInboundHandler. The UdpServerHandler will contain both the Server Packet Wrapper and the Client Packet Wrapper. As it can accept both Client/Server messages. When a message is recieved by a node the handler will attempt to read the first byte of the message (ID) and execute the correct Packet. It is worth noting in the code below how the client packet wrapper will be executed if the packet ID is less than 0 within the channel read method.

/*
* @author Sanjay Patel
*/
package org.mastergenesis.server;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.mastergenesis.server.packet.PacketWrapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;
/**
* The Class UdpServerHandler.
*/
public class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {

/** The udp server. */
private UdpServer udpServer;

/** The client wrapper. */
private PacketWrapper clientWrapper;
/**
* Instantiates a new udp server handler.
*
* @param udpServer the udp server
* @param clientWrapper the client wrapper
*/
public UdpServerHandler(UdpServer udpServer, PacketWrapper clientWrapper) {
this.udpServer = udpServer;
this.clientWrapper = clientWrapper;
}
/* (non-Javadoc)
* @see io.netty.channel.SimpleChannelInboundHandler#channelRead0(io.netty.channel.ChannelHandlerContext, java.lang.Object)
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
ByteBuf buf = (ByteBuf) packet.copy().content();
byte packetId = buf.readByte();
if (packetId > 0) {
udpServer.getPacketWrapper().handlePacket(packetId, ctx, buf, packet.sender());
} else {
clientWrapper.handlePacket(packetId, ctx, buf, packet.sender());
}
}
/* (non-Javadoc)
* @see io.netty.channel.ChannelInboundHandlerAdapter#exceptionCaught(io.netty.channel.ChannelHandlerContext, java.lang.Throwable)
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.err.println(cause.getMessage());
ctx.close();
}
}

Finally to connect this handler to the bootstrap object (shown in the first steps), the UDP bootstrap will look similar to this:

bootstrap.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_REUSEADDR, true).handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new UdpServerHandler(UdpServer.this, clientWrapper));
}
});

For this example we use the GetBalance packet, it is described in the below code:

/*
* @author Sanjay Patel
*/
package org.mastergenesis.server.packet.impl;
import java.net.InetSocketAddress;
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import java.security.PublicKey;
import java.security.spec.InvalidKeySpecException;
import org.mastergenesis.Main;
import org.mastergenesis.blockchain.helper.ByteBufferHelper;
import org.mastergenesis.blockchain.helper.WalletCrypto;
import org.mastergenesis.chain.storage.StorageContext;
import org.mastergenesis.chain.storage.StorageFactory;
import org.mastergenesis.configuration.MasterGenesisConfiguration;
import org.mastergenesis.server.packet.ICallbackPacket;
import org.mastergenesis.server.packet.Packet;
import org.mastergenesis.server.packet.PacketWrapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
// TODO: Auto-generated Javadoc
/**
* The Class GetBalancePacket.
*/
public class GetBalancePacket extends Packet<ByteBuf> {

/**
* Instantiates a new gets the balance packet.
*
* @param wrapper the wrapper
*/
public GetBalancePacket(PacketWrapper wrapper) {
super(wrapper);
}
/* (non-Javadoc)
* @see org.mastergenesis.server.packet.Packet#executePacket(io.netty.channel.ChannelHandlerContext, java.lang.Object, java.net.InetSocketAddress)
*/
@Override
public void executePacket(ChannelHandlerContext ctx, ByteBuf buf, InetSocketAddress sender) {
String walletPublicKey = ByteBufferHelper.readString(buf);
String storageHash = ByteBufferHelper.readFixedString(buf, MasterGenesisConfiguration.HASH_LENGTH);
StorageContext context = StorageFactory.getOrCreate(storageHash);

ByteBuf buffer = Unpooled.buffer(8 + MasterGenesisConfiguration.HASH_LENGTH);
buffer.writeDouble(context.getStorage().getTxInOutList().getBalance(walletPublicKey));
ByteBufferHelper.writeFixedString(buffer, storageHash);
reply(buffer, ctx, sender);
}
}

The current development version of the MGEN node has many messages, below are all our current messages:

Server Messages:

Client Messages (Will have same ID but negative value will be used):

This article aimed to briefly explain how we handle and route the messages in our network. The code used in this article are portions taken from the earlier version’s of the MasterGenesis Node.