How to build a bi-directional streaming gRPC service with Node.js and Java

Murat Kilic
7 min readDec 3, 2019

--

gRPC is a modern open source high performance RPC framework that can run in any environment. It can efficiently connect services in and across data centers with pluggable support for load balancing, tracing, health checking and authentication.

We have explored how we can use gRPC in various exercises before such as “gRPC Java Function Service” or “Secure Browser Communication with Containerd Using gRPC, Envoy and OAuth 2.0” .

gRPC lets you define four kinds of service method:
1. Unary RPCs where the client sends a single request to the server and gets a single response back, just like a normal function call.

2. Server streaming RPCs where the client sends a request to the server and gets a stream to read a sequence of messages back. The client reads from the returned stream until there are no more messages. gRPC guarantees message ordering within an individual RPC call.

3. Client streaming RPCs where the client writes a sequence of messages and sends them to the server, again using a provided stream. Once the client has finished writing the messages, it waits for the server to read them and return its response. Again gRPC guarantees message ordering within an individual RPC call.

4. Bidirectional streaming RPCs where both sides send a sequence of messages using a read-write stream. The two streams operate independently, so clients and servers can read and write in whatever order they like. The order of messages in each stream is preserved.

In this post, I would like to dive into bidirectional streaming. For this exercise we will build a chat service, with a gRPC server acting as the hub and the clients connecting to the hub and sending and receiving messages over gRPC. Also for this exercise I will use two different languages to show gRPC’s interoperability.

All the code created is on my repo @ GitHub

Service Definition

As always, we begin with a service descriptor file. We’ll call this grpc_chat.proto. It has only one service ‘ChatService’ with one rpc method ‘chat’. This rpc has ‘stream’ on both request and response, meaning it uses bi-directional streaming. Our message ‘ChatMessage’ is a simple one with one property.

service ChatService {
rpc chat(stream ChatMessage) returns (stream ChatMessage) {}
}
message ChatMessage {
string message = 1;
}

Node.js gRPC Chat Server

There are two ways to generate the code needed to work with protocol buffers in Node.js — one approach uses Protobuf.js to dynamically generate the code at runtime, the other uses code statically generated using the protocol buffer compiler protoc. We will use dynamic way here. So we start our gRPC server with code below.

var PROTO_PATH = __dirname + '/grpc_chat.proto';
var fs = require('fs');
var parseArgs = require('minimist');
var path = require('path');
var _ = require('lodash');
var grpc = require('grpc');
var protoLoader = require('@grpc/proto-loader');
var packageDefinition = protoLoader.loadSync(
PROTO_PATH,
{keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
var protoDescriptor = grpc.loadPackageDefinition(packageDefinition);

After this, we define our grpcChat variable and also define a Map to keep client connection data.

var grpcChat = protoDescriptor.io.mark.grpc.grpcChat;
var clients = new Map();

the bottom of the code is also boilerplate code, we define server, listen on a port and add ‘chat’ service to it. We will not use TLS on this connection to keep things simple.

var server = new grpc.Server();
server.addService(grpcChat.ChatService.service, {
chat:chat
});
server.bind('0.0.0.0:50050', grpc.ServerCredentials.createInsecure());
server.start();

‘chat:chat’ line above maps the chat rpc method to our function ‘chat’ function in the code.

Time to code out chat function:

function chat(call) {
call.on('data', function(chatRequest{
user=call.metadata.get('username');
msg=chatRequest.message;

In the code above, we are listening to ‘data’ event, which is emitted when other party sends data. In this case when a client sends data, we extract message and also a metadata/header which contains username. Of course ideally this should be a JWT or Authorization header we can authenticate the user, but for simplicity’s sake, we’ll trust client is sending correct username.

After this we broadcast message from client to all other clients connected. ‘clients’ map is a map of username as key and ‘call’ object as value. We loop through ‘clients’ map and if username does not match, we write to the call. If this is first time we are receiving data from the client, value is undefined, so we add the mapping to ‘clients’ to use in the future.

for (let [msgUser, userCall] of clients) {
if (msgUser != username) {
userCall.write(
{
fromName: username,
message : msg
});
}
}
if (clients.get(user) === undefined) {
clients.set(user, call);
}

After this we listen for the ‘end’ event which is when client completes the request . In this case we will wait until client types something like ‘quit’ as a signal to leave the chat room.

call.on('end', function() {
call.write({
fromName: 'Chat server',
message : 'Nice to see ya! Come back again...'
});
call.end();
});

Before we start our server, we need to install the modules we use in our code. So we run:
Node.js gRPC Chat Server

npm i grpc @grpc/proto-loader

Then we start the server:

node grpcServer
gRPC Chat Server started...

Node.js gRPC Chat Client

As with the server, there are two ways to generate the client code needed,so we’ll use dynamic way again.

var grpcChat = protoDescriptor.io.mark.grpc.grpcChat;
var client = new grpcChat.ChatService('localhost:50050',
grpc.credentials.createInsecure());

Above we create the client based on service name and host:port of server. We would like to read messages input from terminal, so we use ‘readline’ module.

const readline = require("readline");
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout
});

We want client to provide username as command line argument, so we user ‘process.argv[2]’ and send this information as part of initial Metadata along with the call to the serve

var user = process.argv[2];
var metadata = new grpc.Metadata();
metadata.add('user', user);
var call = client.chat(metadata);

We listen for ‘data’ event and whenever it is emitted, it means server sent data to client and we write the username who sent the message and the message itself.

call.on('data', function(ChatMessage) {
console.log(`${ChatMessage.from} ==> ${ChatMessage.message}`);
});

And we code how client sends message as below. For each ‘line’ event of readline, meaning user inputs a message, we write that message to the call unless the message is ‘quit’ which is our signal for user to exit the chat. Since we send the username in the metadata, we do not need to send it over and over again with each call.write():

rl.on("line", function(line) {if (line === "quit") {call.end();} else {call.write({message : line});}});

Now it’s time to run the client, and send a message. Open a second terminal and run the client:

>node grpcClient Mark
Enter your messages below:
First msg

After we send the first message, we can see on the server terminal:

>node grpcServer
gRPC Chat Server started...
Mark ==> First msg

Now let’s open a third terminal and run client again as a different user and send 2 messages:

>node grpcClient Mel
Enter your messages below:
> Mel's first msg
> Mel's second msg

Let’s check terminal of server :

Mark ==> First msg
Mel ==> Mel's first msg
Mel ==> Mel's second msg

and client 1(Mark)

>node grpcClient Mark
Enter your messages below:
First msg
==> Mel's first msg
==> Mel's second msg

Voila! So now we have 2 gRPC clients connected to gRPC chat server and communicating two ways both with the server.

Java gRPC Chat Client

Why don’t we take it a bit further and see how a gRPC client generated from same proto file in one language can communicate effectively with a gRPC server generated from the same proto file in a different language. For this, let’s add a Java gRPC client to talk to our Node.js gRPC server

We create a maven project and update pom.xml as usual for gRPC dependencies and plugins. After this we copy the same proto file under src/main/proto directory.

then we start coding our client as /src/main/java/io/mark/grpc/grpcChat/GRPCChatClient.java. We create an asyncStub using ‘newStub’ method.

channel = channelBuilder.build();
asyncStub = ChatServiceGrpc.newStub(channel);

We use ‘username’ system property and pass it to ‘chat(username)’ function. Then we set Metadata and attach it to our stub:

public void chat(String username) {
System.out.println("Enter your messages below:");

Metadata header=new Metadata();
Metadata.Key<String> key =
Metadata.Key.of("username", Metadata.ASCII_STRING_MARSHALLER);
header.put(key,username);
asyncStub = MetadataUtils.attachHeaders(asyncStub, header);

After this we create our requestObserver with StreamObserver<ChatMessage> type and we overwrite methods. Important one is ‘onNext’ where we code what to do when we receive message from server. Server sends a ChatMessage object, so we print it on stdout:

StreamObserver<ChatMessage> requestObserver =
asyncStub.chat(new StreamObserver<ChatMessage>() {
public void onNext(ChatMessage res) {
System.out.println(res.getFrom()+" ==> "+ res.getMessage());
}

The part where Java client sends message works as below.

InputStream is = System.in;
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String line = null;
while ((line = br.readLine()) != null) {
if (line.equalsIgnoreCase("q")) {
break;
}
requestObserver.onNext(ChatMessage.newBuilder().setMessage(line).build());
}

We read from the System.in inputSttream, set it to ‘line’ variable. After this we build our ChatMessage object with message as ‘line’. Then we send it with ‘onNext’ method of requestObserver.

In the main method we create an instance of our client class and initialize it

public static void main(String[] args) {
GRPCChatClient grpcChatClient=new GRPCChatClient("localhost",50050);
grpcChatClient.chat(System.getProperty("username"));
}

At this point we are ready to run our Java gRPC client. Let’s maven package it first to compile classes and create our JAR file:

mvn package
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------

Now’ let’s run it as :

>java -cp target/io.mark.grpc.gRPCChat-1.0-SNAPSHOT.jar -Dusername=Tim  io.mark.grpc.grpcChat.GRPCChatClient
Enter your messages below:
First msg by Tim

Time to check Mark’s screen (Node.js)

C:\sw\Projects\gRPC\GRPCChat\NodeJS>node grpcClient Mark
Enter your messages below:
First msg
Tim ==> First msg by Tim

Let’s try other direction. On Mark’s scren(Node.js)

Tim ==> First msg by Tim
Hello Tim!

Tim’s screen(Java) shows

Enter your messages below:
First msg by Tim
Mark ==> Hello Tim!

Excellent!

Summary

So in this post, we create a bi-directional streaming RPC using gRPC on Node.js and Java. We created our gRPC chat server on Node, while we created both a Node and Java gRPC chat client at the same time. Clients are connected to the gRPC server and continuously exchanging messages with the server using same request in a streaming fashion. Once a client sends a message to the server, server is broadcasting this to the other clients using its own stream.

--

--

Murat Kilic

Tech enthusiast and leader. Love inspiring people to follow their dreams in tech. Coded all the way from BASIC to Go.