Golang整合NSQ消息队列


生产者代码部分


package main

import (
	"fmt"
	"github.com/nsqio/go-nsq"
	"time"
)

func main() {
	var (
		topic   = "test"
		address = "127.0.0.1:4150"
	)
	//实例化配置结构体
	cfg := nsq.NewConfig()
	cfg.DialTimeout = time.Second * 1
	cfg.MsgTimeout = time.Second * 1
	producer, err := nsq.NewProducer(address, cfg)
	if err != nil {
		fmt.Printf("异常:{\n %s \n}\n", err.Error())
	}

	err = producer.Ping()
	if err != nil {
		fmt.Printf(err.Error())
	}

	for i := 1; i < 20; i++ {
		err := producer.Publish(topic, []byte("test message"))
		if err != nil {
			fmt.Errorf(err.Error())
		}
		fmt.Print("test message\n")
	}
	fmt.Println("==========")
}

消费者代码部分


package main

import (
	"fmt"
	"github.com/nsqio/go-nsq"
	"time"
)

func main() {
	//定义消费者topic和nsq服务地址
	var (
		topic   = "test"
		address = "127.0.0.1:4161"
	)
	//实例化配置文件
	cfg := nsq.NewConfig()
	cfg.LookupdPollInterval = 3 * time.Second
	//设置重连时间
	consumer, err := nsq.NewConsumer(topic, topic, cfg)
	if err != nil {
		fmt.Printf("异常:{\n %s \n}\n", err.Error())
	}
	handler := MessageHandler{nsqConsumer: consumer}
	consumer.AddHandler(&handler)
	err = consumer.ConnectToNSQLookupd(address)
	if err != nil {
		fmt.Printf("异常:{\n %s \n}\n", err.Error())
	}
	select {}
}

type MessageHandler struct {
	nsqConsumer *nsq.Consumer
}

func (m MessageHandler) HandleMessage(message *nsq.Message) error {
	fmt.Printf("接收到消息:{%s}\n", string(message.Body))
	return nil
}

代码地址:https://gitee.com/nooft/udp_tcp_demo/tree/master/nsq