Golang实现生产者-消费者的(N:1)模型

2023-07-11 14:43:09 浏览数 (2)

1. 目标模型#

  • 多个生产者对应一个消费者,即 N:1
  • 消费者处理生产者发送过来的消息时是并行处理的,但是有速率限制,最大为5qps
  • 消费者处理完了某个生产者的消息后,通知对应的生产者
  • 当某个生产者发送的所有消息都收到处理完成的消息后,执行后续逻辑

2. 实现分析#

根据上面的条件,可知:

  • 需要开启goroutine来实现并发处理
  • 使用带缓存的channel控制并发量
  • 使用for-select结构实现挂起等待
  • 使用单独的channl实现通知机制
  • 使用sync.WaitGroup保证goroutine执行完成

3. 代码实现#

3.1 初始变量#
代码语言:javascript复制
const rateLimit = 5 // 每秒5个

type msgChan struct { //  用于通信的结构体
	Id       int64
	Text     string
	DoneChan chan int64
}

var ch chan msgChan

func InitChan() {
	ch = make(chan msgChan, 10)
}

func SendToChan(msg msgChan) {
	ch <- msg
}

func GetFromChan() chan msgChan {
	return ch
}
3.2 生产者#
代码语言:javascript复制
func producer(i int64) *msgChan {
	var msg msgChan
	msg.Id = i
	msg.Text = fmt.Sprintf("消息文本%v", i)
	msg.DoneChan = make(chan int64)
	SendToChan(msg) // 发送到channel
	fmt.Println("producer: ", i)
	return &msg
}
3.3 消费者#
代码语言:javascript复制
func consumer() {
	for {
		select {
		case msg, ok := <-GetFromChan():
			if ok {
				fmt.Printf("consumer %v processing ..., time: %vn",
					msg.Id, time.Now().Format("2006-01-02 15:04:05"))

                // 因为 rateLimit = 5,所以模拟每个go大概需运行多长时间
                // 也可以在外部采用time.tick的方式
				time.Sleep((1000 / rateLimit) * time.Millisecond)

				msg.DoneChan <- msg.Id // 通知生产者已经消费完了
			}
		}
	}
}
3.4 主函数#
代码语言:javascript复制
func main() {
	InitChan()
	
	// todo 消费者
	go consumer()
    
	// todo 生产者
	var wg sync.WaitGroup
	wg.Add(20) // 确保每个go都能运行完
	for i := 0; i < 20; i   {
		msg := producer(int64(i))
		// todo 等待消费者消费完的通知(哪个先消费完就接收哪个)
		go func(m *msgChan, w *sync.WaitGroup) {
			defer w.Done()
			if v, ok := <-m.DoneChan; ok {
				fmt.Println("receive done: ", v)
				close(m.DoneChan) // 通信完后关闭channel
			}
		}(msg, &wg)
	}
	wg.Wait()
	close(ch)	// 结束的时候关闭channel
	fmt.Println("后续操作...")
}
3.5 运行结果#

根据时间间隔可知,实现了一秒钟最多运行5个go协程

代码语言:javascript复制
macBook-Pro-8 go_learning % go build test12.go
macBook-Pro-8 go_learning % ./test12      
producer:  0
producer:  1
producer:  2
producer:  3
producer:  4
producer:  5
producer:  6
producer:  7
producer:  8
producer:  9
producer:  10
consumer 0 processing ..., time: 2023-06-12 22:45:31
receive done:  0
consumer 1 processing ..., time: 2023-06-12 22:45:31
producer:  11
consumer 2 processing ..., time: 2023-06-12 22:45:31
producer:  12
receive done:  1
consumer 3 processing ..., time: 2023-06-12 22:45:32
producer:  13
receive done:  2
consumer 4 processing ..., time: 2023-06-12 22:45:32
producer:  14
receive done:  3
consumer 5 processing ..., time: 2023-06-12 22:45:32
receive done:  4
producer:  15
consumer 6 processing ..., time: 2023-06-12 22:45:32
receive done:  5
producer:  16
consumer 7 processing ..., time: 2023-06-12 22:45:32
receive done:  6
producer:  17
consumer 8 processing ..., time: 2023-06-12 22:45:33
producer:  18
receive done:  7
consumer 9 processing ..., time: 2023-06-12 22:45:33
receive done:  8
producer:  19
consumer 10 processing ..., time: 2023-06-12 22:45:33
receive done:  9
consumer 11 processing ..., time: 2023-06-12 22:45:33
receive done:  10
consumer 12 processing ..., time: 2023-06-12 22:45:33
receive done:  11
consumer 13 processing ..., time: 2023-06-12 22:45:34
receive done:  12
consumer 14 processing ..., time: 2023-06-12 22:45:34
receive done:  13
consumer 15 processing ..., time: 2023-06-12 22:45:34
receive done:  14
consumer 16 processing ..., time: 2023-06-12 22:45:34
receive done:  15
consumer 17 processing ..., time: 2023-06-12 22:45:34
receive done:  16
consumer 18 processing ..., time: 2023-06-12 22:45:35
receive done:  17
consumer 19 processing ..., time: 2023-06-12 22:45:35
receive done:  18
receive done:  19
后续操作...

0 人点赞