RabbitMQ 入门系列(二)

2020-07-17 12:16:31 浏览数 (1)

上篇文章 RabbitMQ 入门系列(一) 讲述了 RabbitMQ 有关的基本概念。本文将会给出具体的示例继续讲解,这些示例均来源于官方文档,但其使用的是传统的回掉函数的写法,我将其改写成了 async/await 的形式,同时对内容做了部分微调。

01

流程

我们先来了解一下 RabbitMQ 的一般使用流程。

1、建立到 RabbitMQ 的连接。

2、创建信道。

3、声明交换器。

4、声明队列。

5、绑定交换器和队列。

6、消息操作。生产者:生成并发布消息;消费者:订阅并消费消息。

7、关闭信道。

8、关闭连接。

不论是生产者投递消息,还是消费者接受消息一般都遵循以上步骤,但针对具体的情况仍会有调整,比如声明交换器、声明队列、绑定交换器和队列,我们只需要在生产者或消费者其中之一进行,甚至隔离出来独立维护,只要保证在发布或消费消息之前交换器、队列、绑定等是有效的即可。

02

Hello World

第一个示例,实现基本的投递和接收消息。

生产者投递消息(send.js):

消费者接收消息(receive.js):

对比上述流程,你会发现为什么没有交换器 Exchange 存在的身影呢?这是因为 RabbitMQ 存在一个默认交换器,类型为 direct (直连),每个新建的队列会自动绑定到默认交换器上,并且以队列的名称作为绑定路由规则。

声明队列时,同一个队列其属性前后相同时,重复声明不会有任何影响,反之其属性前后不相同时,重复声明会抛出一个错误,这种情况要注意不得重复声明,当然如果这个队列被声明有效了也不需要再次声明。

从上图中我们也了解到了队列的一个属性 durable,这个属性表明是否对队列进行持久化,也就是保存到磁盘上,一旦 RabbitMQ 服务器重启,持久化的队列可以被重新恢复。

消费者 consume 订阅接收消息时使用了另一个属性 noAck,这个属性表明消费者在接收到消息后是否需要向 RabbitMQ 服务器确认收到该消息。与之相对的是发后即忘模式,也就是 RabbitMQ 服务器向消费者发送完消息后即认为成功,无需等待消费者确认接收应答,这种模式吞吐量更高,但可靠性显然不如确认应答模式,而确认应答模式,我们需要注意的是, RabbitMQ 服务器若没有接收到 ack 确认会一直将该消息保存,如果消费者挂了就会造成消息持续堆叠不断占用内存的情况,极端情况下资源过载会造成 RabbitMQ 服务器重启,同时未被 ack 确认的消息会被尝试重新发送给消费者。

03

Work queues

第二个示例,向多个消费者分发投递消息。

生产者投递消息(new_task.js):

消费者接收消息(worker.js):

我们在 shell 中运行多个 worker.js 会发现消息被一个一个分发到了不同的 worker 消费者,且同一条消息不会被重复发送给多个 worker 。

在这个示例中,我们对队列进行了持久化,并且在消费端使用了 ack 确认接收消息。发送消息时,我们使用了 persistent 属性,这个属性表明是否将消息持久化。另外,对消费者而言,还使用了 ch.prefetch() 方法,这个方法表明该消费者每次最多接收的消息数量,这样做是因为某些情况下消费消息是一个很耗时的业务操作,某些 worker 可能处于繁忙状态,而另外一些 worker 则很空闲,通过 prefetch 和 ack 其实是实现了类似于负载均衡的功能,也就是将消息分发给空闲的 worker 消费。

04

结语

本文给出的两个示例,实现了基本的消息投递与接收功能,并对某些属性方法进行了简单的描述,读着可以将文中的示例与官方教程对比查看并加深理解。

下一篇文章将会通过三个具体的示例重点介绍交换器的不同类型。

0 人点赞