RabbitMQ 模型
RabbitMQ 是一个生产者/消费者模型,生产者生产消息到队列中,而消费者从队列中拿消息进行消费,两者并不直接交互。
我们首先来看看 RabbitMQ 的模型结构
在图中,我们可以看到,整个结构包括:生产者 Producer、交换机 Exchange、队列 Queue,以及消费者 Consumer。
其中,生产者和消费者与 MQ 连接时会创建 TCP 连接和信道,生产者生产消息,根据其指定的 RoutingKey 已经交换机连接 Queue 的 BindingKey,两者共同决定将消息发送到哪个队列中。
下面我们逐一分析各个部分的功能。
Channel
每个生产者或消费者都需要与 RabbitMQ Broker 建立 TCP 连接,即 Connection。Connection 建立起来之后,客户端会创建一个 AMQP 信道,即 Channel,这是基于 Connection 的虚拟连接,多条信道复用一条 TCP 连接,不仅减少性能开销,同时也便于管理。
用 Python 的 pika 包实现 TCP 连接和信道创建:
代码语言:javascript复制class MqClient:
def __init__(self, *, mq_host, mq_port, username, password):
credentials = pika.PlainCredentials(username, password)
conn_params = pika.ConnectionParameters(
host=mq_host,
port=mq_port,
credentials=credentials
)
self.connection = pika.BlockingConnection(conn_params)
self.channel = self.connection.channel()
Exchange
生产者通常将消息发送给交换机,而交换机再将消息路由到队列中,若路由不到,要么返回队列要么丢弃。
用 Python 实现交换机:
代码语言:javascript复制def create_exchange(self, change_name):
self.channel.exchange_declare(
exchange=change_name,
exchange_type='topic',
passive=False,
durable=True,
auto_delete=False
)
交换机通常分四种类型:
- fanout:将所有发送到该交换机的消息路由到所有与该交换机绑定的队列中;
- direct:将消息路由到 BindingKey 与 RoutingKey 完全一致的队列中;
- topic:将消息路由到 BindingKey 与 RoutingKey 匹配的队列中,匹配的规则包括以 '.' 为分割、以 '*' 和 '#' 做模糊匹配;
- headers:该类型的交换机根据消息内容中国的 headers 属性进行匹配。
Queue
队列是 RabbitMQ 中用以存储消息的对象。多个消费者可以订阅同一个队列,而队列中的消息会被均摊到各个消费者,而不是每个消费者都收到所有的消息。
代码语言:javascript复制def create_queue(self, exchange_name, queue_name, routing_key):
self.channel.queue_declare(
queue=queue_name,
)
self.queue_bind(exchange_name, queue_name, routing_key)
RoutingKey & BindingKey
RoutingKey 即路由键,通常是生产者发送消息时指定的。BindingKey 即绑定键,通常用于交换机与队列绑定。
二者通常配合起来使用,比如 direct 和 topic 类型的交换机在路由消息时,都是看这两个键是否匹配。某种情况下,RoutingKey 和 BindingKey 可以看做同一个东西。
代码语言:javascript复制def queue_bind(self, exchange_name, queue_name, routing_key):
self.channel.queue_bind(
queue=queue_name,
exchange=exchange_name,
routing_key=routing_key,
)
Publish/Subscribe 机制
RabbitMQ 消息的消费模式通常分为推模式和拉模式。
推模式采用的是订阅的方式,使用的是 basic_consume
方法 ;而拉模式采用的是从队列中获取消息的方式,使用的是 basic_get
方法。拉模式通常运用于获取单挑消息的场合,对于持续获取消息或者需要实现高吞吐量的场合,推模式更适合。
下面是一个推模式的例子:
代码语言:javascript复制def msg_consumer(channel, method_frame, header_frame, body):
try:
print("[Consumer] Receive message:")
print(" {}: {}".format(method_frame.routing_key, body))
time.sleep(1)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
except:
print("[Consumer] Reject message and return it to queue!")
channel.basic_nack(delivery_tag=method_frame.delivery_tag,
multiple=False, requeue=True)
return
def msg_publisher(channel, *, exchange, routing_key):
# Send a message
data = "hahahahhahahahaha! I'm a bug and you can't catch me!"
if channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=data,
properties=pika.BasicProperties(
content_type='text/plain',
delivery_mode=1),
mandatory=True):
print('[Producer] Message was published')
else:
print('[Producer] Message was returned')
if __name__ == "__main__":
client = MqClient(
mq_host="172.16.110.17",
mq_port=5672,
username="guest",
password="guest",
)
channel = client.get_channel()
# 设置生产者
msg_publisher(channel,
exchange=EXCHANGE_NAME,
routing_key="hdls.miao.message")
# 设置消费者
channel.basic_consume(
msg_consumer,
queue=QUEUE_NAME,
no_ack=False,
consumer_tag="hdls-consumer"
)
# 开始消费
channel.start_consuming()
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
client.connection.close()
消费者收到消息后进行收到 ack,得到的结果如下:
代码语言:javascript复制[Producer] Message was published
[Consumer] Receive message:
hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
若对于该条消息,消费者不消费,而是拒绝: basic_nack
,而拒绝的同时将参数设置为 requeue=True
,即将消息打回队列,则得到的结果如下:
[Producer] Message was published
[Consumer] Receive message:
hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
[Consumer] Reject message and return it to queue!
[Consumer] Receive message:
hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
[Consumer] Reject message and return it to queue!
[Consumer] Receive message:
hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
[Consumer] Reject message and return it to queue!
[Consumer] Receive message:
hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
[Consumer] Reject message and return it to queue!
...
此时该条消息就会一直被打回队列,就一直堵在队列中:
死信
当一个消息被拒绝而被打回队列,而此后该消息没有消费者接收,成了死信,就会堵住队列,当队列中死信越来越多时,队列的性能会受到影响。对于死信的处理,设置死信队列是个很好的选择。
死信通常有下面几种情况:
- 消息被拒绝(通过 basic.reject 方法或 basic.nack 方法),同时被打回队列;
- 消息本身设置了 TTL 或队列设置了 TTL,且达到了过期时间;
- 队列可持有消息数量达到了上限。
死信交换机
当消息在一个队列中成为死信时,就能够被发送到另一个交换机中,也就是死信交换机。死信交换机其实就是普通的交换机,不过绑定的是死信队列,其声明和使用与普通交换机一致。
死信队列
死信队列就是用来接收死信的队列,但其本质与普通队列一样。只不过在设置普通队列的时候需要给其定义死信交换机是哪个,当消息成为死信时,以什么样的 routing_key 来路由到死信队列里去。这样所有的死信就可以被路由到对应的死信队列中去了。
需要注意的是,在声明普通队列的死信设置之前,死信交换机和死信队列需要先存在。
根据定义将上面的普通队列做修改:
代码语言:javascript复制def create_queue(self, exchange_name, queue_name, routing_key,
is_dead=False):
arguments = {}
if not is_dead:
arguments = {
"x-dead-letter-exchange": DEAD_EXCHANGE_NAME,
"x-dead-letter-routing-key": DEAD_ROUTING_KEY,
}
self.channel.queue_declare(
queue=queue_name,
arguments=arguments,
)
self.queue_bind(exchange_name, queue_name, routing_key)
在声明队列时,需要声明两个参数即可: x-dead-letter-exchange
, x-dead-letter-routing-key
。同时在声明普通队列之前声明死信队列:
def connect(self):
self.create_exchange(DEAD_EXCHANGE_NAME)
self.create_queue(DEAD_EXCHANGE_NAME,
DEAD_QUEUE_NAME, DEAD_ROUTING_KEY, is_dead=True)
self.create_exchange(EXCHANGE_NAME)
self.create_queue(EXCHANGE_NAME, QUEUE_NAME, ROUTING_KEY)
如果同时加上死信队列的消费者,就可以统一处理死信了:
代码语言:javascript复制def dead_msg_consumer(channel, method_frame, header_frame, body):
try:
print("[DEAD CSM] Dead Message! It's time to put it to dead queue.")
print(" {}: {}".format(method_frame.routing_key, body))
print(" ACK dead message!")
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
except:
channel.basic_nack(delivery_tag=method_frame.delivery_tag,
multiple=False, requeue=False)
return
万事俱备!但运行后居然报错:
代码语言:javascript复制pika.exceptions.ChannelClosed: (406, "PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'hdls.mq.queue' in vhost '/': received the value 'dead.exchange' of type 'longstr' but current is none")
这是因为之前我们已经声明过不加死信设置的队列了,声明 queue 时试图设定一个 x-dead-letter-exchange 参数,当前服务器上该 queue 的该参数为 none,服务器不允许所以报错。
此时有两种解决方法:一是在服务器上将之前的 queue 删除,加上死信参数,再次声明队列;二是通过 policy 来设置这个参数。
policy 可以用 rabbitmqctl set_policy
设置,也可以在 RabbitMQ 的前端页面进行:
需要注意的是,通过 policy 方法设置时参数为 dead-letter-exchange
dead-letter-routing-key
,而用第一种方法中的死信参数需要加上 x-
前缀。
将死信设置加上后,再次启用 consumer:
代码语言:javascript复制[Publisher] Message was published
[Consumer] Receive message:
hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
[Consumer] Reject message and return it to queue!
[DEAD CSM] Dead Message! It's time to put it to dead queue.
dead.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
ACK dead message!
可以看到,消息在打回队列后就被路由到了死信队列。
延迟队列
所谓延迟队列,指的是消息发送后,并不想立即被消费者拿到,希望在指定时间后,消费者才拿到消息。
延迟队列可以用死信队列来实现。利用队列或消息的 TTL 特性,可以做到消息在指定时间内超时后被路由到死信队列,而此时死信队列就可以当做延迟队列来做消息处理。
代码语言:javascript复制def create_queue(self, exchange_name, queue_name, routing_key,
is_dead=False):
arguments = {}
if not is_dead:
arguments = {
"x-message-ttl": 3000,
"x-dead-letter-exchange": DEAD_EXCHANGE_NAME,
"x-dead-letter-routing-key": DEAD_ROUTING_KEY,
}
self.channel.queue_declare(
queue=queue_name,
arguments=arguments,
)
self.queue_bind(exchange_name, queue_name, routing_key)
在普通队列的死信设置里加上一条 x-message-ttl
就可以设置消息的 TTL。
[Publisher] Message was published
[Consumer] Receive message:
hdls.miao.message: b"I'm not bug, but you can only receive me in 3 seconds."
[Consumer] Reject message and return it to queue!
[Consumer] Receive message:
hdls.miao.message: b"I'm not bug, but you can only receive me in 3 seconds."
[Consumer] Reject message and return it to queue!
[Consumer] Receive message:
hdls.miao.message: b"I'm not bug, but you can only receive me in 3 seconds."
[Consumer] Reject message and return it to queue!
[DELAY CSM] Delay queue receive the message!
dead.message: b"I'm not bug, but you can only receive me in 3 seconds."
ACK delay message!