深入剖析 RabbitMQ —— Spring 框架下实现 AMQP 高级消息队列协议(下)

2019-08-13 14:37:10 浏览数 (1)

前言

消息队列在现今数据量超大,并发量超高的系统中是十分常用的。本文将会对现时最常用到的几款消息队列框架 ActiveMQ、RabbitMQ、Kafka 进行分析对比。

详细介绍 RabbitMQ 在 Spring 框架下的结构及实现原理,从Producer 端的事务、回调函数(ConfirmCallback / ReturnCallback)到 Consumer 端的 MessageListenerContainer 信息接收容器进行详细的分析。通过对 RabbitTemplate、SimpleMessageListenerContainer、DirectMessageListenerContainer 等常用类型介绍,深入剖析在消息处理各个传输环节中的原理及注意事项。

并举以实例对死信队列、持久化操作进行一一介绍。

目录

一、RabbitMQ 与 AMQP 的关系

二、RabbitMQ 的实现原理

三、RabbitMQ 应用实例

四、Producer 端的消息发送与监控

五、Consumer 端的消息接收与监控

六、死信队列

七、持久化操作

六、死信队列

死信队列(Dead-Letter-Exchange) 可被看作是死信交换器。当消息在一个队列中变成死信后,它能被重新被发送到特定的交换器中,这个交换器就是DLX ,绑定DLX 的队列就称之为死信队列。消息变成死信一般是由于以下几种情况:

·消息被拒绝,requeue 被设置为 false, 可通过上一介绍的 void basicReject (deliveryTag, requeue) 或 void basicNack(deliveryTag,multiple, requeue) 完成设置 ;

·消息过期;

·队列超出最大长度。

其实死信队列 DLX 也是一个正常的交换器,和一般的交换器没有什么区别,我们可以用一般建立队列的方法,建立一个死信队列。然后建立一个正常的队列,在正常队列中加入参数 x-dead-letter-exchange、x-dead-letter-routing-key 与死信队列进行绑定,完成绑定后在管理界面 Features 选项中 direct.queue.first 会显示 DLX DLK。这时当被绑定的队列出现超时,超长,或被拒绝时(注意requeue被设置为false时,对会激发死信),信息就会流入死信队列被处理。

具体的例子Producer端:

代码语言:javascript复制
 1 @Configuration 
 2 public class BindingConfig {
 3 public final static String Queue_First="direct.queue.first";
 4 public final static String Exchange_Name="directExchange";
 5 public final static String Routing_Key_First="directKey1";
 6 
 7 @Bean
 8 public Queue queueFirst(){
 9 return new Queue(this.Queue_First);
10 }
11 
12 @Bean
13 public DirectExchange directExchange(){
14 return new DirectExchange(this.Exchange_Name);
15 }
16 
17 @Bean
18 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
19 return BindingBuilder.bind(queueFirst).to(directExchange).with(Routing_Key_First);
20 }
21 }
22 
23 @Configuration
24 public class ConnectionConfig {
25 @Value("${spring.rabbitmq.host}")
26 public String host;
27 
28 @Value("${spring.rabbitmq.port}")
29 public int port;
30 
31 @Value("${spring.rabbitmq.username}")
32 public String username;
33 
34 @Value("${spring.rabbitmq.password}")
35 public String password;
36 
37 @Value("${spring.rabbitmq.virtual-host}")
38 public String virtualHost;
39 
40 @Bean
41 public ConnectionFactory getConnectionFactory(){
42 CachingConnectionFactory factory=new CachingConnectionFactory();
43 System.out.println(host);
44 factory.setHost(host);
45 factory.setPort(port);
46 factory.setUsername(username);
47 factory.setPassword(password);
48 factory.setVirtualHost(virtualHost);
49 return factory;
50 }
51 }
52 
53 @Controller
54 @RequestMapping("/producer")
55 public class ProducerController {
56 @Autowired
57 private RabbitTemplate template;
58 
59 @RequestMapping("/send")
60 public void send() {
61 for(int n=0;n<10;n  ){ 
62 template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey1,"Hello World! "
63  String.valueOf(n),getCorrelationData());
64 }
65 }
66 
67 private CorrelationData getCorrelationData(){
68 return new CorrelationData(UUID.randomUUID().toString());
69 }
70 }

Customer 端

代码语言:javascript复制
 1 @Configuration
 2 public class BindingConfig {
 3 //普通队列参数
 4 public final static String Queue_First="direct.queue.first";
 5 public final static String Exchange_Name="directExchange";
 6 public final static String Routing_Key_First="directKey1";
 7 //死信队列参数
 8 public final static String Queue_Dead="direct.queue.dead";
 9 public final static String Exchange_Dead="directDead";
 10 public final static String Routing_Key_Dead="directDeadKey";
 11 
 12 @Bean
 13 public Queue queueFirst(){
 14 Map<String, Object> args=new HashMap<String,Object>();
 15 //声明当前死信的 Exchange
 16 args.put("x-dead-letter-exchange", this.Exchange_Dead);
 17 //声明当前队列的死信路由key
 18 args.put("x-dead-letter-routing-key", this.Routing_Key_Dead);
 19 //把死信队列的参数绑定到当前队列中
 20 return QueueBuilder.durable(Queue_First).withArguments(args).build();
 21 }
 22 
 23 @Bean
 24 public DirectExchange directExchange(){
 25 return new DirectExchange(this.Exchange_Name);
 26 }
 27 
 28 @Bean
 29 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
 30 return BindingBuilder.bind(queueFirst).to(directExchange).with(Routing_Key_First);
 31 }
 32 
 33 @Bean
 34 public Queue queueDead(){
 35 return new Queue(this.Queue_Dead);
 36 }
 37 
 38 @Bean
 39 public DirectExchange directExchangeDead(){
 40 return new DirectExchange(this.Exchange_Dead);
 41 }
 42 
 43 @Bean
 44 public Binding bindingExchangeDead(Queue queueDead,DirectExchange directExchangeDead){
 45 return BindingBuilder.bind(queueDead).to(directExchangeDead).with(this.Routing_Key_Dead);
 46 }
 47 }
 48 
 49 @Configuration
 50 public class ConnectionConfig {
 51 @Value("${spring.rabbitmq.host}")
 52 public String host;
 53 
 54 @Value("${spring.rabbitmq.port}")
 55 public int port;
 56 
 57 @Value("${spring.rabbitmq.username}")
 58 public String username;
 59 
 60 @Value("${spring.rabbitmq.password}")
 61 public String password;
 62 
 63 @Value("${spring.rabbitmq.virtual-host}")
 64 public String virtualHost;
 65 
 66 @Bean
 67 public ConnectionFactory getConnectionFactory(){
 68 CachingConnectionFactory factory=new CachingConnectionFactory();
 69 factory.setHost(host);
 70 factory.setPort(port);
 71 factory.setUsername(username);
 72 factory.setPassword(password);
 73 factory.setVirtualHost(virtualHost);
 74 return factory;
 75 }
 76 }
 77 
 78 @Configuration
 79 public class DirectMessListener {
 80 @Autowired
 81 private ConnectionConfig connectionConfig;
 82 @Autowired
 83 private RabbitTemplate template;
 84 private int index=0,normalIndex=0,deadIndex=0; 
 85 
 86 @Bean
 87 public DirectMessageListenerContainer messageContainer(){
 88 DirectMessageListenerContainer container=new DirectMessageListenerContainer();
 89 container.setConnectionFactory(connectionConfig.getConnectionFactory());
 90 // 设置每个队列的 consumer 数量
 91 container.setConsumersPerQueue(4);
 92 // 设置每个 consumer 每次的接收的消息数量
 93 container.setPrefetchCount(10);
 94 // 使用MANUAL手动确认
 95 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 96 // 监听队列
 97 container.addQueueNames(BindingConfig.Queue_First);
 98 container.addQueueNames(BindingConfig.Queue_Dead);
 99 container.setConsumerTagStrategy(queue -> "consumer" (  index));
100 
101 container.setMessageListener(new ChannelAwareMessageListener(){
102 @Override
103 public void onMessage(Message message, com.rabbitmq.client.Channel channel) 
104 throws Exception {
105 MessageProperties prop=message.getMessageProperties();
106 if(prop.getReceivedRoutingKey().equals(BindingConfig.Routing_Key_First)){
107 System.out.println("This is a normal queue! " (  normalIndex));
108 //把当前的队列转送到死信队列中
109 channel.basicReject(prop.getDeliveryTag(), false); 
110 }
111 if(prop.getReceivedRoutingKey().equals(BindingConfig.Routing_Key_Dead)){
112 System.out.println("This is a dead queue! " (  deadIndex));
113 //模拟对死信队列处理
114 Thread.currentThread().sleep(5000);
115 .......
116 //处理完毕
117 channel.basicAck(prop.getDeliveryTag(), false);
118 }
119 
120 }
121 });
122 return container;
123 }
124 }

通过管理界面可以看,信息会先发送到 direct.queue.first,然后被放进死信队列作处理。

运行结果

添加描述

死信队列最常用的场景可以在订单支付,流程审批等环节。例如在 京*、淘* 等平台,当下单成功后,客户要在一定的时间内完成支付操作,否则订单被视作无效,这些业务流程就可以使用死信队列来处理。

七、持久化操作

RabbitMq 的持久化操作包含有 Queue 持久化、Message 持久化和 Exchange 持久化三类。

7.1 Queue 的持久化

队列持久化只需要在 Queue 的构造函数 public Queue(String name, boolean durable) 把 durable 参数置为 true 就可实现。如果队列不设置持久化( (durable 默认为 false), 那么在RabbitMQ 服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。

7.2 Message 持久化

设置了Queue 持久化以后,当 RabbitMQ 服务重启之后,队列依然存在,但消息已经消失,可见单单设置队列的持久化而不设置消息持久化显得毫无意义,所以通常列队持久化会与消息持久化共同使用。

在 RabbitMQ 原生态的框架下,需要把信息属性设置为 MessageProperties.PERSISTENT TEXT PLAIN 才会实现消息的持久化。

而在 Spring 框架下,由于在使用回调函数时需要把 Message 重新返回队列再进行处理,所以 Message 默认已经是持久化的。

7.3 Exchage 的持久化

交换器持久化可通过构造函数 public DirectExchange(String name, boolean durable, boolean autoDelete) 把 durable 参数置为 true 就可实现,而 autoDelete 则是指在所在消费者都解除订阅的情况下自动删除。如果交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是消息不再发送到该 Exchange 。对一个长期使用的交换器来说,持久化还是有其必要性的。

本章总结

RabbitMQ 发展至今,被越来越多的人认可,这和它在易用性、扩展性、可靠性和高可用性等方面的卓著表现是密不可分的。

相比于传统的 ActiveMQ 和分布式 Kafka,它具有自己独有的特点。

希望文章有帮于大家对 RabbitMQ 消息队列方面有更深入的了解,在不同的开发环境中灵活运用。

由于时间仓促,文章当中有不明确的地方或有错漏敬请点明。

0 人点赞