rabbitmq 本身不支持延迟队列,但提供了实现延迟队列的必备条件。
原理
- queue可以通过
x-message-ttl
参数设置过期时间,到了过期时间的消息就会被标记为dead letter
状态。 - 过期的消息可以通过
x-dead-letter-exchange
和x-dead-letter-routing-key
参数转发到另一个exchange
中去。
在management 中测试延迟队列
docker 部署测试rabbitmq
- 如果已经有现成的rabbitmq,本小节跳过。
docker-compose.yml
代码语言:javascript复制version: "3"
services:
rabbitmq:
image: rabbitmq:3.7.12-management-alpine
container_name: rabbitmq
ports:
- 15672:15672
- 5671-5672:5671-5672
启动:
代码语言:javascript复制docker-compose up -d
创建出队队列
- 选择
Queues
面板,打开Add a new queue
栏目。 - 输入如下队列数据:
Name: delay_queue_deque
创建exchange
- 选择
Exchanges
面板,打开Add a new exchange
栏目。 - 输入如下队列数据:
Name: delay_exchange
- 添加 Binding:
点击新创建的 delay_exchange;
打开 Bindings 栏目;
选择 "To queue" ,并输入值 "delay_queue_deque";
"Routing key:" 中输入 "delay_routing_key"
To queue
要和前面的出队队列配置的Name
的值相同。
创建入队队列
- 选择
Queues
面板,打开Add a new queue
栏目。 - 输入如下队列数据:
Name: delay_queue_enque
Arguments:
x-message-ttl 30000
x-dead-letter-exchange delay_exchange
x-dead-letter-routing-key delay_routing_key
- 添加参数时,可点击下面add 工具栏里的链接,会自动填充前面的参数名。
x-dead-letter-exchange
要和前面的 exchange 配置的Name
的值相同。- 参数
x-dead-letter-routing-key
要和 exchange 添加的Binding 的Routing key
的值相同。
测试
在 Queues 面板中,点击 delay_queue_enque
队列。
找到 Publish message
,在 payload
中输入测试内容:"hello-001",点击 Publish message
按钮。
点击 "Queues" 面板按钮,就会看到delay_queue_enque
队列中有了一条数据,Ready:1。
等待30秒,消息就会转到 delay_queue_deque
中去了。
点击 delay_queue_deque
,找到 Get messages
栏目,点击 Get messages
按钮,检查取出的数据,为publish的message,测试无误。
- 测试时会发现,get messages之后,消息还在队列里。这是因为默认的应答策略是:
Nack message requeue true
,改为Ack message requeue false
并再次获取消息 ,消息就会真的被消费掉了。
java 测试
在management 中的测试理解了,在java中的代码就容易理解了。
创建springboot项目
引入 RabbitMQ
组件,或者手动加入:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置队列
代码语言:javascript复制private static final String DEQUE_QUEUE_NAME = "dequeQueue";
private static final String ENQUE_QUEUE_NAME = "enqueQueue";
private static final String DEQUE_QUEUE_NAME_KEY = "dequeQueueKey";
private static final String DELAY_EXCHANGE="exchange_delay";
//死信路由
@Bean
DirectExchange exchange() {
return new DirectExchange(DELAY_EXCHANGE);
}
//用于延时消费的队列
@Bean
public Queue dequeQueue() {
Queue queue = new Queue(DEQUE_QUEUE_NAME,true,false,false);
return queue;
}
//绑定exchange 到出队队列
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(dequeQueue()).to(exchange()).with(DEQUE_QUEUE_NAME_KEY);
}
//配置死信队列,即入队队列
@Bean
public Queue deadLetterQueue() {
Map<String,Object> args = new HashMap<>();
args.put("x-message-ttl", 20000);
args.put("x-dead-letter-exchange", DELAY_EXCHANGE);
args.put("x-dead-letter-routing-key", DEQUE_QUEUE_NAME_KEY);
return new Queue(ENQUE_QUEUE_NAME, true, false, false, args);
}
发送消息
代码语言:javascript复制package com.pollyduan.rabbitmq;
import java.time.LocalDateTime;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@Autowired
private AmqpTemplate rabbitTemplate;
@GetMapping("send")
public String send() {
String msg="hello-001";
System.out.println("发送消息:" LocalDateTime.now().toString() " 内容:" msg);
rabbitTemplate.convertAndSend("enqueQueue", msg);
return "ok";
}
}
接收消息
代码语言:javascript复制package com.pollyduan.rabbitmq;
import java.time.LocalDateTime;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageHandler {
@RabbitListener(queues = "dequeQueue")
public void process(String msg) {
System.out.println("接收消息:" LocalDateTime.now().toString() " 内容:" msg);
}
}
测试java实例
访问入队实例地址: http://localhost:8080/send
查看运行日志,可以看到:
代码语言:javascript复制发送消息:2018-12-08T19:18:05.730 msg内容:hello-001
观察日志界面不动,随后会打印一条:
代码语言:javascript复制接收消息:2018-12-08T19:18:25.762 接收内容:hello-001
比较两条日志的时间差:20 秒,测试无误。
单条消息的过期
单条消息发送时也可以指定本消息的过期时间,那么队列过期时间和消息过期时间同时配置的时候,以时间短的限制为准。
我们要在消息中附带一些参数,使用String 类型消息就不够了。
修改出队服务:
代码语言:javascript复制package com.pollyduan.rabbitmq;
import java.io.UnsupportedEncodingException;
import java.time.LocalDateTime;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageHandler {
@RabbitListener(queues = "dequeQueue")
public void process(Message message) throws UnsupportedEncodingException {
String msg=new String(message.getBody(),"UTF-8");
System.out.println("接收消息:" LocalDateTime.now().toString() " 接收内容:" msg);
}
}
修改入队接口:
代码语言:javascript复制package com.pollyduan.rabbitmq;
import java.io.UnsupportedEncodingException;
import java.time.LocalDateTime;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@Autowired
private AmqpTemplate rabbitTemplate;
@GetMapping("send")
public String send() throws UnsupportedEncodingException {
Message message = MessageBuilder.withBody("单独指定过期时间的消息".getBytes("UTF-8")).build();
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setExpiration("10000");
rabbitTemplate.convertAndSend("enqueQueue", message);
System.out.println("发送消息:" LocalDateTime.now().toString() " 内容:" new String(message.getBody(),"UTF-8"));
return "ok";
}
}
那么队列的过期时间是20秒,这里设置的消息过期时间是10 秒,测试就会发现10秒的时候,消息就被消费掉了。
同样,修改消息过期时间为30秒,则会在20秒的时候消息过期。
单条消息过期是不可靠的
如果为单条消息设置过期,实际上是不可靠的。比如:
消息1: hello-001 过期时间:10 秒 消息2: hello-002 过期时间:5 秒
我们希望的是:消息2 先过期,消息1 后过期,那么预期的目标是在 dequeue里先拿到 消息2
,后拿到 消息1
。经过测试你会发现这是做不到的。
因为实际上enqueue 还是按照FIFO顺序处理的,就是说,直到 消息1
到期,判断为死信,处理;然后才会处理 消息2
,因此即便后发的消息过期时间短,也不会被提前处理。
综上,为单条消息设置过期时间是不可靠的。优先选择使用队列的延迟机制。
典型应用
订单到期自动取消 消息延时同步 延迟检查状态