gRPC integration for Siddhi

Sahan Dilshan
siddhi-io
Published in
14 min readDec 9, 2019

Siddhi is a fully open-source cloud-native Stream Processor. With Siddhi’s SQL like streaming language, we can write queries and implement real-time analysis and streaming data integration use cases. Read more about Siddhi Query Language from this link.

gRPC (gRPC Remote Procedure Calls) is a high performance open-source remote procedure call (RPC) system initially developed at Google. It uses HTTP/2 for transport, Protocol Buffers as the interface description language, and provides features such as authentication, bidirectional streaming, flow controls, with blocking and non-blocking bindings. It can generate cross-platform client and server bindings for multiple languages. It is mostly used for connecting services in microservices style architecture, and connect mobile devices, and browser clients to backend services. Learn more about gRPC from this link.

This blog explains how Siddhi can be integrated with gRPC to consume and publish events from sources and sinks. As this blog does not provide information on using Siddhi or gRPC, I strongly recommend you to read the documentation to have prior knowledge on Siddhi and gRPC if you feel it’s necessary.

Note: This blog refers to siddhi-io-grpc 1.0.8 and siddhi-map-protobuf 1.0.4 versions.

Siddhi-io-gRPC

The siddhi-io-grpc is the extension that supports Siddhi to receive and publish events via gRPC protocol. As you would already know, Siddhi uses sinks to publish events and uses sources to receive events. With siddhi-io-grpc, you can achieve the following functionalities via its sinks and sources.

The message formatting for all the above features can be achieved through the below implementations.

  • Default gRPC service (EventService) implementation
  • Generic (custom) gRPC service implementation

In the default gRPC implementation, Siddhi uses a predefined gRPC service class called EventService which can receive and publish events as strings (e.g JSON, XML, and Text)( Explained in more detail below).

In the generic gRPC implementation, we can define a custom gRPC service and use that service with siddhi-io-grpc (More information below). But to do that, we have to create a jar from, the defined proto file (service file), and add that to the project classpath.

Generate jar from proto file for custom gRPC service implementation

If you are familiar with Gradle or Maven you can auto-generate jar file by building the proto file. Follow the below steps to create the jar file using Gradle or Maven. (You can skip this section if you are going to use, siddhi-io-grpc with the default gRPC service implementation).

  • Maven
  1. Execute the following command in the terminal to create a maven project.
    mvn archetype:generate -DgroupId=com.siddhi -DartifactId=siddhi -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
  2. Navigate to ./siddhi directory and replace the pom.xml file with this pom.xml file.
  3. Create a directory name proto in the ./siddhi/src/main directory.
  4. Create your desired .proto file inside that directory.
  5. Finally from the ./siddhi directory, run following command (through the command line)
    mvn clean install
  6. You will be able to get the generated jar file from ./siddhi/target directory.
  • Gradle
  1. Create a project structure by executing the following commands on the terminal.
    mkdir -p siddhi/src/main/java/
    mkdir -p siddhi/src/main/proto/
  2. Download build.gradle from here.
  3. Copy the build.gradle file into the ./siddhi directory.
  4. Create your desired .proto file inside the ./siddhi/src/main/proto/ directory.
  5. Navigate to the siddhi directory through the command line.
  6. Execute the following command from thesiddhi directory (through the command line).
    ./gradlew build
  7. You will be able to get the generated jar file fromsiddhi/build/libs directory.

To use the generated jar in Siddhi Editor (tooling) or in Siddhi Runner distributions, add it to the {SiddhiDistribution_Home}/jars folder. To use it within a Java project add that to the project classpath.

Details on using gRPC features supported by Siddhi

This section discusses how to use siddhi-io-grpc to receive, publish, perform service calls, and to serve incoming gRPC messages. To do these we have to know about each gRPC source and sink. As mentioned above each source and sink have two different data mapping implementations.

  • Default gRPC service (EventService) implementation
  • Generic(Custom) gRPC service implementation

(In this section, we will use the sample.proto file for the generic gRPC service examples. As mentioned above you can define your own gRPC service definition and use it with siddhi-io-grpc generic implementation).

1. Publish event (and expect empty responses) (via grpc Sink)

how grpc-sink works
Publish event (and expect empty responses)

grpc sink will publish events to a given gRPC service. We can use this sink when we just want to publish events from Siddhi, and when we expect only empty responses back from the server (as Fire and Forget). Let’s discuss it with an example.

@sink(type="grpc", publisher.url="<STRING>", @map(…)))

This is the basic configuration of grpc-sink, where we only defined the publisher.url as follows,

grpc://<host-address>:<port>/<serviceClassName_with_package>/<methodName>

In addition, we can use headers, timeout, retry-attempts, etc, as additional parameters. Refer to this documentation to get full knowledge about all parameters.

  • With default implementation
@sink(type=’grpc’, publisher.url = ‘grpc://134.23.43.35:8080/org.wso2.grpc.EventService/consume’, 
@map(type=’json’))
define stream FooStream (message String);

The above configuration will publish events to a gRPC server that runs on the host 134.23.43.35. As we are using the default gRPC service, we are also using a JSON mapper (we can also use Text or XML mappers), to convert the Siddhi events to JSON strings and send them to the gRPC server, through the message attribute of the default gRPC service. Look at the EventService protobuf definition mentioned above for details.

  • With generic implementation
@sink(type='grpc', publisher.url = 'grpc://134.23.43.35:8080/io.siddhi.extension.io.grpc.proto.MyService/send', 
@map(type='protobuf'))
define stream FooStream(stringValue string, intValue int,
longValue long, booleanValue bool, floatValue float,
doubleValue double);

The above configuration will publish events to the send method in MyService service which runs on the host 134.23.43.35:8080. Here when we define the service name, we have to define it with the package name as io.siddhi.extension.io.grpc.proto.MyService. Earlier in the default grpc implementation, we used json mapper. But in the generic gRPC implementation, we have to use theprotobuf mapper. This is because when we publish events to the gRPC service, Protobuf mapper is the one that converts Siddhi events into the particular Protobuf message type and send them through the grpc-sink. (You don’t have to additionally learn about siddhi protobuf mapper, I will be covering how to use protobuf mapper with siddhi-io-grpc generic implementation). You may have already noticed that the attribute names of the stream definition (stringValue string, intValue int…) and the attribute names of the Request protobuf message definition (look at the sample.proto protobuf definition mentioned above) are the same. This is to help the protobuf mapper to map the correct stream attribute with the correct protobuf message attribute names. You always don’t have to use the same attribute names as the protobuf message definition. If you want to use different names for stream definition you can also do that, thanks to the siddhi-protobuf mapper. See the following example;

@sink(type='grpc', publisher.url = 'grpc://134.23.43.35:8080/io.siddhi.extension.io.grpc.proto.MyService/send', 
@map(type='protobuf', @payload( stringValue='a', longValue='b',
intValue='c', booleanValue='d', floatValue='e', doubleValue='f')))
define stream FooStream (a string, b long, c int, d bool,
e float, f double);

Here, we used different names in the stream definition other than what was used in the Protobuf message definition, hence we use the @payload parameter to map stream attributes with the correct Protobuf attributes.

In the above two examples, the reason why I used stringValue, intValue… attributes are to show that you can send/receive these types of attributes from siddhi-io-grpc. Here is the list of attributes that you can use with siddhi-io-grpc implementation,

  • String
  • Integer
  • Double
  • Float
  • Boolean
  • Long
  • Map
  • List
  • Nested and Other message types

Now you might wonder what is Map, List, and Nested and Other message types, and how to use them, let me explain them with the following example.

@sink(type='grpc', publisher.url = 'grpc://134.23.43.35:8080/io.siddhi.extension.io.grpc.proto.MyService/testMap', 
@map(type='protobuf'),
define stream FooStream (stringValue string, intValue int,
map object);

Same as above this sink will publish events to the testMap method in the MyService service which runs on the host 134.23.43.35:8080. But this time it uses an attribute called map object. This attribute is used to send a map to the given gRPC server. If you check the sample.proto there’s a Protobuf message type called RequestWithMap with an attribute map<string, string> map = 3; to publish map objects to the server. To utilize this, we have to create an event with a map object (map object which extends java.util.AbstractMap class) and send it to the gRPC server through the grpc-sink. Similar to this, you can also use list, nested message and other message types with protobuf. (Note:- you can’t create or send map, list, nested or other message types objects using the Event simulator provided in the Siddhi Editor. You can pass these objects when you use Siddhi as a Java project, or via relevant extensions such as siddhi-execution-map).

Additionally, you can also define a client streaming gRPC method and use it with the grpc-sink, it will be faster than using unary gRPC methods. The way to use a client streaming method is the same as the above two examples.

2. Call external services (via grpc-call Sink and grpc-call-response Source)

Call external services

In the earlier topic, grpc-sink published events to the gRPC service, and only got empty responses back from the server. But there might be scenarios where we want to publish events to the servers, get responses back, to process or make decisions based on them. We can achieve this by using grpc-call (Sink) and grpc-call-response (source).

As shown in the above image, grpc-call (sink) will publish events to the gRPC server, and when the gRPC server sends responses, the grpc-call-response (source) will catch the responses correlating to the published event and sends them for further processing. When we define a grpc-call (sink) we have to always define a corresponding grpc-call-response (source) too. Here the source and sink are correlated using the sink.id property as shown below.

@sink(type="grpc-call", publisher.url="<STRING>", sink.id="<STRING>", @map(…)@source(type="grpc-call-response", sink.id="<STRING>", @map(...)))

This is the basic sink and source configs to publish events to a gRPC endpoint and receive responses. Refer the documentation to learn about other parameters that can be used with grpc-call (sink) and grpc-call-response (source).

  • With default implementation
@sink(type='grpc-call', publisher.url='grpc://194.23.98.100:8080/EventService/process',  sink.id= '1', 
@map(type='json'))
define stream FooStream (message String);
@source(type='grpc-call-response', sink.id= '1', @map(type='json'))
define stream BarStream (message String);

Here, a stream named FooStream is defined with the grpc-call sink to send requests to a gRPC server running at 194.23.98.100 to port 8080. Here both the sink and the source are defined with the same sink.id. The sink.id of the grpc-call sink set to 1, and hence the grpc-call-response source sink.id is also set to 1, such that it will listen to corresponding responses for requests published via grpc-call sink. Note, since we are calling EventService/process, the sink will be operating in the default mode.

  • With generic gRPC implementation
@sink(type='grpc-call', publisher.url='grpc://194.23.98.100:8888/io.siddhi.extension.io.grpc.proto.MyService/process', sink.id='1', @map(type='protobuf')) 
define stream FooStream(stringValue string, intValue int,
longValue long, booleanValue bool,
floatValue float, doubleValue double);
@source(type='grpc-call-response', receiver.url='grpc://localhost:8888/io.siddhi.extension.io.grpc.proto.MyService/process', sink.id='1', @map(type='protobuf'))
define stream FooStream(stringValue string, intValue int,
longValue long, booleanValue bool,
floatValue float,doubleValue double);

As same as the default implementation, we have to define the same sink.id in both sink and source, such that the source will listen to responses for requests that are published from the sink, defined in FooStream. Here, as mentioned in the grpc-sink, protobuf mapper will convert the event into the particular protobuf message type and publish the event to the server through the grpc-call-sink. Apart from that, in the source, we have to additionally provide a parameter called receiver.url which is the same as the publisher.url of the sink.receiver.url is used by the protobuf mapper to correctly identify the type of response that it receives from the gRPC server so that it can convert protobuf messages into Siddhi events. As same as grpc-sink if you want to use different names for the stream definition you can use @payload and them. An example is as follows,

@sink(type='grpc-call', publisher.url='grpc://194.23.98.100:8888/io.siddhi.extension.io.grpc.proto.MyService/process', sink.id='1', 
@map(type='protobuf', @payload(stringValue='a', longValue='c', intValue='b', booleanValue='d', floatValue='e', doubleValue='f')))
define stream FooStream (a string, b int, c long, d bool,
e float, f double);
@source(type='grpc-call-response', receiver.url='grpc://localhost:8888/io.siddhi.extension.io.grpc.proto.MyService/process', sink.id='1', @map(type='protobuf',@attributes(a='stringValue', b'='intValue',
c='longValue', d='booleanValue', e='floatValue', f='doubleValue')))
define stream FooStream (a string, b int, c long, d bool,
e float, f double);

Here, the grpc-call sink is similar to the grpc sink. The only difference in the grpc-call sink is that we have to define a corresponding grpc-call-response source along with the grpc-call sink definition, and unlike grpc sink, the grpc-call sink does not support streaming calls, therefore we can only publish and receive events via unary gRPC methods.

3. Receive events (and only send empty responses) (via grpc Source)

In the above two topics, we discussed how to publish events to gRPC endpoints. From this topic let’s learn how to receive requests from given gRPC clients.

Receive events (and only send empty responses)

As you may already know Siddhi sources can receive events from different endpoints. So the grpc source is responsible for listening to the gRPC clients and pass the requests into the relevant Siddhi Streams for processing. Once the grpc source gets a request from the client it sends an empty response back as shown in the above image. We should use this source in situations where we just need to get requests to the Siddhi App and when no information needs to be sent back as responses (just like the grpc-sink, but in the other direction).

@source(type=”grpc”, receiver.url=”<STRING>”, @map(…)))

This is the basic configuration that is needed to create a grpc source. Here we only need to define the receiver.url parameter (Please refer to this documentation to learn about other additional parameters that you can use with this source). Once you deploy the Siddhi App this grpc-source, it will start a gRPC server on the given port. Look at the following examples to get a better understanding.

  • With default gRPC implementation
@source(type='grpc', receiver.url='grpc://0.0.0.0:8888/org.wso2.grpc.EventService/consume', @map(type='json')) 
define stream BarStream (message String);

Here in the receiver.url is given as grpc://0.0.0.0:8888/org.wso2.grpc.EventService/consume. The reason we give 0.0.0.0:8888 is for starting the gRPC server on port 8888 bindings to all network interfaces. As you already know EventService is the default gRPC service class and the consume method is the default method used in the grpc source.

  • With generic implementation
@source(type='grpc',     receiver.url='grpc://0.0.0.0:8888/io.siddhi.extension.io.grpc.proto.MyService/send', @map(type='protobuf')) 
define stream BarStream (stringValue string, intValue int,
longValue long, booleanValue bool,
floatValue float, doubleValue double);

As same as the above, here, the gRPC server will be started on port 8888 and gRPC clients can send requests to it through the send method in the MyService gRPC service. As mentioned earlier, you can use your custom protobuf definition and use that with this source too. But here, when you define an rpc method in the proto file you have to define it as follows,

rpc <method_name>(<request_message_type>) returns(google.protobuf.Empty);

here the return type should be google.protobuf.Empty, this is because the client is not expecting any information back from the grpc source.

Additionally, if needed you can use different attribute names in the stream definition and map those attributes with the correct protobuf attribute names with the help of Siddhi protobuf mapper. Look at the following example for details,

@source(type='grpc', receiver.url='grpc://0.0.0.0:8888/io.siddhi.extension.io.grpc.proto.MyService/send', 
@map(type='protobuf', @attributes(a='stringValue', b='intValue', c='longValue', d='booleanValue', e='floatValue', f='doubleValue')))
define stream BarStream (a string, c long, b int,
d bool, e float, f double);

You can also use streaming methods (client stream) with this source, and publish requests from the clients as a stream of requests. An example of this as follows

@source(type='grpc', receiver.url='grpc://localhost:8888/org.wso2.grpc.StreamService/clientStream', @map(type='protobuf')) 
define stream BarStream (stringValue string, intValue int,
longValue long, booleanValue bool,
floatValue float, doubleValue double);

With grpc source, the clients can send requests as a stream or non-stream (unary method), and the grpc source will automatically identify what kind of requests it gets from and handles them accordingly.

4. Serve incoming requests with synchronous responses (via grpc-service Source and grpc-service-response Sink)

Earlier grpc source only consumes requests from clients and sends empty responses back. But, if we want to get requests from the clients and also send some meaningful responses back, then we have to use the grpc-service source and the grpc-service-response sink.

Serve incoming requests with synchronous responses

As shown in the above image, the grpc-service source will consume requests from gRPC clients and will respond back using the grpc-service-response sink. The configuration to achieve these are as follows,

@source(type='grpc-service', receiver.url='<STRING>', source.id='<STRING>', @map(…)))@sink(type='grpc-service-response', source.id='<STRING>', @map(...)))

Please refer to the documentation to learn about other additional parameters that you can use with grpc-service source and grpc-service-response sink. Similar to the grpc-call sink, when you define a grpc-service you have to also define grpc-service-response along with it. Look at the following examples for details,

  • With default gRPC implementation
@source(type='grpc-service', receiver.url='grpc://134.23.43.35:8080/org.wso2.grpc.EventService/process', source.id='1',  
@map(type='json', @attributes(messageId='trp:messageId', message='message')))
define stream FooStream (messageId String, message String);
@sink(type='grpc-service-response', source.id='1', message.id='{{messageId}}',
@map(type='json'))
define stream BarStream (messageId String, message String);
from FooStream
select *
insert into BarStream;

Here, a gRPC server will be started at port 8080 listing on the IP 134.23.43.35, and the process method of EventService will be exposed to the clients. Same source.id, 1 is configured in both grpc-service source and the corresponding grpc-service-response sink, such that the sink will be responsible for sending the responses for requests received by the source. It is also necessary to pass the transport property called messageId, from the source to the sink in order to correlate each request message with the corresponding response message.

  • With generic gRPC implementation
@source(type='grpc-service', receiver.url='grpc://134.23.43.35:8888/io.siddhi.extension.io.grpc.proto.MyService/process', source.id='1', 
@map(type='protobuf', @attributes(
messageId='trp:message.id',
a='stringValue', b='intValue', c='longValue',
d='booleanValue',e='floatValue', f='doubleValue')))
define stream FooStream (a string, b int, c long, d bool, e float,
f double, messageId string);
@sink(type='grpc-service-response',
publisher.url='grpc://134.23.43.35:8888/io.siddhi.extension.io.grpc.proto.MyService/process', source.id='1', message.id='{{messageId}}',
@map(type='protobuf', @payload(
stringValue='a', intValue='b', longValue='c',
booleanValue='d', floatValue='e', doubleValue='f')))
define stream BarStream(a string, b int, c long, d bool,
e float, f double, messageId string);
from FooStream
select *
insert into BarStream;

Here, a gRPC server will be started at port 8888, and the process method of the MyService will be exposed to the clients. The source.id is set as 1 on both grpc-service source and grpc-service-response sink. Unlike grpc source, here it’s mandatory to map the attribute names of the stream definition with the protobuf message, as we need to retrieve the messageId transport property (which is not available in the protobuf message definition), to correlate the request message with the response. In addition, we also have to pass the publisher.url parameter to the grpc-service-response sink which is the same as the receiver.url parameter in the grpc-service source definition, such that the protobuf mapper can map the responses correctly.

In this article, We looked at how Siddhi can be integrated with gRPC services and clients using siddhi-io-grpc extension in four different ways. I hope this overview was useful to get an understanding of use of gRPC with Siddhi. If you have any questions related to siddhi-io-grpc, feel free to open an issue in the Git repo or contact the dev team via slack.

--

--

Sahan Dilshan
siddhi-io

Software Engineer @WSO2 | NLP/Deep Learning Enthusiastic