Microservice response aggregate with golang and kafka

λ.eranga
λ.eranga
Jul 29, 2018 · 4 min read

Scenario

I’m building an application based on microservices architecture. In this scenario I have a microservice gateway service. All the incoming requests come to this service, it forward the request to other microservices and wait till response coming. Basically when request comes to gateway service it broadcast the request to other microservices and aggregate the response from them. All the service communications happened via apache kafka. Service discovery done via etcd. Services deployed with docker. Gateway service name is ops. Other services named as chain services, assume I have 3 chain services chain1, chain2 and chain3.

Gateway service has two kafka topics ops.req and ops.resp, all the request messages comes to ops.req topic, response messages from other microservices comes to ops.resp topic. Chain services have request topics chain1.req, chain2.req and chain3.req. When request message comes to chain service it do whatever the operation and send the response message back to ops.resp topic.

All request messages comes with unique id uid. When chain service receives request message with uid, it process that message and send the response to ops.resp topic with same uid.

Setting up kafka produces and consumers

In this scenario each microservice has kafka producer part and consumer part. So first setup is to setup kafka and implement kafka consumers and producers. I have written detailed articles about setting up kafka, writing kafka producer and writing kafka consumer. Please refer them for more information.

Handle request messages in ops service

Ops service request messages comes to ops.req kafka topic. When request message comes to this topic it calls handleReq function.

handleReq extracts uid of the message. Then creates a channel and add the channel to a rchans with the uid key. rchans is a golang map which contains string keys and chan string values. After creating channel it broadcast the message to chain services via kafka and starts to wait for responses in waitResp function(it waits in a goroutine).

When response receives with uid, we redirect it to corresponding channel(channel finds from rchans by using message uid). Then it picked and process by the goroutine that waiting for that channel(goroutine wait inside waitResp function).

waitResp function aggregates the responses receiving to the channel. When all responses receive, it do whatever the operation(may be send a response back to client) and remove the channel from the rchans.

Handle request messages in chain services

Chain service request messages comes to chain<id>.req kafka topic. When request comes it calls execReq function.

It do whatever the operation according to the incoming request. Then it construct response message with request message’s uid. Finally sends that response message to ops service response topic ops.resp.

Handle response messages in ops service

All the responses from chain services comes to ops.resp kafka topic. When response message comes to this topic it calls handleResp function.

Response message comes with a uid, which is the same uid of the request message. We have already created a channel which correspond with the request uid and added that channel to rchans map. Now we can find the channel which correspond with response message uid from rchans and forward the message to it.

When message receives for a channel it will pick up by the goroutine waiting for that channel(goroutine wait in waitResp function). waitResp function which executes in goroutine aggregates all the responses with same uid. When all responses receive, it do whatever the operation and remove the channel from the rchans.

Before the end

In this way we can broadcast multiple requests to chain services and wait form the responses in goroutine. We can handle all the responses from chain services asynchronously. Since multiple goroutines accessing the rchan map its better to use a thread safe map for it.

Same kind of concept can be used in scala akka based services as well. When request message comes, we can create an actor to handle the request and broadcast the message to other services(via kafka). We need to store the actor reference in a thread safe map with uid. When response receives from other services(via kafka), we can find the corresponding actor which waiting for the response from the map(by using response message uid) and send the response back to it.

Reference

  1. https://medium.com/@itseranga/kafka-and-zookeeper-with-docker-65cff2c2c34f
  2. https://medium.com/@itseranga/kafka-consumer-with-golang-a93db6131ac2
  3. https://medium.com/@itseranga/kafka-producer-with-golang-fab7348a5f9a

Rahasak

Have less, be more

λ.eranga

Written by

λ.eranga

Scala, Golang with Vim and Hockey: What else does a man need to be happy :)

Rahasak

Rahasak

Have less, be more

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade