Micro In Action(四):Pub/Sub

Che Dan
9 min readFeb 14, 2020
Micro In Action

本文是Micro系列文章的第四篇。我们将以实际开发微服务为主线,顺带解析相关功能。从最基本的话题开始,逐步转到高级特性。

接下来谈谈异步消息处理。要构建一个可伸缩、高容错、高并发的系统, 异步消息处理是一个关键技术。这种技术虽然强大, 但开发起来也相当麻烦, 远没有同步请求那样简单直接。

好在Micro对这个编程模型作了非常好的抽象与封装,供我们便利地使用。

除此之外, 借助Micro的接口抽象, 我们可以透明(或者说几乎透明)地支持各种消息服务器。Micro 默认提供了基于内嵌的Nats消息服务器。同时也以插件形式提供了多种主流消息服务系统的支持。包括 Kafka,RabbitMQ,MQTT,NSQ,Amazon SQS 等。你可以到插件主页了解更多详细说明。 这使得我们在因业务需要而切换消息服务时,可以几乎不修改任何业务代码。

Micro 支持以两种不同方式处理异步消息, 一种是Pub/Sub,另一种是使用micro.Broker接口进行消息收发。 前者相对简单,后者则能提供更大灵活度。

Micro 内置的 Pub/Sub 功能统一并简化了异步消息的收、发、编码和解码。这把开发者从底层技术细节中解放出来,去专注于创造业务价值。多数情况下我们应优先选择此方式。

下面我们将以实例解析一套Pub/Sub系统的开发和运行。

Sub,订阅消息

在本系列第一篇文章中, 我们创建了一个示例项目,其中已经包含了订阅相关的代码。

首先定义消息处理Handler, ./subscriber/hello.go 代码如下:

package subscriber

import (
"context"
log "github.com/micro/go-micro/v2/logger"

hello "hello/proto/hello"
)

type Hello struct{}

func (e *Hello) Handle(ctx context.Context, msg *hello.Message) error {
log.Info("Handler Received message: ", msg.Say)
return nil
}

接收消息的代码是对象的方法, 其签名为 func(context.Context, v interface{}) error

注意在示例中方法的第二个参数是 *hello.Message, 此类型在.proto 文件中定义。Micro框架会自动完成消息的解码。我们在Handler中可以直接使用。

--

--