RabbitMQ 模型和死信队列

2021-03-22 12:29:48 浏览数 (1)

RabbitMQ 模型

RabbitMQ 是一个生产者/消费者模型,生产者生产消息到队列中,而消费者从队列中拿消息进行消费,两者并不直接交互。

我们首先来看看 RabbitMQ 的模型结构

在图中,我们可以看到,整个结构包括:生产者 Producer、交换机 Exchange、队列 Queue,以及消费者 Consumer。

其中,生产者和消费者与 MQ 连接时会创建 TCP 连接和信道,生产者生产消息,根据其指定的 RoutingKey 已经交换机连接 Queue 的 BindingKey,两者共同决定将消息发送到哪个队列中。

下面我们逐一分析各个部分的功能。

Channel

每个生产者或消费者都需要与 RabbitMQ Broker 建立 TCP 连接,即 Connection。Connection 建立起来之后,客户端会创建一个 AMQP 信道,即 Channel,这是基于 Connection 的虚拟连接,多条信道复用一条 TCP 连接,不仅减少性能开销,同时也便于管理。

用 Python 的 pika 包实现 TCP 连接和信道创建:

代码语言:javascript复制
class MqClient:
    def __init__(self, *, mq_host, mq_port, username, password):
        credentials = pika.PlainCredentials(username, password)
        conn_params = pika.ConnectionParameters(
            host=mq_host,
            port=mq_port,
            credentials=credentials
        )
        self.connection = pika.BlockingConnection(conn_params)
        self.channel = self.connection.channel()

Exchange

生产者通常将消息发送给交换机,而交换机再将消息路由到队列中,若路由不到,要么返回队列要么丢弃。

用 Python 实现交换机:

代码语言:javascript复制
def create_exchange(self, change_name):
        self.channel.exchange_declare(
            exchange=change_name,
            exchange_type='topic',
            passive=False,
            durable=True,
            auto_delete=False
        )

交换机通常分四种类型:

  1. fanout:将所有发送到该交换机的消息路由到所有与该交换机绑定的队列中;
  2. direct:将消息路由到 BindingKey 与 RoutingKey 完全一致的队列中;
  3. topic:将消息路由到 BindingKey 与 RoutingKey 匹配的队列中,匹配的规则包括以 '.' 为分割、以 '*' 和 '#' 做模糊匹配;
  4. headers:该类型的交换机根据消息内容中国的 headers 属性进行匹配。

Queue

队列是 RabbitMQ 中用以存储消息的对象。多个消费者可以订阅同一个队列,而队列中的消息会被均摊到各个消费者,而不是每个消费者都收到所有的消息。

代码语言:javascript复制
def create_queue(self, exchange_name, queue_name, routing_key):
        self.channel.queue_declare(
            queue=queue_name,
        )
        self.queue_bind(exchange_name, queue_name, routing_key)

RoutingKey & BindingKey

RoutingKey 即路由键,通常是生产者发送消息时指定的。BindingKey 即绑定键,通常用于交换机与队列绑定。

二者通常配合起来使用,比如 direct 和 topic 类型的交换机在路由消息时,都是看这两个键是否匹配。某种情况下,RoutingKey 和 BindingKey 可以看做同一个东西。

代码语言:javascript复制
def queue_bind(self, exchange_name, queue_name, routing_key):
        self.channel.queue_bind(
            queue=queue_name,
            exchange=exchange_name,
            routing_key=routing_key,
        )

Publish/Subscribe 机制

RabbitMQ 消息的消费模式通常分为推模式和拉模式。

推模式采用的是订阅的方式,使用的是 basic_consume 方法 ;而拉模式采用的是从队列中获取消息的方式,使用的是 basic_get 方法。拉模式通常运用于获取单挑消息的场合,对于持续获取消息或者需要实现高吞吐量的场合,推模式更适合。

下面是一个推模式的例子:

代码语言:javascript复制
def msg_consumer(channel, method_frame, header_frame, body):
    try:
        print("[Consumer] Receive message:")
        print("           {}: {}".format(method_frame.routing_key, body))
        time.sleep(1)
        channel.basic_ack(delivery_tag=method_frame.delivery_tag)
    except:
        print("[Consumer] Reject message and return it to queue!")
        channel.basic_nack(delivery_tag=method_frame.delivery_tag,
                           multiple=False, requeue=True)
    return


def msg_publisher(channel, *, exchange, routing_key):
    # Send a message
    data = "hahahahhahahahaha! I'm a bug and you can't catch me!"
    if channel.basic_publish(
            exchange=exchange,
            routing_key=routing_key,
            body=data,
            properties=pika.BasicProperties(
                content_type='text/plain',
                delivery_mode=1),
            mandatory=True):
        print('[Producer] Message was published')
    else:
        print('[Producer] Message was returned')

if __name__ == "__main__":
    client = MqClient(
        mq_host="172.16.110.17",
        mq_port=5672,
        username="guest",
        password="guest",
    )
    channel = client.get_channel()

    # 设置生产者
    msg_publisher(channel,
                  exchange=EXCHANGE_NAME,
                  routing_key="hdls.miao.message")
    # 设置消费者
    channel.basic_consume(
        msg_consumer,
        queue=QUEUE_NAME,
        no_ack=False,
        consumer_tag="hdls-consumer"
    )
    # 开始消费
    channel.start_consuming()
    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        channel.stop_consuming()
    client.connection.close()

消费者收到消息后进行收到 ack,得到的结果如下:

代码语言:javascript复制
[Producer] Message was published
[Consumer] Receive message:
           hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"

若对于该条消息,消费者不消费,而是拒绝: basic_nack,而拒绝的同时将参数设置为 requeue=True,即将消息打回队列,则得到的结果如下:

代码语言:javascript复制
[Producer] Message was published
[Consumer] Receive message:
           hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
[Consumer] Reject message and return it to queue!
[Consumer] Receive message:
           hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
[Consumer] Reject message and return it to queue!
[Consumer] Receive message:
           hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
[Consumer] Reject message and return it to queue!
[Consumer] Receive message:
           hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
[Consumer] Reject message and return it to queue!
...

此时该条消息就会一直被打回队列,就一直堵在队列中:

死信

当一个消息被拒绝而被打回队列,而此后该消息没有消费者接收,成了死信,就会堵住队列,当队列中死信越来越多时,队列的性能会受到影响。对于死信的处理,设置死信队列是个很好的选择。

死信通常有下面几种情况:

  1. 消息被拒绝(通过 basic.reject 方法或 basic.nack 方法),同时被打回队列;
  2. 消息本身设置了 TTL 或队列设置了 TTL,且达到了过期时间;
  3. 队列可持有消息数量达到了上限。

死信交换机

当消息在一个队列中成为死信时,就能够被发送到另一个交换机中,也就是死信交换机。死信交换机其实就是普通的交换机,不过绑定的是死信队列,其声明和使用与普通交换机一致。

死信队列

死信队列就是用来接收死信的队列,但其本质与普通队列一样。只不过在设置普通队列的时候需要给其定义死信交换机是哪个,当消息成为死信时,以什么样的 routing_key 来路由到死信队列里去。这样所有的死信就可以被路由到对应的死信队列中去了。

需要注意的是,在声明普通队列的死信设置之前,死信交换机和死信队列需要先存在。

根据定义将上面的普通队列做修改:

代码语言:javascript复制
def create_queue(self, exchange_name, queue_name, routing_key,
                     is_dead=False):
        arguments = {}
        if not is_dead:
            arguments = {
                "x-dead-letter-exchange": DEAD_EXCHANGE_NAME,
                "x-dead-letter-routing-key": DEAD_ROUTING_KEY,
            }
        self.channel.queue_declare(
            queue=queue_name,
            arguments=arguments,
        )
        self.queue_bind(exchange_name, queue_name, routing_key)

在声明队列时,需要声明两个参数即可: x-dead-letter-exchangex-dead-letter-routing-key。同时在声明普通队列之前声明死信队列:

代码语言:javascript复制
def connect(self):
        self.create_exchange(DEAD_EXCHANGE_NAME)
        self.create_queue(DEAD_EXCHANGE_NAME,
                          DEAD_QUEUE_NAME, DEAD_ROUTING_KEY, is_dead=True)

        self.create_exchange(EXCHANGE_NAME)
        self.create_queue(EXCHANGE_NAME, QUEUE_NAME, ROUTING_KEY)

如果同时加上死信队列的消费者,就可以统一处理死信了:

代码语言:javascript复制
def dead_msg_consumer(channel, method_frame, header_frame, body):
    try:
        print("[DEAD CSM] Dead Message! It's time to put it to dead queue.")
        print("           {}: {}".format(method_frame.routing_key, body))
        print("           ACK dead message!")
        channel.basic_ack(delivery_tag=method_frame.delivery_tag)
    except:
        channel.basic_nack(delivery_tag=method_frame.delivery_tag,
                           multiple=False, requeue=False)
    return

万事俱备!但运行后居然报错:

代码语言:javascript复制
pika.exceptions.ChannelClosed: (406, "PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'hdls.mq.queue' in vhost '/': received the value 'dead.exchange' of type 'longstr' but current is none")

这是因为之前我们已经声明过不加死信设置的队列了,声明 queue 时试图设定一个 x-dead-letter-exchange 参数,当前服务器上该 queue 的该参数为 none,服务器不允许所以报错。

此时有两种解决方法:一是在服务器上将之前的 queue 删除,加上死信参数,再次声明队列;二是通过 policy 来设置这个参数。

policy 可以用 rabbitmqctl set_policy设置,也可以在 RabbitMQ 的前端页面进行:

需要注意的是,通过 policy 方法设置时参数为 dead-letter-exchange dead-letter-routing-key,而用第一种方法中的死信参数需要加上 x- 前缀。

将死信设置加上后,再次启用 consumer:

代码语言:javascript复制
[Publisher] Message was published
[Consumer] Receive message:
           hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
[Consumer] Reject message and return it to queue!
[DEAD CSM] Dead Message! It's time to put it to dead queue.
           dead.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
           ACK dead message!

可以看到,消息在打回队列后就被路由到了死信队列。

延迟队列

所谓延迟队列,指的是消息发送后,并不想立即被消费者拿到,希望在指定时间后,消费者才拿到消息。

延迟队列可以用死信队列来实现。利用队列或消息的 TTL 特性,可以做到消息在指定时间内超时后被路由到死信队列,而此时死信队列就可以当做延迟队列来做消息处理。

代码语言:javascript复制
def create_queue(self, exchange_name, queue_name, routing_key,
                     is_dead=False):
        arguments = {}
        if not is_dead:
            arguments = {
                "x-message-ttl": 3000,
                "x-dead-letter-exchange": DEAD_EXCHANGE_NAME,
                "x-dead-letter-routing-key": DEAD_ROUTING_KEY,
            }
        self.channel.queue_declare(
            queue=queue_name,
            arguments=arguments,
        )
        self.queue_bind(exchange_name, queue_name, routing_key)

在普通队列的死信设置里加上一条 x-message-ttl 就可以设置消息的 TTL。

代码语言:javascript复制
[Publisher] Message was published
[Consumer] Receive message:
           hdls.miao.message: b"I'm not bug, but you can only receive me in 3 seconds."
[Consumer] Reject message and return it to queue!
[Consumer] Receive message:
           hdls.miao.message: b"I'm not bug, but you can only receive me in 3 seconds."
[Consumer] Reject message and return it to queue!
[Consumer] Receive message:
           hdls.miao.message: b"I'm not bug, but you can only receive me in 3 seconds."
[Consumer] Reject message and return it to queue!
[DELAY CSM] Delay queue receive the message!
            dead.message: b"I'm not bug, but you can only receive me in 3 seconds."
            ACK delay message!

0 人点赞