Nsq 是用 Go 语言开发的轻量级的分布式消息队列,适合小型项目使用、用来学习消息队列实现原理,对于学习 Go channel的原理和用法,以及如何用 Go 语言来写分布式是一个很不错的入门项目。
我们在上一篇文章整体介绍了 nsq 的组成以及各个模块的功能,本文将会带领大家一起实践 nsq 的安装,并基于 nsq 提供的 API 进行实践。
安装使用
在官网(https://nsq.io/overview/quick_start.html) 下载对应的二进制可执行文件。
代码语言:javascript复制# 启动nsqlookupd
$ nsqlookupd
# 启动 nsqd
$ nsqd --lookupd-tcp-address=127.0.0.1:4160
# 启动 nsqadmin
$ nsqadmin --lookupd-http-address=127.0.0.1:4161
# 创建topic,发送消息
$ curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'
# 启动nsq_to_file
$ nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
# 发布消息到 nsqd
$ curl -d 'hello world 2' 'http://127.0.0.1:4151/pub?topic=test'
$ curl -d 'hello world 3' 'http://127.0.0.1:4151/pub?topic=test'
在本地按照上述步骤就可以跑起来了。
创建生产者
安装好 nsq 的几个服务之后,我们来实现基于 nsq 的生产和消费示例。首先是创建生产者:
代码语言:javascript复制package main
import (
"fmt"
"log"
"time"
"github.com/nsqio/go-nsq"
)
func main() {
config := nsq.NewConfig()
p, err := nsq.NewProducer("127.0.0.1:4150", config)
if err != nil {
log.Panic(err)
}
for i := 0; i < 1000; i {
msg := fmt.Sprintf("num-%d", i)
log.Println("Pub:" msg)
err = p.Publish("testTopic", []byte(msg))
if err != nil {
log.Panic(err)
}
time.Sleep(time.Second * 1)
}
p.Stop()
}
生产者的逻辑比较简单,基于 nsq 官方提供的 github.com/nsqio/go-nsq
包,通过调用,循环写 1000 个字符 数字,即 num-n 的形式,通过 p.Publish 发送到消息队列中,等待消费。
消费者
接着,我们创建消费者:consumer.go 来消费刚刚生产的消息。
代码语言:javascript复制package main
import (
"log"
"sync"
"github.com/nsqio/go-nsq"
)
func main() {
wg := &sync.WaitGroup{}
wg.Add(1000)
config := nsq.NewConfig()
c, _ := nsq.NewConsumer("testTopic", "ch", config)
c.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
log.Printf("Got a message: %s", message.Body)
wg.Done()
return nil
}))
// 1.直连nsqd
// err := c.ConnectToNSQD("127.0.0.1:4150")
// 2.通过 nsqlookupd 服务发现
err := c.ConnectToNSQLookupd("127.0.0.1:4161")
if err != nil {
log.Panic(err)
}
wg.Wait()
}
可通过两种方式与 nsqd 连接:
- 直连 nsqd,适用于单机(standalone)版;
- 通过 nsqlookupd 服务发现,适用于集群(cluster)版;
消费消息的动作,主要逻辑就是打印出来,实际业务中需要进行其他处理。
运行结果
依次启动生产者和消费者的服务,可以分别看到如下的输出结果:
代码语言:javascript复制$go run producer.go
2020/12/28 20:29:51 Pub:num-0
2020/12/28 20:29:51 INF 1 (127.0.0.1:4150) connecting to nsqd
2020/12/28 20:29:52 Pub:num-1
2020/12/28 20:29:53 Pub:num-2
2020/12/28 20:29:54 Pub:num-3
2020/12/28 20:29:55 Pub:num-4
2020/12/28 20:29:56 Pub:num-5
2020/12/28 20:29:57 Pub:num-6
2020/12/28 20:29:58 Pub:num-7
2020/12/28 20:29:59 Pub:num-8
2020/12/28 20:30:00 Pub:num-9
2020/12/28 20:30:01 Pub:num-10
$ go run consumer.go
2020/12/28 20:30:08 INF 1 [testTopic/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=testTopic
2020/12/28 20:30:08 INF 1 [testTopic/ch] (10.236.92.208:4150) connecting to nsqd
2020/12/28 20:30:08 Got a message: num-0
2020/12/28 20:30:08 Got a message: num-1
2020/12/28 20:30:08 Got a message: num-2
2020/12/28 20:30:08 Got a message: num-3
2020/12/28 20:30:08 Got a message: num-4
2020/12/28 20:30:08 Got a message: num-5
2020/12/28 20:30:08 Got a message: num-6
2020/12/28 20:30:08 Got a message: num-7
2020/12/28 20:30:08 Got a message: num-8
2020/12/28 20:30:08 Got a message: num-9
2020/12/28 20:30:08 Got a message: num-10
通过如上的示例,我们已经成功地实现 NSQ 的应用。下面我们将解析 NSQ 的几个核心部分。
小结
本文主要介绍 nsq 的安装使用,下载好可执行文件之后,依次启动 nsqlookupd、nsqd、nsqadmin 几个服务。接着我们基于官方提供的客户端 API 包实现了生产消费模型的案例。通过简单的案例,我们能够对 nsq 的安装和基本使用有一个了解。
下一篇文章,将会具体分析 nsq 实现的细节。