RabbitMQ 延迟队列

2019-11-04 15:03:36 浏览数 (1)

rabbitmq 本身不支持延迟队列,但提供了实现延迟队列的必备条件。

原理

  1. queue可以通过 x-message-ttl 参数设置过期时间,到了过期时间的消息就会被标记为 dead letter 状态。
  2. 过期的消息可以通过 x-dead-letter-exchangex-dead-letter-routing-key 参数转发到另一个 exchange 中去。

在management 中测试延迟队列

docker 部署测试rabbitmq

  • 如果已经有现成的rabbitmq,本小节跳过。

docker-compose.yml

代码语言:javascript复制
version: "3"
services:
  rabbitmq:
    image: rabbitmq:3.7.12-management-alpine
    container_name: rabbitmq
    ports:
    - 15672:15672
    - 5671-5672:5671-5672

启动:

代码语言:javascript复制
docker-compose up -d

创建出队队列

  1. 选择 Queues 面板,打开 Add a new queue 栏目。
  2. 输入如下队列数据:
代码语言:javascript复制
Name: delay_queue_deque

创建exchange

  1. 选择 Exchanges 面板,打开 Add a new exchange 栏目。
  2. 输入如下队列数据:
代码语言:javascript复制
Name: delay_exchange
  1. 添加 Binding:
代码语言:javascript复制
点击新创建的 delay_exchange;
打开 Bindings 栏目;
选择 "To queue" ,并输入值 "delay_queue_deque";
"Routing key:" 中输入 "delay_routing_key"
  • To queue 要和前面的出队队列配置的 Name 的值相同。

创建入队队列

  1. 选择 Queues 面板,打开 Add a new queue 栏目。
  2. 输入如下队列数据:
代码语言:javascript复制
Name: delay_queue_enque
Arguments:
  x-message-ttl 30000
  x-dead-letter-exchange delay_exchange
  x-dead-letter-routing-key delay_routing_key
  • 添加参数时,可点击下面add 工具栏里的链接,会自动填充前面的参数名。
  • x-dead-letter-exchange 要和前面的 exchange 配置的 Name 的值相同。
  • 参数 x-dead-letter-routing-key 要和 exchange 添加的Binding 的 Routing key 的值相同。

测试

在 Queues 面板中,点击 delay_queue_enque 队列。

找到 Publish message,在 payload 中输入测试内容:"hello-001",点击 Publish message 按钮。

点击 "Queues" 面板按钮,就会看到delay_queue_enque 队列中有了一条数据,Ready:1。

等待30秒,消息就会转到 delay_queue_deque 中去了。

点击 delay_queue_deque ,找到 Get messages 栏目,点击 Get messages 按钮,检查取出的数据,为publish的message,测试无误。

  • 测试时会发现,get messages之后,消息还在队列里。这是因为默认的应答策略是:Nack message requeue true,改为 Ack message requeue false并再次获取消息 ,消息就会真的被消费掉了。

java 测试

在management 中的测试理解了,在java中的代码就容易理解了。

创建springboot项目

引入 RabbitMQ组件,或者手动加入:

代码语言:javascript复制
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置队列

代码语言:javascript复制
private static final String DEQUE_QUEUE_NAME = "dequeQueue";
private static final String ENQUE_QUEUE_NAME = "enqueQueue";
private static final String DEQUE_QUEUE_NAME_KEY = "dequeQueueKey";
private static final String DELAY_EXCHANGE="exchange_delay";
//死信路由
  @Bean
  DirectExchange exchange() {
      return new DirectExchange(DELAY_EXCHANGE);
  }

  //用于延时消费的队列
  @Bean
  public Queue dequeQueue() {
      Queue queue = new Queue(DEQUE_QUEUE_NAME,true,false,false);
      return queue;
  }

  //绑定exchange 到出队队列
  @Bean
  public Binding  deadLetterBinding() {
      return BindingBuilder.bind(dequeQueue()).to(exchange()).with(DEQUE_QUEUE_NAME_KEY);
  }

  //配置死信队列,即入队队列
  @Bean
  public Queue deadLetterQueue() {
      Map<String,Object> args = new HashMap<>();
      args.put("x-message-ttl", 20000);
      args.put("x-dead-letter-exchange", DELAY_EXCHANGE);
      args.put("x-dead-letter-routing-key", DEQUE_QUEUE_NAME_KEY);
      return new Queue(ENQUE_QUEUE_NAME, true, false, false, args);
  }

发送消息

代码语言:javascript复制
package com.pollyduan.rabbitmq;

import java.time.LocalDateTime;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController {
	@Autowired
    private AmqpTemplate rabbitTemplate;

	@GetMapping("send")
	public String send() {
		String msg="hello-001";
		System.out.println("发送消息:" LocalDateTime.now().toString() " 内容:" msg);
        rabbitTemplate.convertAndSend("enqueQueue", msg);
        return "ok";
	}
}

接收消息

代码语言:javascript复制
package com.pollyduan.rabbitmq;

import java.time.LocalDateTime;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageHandler {
	@RabbitListener(queues = "dequeQueue")
    public void process(String msg) {
        System.out.println("接收消息:" LocalDateTime.now().toString() " 内容:" msg);
    }
}

测试java实例

访问入队实例地址: http://localhost:8080/send

查看运行日志,可以看到:

代码语言:javascript复制
发送消息:2018-12-08T19:18:05.730 msg内容:hello-001

观察日志界面不动,随后会打印一条:

代码语言:javascript复制
接收消息:2018-12-08T19:18:25.762 接收内容:hello-001

比较两条日志的时间差:20 秒,测试无误。

单条消息的过期

单条消息发送时也可以指定本消息的过期时间,那么队列过期时间和消息过期时间同时配置的时候,以时间短的限制为准。

我们要在消息中附带一些参数,使用String 类型消息就不够了。

修改出队服务:

代码语言:javascript复制
package com.pollyduan.rabbitmq;

import java.io.UnsupportedEncodingException;
import java.time.LocalDateTime;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageHandler {
	@RabbitListener(queues = "dequeQueue")
	public void process(Message message) throws UnsupportedEncodingException {
		String msg=new String(message.getBody(),"UTF-8");
		System.out.println("接收消息:" LocalDateTime.now().toString() " 接收内容:" msg);
	}
}

修改入队接口:

代码语言:javascript复制
package com.pollyduan.rabbitmq;

import java.io.UnsupportedEncodingException;
import java.time.LocalDateTime;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController {
	@Autowired
    private AmqpTemplate rabbitTemplate;

	@GetMapping("send")
	public String send() throws UnsupportedEncodingException {
		Message message = MessageBuilder.withBody("单独指定过期时间的消息".getBytes("UTF-8")).build();
		message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
		message.getMessageProperties().setExpiration("10000");

        rabbitTemplate.convertAndSend("enqueQueue", message);
        System.out.println("发送消息:" LocalDateTime.now().toString() " 内容:" new String(message.getBody(),"UTF-8"));
        return "ok";
	}
}

那么队列的过期时间是20秒,这里设置的消息过期时间是10 秒,测试就会发现10秒的时候,消息就被消费掉了。

同样,修改消息过期时间为30秒,则会在20秒的时候消息过期。

单条消息过期是不可靠的

如果为单条消息设置过期,实际上是不可靠的。比如:

消息1: hello-001 过期时间:10 秒 消息2: hello-002 过期时间:5 秒

我们希望的是:消息2 先过期,消息1 后过期,那么预期的目标是在 dequeue里先拿到 消息2,后拿到 消息1。经过测试你会发现这是做不到的。

因为实际上enqueue 还是按照FIFO顺序处理的,就是说,直到 消息1 到期,判断为死信,处理;然后才会处理 消息2,因此即便后发的消息过期时间短,也不会被提前处理。

综上,为单条消息设置过期时间是不可靠的。优先选择使用队列的延迟机制。

典型应用

订单到期自动取消 消息延时同步 延迟检查状态

0 人点赞