MQ教程 | 基于RabbitMQ消息延时队列

2020-04-01 16:48:55 浏览数 (1)

▍延迟任务应用场景

  • 物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时。
  • 订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。
  • 过1分钟给新注册会员的用户,发送注册邮件等。

▍RabbitMQ延迟队列实现的方式有两种

  • 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能。
  • 使用rabbitmq-delayed-message-exchange 插件实现延迟功能(注意:延迟插件rabbitmq-delayed-message-exchange是在RabbitMQ 3.5.7及以上的版本才支持的)

▍死信机制实现延迟队列

RabbitMQ没有直接去实现延迟队列这个功能。而是需要通过消息的TTL(Time To Live))和死信交换机(Dead Letter Exchanges)这两者的组合来实现。

▍消息的TTL(Time To Live)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。

超过了这个时间,认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。

可以通过设置消息的 expiration 字段或者队列 x-message-ttl 属性来设置时间,两者是一样的效果。

下面例子是通过设置消息的 expiration 字段实现死信,针对每条消息设置 TTL 是在发送消息的时候设置 expiration 参数,单位为毫秒

代码语言:javascript复制
$body = 'Tinywan expiration!';$msg = new AMQPMessage($body);$msg->set("delivery_mode", AMQPMessage::DELIVERY_MODE_PERSISTENT); // 设置超时时间$msg->set("expiration", 30000); // ms 1000ms = 1s

上面的代码在向队列发送消息的时候,通过传递 { expiration: '30000'} 将这条消息的过期时间设为了 30秒,对消息设置 30秒 钟过期,这条消息并不一定就会在30秒钟后被丢弃或进入死信,只有当这条消息到达队首即将被消费时才会判断其是否过期,若未过期就会被消费者消费,若已过期就会被删除或者成为死信。

▍死信交换器(Dead Letter Exchanges)

RabbitMQ中有一种交换器叫 死信交换器,全称为 Dead-Letter-Exchange,可以称之为死信交换器。当消息在一个队列中变成死信(dead message)之后,它会被重新发送到另外一个交换器中,这个交换器就是 死信交换器,绑定在 死信交换器 上的队列就称之为 死信队列

队列中的消息在以下三种情况下会变成死信:

  • 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
  • 上面的消息的TTL到了,消息过期了。
  • 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

当队列中的消息成为死信以后,如果队列设置了DLX那么消息会被发送到DLX。通过x-dead-letter-exchange设置DLX,通过这个x-dead-letter-routing-key设置消息发送到DLX所用的routing-key,如果不设置默认使用消息本身的routing-key。

Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

死信交换器可以在程序中设置,也可以使用rabbitmqctl工具进行设置,关于死信交换器的介绍请参考RabbitMQ官网 https://www.rabbitmq.com/dlx.html

▍死信队列设置

1. 首先需要设置死信队列的exchange和queue,然后进行绑定:

代码语言:javascript复制
Exchange: dlx.exchangeQueue: dlx.queueRoutingKey: ##表示只要有消息到达了Exchange,那么都会路由到这个queue上

2. 然后需要有一个监听,去监听这个队列进行处理

3. 然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列加上一个参数即可

代码语言:javascript复制
arguments.put(" x-dead-letter-exchange","dlx.exchange");

这样消息在过期、requeue、 队列在达到最大长度时,消息就可以直接路由到死信队列!

▍定时任务

因为队列中的消息过期后会成为死信,而死信又会被发布到该消息所在的队列的 DLX 上去,所以通过为消息设置过期时间,然后再消费该消息所在队列的 DLX 所绑定的队列,从而来达到定时处理一个任务的目的。

简单的讲就是当有一个队列 queue1,其 DLX 为 deadEx1,deadEx1 绑定了一个队列 deadQueue1,当队列 queue1 中有一条消息因过期成为死信时,就会被发布到 deadEx1 中去,通过消费队列 deadQueue1 中的消息,也就相当于消费的是 queue1 中的因过期产生的死信消息。

▍参考案例

发送消息

代码语言:javascript复制
public static function delayQueueSend($param = []){    $connection = RabbitMqConnection::getConnection();    $channel = $connection->channel();//定义等待exchange// Fanout:该类型路由规则非常简单,会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中,相当于广播功能。    $channel->exchange_declare('waitSendExchange', 'fanout', false, false, false);//定义过期exchange    $channel->exchange_declare('expireExchange', 'fanout', false, false, false);//定义过期queue    $channel->queue_declare("expireQueue", false, false, false, false, false);//定义等待queue    $channel->queue_declare("waitSendQueue", false, false, false, false, false, new AMQPTable(["x-dead-letter-exchange" => "expireExchange"]));    $channel->queue_bind("waitSendQueue", "waitSendExchange");    $channel->queue_bind("expireQueue", "expireExchange");
    $body = 'Tinywan expiration!';    $msg = new AMQPMessage($body);    $msg->set("delivery_mode", AMQPMessage::DELIVERY_MODE_PERSISTENT); // 设置超时时间    $msg->set("expiration", 30000); // ms 1000ms = 1s
// 向等待exchage发布消息    $channel->basic_publish($msg, 'waitSendExchange');echo ' [x] Sent '. date('Y-m-d H:i:s') .': ', $body, "n";    $channel->close();    $connection->close();}

接受消息(这里为阻塞模式)

代码语言:javascript复制
public static function delayQueueReceive(){    $connection = RabbitMqConnection::getConnection();    $channel = $connection->channel();
//定义等待exchange    $channel->exchange_declare('waitSendExchange', 'fanout', false, false, false);//定义过期exchange    $channel->exchange_declare('expireExchange', 'fanout', false, false, false);
//定义过期queue    $channel->queue_declare("expireQueue", false, false, false, false, false);//定义等待queue    $channel->queue_declare("waitSendQueue", false, false, false, false, false, new AMQPTable(["x-dead-letter-exchange" => "expireExchange"]));    $channel->queue_bind("waitSendQueue", "waitSendExchange");    $channel->queue_bind("expireQueue", "expireExchange");
echo ' [*] Waiting for message. To exit press CTRL C '.PHP_EOL;    $callback = function ($msg) {echo ' [x] Receive '. date('Y-m-d H:i:s') .':', $msg->body, "n";    };// 订阅超时queue    $channel->basic_consume("expireQueue", "", false, true, false, false, $callback);
while (count($channel->callbacks)) {      $channel->wait();    }    $channel->close();    $connection->close();}

注意:如果声明的 expireExchange 是 direct 类型,那么在为其绑定队列的时候一定要指定 BindingKey,即这里的 deadLetterRoutingKey,如果不指定 Bindingkey,则需要将 expireExchange 声明为 fanout 类型。

fanout 类型路由规则非常简单,会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中,相当于广播功能)。

▍运行结果

发送消息

接受消息

0 人点赞