1. 目标模型#
- 多个生产者对应一个消费者,即 N:1
- 消费者处理生产者发送过来的消息时是并行处理的,但是有速率限制,最大为5qps
- 消费者处理完了某个生产者的消息后,通知对应的生产者
- 当某个生产者发送的所有消息都收到处理完成的消息后,执行后续逻辑
2. 实现分析#
根据上面的条件,可知:
- 需要开启goroutine来实现并发处理
- 使用带缓存的channel控制并发量
- 使用for-select结构实现挂起等待
- 使用单独的channl实现通知机制
- 使用sync.WaitGroup保证goroutine执行完成
3. 代码实现#
3.1 初始变量#
代码语言:javascript复制const rateLimit = 5 // 每秒5个
type msgChan struct { // 用于通信的结构体
Id int64
Text string
DoneChan chan int64
}
var ch chan msgChan
func InitChan() {
ch = make(chan msgChan, 10)
}
func SendToChan(msg msgChan) {
ch <- msg
}
func GetFromChan() chan msgChan {
return ch
}
3.2 生产者#
代码语言:javascript复制func producer(i int64) *msgChan {
var msg msgChan
msg.Id = i
msg.Text = fmt.Sprintf("消息文本%v", i)
msg.DoneChan = make(chan int64)
SendToChan(msg) // 发送到channel
fmt.Println("producer: ", i)
return &msg
}
3.3 消费者#
代码语言:javascript复制func consumer() {
for {
select {
case msg, ok := <-GetFromChan():
if ok {
fmt.Printf("consumer %v processing ..., time: %vn",
msg.Id, time.Now().Format("2006-01-02 15:04:05"))
// 因为 rateLimit = 5,所以模拟每个go大概需运行多长时间
// 也可以在外部采用time.tick的方式
time.Sleep((1000 / rateLimit) * time.Millisecond)
msg.DoneChan <- msg.Id // 通知生产者已经消费完了
}
}
}
}
3.4 主函数#
代码语言:javascript复制func main() {
InitChan()
// todo 消费者
go consumer()
// todo 生产者
var wg sync.WaitGroup
wg.Add(20) // 确保每个go都能运行完
for i := 0; i < 20; i {
msg := producer(int64(i))
// todo 等待消费者消费完的通知(哪个先消费完就接收哪个)
go func(m *msgChan, w *sync.WaitGroup) {
defer w.Done()
if v, ok := <-m.DoneChan; ok {
fmt.Println("receive done: ", v)
close(m.DoneChan) // 通信完后关闭channel
}
}(msg, &wg)
}
wg.Wait()
close(ch) // 结束的时候关闭channel
fmt.Println("后续操作...")
}
3.5 运行结果#
根据时间间隔可知,实现了一秒钟最多运行5个go协程
代码语言:javascript复制macBook-Pro-8 go_learning % go build test12.go
macBook-Pro-8 go_learning % ./test12
producer: 0
producer: 1
producer: 2
producer: 3
producer: 4
producer: 5
producer: 6
producer: 7
producer: 8
producer: 9
producer: 10
consumer 0 processing ..., time: 2023-06-12 22:45:31
receive done: 0
consumer 1 processing ..., time: 2023-06-12 22:45:31
producer: 11
consumer 2 processing ..., time: 2023-06-12 22:45:31
producer: 12
receive done: 1
consumer 3 processing ..., time: 2023-06-12 22:45:32
producer: 13
receive done: 2
consumer 4 processing ..., time: 2023-06-12 22:45:32
producer: 14
receive done: 3
consumer 5 processing ..., time: 2023-06-12 22:45:32
receive done: 4
producer: 15
consumer 6 processing ..., time: 2023-06-12 22:45:32
receive done: 5
producer: 16
consumer 7 processing ..., time: 2023-06-12 22:45:32
receive done: 6
producer: 17
consumer 8 processing ..., time: 2023-06-12 22:45:33
producer: 18
receive done: 7
consumer 9 processing ..., time: 2023-06-12 22:45:33
receive done: 8
producer: 19
consumer 10 processing ..., time: 2023-06-12 22:45:33
receive done: 9
consumer 11 processing ..., time: 2023-06-12 22:45:33
receive done: 10
consumer 12 processing ..., time: 2023-06-12 22:45:33
receive done: 11
consumer 13 processing ..., time: 2023-06-12 22:45:34
receive done: 12
consumer 14 processing ..., time: 2023-06-12 22:45:34
receive done: 13
consumer 15 processing ..., time: 2023-06-12 22:45:34
receive done: 14
consumer 16 processing ..., time: 2023-06-12 22:45:34
receive done: 15
consumer 17 processing ..., time: 2023-06-12 22:45:34
receive done: 16
consumer 18 processing ..., time: 2023-06-12 22:45:35
receive done: 17
consumer 19 processing ..., time: 2023-06-12 22:45:35
receive done: 18
receive done: 19
后续操作...