NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。它具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。
安装
本机测试时使用的是 windows 环境就独自编译了 nsq 的各模块
go get github.com/nsqio/nsq cd apps nsqd go build nsqd.go
nsq 可以搭建 mq 集群,通过 nsqlookupd 发现管理 nsqd 实例,nsqadmin 以 web 的方式管理 nsqd
1.运行 nsqlookupd
D:\go\gopath\src\github.com\nsqio\nsq\apps\nsqlookupd>nsqlookupd.exe [nsqlookupd] 2017/11/07 17:52:46.063484 INFO: nsqlookupd v1.0.0-compat (built w/ go1.8rc2) [nsqlookupd] 2017/11/07 17:52:46.084485 INFO: TCP: listening on [::]:4160 [nsqlookupd] 2017/11/07 17:52:46.088485 INFO: HTTP: listening on [::]:4161
2.运行 nsqld
D:\go\gopath\src\github.com\nsqio\nsq\apps\nsqd>nsqd --lookupd-tcp-address=127.0.0.1:4160 [nsqd] 2017/11/07 17:55:17.983173 INFO: nsqd v1.0.0-compat (built w/go1.8rc2) [nsqd] 2017/11/07 17:55:18.010175 INFO: ID: 710 [nsqd] 2017/11/07 17:55:18.011175 INFO: TOPIC(test): created [nsqd] 2017/11/07 17:55:18.012175
3.运行 nsqadmin
D:\go\gopath\src\github.com\nsqio\nsq\apps\nsqadmin>nsqadmin --lookupd-http-address=127.0.0.1:4161 [nsqadmin] 2017/11/07 17:58:30.405179 INFO: nsqadmin v1.0.0-compat (built w/go1.8rc2) [nsqadmin] 2017/11/07 17:58:30.426180 INFO: HTTP: listening on [::]:4171
使用
1.管理
我们可以访问 http://127.0.0.1:4171/ 来管理我们的 nsq
2.创建消息
除了客户端连接创建消息之外我们还可以通过 http 提交消息
3.消费消息
nsq 的 topic 可以创建多个消费 channel,一条消息可以多个通道消费使用:
package main import ( "fmt" "github.com/nsqio/go-nsq" "time" ) // 消费者 type ConsumerT struct{} // 主函数 func main() { InitConsumer("test", "test-channel", "127.0.0.1:4161") for { time.Sleep(time.Second * 10) } } // 处理消息 func (*ConsumerT) HandleMessage(msg *nsq.Message) error { fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body)) return nil } // 初始化消费者 func InitConsumer(topic string, channel string, address string) { cfg := nsq.NewConfig() cfg.LookupdPollInterval = time.Second //设置重连时间 c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者 if err != nil { panic(err) } c.SetLogger(nil, 0) //屏蔽系统日志 c.AddHandler(&ConsumerT{}) // 添加消费者接口 //建立NSQLookupd连接 if err := c.ConnectToNSQLookupd(address); err != nil { panic(err) } }
运行返回:
receive SC-201612261725:4150 message: test receive SC-201612261725:4150 message: test receive SC-201612261725:4150 message: test receive SC-201612261725:4150 message: message1 receive SC-201612261725:4150 message: message2 receive SC-201612261725:4150 message: message2
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于