golang 开源的消息队列 nsq 安装使用介绍

本贴最后更新于 2414 天前,其中的信息可能已经渤澥桑田

NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。它具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。

安装

本机测试时使用的是 windows 环境就独自编译了 nsq 的各模块

go get github.com/nsqio/nsq cd apps nsqd go build nsqd.go

nsq 可以搭建 mq 集群,通过 nsqlookupd 发现管理 nsqd 实例,nsqadmin 以 web 的方式管理 nsqd

1.运行 nsqlookupd
D:\go\gopath\src\github.com\nsqio\nsq\apps\nsqlookupd>nsqlookupd.exe [nsqlookupd] 2017/11/07 17:52:46.063484 INFO: nsqlookupd v1.0.0-compat (built w/ go1.8rc2) [nsqlookupd] 2017/11/07 17:52:46.084485 INFO: TCP: listening on [::]:4160 [nsqlookupd] 2017/11/07 17:52:46.088485 INFO: HTTP: listening on [::]:4161
2.运行 nsqld
D:\go\gopath\src\github.com\nsqio\nsq\apps\nsqd>nsqd --lookupd-tcp-address=127.0.0.1:4160 [nsqd] 2017/11/07 17:55:17.983173 INFO: nsqd v1.0.0-compat (built w/go1.8rc2) [nsqd] 2017/11/07 17:55:18.010175 INFO: ID: 710 [nsqd] 2017/11/07 17:55:18.011175 INFO: TOPIC(test): created [nsqd] 2017/11/07 17:55:18.012175
3.运行 nsqadmin
D:\go\gopath\src\github.com\nsqio\nsq\apps\nsqadmin>nsqadmin --lookupd-http-address=127.0.0.1:4161 [nsqadmin] 2017/11/07 17:58:30.405179 INFO: nsqadmin v1.0.0-compat (built w/go1.8rc2) [nsqadmin] 2017/11/07 17:58:30.426180 INFO: HTTP: listening on [::]:4171

使用

1.管理

我们可以访问 http://127.0.0.1:4171/ 来管理我们的 nsq

nsq_glpng

nsq_lppng

2.创建消息

除了客户端连接创建消息之外我们还可以通过 http 提交消息

1510107941206982115png

3.消费消息

nsq 的 topic 可以创建多个消费 channel,一条消息可以多个通道消费使用:

package main import ( "fmt" "github.com/nsqio/go-nsq" "time" ) // 消费者 type ConsumerT struct{} // 主函数 func main() { InitConsumer("test", "test-channel", "127.0.0.1:4161") for { time.Sleep(time.Second * 10) } } // 处理消息 func (*ConsumerT) HandleMessage(msg *nsq.Message) error { fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body)) return nil } // 初始化消费者 func InitConsumer(topic string, channel string, address string) { cfg := nsq.NewConfig() cfg.LookupdPollInterval = time.Second //设置重连时间 c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者 if err != nil { panic(err) } c.SetLogger(nil, 0) //屏蔽系统日志 c.AddHandler(&ConsumerT{}) // 添加消费者接口 //建立NSQLookupd连接 if err := c.ConnectToNSQLookupd(address); err != nil { panic(err) } }

运行返回:

receive SC-201612261725:4150 message: test receive SC-201612261725:4150 message: test receive SC-201612261725:4150 message: test receive SC-201612261725:4150 message: message1 receive SC-201612261725:4150 message: message2 receive SC-201612261725:4150 message: message2
  • NSQ
    2 引用 • 3 回帖
  • 消息队列
    40 引用 • 52 回帖 • 2 关注
  • golang

    Go 语言是 Google 推出的一种全新的编程语言,可以在不损失应用程序性能的情况下降低代码的复杂性。谷歌首席软件工程师罗布派克(Rob Pike)说:我们之所以开发 Go,是因为过去 10 多年间软件开发的难度令人沮丧。Go 是谷歌 2009 发布的第二款编程语言。

    498 引用 • 1395 回帖 • 254 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...