前言
消息队列在现今数据量超大,并发量超高的系统中是十分常用的。本文将会对现时最常用到的几款消息队列框架 ActiveMQ、RabbitMQ、Kafka 进行分析对比。
详细介绍 RabbitMQ 在 Spring 框架下的结构及实现原理,从Producer 端的事务、回调函数(ConfirmCallback / ReturnCallback)到 Consumer 端的 MessageListenerContainer 信息接收容器进行详细的分析。通过对 RabbitTemplate、SimpleMessageListenerContainer、DirectMessageListenerContainer 等常用类型介绍,深入剖析在消息处理各个传输环节中的原理及注意事项。
并举以实例对死信队列、持久化操作进行一一介绍。
目录
一、RabbitMQ 与 AMQP 的关系
二、RabbitMQ 的实现原理
三、RabbitMQ 应用实例
四、Producer 端的消息发送与监控
五、Consumer 端的消息接收与监控
六、死信队列
七、持久化操作
六、死信队列
死信队列(Dead-Letter-Exchange) 可被看作是死信交换器。当消息在一个队列中变成死信后,它能被重新被发送到特定的交换器中,这个交换器就是DLX ,绑定DLX 的队列就称之为死信队列。消息变成死信一般是由于以下几种情况:
·消息被拒绝,requeue 被设置为 false, 可通过上一介绍的 void basicReject (deliveryTag, requeue) 或 void basicNack(deliveryTag,multiple, requeue) 完成设置 ;
·消息过期;
·队列超出最大长度。
其实死信队列 DLX 也是一个正常的交换器,和一般的交换器没有什么区别,我们可以用一般建立队列的方法,建立一个死信队列。然后建立一个正常的队列,在正常队列中加入参数 x-dead-letter-exchange、x-dead-letter-routing-key 与死信队列进行绑定,完成绑定后在管理界面 Features 选项中 direct.queue.first 会显示 DLX DLK。这时当被绑定的队列出现超时,超长,或被拒绝时(注意requeue被设置为false时,对会激发死信),信息就会流入死信队列被处理。
具体的例子Producer端:
代码语言:javascript复制 1 @Configuration
2 public class BindingConfig {
3 public final static String Queue_First="direct.queue.first";
4 public final static String Exchange_Name="directExchange";
5 public final static String Routing_Key_First="directKey1";
6
7 @Bean
8 public Queue queueFirst(){
9 return new Queue(this.Queue_First);
10 }
11
12 @Bean
13 public DirectExchange directExchange(){
14 return new DirectExchange(this.Exchange_Name);
15 }
16
17 @Bean
18 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
19 return BindingBuilder.bind(queueFirst).to(directExchange).with(Routing_Key_First);
20 }
21 }
22
23 @Configuration
24 public class ConnectionConfig {
25 @Value("${spring.rabbitmq.host}")
26 public String host;
27
28 @Value("${spring.rabbitmq.port}")
29 public int port;
30
31 @Value("${spring.rabbitmq.username}")
32 public String username;
33
34 @Value("${spring.rabbitmq.password}")
35 public String password;
36
37 @Value("${spring.rabbitmq.virtual-host}")
38 public String virtualHost;
39
40 @Bean
41 public ConnectionFactory getConnectionFactory(){
42 CachingConnectionFactory factory=new CachingConnectionFactory();
43 System.out.println(host);
44 factory.setHost(host);
45 factory.setPort(port);
46 factory.setUsername(username);
47 factory.setPassword(password);
48 factory.setVirtualHost(virtualHost);
49 return factory;
50 }
51 }
52
53 @Controller
54 @RequestMapping("/producer")
55 public class ProducerController {
56 @Autowired
57 private RabbitTemplate template;
58
59 @RequestMapping("/send")
60 public void send() {
61 for(int n=0;n<10;n ){
62 template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey1,"Hello World! "
63 String.valueOf(n),getCorrelationData());
64 }
65 }
66
67 private CorrelationData getCorrelationData(){
68 return new CorrelationData(UUID.randomUUID().toString());
69 }
70 }
Customer 端
代码语言:javascript复制 1 @Configuration
2 public class BindingConfig {
3 //普通队列参数
4 public final static String Queue_First="direct.queue.first";
5 public final static String Exchange_Name="directExchange";
6 public final static String Routing_Key_First="directKey1";
7 //死信队列参数
8 public final static String Queue_Dead="direct.queue.dead";
9 public final static String Exchange_Dead="directDead";
10 public final static String Routing_Key_Dead="directDeadKey";
11
12 @Bean
13 public Queue queueFirst(){
14 Map<String, Object> args=new HashMap<String,Object>();
15 //声明当前死信的 Exchange
16 args.put("x-dead-letter-exchange", this.Exchange_Dead);
17 //声明当前队列的死信路由key
18 args.put("x-dead-letter-routing-key", this.Routing_Key_Dead);
19 //把死信队列的参数绑定到当前队列中
20 return QueueBuilder.durable(Queue_First).withArguments(args).build();
21 }
22
23 @Bean
24 public DirectExchange directExchange(){
25 return new DirectExchange(this.Exchange_Name);
26 }
27
28 @Bean
29 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
30 return BindingBuilder.bind(queueFirst).to(directExchange).with(Routing_Key_First);
31 }
32
33 @Bean
34 public Queue queueDead(){
35 return new Queue(this.Queue_Dead);
36 }
37
38 @Bean
39 public DirectExchange directExchangeDead(){
40 return new DirectExchange(this.Exchange_Dead);
41 }
42
43 @Bean
44 public Binding bindingExchangeDead(Queue queueDead,DirectExchange directExchangeDead){
45 return BindingBuilder.bind(queueDead).to(directExchangeDead).with(this.Routing_Key_Dead);
46 }
47 }
48
49 @Configuration
50 public class ConnectionConfig {
51 @Value("${spring.rabbitmq.host}")
52 public String host;
53
54 @Value("${spring.rabbitmq.port}")
55 public int port;
56
57 @Value("${spring.rabbitmq.username}")
58 public String username;
59
60 @Value("${spring.rabbitmq.password}")
61 public String password;
62
63 @Value("${spring.rabbitmq.virtual-host}")
64 public String virtualHost;
65
66 @Bean
67 public ConnectionFactory getConnectionFactory(){
68 CachingConnectionFactory factory=new CachingConnectionFactory();
69 factory.setHost(host);
70 factory.setPort(port);
71 factory.setUsername(username);
72 factory.setPassword(password);
73 factory.setVirtualHost(virtualHost);
74 return factory;
75 }
76 }
77
78 @Configuration
79 public class DirectMessListener {
80 @Autowired
81 private ConnectionConfig connectionConfig;
82 @Autowired
83 private RabbitTemplate template;
84 private int index=0,normalIndex=0,deadIndex=0;
85
86 @Bean
87 public DirectMessageListenerContainer messageContainer(){
88 DirectMessageListenerContainer container=new DirectMessageListenerContainer();
89 container.setConnectionFactory(connectionConfig.getConnectionFactory());
90 // 设置每个队列的 consumer 数量
91 container.setConsumersPerQueue(4);
92 // 设置每个 consumer 每次的接收的消息数量
93 container.setPrefetchCount(10);
94 // 使用MANUAL手动确认
95 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
96 // 监听队列
97 container.addQueueNames(BindingConfig.Queue_First);
98 container.addQueueNames(BindingConfig.Queue_Dead);
99 container.setConsumerTagStrategy(queue -> "consumer" ( index));
100
101 container.setMessageListener(new ChannelAwareMessageListener(){
102 @Override
103 public void onMessage(Message message, com.rabbitmq.client.Channel channel)
104 throws Exception {
105 MessageProperties prop=message.getMessageProperties();
106 if(prop.getReceivedRoutingKey().equals(BindingConfig.Routing_Key_First)){
107 System.out.println("This is a normal queue! " ( normalIndex));
108 //把当前的队列转送到死信队列中
109 channel.basicReject(prop.getDeliveryTag(), false);
110 }
111 if(prop.getReceivedRoutingKey().equals(BindingConfig.Routing_Key_Dead)){
112 System.out.println("This is a dead queue! " ( deadIndex));
113 //模拟对死信队列处理
114 Thread.currentThread().sleep(5000);
115 .......
116 //处理完毕
117 channel.basicAck(prop.getDeliveryTag(), false);
118 }
119
120 }
121 });
122 return container;
123 }
124 }
通过管理界面可以看,信息会先发送到 direct.queue.first,然后被放进死信队列作处理。
运行结果
添加描述
死信队列最常用的场景可以在订单支付,流程审批等环节。例如在 京*、淘* 等平台,当下单成功后,客户要在一定的时间内完成支付操作,否则订单被视作无效,这些业务流程就可以使用死信队列来处理。
七、持久化操作
RabbitMq 的持久化操作包含有 Queue 持久化、Message 持久化和 Exchange 持久化三类。
7.1 Queue 的持久化
队列持久化只需要在 Queue 的构造函数 public Queue(String name, boolean durable) 把 durable 参数置为 true 就可实现。如果队列不设置持久化( (durable 默认为 false), 那么在RabbitMQ 服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。
7.2 Message 持久化
设置了Queue 持久化以后,当 RabbitMQ 服务重启之后,队列依然存在,但消息已经消失,可见单单设置队列的持久化而不设置消息持久化显得毫无意义,所以通常列队持久化会与消息持久化共同使用。
在 RabbitMQ 原生态的框架下,需要把信息属性设置为 MessageProperties.PERSISTENT TEXT PLAIN 才会实现消息的持久化。
而在 Spring 框架下,由于在使用回调函数时需要把 Message 重新返回队列再进行处理,所以 Message 默认已经是持久化的。
7.3 Exchage 的持久化
交换器持久化可通过构造函数 public DirectExchange(String name, boolean durable, boolean autoDelete) 把 durable 参数置为 true 就可实现,而 autoDelete 则是指在所在消费者都解除订阅的情况下自动删除。如果交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是消息不再发送到该 Exchange 。对一个长期使用的交换器来说,持久化还是有其必要性的。
本章总结
RabbitMQ 发展至今,被越来越多的人认可,这和它在易用性、扩展性、可靠性和高可用性等方面的卓著表现是密不可分的。
相比于传统的 ActiveMQ 和分布式 Kafka,它具有自己独有的特点。
希望文章有帮于大家对 RabbitMQ 消息队列方面有更深入的了解,在不同的开发环境中灵活运用。
由于时间仓促,文章当中有不明确的地方或有错漏敬请点明。