YuHaibo
YuHaibo
Mar 3 · 20 min read

Go-Kit 是一个微服务开发工具集. 本文演示传输层的使用.

准备

需要先安装 protobuf 3, 可以直接下载二进制包或者从源码编译

brew install autoconf automake libtool
git clone https://github.com/google/protobuf
cd protobuf
./autogen.sh ; ./configure ; make ; make install

Proto

创建一个新项目, 并且新建 add.proto 文件, 使用 pb3 定义服务, 本文的示例文件来自 go-kit 的例子:

syntax = "proto3";

package pb;

// The Add service definition.
service Add {
// Sums two integers.
rpc Sum (SumRequest) returns (SumReply) {}

// Concatenates two strings
rpc Concat (ConcatRequest) returns (ConcatReply) {}
}

// The sum request contains two parameters.
message SumRequest {
int64 a = 1;
int64 b = 2;
}

// The sum response contains the result of the calculation.
message SumReply {
int64 v = 1;
}

// The Concat request contains two parameters.
message ConcatRequest {
string a = 1;
string b = 2;
}

// The Concat response contains the result of the concatenation.
message ConcatReply {
string v = 1;
}

这里定义名为 Add 的服务, 里面有 Sum 和 Concat 两个方法, 本 🌰 中就实现这两个方法.

使用以下命令编译服务定义, 得到 add.pb.go 文件

protoc add.proto --go_out=plugins=grpc:.

定义服务

新建 service.go 定义并实现服务

package server

import "context"

//interface
type AddService interface {
Sum(_ context.Context, a, b int) (v int)
Concat(_ context.Context, a, b string) (v string)
}

//service struct
type addService struct{}

//returns a implementation
func New() AddService {
return addService{}
}

func (addService) Sum(_ context.Context, a, b int) (v int) {
return a + b
}

func (addService) Concat(_ context.Context, a, b string) (v string) {
return a + b
}

创建 Endpoints

新建 endpoints.go 实现需要的两个 endpoint

package server

import (
"context"
"github.com/go-kit/kit/endpoint"
)

//all endpoints required by AddService.
type Endpoints struct {
SumEndpoint endpoint.Endpoint
ConcatEndpoint endpoint.Endpoint
}

type sumRequest struct {
A int
B int
}

type sumResponse struct {
V int
}

// MakeSumEndpoint returns an endpoint that invokes Sum on the AddService
// for server
func MakeSumEndpoint(svc AddService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(sumRequest)
v := svc.Sum(ctx, req.A, req.B)
return sumResponse{v}, nil
}
}

// Sum implements AddService
//for client
func (e Endpoints) Sum(ctx context.Context, a, b int) int {
req := sumRequest{A:a, B:b}
res, err := e.SumEndpoint(ctx, req)
if err != nil {
return sumResponse{0}.V
}
return res.(sumResponse).V
}

type concatRequest struct {
A string
B string
}

type concatResponse struct {
V string
}

// MakeConcatEndpoint returns an endpoint that invokes Sum on the AddService
// for server
func MakeConcatEndpoint(svc AddService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(concatRequest)
v := svc.Concat(ctx, req.A, req.B)
return concatResponse{v}, nil
}
}

// Concat implements AddService
//for client
func (e Endpoints) Concat(ctx context.Context, a, b string) string {
req := concatRequest{A:a, B:b}
res, err := e.ConcatEndpoint(ctx, req)
if err != nil {
return concatResponse{"error"}.V
}
return res.(concatResponse).V
}

HTTP 传输层

对于 http 传输层比较简单, 实现 http handler 和响应的 encode 和 decode. 新建 http_transport.go

package server

import (
"context"
"encoding/json"
httptransport "github.com/go-kit/kit/transport/http"
"net/http"
)

func MakeHTTPHandler(endpoints Endpoints) http.Handler {
m := http.NewServeMux()
m.Handle("/sum",
httptransport.NewServer(
endpoints.SumEndpoint,
DecodeHTTPSumRequest,
EncodeHTTPResponse,
))

m.Handle("/concat",
httptransport.NewServer(
endpoints.ConcatEndpoint,
DecodeHTTPConcatRequest,
EncodeHTTPResponse,
))

return m
}

func DecodeHTTPSumRequest(_ context.Context, r *http.Request) (interface{}, error) {
var request sumRequest
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
return nil, err
}
return request, nil
}

func DecodeHTTPConcatRequest(_ context.Context, r *http.Request) (interface{}, error) {
var request concatRequest
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
return nil, err
}
return request, nil
}

func EncodeHTTPResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
return json.NewEncoder(w).Encode(response)
}

gRPC 传输层

gRPC 也类似, 实现 server 和响应的 encode 和 decode 方法即可, 但需要对所有的请求和相应定义 encode 和 decode 方法, 部分供 gRPC 服务端使用, 部分供 gRPC 客户端使用. 新建 grpc_transport.go

此处 encode / decode 比较繁琐, 注意不要写错

package server

//Server-side bindings for the gRPC transport

import (
"context"
grpctransport "github.com/go-kit/kit/transport/grpc"
"go-kit-demo/pb"
)

//returns a set of handlers available as a gRPC AddServer.
func MakeGRPCServer(endpoints Endpoints) pb.AddServer {
return &grpcServer{
sum: grpctransport.NewServer(
endpoints.SumEndpoint,
DecodeGRPCSumRequest,
EncodeGRPCSumResponse,
),
concat: grpctransport.NewServer(
endpoints.ConcatEndpoint,
DecodeGRPCConcatRequest,
EncodeGRPCConcatResponse,
),
}
}

type grpcServer struct {
sum grpctransport.Handler
concat grpctransport.Handler
}

func (s *grpcServer) Sum(ctx context.Context, req *pb.SumRequest) (*pb.SumReply, error) {
_, resp, err := s.sum.ServeGRPC(ctx, req)
if err != nil {
return nil, err
}
return resp.(*pb.SumReply), nil
}

func (s *grpcServer) Concat(ctx context.Context, req *pb.ConcatRequest) (*pb.ConcatReply, error) {
_, resp, err := s.concat.ServeGRPC(ctx, req)
if err != nil {
return nil, err
}
return resp.(*pb.ConcatReply), nil
}

//Encode & Decode Func
//https://github.com/go-kit/kit/blob/master/transport/grpc/encode_decode.go

// DecodeGRPCSumRequest is a transport/grpc.DecodeRequestFunc
// for server
func DecodeGRPCSumRequest(_ context.Context, request interface{}) (interface{}, error) {
req := request.(*pb.SumRequest)
return sumRequest{int(req.A), int(req.B)}, nil
}

// EncodeGRPCSumRequest is a transport/grpc.EncodeRequestFunc
// for client
func EncodeGRPCSumRequest(_ context.Context, request interface{}) (interface{}, error) {
req := request.(sumRequest)
return &pb.SumRequest{A: int64(req.A), B: int64(req.B)}, nil
}

// EncodeGRPCSumResponse is a transport/grpc.EncodeResponseFunc
// for server
func EncodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) {
resp := response.(sumResponse)
return &pb.SumReply{V: int64(resp.V)}, nil
}

// DecodeGRPCSumResponse is a transport/grpc.DecodeResponseFunc
// for client
func DecodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) {
resp := response.(*pb.SumReply)
return sumResponse{int(resp.V)}, nil
}

func DecodeGRPCConcatRequest(_ context.Context, request interface{}) (interface{}, error) {
req := request.(*pb.ConcatRequest)
return concatRequest{req.A, req.B}, nil
}

func EncodeGRPCConcatRequest(_ context.Context, request interface{}) (interface{}, error) {
req := request.(concatRequest)
return &pb.ConcatRequest{A: req.A, B: req.B}, nil
}

func EncodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) {
resp := response.(concatResponse)
return &pb.ConcatReply{V: resp.V}, nil
}

func DecodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) {
resp := response.(*pb.ConcatReply)
return concatResponse{resp.V}, nil
}

Server

最后实现 HTTP 和 gRPC 服务即可, 新建 main.go, 例子中对服务同时实现了 HTTP 传输层与 gRPC 传输层. 运行 main.go 即可启动服务.

package main

import (
"flag"
"fmt"
"github.com/go-kit/kit/log"
"go-kit-demo/pb"
"go-kit-demo/server"
"google.golang.org/grpc"
"net"
"net/http"
"os"
"os/signal"
"syscall"
)

func main() {
httpAddr := flag.String("HTTP", ":8890", "HTTP server")
gRPCAddr := flag.String("gRPC", ":8891", "gRPC server")
flag.Parse()

var logger log.Logger
{
logger = log.NewLogfmtLogger(os.Stdout)
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
logger = log.With(logger, "caller", log.DefaultCaller)
}
logger.Log("msg", "Server Start...")
defer logger.Log("msg", "Closed")

svc := server.New()

endpoints := server.Endpoints{
SumEndpoint: server.MakeSumEndpoint(svc),
ConcatEndpoint: server.MakeConcatEndpoint(svc),
}

// Error channel.
errc := make(chan error)

// Interrupt handler.
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
errc <- fmt.Errorf("%s", <-c)
}()

// HTTP transport.
go func() {
logger := log.With(logger, "transport", "HTTP")
logger.Log("addr", *httpAddr)

handler := server.MakeHTTPHandler(endpoints)
errc <- http.ListenAndServe(*httpAddr, handler)
}()

// gRPC transport.
go func() {
logger := log.With(logger, "transport", "gRPC")
logger.Log("addr", *gRPCAddr)

listener, err := net.Listen("tcp", *gRPCAddr)
if err != nil {
errc <- err
return
}

srv := server.MakeGRPCServer(endpoints)
s := grpc.NewServer()
pb.RegisterAddServer(s, srv)
errc <- s.Serve(listener)
}()

logger.Log("exit", <-errc)
}

Client

对于 HTTP Server, 我们可以直接使用 curl 进行调试

#!/usr/bin/env sh -v

#Sum Response
curl -d '{"a":11111, "b":22222}' http://127.0.0.1:8890/sum

#Concat Response
curl -d '{"a":"11111", "b":"22222"}' http://127.0.0.1:8890/concat

同时实现一个 gRPC Client, 此处需要用到 grpc_transport.go 中定义的 encode / decode 方法.

package grpc

import (
grpctransport "github.com/go-kit/kit/transport/grpc"
"go-kit-demo/pb"
"go-kit-demo/server"
"google.golang.org/grpc"
)

func New(conn *grpc.ClientConn) server.AddService {
sumEndpoint := grpctransport.NewClient(
conn, "pb.Add", "Sum",
server.EncodeGRPCSumRequest,
server.DecodeGRPCSumResponse,
pb.SumReply{},
).Endpoint()

concatEndpoint := grpctransport.NewClient(
conn, "pb.Add", "Concat",
server.EncodeGRPCConcatRequest,
server.DecodeGRPCConcatResponse,
pb.ConcatReply{},
).Endpoint()

return server.Endpoints{
SumEndpoint: sumEndpoint,
ConcatEndpoint: concatEndpoint,
}
}
package main

import (
"context"
"flag"
grpcclient "go-kit-demo/client/grpc"
"go-kit-demo/server"
"google.golang.org/grpc"
"log"
"time"
)

func main() {
gRPCAddr := flag.String("gRPC", ":8891", "gRPC client")
flag.Parse()

conn, err := grpc.Dial(
*gRPCAddr, grpc.WithInsecure(),
grpc.WithTimeout(time.Second),
)

if err != nil {
log.Fatalln("gRPC dial error:", err)
}
defer conn.Close()

addService := grpcclient.New(conn)

println("Sum Response:")
sum(context.Background(), addService, 11111, 22222)

println("Concat Response:")
concat(context.Background(), addService, "11111", "22222")
}

func sum(ctx context.Context, svc server.AddService, a, b int) {
output := svc.Sum(ctx, a, b)
println(output)
}

func concat(ctx context.Context, svc server.AddService, a, b string) {
output := svc.Concat(ctx, a, b)
println(output)
}

运行客户端, 应该能得到如下响应

➜ ~/GitHub/go-kit-demo git:(master)>go run client/main.go 
Sum Response:
33333
Concat Response:
1111122222

一个简单的 go-kit 微服务就实现了, 并且实现了两种传输层.

本文所有的代码可在 Github 上找到 (tag: v1.0)

Next

接下来将实现更多的功能.

YuHaibo

Written by

YuHaibo

Code & Life. 🏠 https://yuhaibo.com/

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade