关于吞吐量的一些思考
- 写入消息队列吞吐量取决于以下两个方面
* 网络带宽
* 消息队列(比如Kafka)写入速度
最佳吞吐量是让其中之一打满,而一般情况下内网带宽都会非常高,不太可能被打满,所以自然就是讲消息队列的写入速度打满,这就就有两个点需要平衡
代码语言:txt复制* 批量写入的消息量大小或者字节数多少
* 延迟多久写入
go-zero 的 PeriodicalExecutor
和 ChunkExecutor
就是为了这种情况设计的
- 从消息队列里消费消息的吞吐量取决于以下两个方面
* 消息队列的读取速度,一般情况下消息队列本身的读取速度相比于处理消息的速度都是足够快的
* 处理速度,这个依赖于业务
这里有个核心问题是不能不考虑业务处理速度,而读取过多的消息到内存里,否则可能会引起两个问题:
代码语言:txt复制* 内存占用过高,甚至出现OOM,`pod` 也是有 `memory limit` 的
* 停止 `pod` 时堆积的消息来不及处理而导致消息丢失
解决方案和实现
借用一下 Rob Pike
的一张图,这个跟队列消费异曲同工。左边4个 gopher
从队列里取,右边4个 gopher
接过去处理。比较理想的结果是左边和右边速率基本一致,没有谁浪费,没有谁等待,中间交换处也没有堆积。
我们来看看 go-zero
是怎么实现的:
Producer
端
for {
代码语言:txt复制 select {
代码语言:txt复制 case <-q.quit:
代码语言:txt复制 logx.Info("Quitting producer")
代码语言:txt复制 return
代码语言:txt复制 default:
代码语言:txt复制 if v, ok := q.produceOne(producer); ok {
代码语言:txt复制 q.channel <- v
代码语言:txt复制 }
代码语言:txt复制 }
代码语言:txt复制 }
没有退出事件就会通过 produceOne
去读取一个消息,成功后写入 channel
。利用 chan
就可以很好的解决读取和消费的衔接问题。
Consumer
端
for {
代码语言:txt复制 select {
代码语言:txt复制 case message, ok := <-q.channel:
代码语言:txt复制 if ok {
代码语言:txt复制 q.consumeOne(consumer, message)
代码语言:txt复制 } else {
代码语言:txt复制 logx.Info("Task channel was closed, quitting consumer...")
代码语言:txt复制 return
代码语言:txt复制 }
代码语言:txt复制 case event := <-eventChan:
代码语言:txt复制 consumer.OnEvent(event)
代码语言:txt复制 }
代码语言:txt复制 }
这里如果拿到消息就去处理,当 ok
为 false
的时候表示 channel
已被关闭,可以退出整个处理循环了。同时我们还在 `redis
queue上支持了
pause/resume,我们原来在社交场景里大量使用这样的队列,可以通知
consumer` 暂停和继续。
- 启动
queue
,有了这些我们就可以通过控制producer/consumer
的数量来达到吞吐量的调优了
func (q *Queue) Start() {
代码语言:txt复制 q.startProducers(q.producerCount)
代码语言:txt复制 q.startConsumers(q.consumerCount)
代码语言:txt复制 q.producerRoutineGroup.Wait()
代码语言:txt复制 close(q.channel)
代码语言:txt复制 q.consumerRoutineGroup.Wait()
代码语言:txt复制}
这里需要注意的是,先要停掉 producer
,再去等 consumer
处理完。
到这里核心控制代码基本就讲完了,其实看起来还是挺简单的,也可以到 [https://github.com/tal-tech/go-
zero/tree/master/core/queue](https://links.jianshu.com/go?to=https://github.com/tal-
tech/go-zero/tree/master/core/queue) 去看完整实现。
如何使用
基本的使用流程:
- 创建
producer
或consumer
- 启动
queue
- 生产消息 / 消费消息
对应到 queue
中,大致如下:
创建 queue
代码语言:txt复制// 生产者创建工厂
代码语言:txt复制producer := newMockedProducer()
代码语言:txt复制// 消费者创建工厂
代码语言:txt复制consumer := newMockedConsumer()
代码语言:txt复制// 将生产者以及消费者的创建工厂函数传递给 NewQueue()
代码语言:txt复制q := queue.NewQueue(func() (Producer, error) {
代码语言:txt复制 return producer, nil
代码语言:txt复制}, func() (Consumer, error) {
代码语言:txt复制 return consumer, nil
代码语言:txt复制})
我们看看 NewQueue
需要什么参数:
producer
工厂方法consumer
工厂方法
将 producer & consumer
的工厂函数传递 queue
,由它去负责创建。框架提供了 Producer
和 Consumer
的接口以及工厂方法定义,然后整个流程的控制 queue
实现会自动完成。
生产 message
我们通过自定义一个 mockedProducer
来模拟:
type mockedProducer struct {
代码语言:txt复制 total int32
代码语言:txt复制 count int32
代码语言:txt复制 // 使用waitgroup来模拟任务的完成
代码语言:txt复制 wait sync.WaitGroup
代码语言:txt复制}
代码语言:txt复制// 实现 Producer interface 的方法:Produce()
代码语言:txt复制func (p *mockedProducer) Produce() (string, bool) {
代码语言:txt复制 if atomic.AddInt32(&p.count, 1) <= p.total {
代码语言:txt复制 p.wait.Done()
代码语言:txt复制 return "item", true
代码语言:txt复制 }
代码语言:txt复制 time.Sleep(time.Second)
代码语言:txt复制 return "", false
代码语言:txt复制}
queue
中的生产者编写都必须实现:
Produce()
:由开发者编写生产消息的逻辑AddListener()
:添加事件listener
消费 message
我们通过自定义一个 mockedConsumer
来模拟:
type mockedConsumer struct {
代码语言:txt复制 count int32
代码语言:txt复制}
代码语言:txt复制func (c *mockedConsumer) Consume(string) error {
代码语言:txt复制 atomic.AddInt32(&c.count, 1)
代码语言:txt复制 return nil
代码语言:txt复制}
启动 queue
启动,然后验证我们上述的生产者和消费者之间的数据是否传输成功:
代码语言:txt复制func main() {
代码语言:txt复制 // 创建 queue
代码语言:txt复制 q := NewQueue(func() (Producer, error) {
代码语言:txt复制 return newMockedProducer(), nil
代码语言:txt复制 }, func() (Consumer, error) {
代码语言:txt复制 return newMockedConsumer(), nil
代码语言:txt复制 })
代码语言:txt复制 // 启动panic了也可以确保stop被执行以清理资源
代码语言:txt复制 defer q.Stop()
代码语言:txt复制 // 启动
代码语言:txt复制 q.Start()
代码语言:txt复制}
以上就是 queue
最简易的实现示例。我们通过这个 core/queue
框架实现了基于 redis
和 kafka
等的消息队列服务,在不同业务场景中经过了充分的实践检验。你也可以根据自己的业务实际情况,实现自己的消息队列服务。
整体设计
整体流程如上图:
- 全体的通信都由
channel
进行 Producer
和Consumer
的数量可以设定以匹配不同业务需求Produce
和Consume
具体实现由开发者定义,queue
负责整体流程