Go之一步步学习RabbitMQ(一)

2023-10-30 15:44:28 浏览数 (2)

写在前面的话:最近笔者在学习RabbitMQ,便尝试着通过下面的学习过程,来尽量还原RabbitMQ为什么如此设计,以及它是如何解决这些问题的。当中如有不对或者理解偏差的地方,还请大家不吝赐教,多多留言。如果你觉得这篇文章真的帮到了你,还请你顺手转发下。


背景知识:

在学习RabbitMQ之前,我们需要对下面的知识有些概念,

生产者(producer):产生并发送消息的程序。

队列(queue):存在RabbitMQ中的邮筒,虽然消息是在应用程序和RabbitMQ中进行传递,但队列才是唯一能够存储消息的地方。队列的大小取决于宿主机器的内存和磁盘容量,它本质上是一个巨大的消息缓存池。多个生产者可以发送消息给同一个队列,多个消费者也可以从同一个队列中读取消息。这个队列有一个特点,先进先出。

消费者(consuming):等待接收消息的程序。

参考知识: 消息队列基础知识,还请参考笔者的另外两篇文章: https://mp.weixin.qq.com/s/uFL6a52FwAAneSJ4GniP5Q https://mp.weixin.qq.com/s/F0DbjgavwH3MUmPlRc9sDg Go 语言中与RabbitMQ交互的客户端包go-amqp: https://mp.weixin.qq.com/s/ALjCxEGrNOBjGV7vn5_SHQ


问题一:RabbitMQ如何解决生产者生产过快,消费者消费过慢的问题?

在看这个问题之前,我们先看下这个问题:网络中,如果一个机器(producer)想把数据发送给另外一台机器(consumer),那么它应该怎么做?

答案是:它们之间需要建立一个连接,如下图所示,这样貌似就解决了生产者与消费者之间传递数据的问题。不过这样以来producer与consumer之间就绑定了,这个连接也要一直存在,要不然它们之间就没有办法通讯。

如果它们都很闲,或者它们的处理速度差不多(备注:生产者生产数据的速度和消费者消费数据的速度相当)的情况下,这都不是问题。

可是,一旦生产者生产数据过快,或者消费者消费数据过慢,这样就会出问题,生产者产生的数据没有办法被及时处理完。这样就会导致这些数据被丢弃掉,或者生产者只能暂时停止继续生产数据,但是生产者又被绑死在这个消费者上面,也没有办法去干别的事情。

要解决上面的问题,我们该怎么办呢?一般有两种办法:

方法一:新增消费者并让生产者与它再建立连接,然后生产者自己决策如何给这么多的消费者分配数据。这样的话会有两个结果:

第一,生产者需要与另外一个消费者再建立一条连接。第二,生产者需要自己添加数据分发策略,这样会导致生产者的逻辑变得复杂了很多。

方法二:将生产者产生的数据放到缓存中(也就是消息队列中),而消费者也从这个缓存中获取数据,如下图所示,这也是RabbitMQ的实现方式。这样的话,会有两个好处:

第一,生产者不需要与消费者绑定,它们只需要与消息队列绑定就好了,生产者和消费者成功完成解耦操作。第二,生产者和消费者的速度,可以不一致,就算生产者很快,消费者很慢也没有问题,只要它们能够保证消息队列不满的话,消费者就可以慢慢处理,生产者可以不停的去生产数据。

下面我们来看一下go-amqp例子,是如何实现的这一步操作:

左边是生产者的核心代码部分,右边是消费者的核心代码部分。

运行的时候,我们需要按照下面的步骤来操作:

首先,启动rabbitmq服务器

$ rabbitmq-server

........

Starting broker... completed with 6 plugins // 表示启动成功

其次,启动消费者

$ ./receive

最后,启动生产者,分别发送三次数据给rabbitmq-server

$ ./send hello 2019/11/03 16:32:45 [x] Sent hello

$ ./send world 2019/11/03 16:32:53 [x] Sent world

$ ./send I love U 2019/11/03 16:33:13 [x] Sent I love U

说明:通过上面的消费者的输出,我们可以看出,生产者每生产一个数据,消费者都会立即取走一个数据进行处理。

问题二:RabbitMQ如何解决多个消费者调度的问题?

当一个消费者怎么都处理不过来的时候,最终还是应该新增消费者来处理,如下图所示。在新增消费者的时候后,RabbitMQ的优势就体现出来了,新增消费者的时候,消费者只是与消息队列建立了新的连接,并且也不会增加生产者的代码复杂度。

不过这样也带来了一个新的问题:消息队列怎么决定,同一时刻哪一个消费者来消费这个消息?

RabbitMQ最简单的方式就是时间轮询策略,也就是保证队列先进先出,本时刻哪一个消费者来消息数据,就给到哪一个消费者。

下面是多个消费者调度的展示例子, 我们启动两个消费者,一个生产者,如下图所示: 消费者一:$ ./receive

消费者二:$ ./receive

生产者:

备注:通过消费者一和二输出的结果来看,对于生产者生产的数据,两个消费者按照时间顺序,依次轮询输出。

问题三:RabbitMQ如何保证消息队列中的数据,确实被消费者已经处理掉了?

在真实的网络中,网络往往不可靠。也就是说有可能会存在消息被消费者拿走之后,因为网络原因导致消息并没有真正发送到消费者。

RabbitMQ采用的是消息确认机制,也就是消费者取走消息之后,在处理完了这个消息,需要要主动发送ACK给消息队列,消息队列在收到这个ACK之后,才可以删除这个消息。例子如下所示:

消息确认机制的代码,只要是在对消费者设置的时候,auto-ack设置成false,也就是需要消费者主动回复Ack。 1. 消费者主动回复ACK的过程,与上面的例子类似,并无特别之处。 2. 消费者不回复ACK消息,会发现生产者发送的消息,一直在rabbitmq-server上面保留着,只要有消费者启动,就会将这些数据再消费一次。

生产者发送的消息内容:

消费者消费了第一和第二次数据:

不过,这也要求消息队列必须在ACK回来的这段时间内保证不删除,可是如果ACK一直不来呢?

这样就会导致这个消息一直放在消息队列中不被处理,进而导致RabbitMQ上面的内存泄漏。

我们先来看下消息丢失的场景,一般有三种:第一种,消费者真的就没有回复;第二种,消费者回复了,但是网络原因给丢弃了;第三种,网络断开或者连接关掉。第一种和第三种最为常见,第二种,其实并不是很常见。

第一种情况,其实是消费者那端的代码问题,需要消费者修复才行。

第二种情况,往往是因为使用的底层通讯库有bug导致的,因为在连接不断开的前提下,只要消息发出去了,TCP协议会保证消息到达对端的。RabbitMQ并没有对这种场景做处理,因为RabbitMQ并不知道,这个消息是消费者丢失了,还是网络丢失了,当然了它也不应该关心这也业务场景。不过在设计的时候,我们到可以让消费者根据自身业务,添加超时处理机制,例如:消费者在长时间得不到RabbitMQ新的消息的时候,可以尝试去重发上一个消息的ACK消息。

第三种情况,RabbitMQ在监测到网络断开或者连接关掉的时候,会主动将这个消息再一次放回到消息队列里面,让后续消费者可以再取一次消息。

问题四:RabbitMQ如何保证消费者处理的公平性?

上面讨论的消息内容都是相同或者相似大小的情况下,一旦者消息的大小不同,在RabbitMQ的轮询策略下,就很有可能导致大任务的消息被分配给同一个消费者,导致这个消费者很忙,而其他的消费者却比较闲。

基于这个问题,RabbitMQ采用公平策略做了处理,大体就是在消费者没有将分配到的消息处理完的时候,不在分配新的消息给他,这样就能够让闲一点的消费者去消息队列继续拿新的消息,而忙的消费者一心一意的处理拿到的这个大任务消息。例子如下所示:

代码主要涉及在消费者里,新增的ch.Qos中prefetch count的设定,我们这里设定的数值是1,也就是当消费者拿走数据之后,一直没有回复ACK给rabbitmq-server,那么rabbitmq-server就一直不在给这个消费者分配新的消息。

消费者一,拿到了一个处理时间比较久的数据,所以一直在处理这个消息。

消费者二,拿到了比较短的数据,所以可以很快的处理完,便可以很快的分配到别的数据。

问题五:一旦RabbitMQ挂掉了,该怎么办呢?

基于这个问题,RabbitMQ也做了处理,叫做消息持久化,在RabbitMQ挂掉之前的那些消息队列中的消息,它都会存到硬盘里面,等到RabbitMQ重启之后,会将这些数据重新恢复出来。

当然,对于生产者已经发送,却没有收到确认的消息,需要生产者单独做异常处理。

这一部分操作代码里面主要是一个消息持久化flag的设定,生产者和消费者里面都需要设置,对于效果的展示这里就不做介绍了。

总结:

本文只是对rabbitmq的基本使用,碰到的问题以及解决方法做了详解和举例说明,希望对你有所帮助。对于rabbitmq的路由部分,是另外一类内容,笔者会在后面一篇给出

0 人点赞