1、RabbitMQ与Spring的框架整合之Spring Boot实战。
首先创建maven项目的RabbitMQ的消息生产者rabbitmq-springboot-provider项目,配置pom.xml配置文件,如下所示:
代码语言:javascript复制 1 <project xmlns="http://maven.apache.org/POM/4.0.0"
2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
4 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 <modelVersion>4.0.0</modelVersion>
6 <groupId>com.bie</groupId>
7 <artifactId>rabbitmq-springboot-provider</artifactId>
8 <version>0.0.1-SNAPSHOT</version>
9
10 <parent>
11 <groupId>org.springframework.boot</groupId>
12 <artifactId>spring-boot-starter-parent</artifactId>
13 <version>2.0.2.RELEASE</version>
14 <relativePath /> <!-- lookup parent from repository -->
15 </parent>
16
17 <properties>
18 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
19 <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
20 <java.version>1.8</java.version>
21 </properties>
22
23 <dependencies>
24 <dependency>
25 <groupId>org.springframework.boot</groupId>
26 <artifactId>spring-boot-starter</artifactId>
27 </dependency>
28
29 <dependency>
30 <groupId>org.springframework.boot</groupId>
31 <artifactId>spring-boot-starter-test</artifactId>
32 <scope>test</scope>
33 </dependency>
34 <dependency>
35 <groupId>org.springframework.boot</groupId>
36 <artifactId>spring-boot-starter-amqp</artifactId>
37 </dependency>
38 </dependencies>
39
40 <build>
41 <plugins>
42 <plugin>
43 <groupId>org.springframework.boot</groupId>
44 <artifactId>spring-boot-maven-plugin</artifactId>
45 </plugin>
46 </plugins>
47 </build>
48
49 </project>
修改rabbitmq-springboot-provider的配置文件application.yml,如下所示:
代码语言:javascript复制 1 spring:
2 rabbitmq:
3 addresses: 192.168.110.133:5672 # rabbitmq服务器的ip地址和端口号
4 username: guest # rabbitmq服务器的账号
5 password: guest # rabbitmq服务器的密码
6 virtual-host: / # rabbitmq服务器的虚拟主机
7 connection-timeout:
8 15000 # rabbitmq服务器连接超时时间
9 # publisher-confirms,实现一个监听器用于监听Broker端给我们返回的确认请求。RabbitTemplate.ConfirmCallback。
10 publisher-confirms: true # 消息确认模式
11 # publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,
12 # 则使用监听器对不可达的消息进行后续处理,保证消息的路由成功,RabbitTemplate.ReturnCallback
13 publisher-returns: true # 消息返回模式
14 template:
15 mandatory: true # 配置mandatory=true保证监听有效。
16
创建配置类,可以将bean添加到容器中,开启注解扫描。
代码语言:javascript复制 1 package com.bie.config;
2
3 import org.springframework.context.annotation.ComponentScan;
4 import org.springframework.context.annotation.Configuration;
5
6 /**
7 *
8 * @author biehl
9 *
10 */
11 @Configuration // 配置类,可以将bean添加到容器中
12 @ComponentScan(basePackages = { "com.bie.*" }) // 扫描包注解
13 public class RabbitMQProducerConfig {
14
15 }
创建实体类,用于测试消息的发送。
代码语言:javascript复制 1 package com.bie.po;
2
3 import java.io.Serializable;
4
5 /**
6 *
7 * @author biehl
8 *
9 */
10 public class Order implements Serializable {
11
12 /**
13 *
14 */
15 private static final long serialVersionUID = 1L;
16
17 private String id;
18 private String name;
19
20 public String getId() {
21 return id;
22 }
23
24 public void setId(String id) {
25 this.id = id;
26 }
27
28 public String getName() {
29 return name;
30 }
31
32 public void setName(String name) {
33 this.name = name;
34 }
35
36 public Order(String id, String name) {
37 super();
38 this.id = id;
39 this.name = name;
40 }
41
42 @Override
43 public String toString() {
44 return "Order [id=" id ", name=" name "]";
45 }
46
47 public Order() {
48 super();
49 }
50
51 }
创建RabbitMQ的生产者,用于消息的发送。
代码语言:javascript复制 1 package com.bie.producer;
2
3 import java.text.SimpleDateFormat;
4 import java.util.Date;
5 import java.util.Map;
6 import java.util.UUID;
7
8 import org.springframework.amqp.rabbit.core.RabbitTemplate;
9 import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
10 import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
11 import org.springframework.amqp.rabbit.support.CorrelationData;
12 import org.springframework.beans.factory.annotation.Autowired;
13 import org.springframework.messaging.Message;
14 import org.springframework.messaging.MessageHeaders;
15 import org.springframework.messaging.support.MessageBuilder;
16 import org.springframework.stereotype.Component;
17
18 import com.bie.po.Order;
19
20 /**
21 *
22 * @author biehl
23 *
24 */
25 @Component
26 public class RabbitMQProducerMessage {
27
28 @Autowired
29 private RabbitTemplate rabbitTemplate;
30
31 // publisher-confirms,实现一个监听器用于监听Broker端给我们返回的确认请求。RabbitTemplate.ConfirmCallback。
32 // publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续处理,保证消息的路由成功,RabbitTemplate.ReturnCallback
33 // 注意:在发送消息的时候对template进行配置mandatory=true保证监听有效。
34 // 生产端还可以配置其他属性,比如发送重试、超时时间、次数、间隔等等。
35
36 // 回调函数,confirm确认
37 final ConfirmCallback confirmCallback = new ConfirmCallback() {
38
39 @Override
40 public void confirm(CorrelationData correlationData, boolean ack, String cause) {
41 System.out.println("correlationData : " correlationData);
42 System.out.println("ack : " ack);
43 if (!ack) {
44 System.out.println("异常处理,将后续继续处理.......");
45 }
46 System.out.println();
47 }
48
49 };
50
51 // 回调函数,return返回
52 final ReturnCallback returnCallback = new ReturnCallback() {
53
54 @Override
55 public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
56 String exchange, String routingKey) {
57 System.err.println("return exchange: " exchange ", routingKey: " routingKey ", replyCode: "
58 replyCode ", replyText: " replyText);
59 }
60
61 };
62
63 /**
64 * 发送消息的方法
65 *
66 * @param message
67 * @param properties
68 */
69 public void send(Object message, Map<String, Object> properties) {
70 // 设置消息头信息
71 MessageHeaders messageHeaders = new MessageHeaders(properties);
72 // 创建消息
73 Message msg = MessageBuilder.createMessage(message, messageHeaders);
74 // 消息确认和消息返回机制的回调
75 rabbitTemplate.setConfirmCallback(confirmCallback);
76 rabbitTemplate.setReturnCallback(returnCallback);
77 // id 时间戳的格式,保证全局唯一性
78 CorrelationData correlationData = new CorrelationData();
79 String id = UUID.randomUUID().toString();
80 // 唯一性id,做ack可靠性投递的时候、补偿策略的时候,根据该id可以找到唯一条消息。
81 correlationData.setId(id);
82 String exchange = "exchange-1"; // 交换机名称。需要自己创建好该交换机,然后创建一个队列,使用路由键将该交换机和队列进行绑定即可。
83 String routingkey = "springboot.helloRabbitmq"; // 路由键
84 rabbitTemplate.convertAndSend(exchange, routingkey, msg, correlationData);
85 }
86
87 /**
88 *
89 * @param order
90 */
91 public void sendOrder(Order order) {
92 // 消息确认和消息返回机制的回调
93 rabbitTemplate.setConfirmCallback(confirmCallback);
94 rabbitTemplate.setReturnCallback(returnCallback);
95 // id 时间戳的格式,保证全局唯一性
96 CorrelationData correlationData = new CorrelationData();
97 String id = UUID.randomUUID().toString();
98 correlationData.setId(id);
99 rabbitTemplate.convertAndSend("exchange-1", "springboot.def", order, correlationData);
100 }
101
102 }
创建主启动类,进行项目的启动,如下所示:
代码语言:javascript复制 1 package com.bie;
2
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5
6 /**
7 *
8 * @author biehl
9 *
10 */
11 @SpringBootApplication
12 public class SpringBootRabbitMQProviderApplication {
13
14 public static void main(String[] args) {
15 SpringApplication.run(SpringBootRabbitMQProviderApplication.class, args);
16 }
17
18 }
创建生产者的测试类,进行生产者消息的发送。
代码语言:javascript复制 1 package com.bie.springboot;
2
3 import java.text.SimpleDateFormat;
4 import java.util.Date;
5 import java.util.HashMap;
6 import java.util.Map;
7
8 import org.junit.Test;
9 import org.junit.runner.RunWith;
10 import org.springframework.beans.factory.annotation.Autowired;
11 import org.springframework.boot.test.context.SpringBootTest;
12 import org.springframework.test.context.junit4.SpringRunner;
13
14 import com.bie.po.Order;
15 import com.bie.producer.RabbitMQProducerMessage;
16
17 @RunWith(SpringRunner.class)
18 @SpringBootTest
19 public class ApplicationTests {
20
21 @Autowired
22 private RabbitMQProducerMessage rabbitMQProducerMessage;
23
24 private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
25
26 @Test
27 public void rabbitMQProducerMessage() throws Exception {
28 Map<String, Object> properties = new HashMap<>();
29 properties.put("idCard", "410725195545815685x");
30 properties.put("createdate", simpleDateFormat.format(new Date()));
31 for (int i = 0; i < 1000; i ) {
32 rabbitMQProducerMessage.send("Hello RabbitMQ For Spring Boot!" i, properties);
33 }
34
35 // 线程休眠,消息ack确认
36 // Thread.sleep(500000);
37 }
38
39 @Test
40 public void rabbitMQProducerOrder() throws Exception {
41 Order order = new Order("001", "第一个订单");
42 rabbitMQProducerMessage.sendOrder(order);
43 }
44
45 }
生产者发送消息,可以在RabbitMQ的管控台进行观察效果的。上面这种方式,需要手动创建交换机,队列,以及使用路由键将交换机和队列进行绑定。可以在管控台进行交换机、队列、以及使用路由键将交换机和队列进行绑定的。
2、首先创建maven项目的RabbitMQ的消息消费者rabbitmq-springboot-consumer项目,配置pom.xml配置文件,如下所示:
代码语言:javascript复制 1 <project xmlns="http://maven.apache.org/POM/4.0.0"
2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
4 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 <modelVersion>4.0.0</modelVersion>
6 <groupId>com.bie</groupId>
7 <artifactId>rabbitmq-springboot-consumer</artifactId>
8 <version>0.0.1-SNAPSHOT</version>
9
10 <parent>
11 <groupId>org.springframework.boot</groupId>
12 <artifactId>spring-boot-starter-parent</artifactId>
13 <version>2.0.2.RELEASE</version>
14 <relativePath /> <!-- lookup parent from repository -->
15 </parent>
16
17 <properties>
18 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
19 <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
20 <java.version>1.8</java.version>
21 </properties>
22
23 <dependencies>
24 <dependency>
25 <groupId>org.springframework.boot</groupId>
26 <artifactId>spring-boot-starter</artifactId>
27 </dependency>
28
29 <dependency>
30 <groupId>org.springframework.boot</groupId>
31 <artifactId>spring-boot-starter-test</artifactId>
32 <scope>test</scope>
33 </dependency>
34 <dependency>
35 <groupId>org.springframework.boot</groupId>
36 <artifactId>spring-boot-starter-amqp</artifactId>
37 </dependency>
38 </dependencies>
39
40 <build>
41 <plugins>
42 <plugin>
43 <groupId>org.springframework.boot</groupId>
44 <artifactId>spring-boot-maven-plugin</artifactId>
45 </plugin>
46 </plugins>
47 </build>
48
49 </project>
由于生产者端和消费者端是分项目开发的,但是配置类RabbitMQProducerConfig和实体类Order都一样,主启动类修改一下名称即可,这里就省略了。
代码语言:javascript复制 1 package com.bie.consumer;
2
3 import java.util.Map;
4
5 import org.springframework.amqp.rabbit.annotation.Exchange;
6 import org.springframework.amqp.rabbit.annotation.Queue;
7 import org.springframework.amqp.rabbit.annotation.QueueBinding;
8 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
9 import org.springframework.amqp.rabbit.annotation.RabbitListener;
10 import org.springframework.amqp.support.AmqpHeaders;
11 import org.springframework.messaging.Message;
12 import org.springframework.messaging.handler.annotation.Headers;
13 import org.springframework.messaging.handler.annotation.Payload;
14 import org.springframework.stereotype.Component;
15
16 import com.bie.po.Order;
17 import com.rabbitmq.client.Channel;
18
19 /**
20 *
21 * @author biehl
22 *
23 * 1、签收模式,首先配置手动确认模式,用于ack的手工处理,这样我们可以保证消息的可靠性送达,
24 * 或者在消费端消费失败的时候可以做到重回队列,根据业务记录日志等处理。
25 *
26 * 2、可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况
27 *
28 * 3、消费端最重要的就是注解@RabbitListener注解的使用。消息端监听注解。
29 *
30 * 该注解是一个组合注解,里面可以注解配置@QueueBinding、@Queue、@Exchange。
31 *
32 * 直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并且配置监听功能等等。
33 *
34 */
35 @Component
36 public class RabbitMQConsumerMessage {
37
38 @RabbitListener(bindings = @QueueBinding(
39
40 value = @Queue(value = "${order.queue.name}", durable = "${order.queue.durable}"),
41
42 exchange = @Exchange(value = "${order.exchange.name}", durable = "${order.exchange.durable}", type = "${order.exchange.type}", ignoreDeclarationExceptions = "${order.exchange.ignoreDeclarationExceptions}"),
43
44 key = "${order.key}"
45
46 )
47
48 )
49 @RabbitHandler
50 public void onMessage(Message message, Channel channel) throws Exception {
51 System.out.println("消费者: " message.getPayload());
52 Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
53 // 手工ack
54 channel.basicAck(deliveryTag, false);
55 }
56
57 /**
58 *
59 * order.queue.name=queue-2 order.queue.durable=true
60 * order.exchange.name=exchange-1 order.exchange.durable=true
61 * order.exchange.type=topic order.exchange.ignoreDeclarationExceptions=true
62 * order.key=springboot.*
63 *
64 * @param order
65 * @param channel
66 * @param headers
67 * @throws Exception
68 */
69 @RabbitListener(bindings = @QueueBinding(
70
71 value = @Queue(value = "${order.queue.name}", durable = "${order.queue.durable}"),
72
73 exchange = @Exchange(value = "${order.exchange.name}", durable = "${order.exchange.durable}", type = "${order.exchange.type}", ignoreDeclarationExceptions = "${order.exchange.ignoreDeclarationExceptions}"),
74
75 key = "${order.key}"
76
77 )
78
79 )
80 @RabbitHandler // @Payload指定实际消息体内容,可以定义到形参上。
81 public void onOrderMessage(@Payload Order order, Channel channel, @Headers Map<String, Object> headers)
82 throws Exception {
83 System.out.println("消费端order: " order.getId());
84 Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
85 // 手工ACK
86 channel.basicAck(deliveryTag, false);
87 }
88
89 }
直接启动消费者的启动类,然后在生产者测试类开始发送消息,消费端就可以监听到了消息。