ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
[TOC] > NSQ是Go语言编写的一个开源的实时分布式内存消息队列,其性能十分优异 ## 安装NSQ服务端 ### 下载地址 > https://nsq.io/deployment/installing.html 下载linux最新稳定版 ### 启动nsqlookupd > 主要负责服务发现 负责nsqd的心跳、状态监测,给客户端、nsqadmin提供nsqd地址与状态 ~~~ ./nsqlookupd ~~~ ### 启动nsqd > 负责接收消息,存储队列和将消息发送给客户端 ~~~ ./nsqd --lookupd-tcp-address=127.0.0.1:4160 ~~~ ### 启动nqsadmin > nsqadmin是一个web管理界面,访问地址 http://localhost:4171/ ~~~ ./nsqadmin --lookupd-http-address=127.0.0.1:4161 ~~~ ## 生产者 (nsq-send.go) ~~~ package main import ( "github.com/nsqio/go-nsq" "fmt" "strconv" ) var ( //nsqd的地址,使用了tcp监听的端口 tcpNsqdAddrr = "127.0.0.1:4150" ) func main() { //初始化配置 config := nsq.NewConfig() for i := 0; i < 100; i++ { //创建100个生产者 tPro, err := nsq.NewProducer(tcpNsqdAddrr, config) if err != nil { fmt.Println(err) } //主题 topic := "topic_demo" //主题内容 tCommand := "new data!" + strconv.Itoa(i) //发布消息 err = tPro.Publish(topic, []byte(tCommand)) if err != nil { fmt.Println(err) } } } ~~~ ## 消费者 (nsq-receive.go) > 测试服务器上可执行`go run nsq-receive.go`进行测试 > 正式服务器上,需先进行打包`go build nsq-receive.go`后,再使用`nohup ./nsq-receive &`或者`supervisor`进行运行 ~~~ package main import ( "fmt" "time" "github.com/nsqio/go-nsq" ) // 消费者 type ConsumerT struct{} // 主函数 func main() { InitConsumer("topic_demo", "test-channel", "127.0.0.1:4161") for { time.Sleep(time.Second * 10) } } //处理消息 func (*ConsumerT) HandleMessage(msg *nsq.Message) error { fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body)) return nil } //初始化消费者 func InitConsumer(topic string, channel string, address string) { cfg := nsq.NewConfig() cfg.LookupdPollInterval = time.Second //设置重连时间 c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者 if err != nil { panic(err) } c.SetLogger(nil, 0) //屏蔽系统日志 c.AddHandler(&ConsumerT{}) // 添加消费者接口 //建立NSQLookupd连接 if err := c.ConnectToNSQLookupd(address); err != nil { panic(err) } } ~~~