golang 开源的消息队列 nsq 安装使用介绍

本贴最后更新于 2267 天前,其中的信息可能已经渤澥桑田

NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。它具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。

安装

本机测试时使用的是 windows 环境就独自编译了 nsq 的各模块

go get github.com/nsqio/nsq
cd apps nsqd
go build nsqd.go

nsq 可以搭建 mq 集群,通过 nsqlookupd 发现管理 nsqd 实例,nsqadmin 以 web 的方式管理 nsqd

1.运行 nsqlookupd
D:\go\gopath\src\github.com\nsqio\nsq\apps\nsqlookupd>nsqlookupd.exe
[nsqlookupd] 2017/11/07 17:52:46.063484 INFO: nsqlookupd v1.0.0-compat (built w/
go1.8rc2)
[nsqlookupd] 2017/11/07 17:52:46.084485 INFO: TCP: listening on [::]:4160
[nsqlookupd] 2017/11/07 17:52:46.088485 INFO: HTTP: listening on [::]:4161
2.运行 nsqld
D:\go\gopath\src\github.com\nsqio\nsq\apps\nsqd>nsqd --lookupd-tcp-address=127.0.0.1:4160                                                                       [nsqd] 2017/11/07 17:55:17.983173 INFO: nsqd v1.0.0-compat (built w/go1.8rc2)   [nsqd] 2017/11/07 17:55:18.010175 INFO: ID: 710                                 [nsqd] 2017/11/07 17:55:18.011175 INFO: TOPIC(test): created                    [nsqd] 2017/11/07 17:55:18.012175 
3.运行 nsqadmin
D:\go\gopath\src\github.com\nsqio\nsq\apps\nsqadmin>nsqadmin --lookupd-http-address=127.0.0.1:4161                                                              [nsqadmin] 2017/11/07 17:58:30.405179 INFO: nsqadmin v1.0.0-compat (built w/go1.8rc2)                                                                           [nsqadmin] 2017/11/07 17:58:30.426180 INFO: HTTP: listening on [::]:4171

使用

1.管理

我们可以访问 http://127.0.0.1:4171/ 来管理我们的 nsq

nsq_glpng

nsq_lppng

2.创建消息

除了客户端连接创建消息之外我们还可以通过 http 提交消息

1510107941206982115png

3.消费消息

nsq 的 topic 可以创建多个消费 channel,一条消息可以多个通道消费使用:

package main

import (
	"fmt"
	"github.com/nsqio/go-nsq"
	"time"
)

// 消费者
type ConsumerT struct{}

// 主函数
func main() {
	InitConsumer("test", "test-channel", "127.0.0.1:4161")
	for {
		time.Sleep(time.Second * 10)
	}
}

// 处理消息
func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
	fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
	return nil
}

// 初始化消费者
func InitConsumer(topic string, channel string, address string) {
	cfg := nsq.NewConfig()
	cfg.LookupdPollInterval = time.Second          //设置重连时间
	c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者
	if err != nil {
		panic(err)
	}
	c.SetLogger(nil, 0)        //屏蔽系统日志
	c.AddHandler(&ConsumerT{}) // 添加消费者接口

	//建立NSQLookupd连接
	if err := c.ConnectToNSQLookupd(address); err != nil {
		panic(err)
	}
}

运行返回:

receive SC-201612261725:4150 message: test
receive SC-201612261725:4150 message: test
receive SC-201612261725:4150 message: test
receive SC-201612261725:4150 message: message1
receive SC-201612261725:4150 message: message2
receive SC-201612261725:4150 message: message2
  • NSQ
    2 引用 • 3 回帖
  • 消息队列
    40 引用 • 52 回帖 • 2 关注
  • golang

    Go 语言是 Google 推出的一种全新的编程语言,可以在不损失应用程序性能的情况下降低代码的复杂性。谷歌首席软件工程师罗布派克(Rob Pike)说:我们之所以开发 Go,是因为过去 10 多年间软件开发的难度令人沮丧。Go 是谷歌 2009 发布的第二款编程语言。

    497 引用 • 1387 回帖 • 294 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...