go-micro broker with RabbitMQ
之前有简单了解过go-micro的broken以及默认的http实现(参考:),目前因为消息数量以及稳定性的需要,想引用消息队列,初步了解了下RabbitMQ的在go-micro中的应用。
安装RabbitMQ
我这边是macOS直接使用brew命令安装的:
brew update
brew install rabbitmq
安装完成后,还需要把rabbitmq的路径加入到PATH
中,方便之后命令行使用CLI tools执行相应的用户和权限的操作。
brew自动安装完成后会在/usr/local/sbin
下面创建rabbitmq可执行程序的软链,所以只需要确保/usr/local/sbin
在环境的PATH
即可
rabbitmq的安装完成后的目录/usr/local/Cellar/rabbitmq/
其他的方式或者系统可参考RabbitMQ的官方文档Installation Guides
RabbitMQ实现的go-micro broker plugin
github上已经有实现好的plugin,我们直接使用即可,地址:rabbitmq for go-micro broker plugin
- go.mod中添加plugin
github.com/micro/go-micro/v2 v2.5.0
github.com/micro/go-plugins/broker/rabbitmq/v2 v2.5.0
这里使用的时候需要注意plugin的version和go-micro使用的version匹配。 我本来使用的是go-micro v2.4.0的版本,但是plugin没有对应匹配的版本,所以我升级到了v2.5.0,最终go-micro和plugin都使用了v2.5.0的版本。
- main.go中初始化RabbitMQ
b := rabbitmq.NewBroker(
broker.Addrs("amqp://username:password@localhost:5672"),
)
if err := b.Init(); err != nil {
log.Fatalf("Broker Init error: %v", err)
}
if err := b.Connect(); err != nil {
log.Fatalf("Broker Connect error: %v", err)
}
Publish/Subscribe
中这部分初始化是一样的
- main.go中设置service中的broker实现
// New Service
service := micro.NewService(
micro.Name("ioridy.micro.api.hello"),
micro.Version("latest"),
micro.Broker(b), //默认是http实现,需要执行替换
)
这里有个小点需要提醒下,如果你直接使用broker中的接口做
Publish/Subscribe
,是不需要上面这样设置service中broker的实现的,这个替换是针对使用micro下封装的Event
以上就已经完成了RabiitMQ在go-micro中的使用,之后只需要按照broker中的接口使用即可。
Publish/Subscribe
最后还是放个Publish/Subscribe
的实际例子供参考下
- Publish
const DemoTopic = "ioridy.micro.msg.hello"
var count = 1
func MessagePublish(msg string) {
message := &broker.Message{
Header: map[string]string{
"count": fmt.Sprintf("%d", count),
},
Body: []byte("this is event body"),
}
count += 1
if err := broker.Publish(DemoTopic, message); err != nil {
log.Infof("[Hello pub] failed: %v", err)
} else {
log.Infof("[Hello pub] pubbed message:", string(message.Body))
}
}
- Subscribe
const DemoTopic = "ioridy.micro.msg.hello"
// Example of a shared subscription which receives a subset of messages
func sharedSub() {
_, err := broker.Subscribe(DemoHelloTopic, func(p broker.Event) error {
fmt.Println("[Word sharedSub] received message:", string(p.Message().Body), "header", p.Message().Header)
return nil
}, broker.Queue("consumer"))
if err != nil {
fmt.Println(err)
}
}
// Example of a subscription which receives all the messages
func sub() {
_, err := broker.Subscribe(DemoHelloTopic, func(p broker.Event) error {
fmt.Println("[World sub] received message:", string(p.Message().Body), "header", p.Message().Header)
return nil
})
if err != nil {
fmt.Println(err)
}
}
以上部分的Subscribe需要放在main.go中初始化的