本文是Micro系列文章的第四篇。我们将以实际开发微服务为主线,顺带解析相关功能。从最基本的话题开始,逐步转到高级特性。
接下来谈谈异步消息处理。要构建一个可伸缩、高容错、高并发的系统, 异步消息处理是一个关键技术。这种技术虽然强大, 但开发起来也相当麻烦, 远没有同步请求那样简单直接。
好在Micro对这个编程模型作了非常好的抽象与封装,供我们便利地使用。
除此之外, 借助Micro的接口抽象, 我们可以透明(或者说几乎透明)地支持各种消息服务器。Micro 默认提供了基于内嵌的Nats消息服务器。同时也以插件形式提供了多种主流消息服务系统的支持。包括 Kafka,RabbitMQ,MQTT,NSQ,Amazon SQS 等。你可以到插件主页了解更多详细说明。 这使得我们在因业务需要而切换消息服务时,可以几乎不修改任何业务代码。
Micro 支持以两种不同方式处理异步消息, 一种是Pub/Sub,另一种是使用micro.Broker
接口进行消息收发。 前者相对简单,后者则能提供更大灵活度。
Micro 内置的 Pub/Sub 功能统一并简化了异步消息的收、发、编码和解码。这把开发者从底层技术细节中解放出来,去专注于创造业务价值。多数情况下我们应优先选择此方式。
下面我们将以实例解析一套Pub/Sub系统的开发和运行。
Sub,订阅消息
在本系列第一篇文章中, 我们创建了一个示例项目,其中已经包含了订阅相关的代码。
首先定义消息处理Handler, ./subscriber/hello.go 代码如下:
package subscriber
import (
"context"
log "github.com/micro/go-micro/v2/logger"
hello "hello/proto/hello"
)
type Hello struct{}
func (e *Hello) Handle(ctx context.Context, msg *hello.Message) error {
log.Info("Handler Received message: ", msg.Say)
return nil
}
接收消息的代码是对象的方法, 其签名为 func(context.Context, v interface{}) error
。
注意在示例中方法的第二个参数是 *hello.Message
, 此类型在.proto 文件中定义。Micro框架会自动完成消息的解码。我们在Handler中可以直接使用。